lib/t2-server/run.rb
# Copyright (c) 2010-2014 The University of Manchester, UK.
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the names of The University of Manchester nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Author: Robert Haines
require 'base64'
require 'time'
require 'taverna-baclava'
module T2Server
# An interface for easily running jobs on a Taverna 2 Server with minimal
# setup and configuration required.
#
# A run can be in one of three states:
# * :initialized - The run has been accepted by the server. It may not yet be
# ready to run though as its input port may not have been set.
# * :running - The run is being run by the server.
# * :finished - The run has finished running and its outputs are available
# for download.
class Run
include XML::Methods
private_class_method :new
# The identifier of this run on the server.
attr_reader :identifier
alias :id :identifier
# The server instance that this run is hosted on.
attr_reader :server
# :stopdoc:
XPATHS = {
# Run XPath queries
:run_desc => "/nsr:runDescription",
:dir => "//nss:dir",
:file => "//nss:file",
:expiry => "//nsr:expiry",
:workflow => "//nsr:creationWorkflow",
:status => "//nsr:status",
:createtime => "//nsr:createTime",
:starttime => "//nsr:startTime",
:finishtime => "//nsr:finishTime",
:wdir => "//nsr:workingDirectory",
:inputs => "//nsr:inputs",
:output => "//nsr:output",
:securectx => "//nsr:securityContext",
:listeners => "//nsr:listeners",
:baclava => "//nsr:baclava",
:inputexp => "//nsr:expected",
:name => "//nsr:name",
:feed => "//nsr:interaction",
:gen_prov => "//nsr:generate-provenance",
:run_bundle => "//nsr:run-bundle",
# Port descriptions XPath queries
:port_in => "//port:input",
:port_out => "//port:output",
# Run security XPath queries
:sec_creds => "//nsr:credentials",
:sec_perms => "//nsr:permissions",
:sec_trusts => "//nsr:trusts",
:sec_perm => "/nsr:permissionsDescriptor/nsr:permission",
:sec_uname => "nsr:userName",
:sec_uperm => "nsr:permission",
:sec_cred => "/nsr:credential",
:sec_suri => "nss:serviceURI",
:sec_trust => "/nsr:trustedIdentities/nsr:trust"
}
@@xpaths = XML::XPathCache.instance
@@xpaths.register_xpaths XPATHS
# The name to be used internally for retrieving results via baclava
BACLAVA_FILE = "out.xml"
# New is private but rdoc does not get it right! Hence :stopdoc: section.
def initialize(server, uri, credentials = nil)
@server = server
@uri = uri
@identifier = Util.get_path_leaf_from_uri(@uri)
@workflow = ""
@baclava_in = false
@baclava_out = false
@provenance = false
@credentials = credentials
# Has this Run finished executing on the server?
@finished = false
# Has this Run object been deleted from the server?
@deleted = false
# The following three fields hold cached data about the run that is only
# downloaded the first time it is requested.
@run_doc = nil
@owner = nil
@links = nil
# initialize ports lists to nil as an empty list means no inputs/outputs
@input_ports = nil
@output_ports = nil
# The interaction reader to use for this run, if required.
@interaction_reader = nil
end
# :startdoc:
# :call-seq:
# Run.create(server, workflow) -> run
# Run.create(server, workflow, connection_parameters) -> run
# Run.create(server, workflow, user_credentials) -> run
# Run.create(server, workflow, ...) {|run| ...}
#
# Create a new run in the :initialized state. The run will be created on
# the server with address supplied by _server_. This can either be a
# String of the form <tt>http://example.com:8888/blah</tt> or an already
# created instance of T2Server::Server. The _workflow_ may be supplied
# as a string in t2flow format, a filename or a File or IO object. User
# credentials and connection parameters can be supplied if required but
# are both optional. If _server_ is an instance of T2Server::Server then
# _connection_parameters_ will be ignored.
#
# This method will _yield_ the newly created Run if a block is given.
def Run.create(server, workflow, *rest)
credentials = nil
uri = nil
conn_params = nil
rest.each do |param|
case param
when URI
uri = param
when ConnectionParameters
conn_params = param
when HttpCredentials
credentials = param
end
end
# If server is not a Server object, get one.
server = Server.new(server, conn_params) if server.class != Server
# If we are not given a URI to a run then we know we need to create one.
uri ||= server.initialize_run(workflow, credentials)
# Create the run object and yield it if necessary.
run = new(server, uri, credentials)
yield(run) if block_given?
run
end
# :call-seq:
# owner -> string
#
# Get the username of the owner of this run. The owner is the user who
# created the run on the server.
def owner
@owner ||= _get_run_owner
end
# :call-seq:
# name -> String
#
# Get the name of this run.
#
# Initially this name is derived by Taverna Server from the name
# annotation in the workflow file and the time at which the run was
# initialized. It can be set with the <tt>name=</tt> method.
#
# For Taverna Server versions prior to version 2.5.0 this is a no-op and
# the empty string is returned for consistency.
def name
return "" if links[:name].nil?
@server.read(links[:name], "text/plain", @credentials)
end
# :call-seq:
# name = new_name -> bool
#
# Set the name of this run. +true+ is returned upon success. The maximum
# length of names supported by the server is 48 characters. Anything
# longer than 48 characters will be truncated before upload.
#
# Initially this name is derived by Taverna Server from the name
# annotation in the workflow file and the time at which the run was
# initialized.
#
# For Taverna Server versions prior to version 2.5.0 this is a no-op but
# +true+ is still returned for consistency.
def name=(name)
return true if links[:name].nil?
@server.update(links[:name], name[0...48], "text/plain", @credentials)
end
# :call-seq:
# delete
#
# Delete this run from the server.
def delete
@server.delete(@uri, @credentials)
@deleted = true
end
# :call-seq:
# deleted? -> true or false
#
# Has this run been deleted from the server?
def deleted?
@deleted
end
# :call-seq:
# input_ports -> hash
#
# Return a hash (name, port) of all the input ports this run expects.
def input_ports
@input_ports ||= _get_input_port_info
end
# :call-seq:
# input_port(port) -> port
#
# Get _port_.
def input_port(port)
input_ports[port]
end
# :call-seq:
# output_ports -> hash
#
# Return a hash (name, port) of all the output ports this run has. Until
# the run is finished this method will return _nil_.
def output_ports
if finished? && @output_ports.nil?
@output_ports = _get_output_port_info
end
@output_ports
end
# :call-seq:
# output_port(port) -> port
#
# Get output port _port_.
def output_port(port)
output_ports[port] if finished?
end
# :call-seq:
# expiry -> string
#
# Return the expiry time of this run as an instance of class Time.
def expiry
Time.parse(@server.read(links[:expiry], "text/plain", @credentials))
end
# :call-seq:
# expiry = time -> true or false
#
# Set the expiry time of this run to _time_. _time_ should either be a Time
# object or something that the Time class can parse. If the value given
# does not specify a date then today's date will be assumed. If a time/date
# in the past is specified, the expiry time will not be changed.
def expiry=(time)
unless time.instance_of? Time
time = Time.parse(time)
end
# need to massage the xmlschema format slightly as the server cannot
# parse timezone offsets with a colon (eg +00:00)
date_str = time.xmlschema(2)
date_str = date_str[0..-4] + date_str[-2..-1]
@server.update(links[:expiry], date_str, "text/plain", @credentials)
end
# :call-seq:
# workflow -> string
#
# Get the workflow that this run represents.
def workflow
if @workflow == ""
@workflow = @server.read(links[:workflow], "application/xml",
@credentials)
end
@workflow
end
# :call-seq:
# status -> string
#
# Get the status of this run. Status can be one of :initialized,
# :running or :finished.
def status
return :deleted if @deleted
return :finished if @finished
state = Status.to_sym(@server.read(links[:status], "text/plain",
@credentials))
@finished = (state == :finished)
state
end
# :call-seq:
# start -> true or false
#
# Start this run on the server. Returns true if the run was started, false
# otherwise.
#
# Raises RunStateError if the run is not in the :initialized state.
def start
state = status
raise RunStateError.new(state, :initialized) if state != :initialized
# set all the inputs
_check_and_set_inputs unless baclava_input?
begin
@server.update(links[:status], Status.to_text(:running), "text/plain",
@credentials)
rescue ServerAtCapacityError => sace
false
end
end
# :call-seq:
# wait(check_interval = 1)
#
# Wait (block) for this run to finish. How often (in seconds) the run is
# tested for completion can be specified with check_interval.
#
# Raises RunStateError if the run is still in the :initialized state.
def wait(interval = 1)
state = status
raise RunStateError.new(state, :running) if state == :initialized
# wait
until finished?
sleep(interval)
end
end
# :call-seq:
# exitcode -> fixnum
#
# Get the return code of the run. Zero indicates success.
def exitcode
@server.read(links[:exitcode], "text/plain", @credentials).to_i
end
# :call-seq:
# stdout -> string
#
# Get anything that the run printed to the standard out stream.
def stdout
@server.read(links[:stdout], "text/plain", @credentials)
end
# :call-seq:
# stderr -> string
#
# Get anything that the run printed to the standard error stream.
def stderr
@server.read(links[:stderr], "text/plain", @credentials)
end
# :call-seq:
# log -> string
# log(filename) -> fixnum
# log(stream) -> fixnum
# log {|chunk| ...}
#
# Get the internal Taverna Server log from this run.
#
# Calling this method with no parameters will simply return a text string.
# Providing a filename will stream the data directly to that file and
# return the number of bytes written. Passing in an object that has a
# +write+ method (for example, an instance of File or IO) will stream the
# text directly to that object and return the number of bytes that were
# streamed. Passing in a block will allow access to the underlying data
# stream:
# run.log do |chunk|
# print chunk
# end
def log(param = nil, &block)
raise ArgumentError,
'both a parameter and block given for baclava_output' if param && block
download_or_stream(param, links[:logfile], "text/plain", &block)
end
# :call-seq:
# mkdir(dir) -> true or false
#
# Create a directory in the run's working directory on the server. This
# could be used to store input data.
def mkdir(dir)
dir = Util.strip_path_slashes(dir)
@server.mkdir(links[:wdir], dir, @credentials)
end
# :call-seq:
# upload_file(filename, params={}) -> string
#
# Upload a file, with name _filename_, to the server. Possible values that
# can be passed in via _params_ are:
# * :dir - The directory to upload to. If this is not left blank the
# corresponding directory will need to have been created by Run#mkdir.
# * :rename - Save the file on the server with a different name.
#
# The name of the file on the server is returned.
def upload_file(filename, params={})
location = params[:dir] || ""
uri = Util.append_to_uri_path(links[:wdir], location)
rename = params[:rename] || ""
file_uri = @server.upload_file(filename, uri, rename, @credentials)
Util.get_path_leaf_from_uri(file_uri)
end
# :call-seq:
# upload_data(data, remote_name, remote_directory = "") -> URI
#
# Upload data to the server and store it in <tt>remote_file</tt>. The
# remote directory to put this file in can also be specified, but if it is
# it must first have been created by a call to Run#mkdir.
#
# Returns the URI of the file on the server in which the data has been
# stored.
def upload_data(data, remote_name, remote_directory = "")
location_uri = Util.append_to_uri_path(links[:wdir], remote_directory)
@server.upload_data(data, remote_name, location_uri, @credentials)
end
# :call-seq:
# baclava_input = filename -> true or false
#
# Use a baclava file for the workflow inputs.
def baclava_input=(filename)
state = status
raise RunStateError.new(state, :initialized) if state != :initialized
file = upload_file(filename)
result = @server.update(links[:baclava], file, "text/plain", @credentials)
@baclava_in = true if result
result
end
# :call-seq:
# generate_baclava_output -> true or false
#
# Set the server to save the outputs of this run in baclava format. This
# must be done before the run is started.
def generate_baclava_output
return if @baclava_out
state = status
raise RunStateError.new(state, :initialized) if state != :initialized
@baclava_out = @server.update(links[:output], BACLAVA_FILE, "text/plain",
@credentials)
end
# :stopdoc:
def request_baclava_output
warn "[DEPRECATED] Run#request_baclava_output is deprecated and will "\
"be removed in the next major release. Please use "\
"Run#generate_baclava_output instead."
generate_baclava_output
end
# :startdoc:
# :call-seq:
# baclava_input? -> true or false
#
# Have the inputs to this run been set by a baclava document?
def baclava_input?
@baclava_in
end
# :call-seq:
# generate_baclava_output? -> true or false
#
# Has this run been set to return results in baclava format?
def generate_baclava_output?
@baclava_out
end
# :stopdoc:
def baclava_output?
warn "[DEPRECATED] Run#baclava_output? is deprecated and will be "\
"removed in the next major release. Please use "\
"Run#generate_baclava_output? instead."
generate_baclava_output?
end
# :startdoc:
# :call-seq:
# baclava_output -> string
# baclava_output(filename) -> fixnum
# baclava_output(stream) -> fixnum
# baclava_output {|chunk| ...}
#
# Get the outputs of this run in baclava format. This can only be done if
# the output has been requested in baclava format by #set_baclava_output
# before starting the run.
#
# Calling this method with no parameters will simply return a blob of
# XML data. Providing a filename will stream the data directly to that
# file and return the number of bytes written. Passing in an object that
# has a +write+ method (for example, an instance of File or IO) will
# stream the XML data directly to that object and return the number of
# bytes that were streamed. Passing in a block will allow access to the
# underlying data stream:
# run.baclava_output do |chunk|
# print chunk
# end
#
# Raises RunStateError if the run has not finished running.
def baclava_output(param = nil, &block)
raise ArgumentError,
'both a parameter and block given for baclava_output' if param && block
state = status
raise RunStateError.new(state, :finished) if state != :finished
raise AccessForbiddenError.new("baclava output") if !@baclava_out
baclava_uri = Util.append_to_uri_path(links[:wdir], BACLAVA_FILE)
download_or_stream(param, baclava_uri, "*/*", &block)
end
# :call-seq:
# generate_provenance(toggle = true) -> true or false
#
# Toggle the generation of provenance for this run on or off. This must be
# done before the run is started. Once the run has completed provenance
# can be retrieved with Run#provenance.
#
# Requesting baclava output for a run will override this setting.
def generate_provenance(toggle = true)
return @provenance if @provenance == toggle || links[:gen_prov].nil?
state = status
raise RunStateError.new(state, :initialized) if state != :initialized
result = @server.update(links[:gen_prov], toggle.to_s, "text/plain",
@credentials)
# If changing the setting worked then return the new setting, otherwise
# return the old one.
@provenance = result ? toggle : @provenance
end
# :call-seq:
# generate_provenance? -> true or false
#
# Has this run been set to generate provenance output?
def generate_provenance?
@provenance
end
# :call-seq:
# provenance -> binary blob
# provenance(filename) -> fixnum
# provenance(stream) -> fixnum
# provenance {|chunk| ...}
#
# Get the provenance of this run from the server in zip format.
#
# Calling this method with no parameters will simply return a blob of
# zipped data. Providing a filename will stream the data directly to that
# file and return the number of bytes written. Passing in an object that
# has a +write+ method (for example, an instance of File or IO) will
# stream the data directly to that object and return the number of bytes
# that were streamed. Passing in a block will allow access to the
# underlying data stream:
# run.provenance do |chunk|
# print chunk
# end
#
# Raises RunStateError if the run has not finished running.
def provenance(param = nil, &block)
raise ArgumentError,
'both a parameter and block given for provenance' if param && block
state = status
raise RunStateError.new(state, :finished) if state != :finished
raise AccessForbiddenError.new("provenance") unless @provenance
download_or_stream(param, links[:run_bundle], "*/*", &block)
end
# :call-seq:
# zip_output -> binary blob
# zip_output(filename) -> fixnum
# zip_output(stream) -> fixnum
# zip_output {|chunk| ...}
#
# Get the working directory of this run directly from the server in zip
# format.
#
# Calling this method with no parameters will simply return a blob of
# zipped data. Providing a filename will stream the data directly to that
# file and return the number of bytes written. Passing in an object that
# has a +write+ method (for example, an instance of File or IO) will
# stream the zip data directly to that object and return the number of
# bytes that were streamed. Passing in a block will allow access to the
# underlying data stream:
# run.zip_output do |chunk|
# print chunk
# end
#
# Raises RunStateError if the run has not finished running.
def zip_output(param = nil, port = "", &block)
raise ArgumentError,
"both a parameter and block given for zip_output" if param && block
state = status
raise RunStateError.new(state, :finished) if state != :finished
path = port.empty? ? "out" : "out/#{port}"
output_uri = Util.append_to_uri_path(links[:wdir], path)
download_or_stream(param, output_uri, "application/zip", &block)
end
# :call-seq:
# initialized? -> true or false
#
# Is this run in the :initialized state?
def initialized?
status == :initialized
end
# :call-seq:
# running? -> true or false
#
# Is this run in the :running state?
def running?
status == :running
end
# :call-seq:
# finished? -> true or false
#
# Is this run in the :finished state?
def finished?
status == :finished
end
# :call-seq:
# error? -> true or false
#
# Are there errors in this run's outputs? Returns false if the run is not
# finished yet.
def error?
return false unless finished?
output_ports.values.each do |output|
return true if output.error?
end
false
end
# :call-seq:
# create_time -> string
#
# Get the creation time of this run as an instance of class Time.
def create_time
Time.parse(@server.read(links[:createtime], "text/plain", @credentials))
end
# :call-seq:
# start_time -> string
#
# Get the start time of this run as an instance of class Time.
def start_time
Time.parse(@server.read(links[:starttime], "text/plain", @credentials))
end
# :call-seq:
# finish_time -> string
#
# Get the finish time of this run as an instance of class Time.
def finish_time
Time.parse(@server.read(links[:finishtime], "text/plain", @credentials))
end
# :call-seq:
# owner? -> true or false
#
# Are the credentials being used to access this run those of the owner?
# The owner of the run can give other users certain access rights to their
# runs but only the owner can change these rights - or even see what they
# are. Sometimes it is useful to know if the user accessing the run is
# actually the owner of it or not.
def owner?
@credentials.username == owner
end
# :call-seq:
# grant_permission(username, permission) -> username
#
# Grant the user the stated permission. A permission can be one of
# <tt>:none</tt>, <tt>:read</tt>, <tt>:update</tt> or <tt>:destroy</tt>.
# Only the owner of a run may grant permissions on it. +nil+ is returned
# if a user other than the owner uses this method.
def grant_permission(username, permission)
return unless owner?
value = xml_permissions_fragment(username, permission.to_s)
@server.create(links[:sec_perms], value, "application/xml", @credentials)
end
# :call-seq:
# permissions -> hash
#
# Return a hash (username => permission) of all the permissions set for
# this run. Only the owner of a run may query its permissions. +nil+ is
# returned if a user other than the owner uses this method.
def permissions
return unless owner?
perms = {}
doc = xml_document(@server.read(links[:sec_perms], "application/xml",
@credentials))
xpath_find(doc, @@xpaths[:sec_perm]).each do |p|
user = xml_node_content(xpath_first(p, @@xpaths[:sec_uname]))
perm = xml_node_content(xpath_first(p, @@xpaths[:sec_uperm])).to_sym
perms[user] = perm
end
perms
end
# :call-seq:
# permission(username) -> permission
#
# Return the permission granted to the supplied username, if any. Only the
# owner of a run may query its permissions. +nil+ is returned if a user
# other than the owner uses this method.
def permission(username)
return unless owner?
permissions[username] || :none
end
# :call-seq:
# revoke_permission(username) -> true or false
#
# Revoke whatever permissions that have been granted to the user. Only the
# owner of a run may revoke permissions on it. +nil+ is returned if a user
# other than the owner uses this method.
def revoke_permission(username)
return unless owner?
uri = Util.append_to_uri_path(links[:sec_perms], username)
@server.delete(uri, @credentials)
end
# :call-seq:
# add_password_credential(service_uri, username, password) -> URI
#
# Provide a username and password credential for the secure service at the
# specified URI. The URI of the credential on the server is returned. Only
# the owner of a run may supply credentials for it. +nil+ is returned if a
# user other than the owner uses this method.
def add_password_credential(uri, username, password)
return unless owner?
# Is this a new credential, or an update?
cred_uri = credential(uri)
# basic uri checks
uri = _check_cred_uri(uri)
value = xml_password_cred_fragment(uri, username, password)
if cred_uri.nil?
@server.create(links[:sec_creds], value, "application/xml",
@credentials)
else
@server.update(cred_uri, value, "application/xml", @credentials)
end
end
# :call-seq:
# add_keypair_credential(service_uri, filename, password,
# alias = "Imported Certificate", type = :pkcs12) -> URI
#
# Provide a client certificate credential for the secure service at the
# specified URI. You will need to provide the password to unlock the
# private key. You will also need to provide the 'alias' or 'friendlyName'
# of the key you wish to use if it differs from the default. The URI of the
# credential on the server is returned. Only the owner of a run may supply
# credentials for it. +nil+ is returned if a user other than the owner uses
# this method.
def add_keypair_credential(uri, filename, password,
name = "Imported Certificate", type = :pkcs12)
return unless owner?
type = type.to_s.upcase
contents = Base64.encode64(IO.read(filename))
# basic uri checks
uri = _check_cred_uri(uri)
value = xml_keypair_cred_fragment(uri, name, contents, type, password)
@server.create(links[:sec_creds], value, "application/xml", @credentials)
end
# :call-seq:
# credentials -> hash
#
# Return a hash (service_uri => credential_uri) of all the credentials
# provided for this run. Only the owner of a run may query its credentials.
# +nil+ is returned if a user other than the owner uses this method.
def credentials
return unless owner?
creds = {}
doc = xml_document(@server.read(links[:sec_creds], "application/xml",
@credentials))
xpath_find(doc, @@xpaths[:sec_cred]).each do |c|
uri = URI.parse(xml_node_content(xpath_first(c, @@xpaths[:sec_suri])))
cred_uri = URI.parse(xml_node_attribute(c, "href"))
creds[uri] = cred_uri
end
creds
end
# :call-seq:
# credential(service_uri) -> URI
#
# Return the URI of the credential set for the supplied service, if
# any. Only the owner of a run may query its credentials. +nil+ is
# returned if a user other than the owner uses this method.
def credential(uri)
return unless owner?
credentials[uri]
end
# :call-seq:
# delete_credential(service_uri) -> true or false
#
# Delete the credential that has been provided for the specified service.
# Only the owner of a run may delete its credentials. +nil+ is returned if
# a user other than the owner uses this method.
def delete_credential(uri)
return unless owner?
@server.delete(credentials[uri], @credentials)
end
# :call-seq:
# delete_all_credentials -> true or false
#
# Delete all credentials associated with this workflow run. Only the owner
# of a run may delete its credentials. +nil+ is returned if a user other
# than the owner uses this method.
def delete_all_credentials
return unless owner?
@server.delete(links[:sec_creds], @credentials)
end
# :call-seq:
# add_trust(filename, type = :x509) -> URI
#
# Add a trusted identity (server public key) to verify peers when using
# https connections to Web Services. The URI of the trust on the server is
# returned. Only the owner of a run may add a trust. +nil+ is returned if
# a user other than the owner uses this method.
def add_trust(filename, type = :x509)
return unless owner?
type = type.to_s.upcase
contents = Base64.encode64(IO.read(filename))
value = xml_trust_fragment(contents, type)
@server.create(links[:sec_trusts], value, "application/xml", @credentials)
end
# :call-seq:
# trusts -> array
#
# Return a list of all the URIs of trusts that have been registered for
# this run. At present there is no way to differentiate between trusts
# without noting the URI returned when originally uploaded. Only the owner
# of a run may query its trusts. +nil+ is returned if a user other than the
# owner uses this method.
def trusts
return unless owner?
t_uris = []
doc = xml_document(@server.read(links[:sec_trusts], "application/xml",
@credentials))
xpath_find(doc, @@xpaths[:sec_trust]). each do |t|
t_uris << URI.parse(xml_node_attribute(t, "href"))
end
t_uris
end
# :call-seq:
# delete_trust(URI) -> true or false
#
# Delete the trust with the provided URI. Only the owner of a run may
# delete its trusts. +nil+ is returned if a user other than the owner uses
# this method.
def delete_trust(uri)
return unless owner?
@server.delete(uri, @credentials)
end
# :call-seq:
# delete_all_trusts -> true or false
#
# Delete all trusted identities associated with this workflow run. Only
# the owner of a run may delete its trusts. +nil+ is returned if a user
# other than the owner uses this method.
def delete_all_trusts
return unless owner?
@server.delete(links[:sec_trusts], @credentials)
end
# :stopdoc:
# Outputs are represented as a directory structure with the eventual list
# items (leaves) as files. This method (not part of the public API)
# downloads a file from the run's working directory.
def download_output_data(uri, range = nil, &block)
@server.read(uri, "application/octet-stream", range, @credentials,
&block)
end
# Read from the run's notification feed.
def read_notification_feed
@server.read(links[:feed], "application/atom+xml", @credentials)
end
# Write to the run's notification feed.
def write_notification(entry)
@server.create(links[:feed], entry, "application/atom+xml", @credentials)
end
# Read a file from the interactions directory for this run on the server.
def read_interaction_data(name)
uri = Util.append_to_uri_path(links[:feeddir], name)
@server.read(uri, "*/*", @credentials)
end
# Write a file to the interactions directory for this run on the server.
def write_interaction_data(name, data)
uri = Util.append_to_uri_path(links[:feeddir], name)
@server.update(uri, data, "*/*", @credentials)
end
# This is a slightly unpleasant hack to help proxy notification
# communications through a third party.
def notifications_uri
links[:feed] || ""
end
# This is a slightly unpleasant hack to help proxy interaction
# communications through a third party.
def interactions_uri
links[:feeddir] || ""
end
# :startdoc:
# :call-seq:
# notifications(type = :new_requests) -> array
#
# Poll the server for notifications and return them in a list. Returns the
# empty list if there are none, or if the server does not support the
# Interaction Service.
#
# The +type+ parameter is used to select which types of notifications are
# returned as follows:
# * <tt>:requests</tt> - Interaction requests.
# * <tt>:replies</tt> - Interaction replies.
# * <tt>:new_requests</tt> - Interaction requests that are new since the
# last time they were polled (default).
# * <tt>:all</tt> - All interaction requests and replies.
def notifications(type = :new_requests)
return [] if links[:feed].nil?
@interaction_reader ||= Interaction::Feed.new(self)
if type == :new_requests
@interaction_reader.new_requests
else
@interaction_reader.notifications(type)
end
end
private
def links
@links ||= _get_run_links
end
# Check each input to see if it requires a list input and call the
# requisite upload method for the entire set of inputs.
def _check_and_set_inputs
lists = false
input_ports.each_value do |port|
if port.depth > 0
lists = true
break
end
end
lists ? _fake_lists : _set_all_inputs
end
# Set all the inputs on the server. The inputs must have been set prior to
# this call using the InputPort API.
def _set_all_inputs
input_ports.each_value do |port|
next unless port.set?
uri = Util.append_to_uri_path(links[:inputs], "input/#{port.name}")
if port.file?
# If we're using a local file upload it first then set the port to
# use a remote file.
port.remote_file = upload_file(port.file) unless port.remote_file?
xml_value = xml_input_fragment(port.file, :file)
else
xml_value = xml_input_fragment(port.value)
end
@server.update(uri, xml_value, "application/xml", @credentials)
end
end
# Fake being able to handle lists as inputs by converting everything into
# one big baclava document and uploading that. This has to be done for all
# inputs or none at all. The inputs must have been set prior to this call
# using the InputPort API.
def _fake_lists
data_map = {}
input_ports.each_value do |port|
next unless port.set?
if port.file?
unless port.remote_file?
file = File.read(port.file)
data_map[port.name] = Taverna::Baclava::Node.new(file)
end
else
data_map[port.name] = Taverna::Baclava::Node.new(port.value)
end
end
# Create and upload the baclava data.
baclava = Taverna::Baclava::Writer.write(data_map)
upload_data(baclava, "in.baclava")
@server.update(links[:baclava], "in.baclava", "text/plain", @credentials)
end
# Check that the uri passed in is suitable for credential use:
# * rserve uris must not have a path.
# * http(s) uris must have at least "/" as their path.
def _check_cred_uri(uri)
u = URI(uri)
case u.scheme
when "rserve"
u.path = ""
when /https?/
u.path = "/" if u.path == ""
end
u.to_s
end
def _get_input_port_info
port_desc = @server.read(links[:inputexp], "application/xml",
@credentials)
doc = xml_document(port_desc)
_get_port_info(doc, :port_in)
end
def _get_output_port_info
begin
port_desc = @server.read(links[:output], "application/xml", @credentials)
rescue AttributeNotFoundError => anfe
return {}
end
doc = xml_document(port_desc)
_get_port_info(doc, :port_out)
end
def _get_port_info(doc, type)
ports = {}
xpath_find(doc, @@xpaths[type]).each do |desc|
port = if type == :port_out
OutputPort.new(self, desc)
else
InputPort.new(self, desc)
end
ports[port.name] = port
end
ports
end
def _get_run_description
if @run_doc.nil?
@run_doc = xml_document(@server.read(@uri, "application/xml",
@credentials))
end
@run_doc
end
def _get_run_owner
doc = _get_run_description
xpath_attr(doc, @@xpaths[:run_desc], "owner")
end
def _get_run_links
doc = _get_run_description
# first parse out the basic stuff
links = get_uris_from_doc(doc, [:expiry, :workflow, :status,
:createtime, :starttime, :finishtime, :wdir, :inputs, :output,
:securectx, :listeners, :name, :feed, :gen_prov, :run_bundle])
# Working dir links
_get_wdir_links(links)
# get inputs
inputs = @server.read(links[:inputs], "application/xml",@credentials)
doc = xml_document(inputs)
links.merge! get_uris_from_doc(doc, [:baclava, :inputexp])
# IO properties links
_get_io_properties_links(links)
# Security properties links - only available to the owner of a run
if owner?
_get_security_links(links)
end
links
end
def _get_wdir_links(links)
# Logs directory
links[:logdir] = Util.append_to_uri_path(links[:wdir], "logs")
# Log file
links[:logfile] = Util.append_to_uri_path(links[:logdir], "detail.log")
# Interaction working directory, if we have a feed.
unless links[:feed].nil?
links[:feeddir] = Util.append_to_uri_path(links[:wdir], "interactions")
end
end
def _get_io_properties_links(links)
links[:io] = Util.append_to_uri_path(links[:listeners], "io")
[:stdout, :stderr, :exitcode].each do |res|
links[res] = Util.append_to_uri_path(links[:io], "properties/#{res}")
end
end
def _get_security_links(links)
securectx = @server.read(links[:securectx], "application/xml",
@credentials)
doc = xml_document(securectx)
links.merge! get_uris_from_doc(doc,
[:sec_creds, :sec_perms, :sec_trusts])
end
def download_or_stream(param, uri, type, &block)
if param.respond_to? :write
@server.read_to_stream(param, uri, type, @credentials)
elsif param.instance_of? String
@server.read_to_file(param, uri, type, @credentials)
else
@server.read(uri, type, @credentials, &block)
end
end
# :stopdoc:
class Status
STATE2TEXT = {
:initialized => "Initialized",
:running => "Operating",
:finished => "Finished",
:stopped => "Stopped",
:deleted => "Deleted"
}
TEXT2STATE = {
"Initialized" => :initialized,
"Operating" => :running,
"Finished" => :finished,
"Stopped" => :stopped
}
def Status.to_text(state)
STATE2TEXT[state.to_sym]
end
def Status.to_sym(text)
TEXT2STATE[text]
end
end
# :startdoc:
end
end