lib/transflow/publisher.rb
require 'wisper'
require 'kleisli'
require 'transflow/errors'
module Transflow
class Publisher
include Wisper::Publisher
attr_reader :name
attr_reader :op
def self.[](name, op, options = {})
type =
if options[:monadic]
Monadic
else
self
end
type.new(name, op)
end
class Monadic < Publisher
def call(*args)
op.(*args)
.or { |result| broadcast_failure(*args, result) and Left(result) }
.>-> value { broadcast_success(value) and Right(value) }
end
end
class Curried < Publisher
attr_reader :publisher
attr_reader :arity
attr_reader :curry_args
def initialize(publisher, curry_args = [])
@publisher = publisher
@arity = publisher.arity
@curry_args = curry_args
end
def call(*args)
all_args = curry_args + args
if all_args.size == arity
publisher.call(*all_args)
else
self.class.new(publisher, all_args)
end
end
def subscribe(*args)
publisher.subscribe(*args)
end
end
def initialize(name, op)
@name = name
@op = op
end
def curry
raise "can't curry publisher where operation arity is < 0" if arity < 0
Curried.new(self)
end
def arity
op.is_a?(Proc) ? op.arity : op.method(:call).arity
end
def call(*args)
result = op.call(*args)
broadcast_success(result)
result
rescue StepError => err
broadcast_failure(*args, err) and raise(err)
end
alias_method :[], :call
def subscribe(listeners, *args)
Array(listeners).each { |listener| super(listener, *args) }
end
private
def broadcast_success(result)
broadcast(:"#{name}_success", result)
end
def broadcast_failure(*args, err)
broadcast(:"#{name}_failure", *args, err)
end
end
end