lib/pione/command/pione-client.rb
module Pione
module Command
# PioneClient is a command to request processing.
class PioneClient < BasicCommand
include TupleSpace::TupleSpaceInterface
#
# basic informations
#
define(:toplevel, true)
define(:name, "pione-client")
define(:desc, "Process PIONE document")
define(:front, Front::ClientFront)
#
# arguments
#
argument(:location) do |item|
item.type = :location
item.key_name = :package_location
item.desc = "package location"
item.missing = "There are no PIONE documents or packages."
end
#
# options
#
option_pre(:prepare_model) do |item|
item.desc = "Prepare model"
item.assign(:params) {Lang::ParameterSetSequence.new}
end
option(CommonOption.debug)
option(CommonOption.color)
option(CommonOption.communication_address)
option(CommonOption.task_worker_size)
option(CommonOption.parent_front) do |item|
item.requisite = false
end
option(CommonOption.features) do |item|
item.default = Global.features + "& ^Interactive"
end
option(CommonOption.file_cache_method)
option(CommonOption.file_sliding)
option(NotificationOption.notification_targets)
option(NotificationOption.notification_receivers)
option(SessionOption.request_from)
option(SessionOption.session_id)
option(:input_location) do |item|
item.type = :location
item.short = '-i'
item.long = '--input'
item.arg = 'LOCATION'
item.desc = 'Set input directory'
end
option(:base_location) do |item|
item.type = :location
item.short = '-b'
item.long = '--base'
item.arg = 'LOCATION'
item.desc = 'Set process base location'
item.init = "local:./process/"
item.process do |location|
model[:base_location] = location
if location.scheme == "myftp"
model[:myftp] = URI.parse(uri).normalize
end
end
item.exception(ArgumentError) do |e, val|
raise OptionError.new(cmd, "base location '%s' is bad in %s" % [uri, cmd.name])
end
end
option(:stream) do |item|
item.type = :boolean
item.long = '--stream'
item.arg = '[BOOLEAN]'
item.desc = 'Turn on/off stream mode'
item.init = false
item.default = true
end
option(:request_task_worker) do |item|
item.type = :integer
item.long = '--request-task-worker'
item.arg = 'N'
item.desc = 'Set request number of task workers'
item.init = 1
end
option(:params) do |item|
item.type = :param_set
item.long = '--params="{Var:1,...}"'
item.desc = "Set user parameters"
item.assign do |params|
model[:params].merge(params)
end
item.exception(Parslet::ParseFailed) do |e, str|
arg = {str: str, name: cmd.name, reason: e.message}
raise OptionError.new(cmd, 'Invalid parameters "%{str}" in %{name}: %{reason}' % arg)
end
end
option(:stand_alone) do |item|
item.type = :boolean
item.long = '--stand-alone'
item.desc = 'Turn on stand alone mode'
item.init = false
item.default = true
item.process do |val|
model[:without_tuple_space_provider] = val
end
end
option(:dry_run) do |item|
item.long = '--dry-run'
item.desc = 'Turn on dry run mode'
item.type = :boolean
item.init = false
item.default = true
end
option(:rehearse) do |item|
item.type = :string
item.long = '--rehearse'
item.arg = '[SCENARIO]'
item.desc = 'rehearse the scenario'
item.assign {|val| (not(val.nil?) and val.size != 0) ? val : :anything}
end
option(:timeout) do |item|
item.type = :positive_integer
item.long = '--timeout'
item.arg = 'SEC'
item.desc = 'timeout processing after SEC'
end
option(:client_ui) do |item|
item.type = :string
item.long = "--client-ui"
item.arg = "TYPE"
item.desc = "Type of the client's user interface"
end
option_post(:validate_task_worker_size) do |item|
item.desc = "Validate task worker size"
item.process do
test(model[:task_worker_size] == 0)
test(model[:stand_alone])
raise Rootage::OptionError.new(cmd, "option error: invalid task worker size '%s'" % model[:task_worker])
end
end
option_post(:stream_location) do |item|
item.desc = "Validate stream location"
item.process do
test(model[:stream])
test(model[:input_location].nil?)
raise Rootage::OptionError.new(cmd, "option error: no input URI on stream mode")
end
end
#
# command lifecycle: setup phase
#
# setup_phase :timeout => 20 # because of setup for dropbox...
phase(:setup) do |seq|
seq << ProcessAction.connect_parent
seq << :spawner_thread_group
seq << :ftp_server
seq << :tuple_space
seq << :base_location
seq << :lang_environment
seq << :package
seq << :scenario
end
setup(:spawner_thread_group) do |item|
item.desc = "Make a spawner thread group"
item.assign(:spawner_threads) {ThreadGroup.new}
end
setup(:ftp_server) do |item|
item.desc = "Setup FTP server with the URI"
item.process do
test(model[:myftp])
uri = model[:myftp]
location = Location[uri.path]
location.path.mkdir unless location.exist?
if uri.userinfo
Util::FTPServer.auth_info = Util::FTPAuthInfo.new(uri.user, uri.password)
end
if uri.port
Util::FTPServer.port = myftp.port
end
Util::FTPServer.start(Util::FTPLocalFS.new(location))
end
end
setup(:tuple_space) do |item|
item.desc = "Make a tuple space"
item.assign(:tuple_space) do
TupleSpace::TupleSpaceServer.new(task_worker_resource: model[:request_task_worker])
end
item.process do
model[:front].set_tuple_space(model[:tuple_space])
# write tuples
model[:tuple_space].write(TupleSpace::ProcessInfoTuple.new('standalone', 'Standalone'))
model[:tuple_space].write(TupleSpace::DryRunTuple.new(model[:dry_run]))
if model[:request_from]
model[:tuple_space].write(TupleSpace::AttributeTuple.new("request_from", model[:request_from]))
end
if model[:session_id]
model[:tuple_space].write(TupleSpace::AttributeTuple.new("session_id", model[:session_id]))
end
if model[:client_ui]
model[:tuple_space].write(TupleSpace::AttributeTuple.new("client_ui", model[:client_ui]))
else
model[:tuple_space].write(TupleSpace::AttributeTuple.new("client_ui", "GUI"))
end
end
end
setup(:base_location) do |item|
item.desc = "Setup base location"
# setup location
item.process do
case model[:base_location]
when Location::LocalLocation
model[:base_location] = Location[model[:base_location].path.expand_path]
model[:base_location].path.mkpath
when Location::DropboxLocation
Location::DropboxLocation.setup_for_cui_client(tuple_space_server)
end
end
# mkdir
item.process do
test(not(model[:base_location].exist?))
model[:base_location].mkdir
end
# set base location into tuple space
item.process do
model[:tuple_space].set_base_location(model[:base_location])
end
end
setup(:lang_environment) do |item|
item.desc = "Make an environment"
item.assign(:env) do
Lang::Environment.new
end
end
# This setups package sharing and secnario handling also.
setup(:package) do |item|
item.desc = "Read a PIONE package"
# read package
item.assign(:package_handler) do
Package::PackageReader.read(model[:package_location])
end
# merge the package into environment
item.assign(:env) do
model[:package_handler].eval(model[:env])
end
# upload the package
item.process do
model[:package_handler].upload(model[:base_location] + "package")
end
item.exception(Package::InvalidPackage) do |e|
cmd.abort("Package error: " + e.message)
end
item.exception(Lang::ParserError) do |e|
cmd.abort("Pione syntax error: " + e.message)
end
item.exception(Lang::LangError) do |e|
cmd.abort("Pione language error: %s(%s)" % [e.message, e.class.name])
end
end
setup(:scenario) do |item|
item.desc = "Read a scenario"
item.condition do
test(model[:rehearse])
test(not(model[:package_handler].info.scenarios.empty?))
end
item.assign(:scenario_handler) do
model[:package_handler].find_scenario(model[:rehearse])
end
item.assign(:input_location) do
if model[:scenario_handler]
model[:scenario_handler].input
else
cmd.abort("the scenario not found: %s" % model[:rehearse])
end
end
end
#
# command lifecycle: execution phase
#
phase(:execution) do |seq|
seq << :job_terminator
seq << :messenger
seq << :logger
seq << :input_generator
seq << :tuple_space_provider
seq << :task_worker
seq << :job_manager
seq << :check_rehearsal_result
end
execution(:job_terminator) do |item|
item.desc = "Start a job terminator"
item.assign(:job_terminator) do
Agent::JobTerminator.start(model[:tuple_space]) do |status|
if status.error?
cmd.abort("pione-client catched the error: %s" % status.message, exception: status.exception)
else
cmd.terminate
end
end
end
end
execution(:messenger) do |item|
item.desc = "Start a messenger agent"
item.assign(:messenger) do
# select receiver
if model[:parent_front] and model[:parent_front][:message_log_receiver]
# delegate parent's receiver
receiver = model[:parent_front][:message_log_receiver]
else
# CUI receiver
receiver = Log::CUIMessageLogReceiver.new
end
Agent::Messenger.new(model[:tuple_space], receiver, model[:session_id]).start
end
end
# Launch a logger agent.
execution(:logger) do |item|
item.desc = "Start a logger agent"
item.assign(:logger) do
Agent::Logger.start(model[:tuple_space], model[:base_location])
end
end
execution(:input_generator) do |item|
item.desc = "Start an input generator agent"
item.assign(:input_generator) do
Agent::InputGenerator.start(
model[:tuple_space], :dir, model[:input_location], model[:stream]
)
end
end
execution(:tuple_space_provider_spawner) do |item|
item.desc = "Spawn a tuple space provider"
item.assign(:tuple_space_provider) do
spawner = Command::PioneTupleSpaceProvider.spawn(cmd)
spawner.when_terminated do
if cmd.current_phase == :execution
cmd.abort("%s is terminated because child tuple space provider is maybe dead." % cmd.name)
end
end
spawner.child_front
end
item.exception(SpawnError) do |e|
if cmd.current_phase == :termination
Log::Debug.system(e.message)
else
cmd.abort(e)
end
end
end
execution(:tuple_space_provider) do |item|
item.desc = "Spawn a tuple space provider"
item.process do
test(not(model[:without_tuple_space_provider]))
thread = Thread.new do
cmd.phase(:execution).find_item(:tuple_space_provider_spawner).execute(cmd)
end
model[:spawner_threads].add(thread)
end
end
# Spawn a task worker command. This is used from `task_worker` action.
execution(:task_worker_spawner) do |item|
item.desc = "Spawn a task worker"
item.process do
param = {
:features => Global.features,
:tuple_space_id => model[:tuple_space].uuid
}
Command::PioneTaskWorker.spawn(model, param)
end
item.exception(SpawnError) do |e|
if cmd.current_phase == :termination
# ignore the exception if the command is terminating
Log::Debug.system(e.message)
else
cmd.abort(e)
end
end
end
# Launch task worker agents in the client side. If the client is
# stand-alone mode, they are in this thread. Otherwise, in other OS
# process.
execution(:task_worker) do |item|
item.desc = "Start task workers"
item.assign(:task_workers) {Array.new}
# stand-alone mode
item.process do
test(model[:stand_alone])
# start task worker agents in this command
model[:task_worker_size].times do
model[:task_workers] << Agent::TaskWorker.start(
model[:tuple_space], Global.expressional_features, model[:env]
)
end
end
# distribution mode
item.process do
test(not(model[:stand_alone]))
# spawn task worker commands
model[:task_worker_size].times do
# we don't wait workers start up because of performance
thread = Thread.new do
cmd.phase(:execution).find_item(:task_worker_spawner).execute(cmd)
end
model[:spawner_threads].add(thread)
end
end
end
execution(:job_manager) do |item|
item.desc = "Start a job manager agent"
item.assign(:job_manager) do
param_set = Lang::ParameterSet.new
# from option
if model[:params] and not(model[:params].pieces.empty?)
param_set = model[:params].pieces.first
end
# from scenario
if not(model[:scenario_handler].nil?) and model[:scenario_handler].info.textual_param_sets
param_set = Util.parse_param_set(model[:scenario_handler].info.textual_param_sets).pieces.first
end
# start
Agent::JobManager.start(
model[:tuple_space], model[:env], model[:package_handler], param_set, model[:stream]
)
end
item.process do
Timeout::timeout(model[:timeout]) do
model[:job_manager].wait_until_terminated(nil)
end
end
item.exception(Agent::JobError) do |e|
cmd.abort(e)
end
item.exception(Timeout::Error) do |e|
cmd.abort("Job timed out after %{number} sec." % {number: model[:timeout]})
end
end
execution(:check_rehearsal_result) do |item|
item.desc = "Check rehearsal result"
item.process do
test(model[:rehearse])
test(not(model[:package_handler].info.scenarios.empty?))
pscenario = test(model[:package_handler].find_scenario(model[:rehearse]))
errors = pscenario.validate(model[:base_location] + "output")
if errors.empty?
Log::SystemLog.info "Rehearsal Result: Succeeded"
else
puts "Rehearsal Result: Failed"
errors.each {|error| puts "- %s" % error.to_s}
cmd.exit_status = false
end
end
end
#
# command lifecycle: termination phase
#
phase(:termination) do |seq|
seq.configure(:timeout => 10)
seq << :spawner_thread
seq << ProcessAction.terminate_children
seq << :job_manager
seq << :job_terminator
seq << :task_worker
seq << :input_generator
seq << :logger
seq << :messenger
seq << :tuple_space
seq << ProcessAction.disconnect_parent
end
# This action is required for the case that the requested job reaches end
# before task workers finish to be spawned.
termination(:spawner_thread) do |item|
item.desc = "Terminate spawner threads"
item.process do
if model[:spawner_threads]
model[:spawner_threads].list.each {|thread| thread.kill}
end
end
end
# Be careful that main thread of `pione-client` command waits to stop the
# job manager's chain thread, so pione-client cannot terminate until the
# thread terminated.
termination(:job_manager) do |item|
item.desc = "Terminate job manager agent"
item.process do
test(model[:job_manager])
test(not(model[:job_manager].terminated?))
model[:job_manager].terminate
end
end
termination(:job_terminator) do |item|
item.desc = "Terminate job terminator agent"
item.process do
test(model[:job_terminator])
test(not(model[:job_terminator].terminated?))
model[:job_terminator].terminate
end
end
termination(:task_worker) do |item|
item.desc = "Terminate task worker agents"
item.process do
test(model[:stand_alone])
test(model[:task_workers])
model[:task_workers].each {|task_worker| task_worker.terminate}
end
end
termination(:input_generator) do |item|
item.desc = "Terminate input generator agent"
item.process do
test(model[:input_generator])
test(not(model[:input_generator].terminated?))
model[:input_generator].terminate
end
end
termination(:logger) do |item|
item.desc = "Terminate logger agent"
item.process do
test(model[:logger])
test(not(model[:logger].terminated?))
model[:logger].terminate
end
end
termination(:messenger) do |item|
item.desc = "Terminate messenger agent"
item.process do
test(model[:messenger])
test(not(model[:messenger].terminated?))
model[:messenger].terminate
end
end
termination(:tuple_space) do |item|
item.desc = "Terminate tuple space agent"
item.process do
test(model[:tuple_space])
model[:tuple_space].terminate
end
end
end
end
end