lib/synco/scope.rb
# Copyright, 2016, by Samuel G. D. Williams. <http://www.codeotaku.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
require_relative 'script'
require 'process/group'
require 'console'
require 'delegate'
module Synco
class CommandFailure < RuntimeError
def initialize(command, status)
@command = command
@status = status
super "Command #{command.inspect} failed: #{status}!"
end
attr :command
attr :status
end
class Runner
include Console
def initialize(*scripts)
@scripts = scripts
end
attr :scripts
def call
start_time = Time.now
logger.info "===== Starting at #{start_time} ====="
Process::Group.wait do |group|
@scripts.each do |script|
Fiber.new do
ScriptScope.new(script, logger, group).call
end.resume
end
end
ensure
end_time = Time.now
logger.info "[Time]: (#{end_time - start_time}s)."
logger.info "===== Finished backup at #{end_time} ====="
end
end
class ScriptScope
def initialize(script, logger, group)
@script = script
@logger = logger
@group = group
@current_server = ServerScope.new(@script.current_server, self)
@master_server = ServerScope.new(@script.master_server, self, @current_server)
end
attr :script
attr :logger
attr :group
attr :master_server
attr :current_server
def method
@script.method
end
def call
if @script.running_on_master?
logger.info "We are the master server..."
else
logger.info "We are not the master server..."
logger.info "Master server is #{@master}..."
end
@script.try(self) do
# This allows events to run on the master server if specified, before running any backups.
@master_server.try(master_target_server) do
method.try(self) do
logger.info "Running backups for server #{@current_server}..."
run_servers(group)
end
end
end
end
private
def master_target_server
TargetScope.new(self, @master_server)
end
def target_servers
@script.servers.each do |name, server|
# server is always a data destination, therefore server can't be @master_server:
next if @master_server.eql?(server)
yield ServerScope.new(server, self, @current_server)
end
end
# This function runs the method for each directory and server combination specified.
def run_servers(group)
target_servers do |server|
sync_scope = TargetScope.new(self, server)
logger.info "===== Processing ====="
logger.info "[Master]: #{master_server}"
logger.info "[Target]: #{server}"
server.try(sync_scope) do
@script.directories.each do |directory|
directory_scope = DirectoryScope.new(sync_scope, directory)
logger.info "[Directory]: #{directory}"
directory.try(directory_scope) do
method.call(directory_scope)
end
end
end
end
end
end
class LogPipe < DelegateClass(IO)
def initialize(logger, level = :info)
@input, @output = IO.pipe
@logger = logger
super(@output)
@thread = Thread.new do
@input.each{|line| logger.send(level, line.chomp!)}
end
end
def close
# Close the output pipe, we should never be writing to this anyway:
@output.close
# Wait for the thread to read everything and join:
@thread.join
# Close the input pipe because it's already closed on the remote end:
@input.close
end
end
class ServerScope < DelegateClass(Server)
def initialize(server, script_scope, from = nil)
super(server)
@script_scope = script_scope
@from = from
end
def logger
@logger ||= @script_scope.logger
end
def group
@group ||= @script_scope.group
end
def run(*command, from: @from, **options)
# We are invoking a command from the given server, so we need to use the shell to connect..
if from and !from.same_host?(self)
if chdir = options.delete(:chdir)
command = ["synco", "--root", chdir, "spawn"] + command
end
command = self.connection_command + ["--"] + command
end
logger.info("shell") {[command, options]}
options[:out] ||= LogPipe.new(logger)
options[:err] ||= LogPipe.new(logger, :error)
status = self.group.spawn(*command, **options)
logger.debug{"Process finished: #{status}."}
options[:out].close
options[:err].close
unless status.success?
raise CommandFailure.new(command, status)
end
end
end
class TargetScope < DelegateClass(ScriptScope)
def initialize(script_scope, target)
super(script_scope)
@target_server = ServerScope.new(target, script_scope, script_scope.current_server)
end
def run(*arguments)
@target_server.run(*arguments)
end
attr :target_server
end
class DirectoryScope < DelegateClass(TargetScope)
def initialize(sync_scope, directory)
super(sync_scope)
@directory = directory
end
attr :directory
end
end