lib/fear/future.rb
# frozen_string_literal: true
module Fear
# Asynchronous computations that yield futures are created
# with the +Fear.future+ call:
#
# success = "Hello"
# f = Fear.future { success + ' future!' }
# f.on_success do |result|
# puts result
# end
#
# Multiple callbacks may be registered; there is no guarantee
# that they will be executed in a particular order.
#
# The future may contain an exception and this means
# that the future failed. Futures obtained through combinators
# have the same error as the future they were obtained from.
#
# f = Fear.future { 5 }
# g = Fear.future { 3 }
# f.flat_map do |x|
# g.map { |y| x + y }
# end
#
# The same program may be written using +Fear.for+
#
# Fear.for(Fear.future { 5 }, Fear.future { 3 }) do |x, y|
# x + y
# end
#
# Futures use +Concurrent::Promise+ under the hood. +Fear.future+ accepts optional configuration Hash passed
# directly to underlying promise. For example, run it on custom thread pool.
#
# require 'open-uri'
#
# future = Fear.future(executor: :io) { open('https://example.com/') }
#
# future.map(executor: :fast, &:read).each do |body|
# puts "#{body}"
# end
#
# @see https://github.com/scala/scala/blob/2.11.x/src/library/scala/concurrent/Future.scala
#
class Future
include Awaitable
# @param promise [nil, Concurrent::Future] converts
# +Concurrent::Promise+ into +Fear::Future+.
# @param options [see Concurrent::Future] options will be passed
# directly to +Concurrent::Promise+
# @yield given block and evaluate it in the future.
# @api private
# @see Fear.future
#
def initialize(promise = nil, **options, &block)
if block_given? && promise
raise ArgumentError, "pass block or promise"
end
@options = options
@promise = promise || Concurrent::Promise.execute(@options) do
Fear.try(&block)
end
end
attr_reader :promise
private :promise
# Calls the provided callback when this future is completed successfully.
#
# If the future has already been completed with a value,
# this will either be applied immediately or be scheduled asynchronously.
# @yieldparam [any] value
# @return [self]
# @see #transform
#
# @example
# Fear.future { }.on_success do |value|
# # ...
# end
#
def on_success(&block)
on_complete do |result|
result.each(&block)
end
end
# When this future is completed successfully match against its result
#
# If the future has already been completed with a value,
# this will either be applied immediately or be scheduled asynchronously.
# @yieldparam [Fear::PatternMatch] m
# @return [self]
#
# @example
# Fear.future { }.on_success_match do |m|
# m.case(42) { ... }
# end
#
def on_success_match
on_success do |value|
Fear.matcher { |m| yield(m) }.call_or_else(value, &:itself)
end
end
# When this future is completed with a failure apply the provided callback to the error.
#
# If the future has already been completed with a failure,
# this will either be applied immediately or be scheduled asynchronously.
#
# Will not be called in case that the future is completed with a value.
# @yieldparam [StandardError]
# @return [self]
#
# @example
# Fear.future { }.on_failure do |error|
# if error.is_a?(HTTPError)
# # ...
# end
# end
#
def on_failure
on_complete do |result|
if result.failure?
yield result.exception
end
end
end
# When this future is completed with a failure match against the error.
#
# If the future has already been completed with a failure,
# this will either be applied immediately or be scheduled asynchronously.
#
# Will not be called in case that the future is completed with a value.
# @yieldparam [Fear::PatternMatch] m
# @return [self]
#
# @example
# Fear.future { }.on_failure_match do |m|
# m.case(HTTPError) { |error| ... }
# end
#
def on_failure_match
on_failure do |error|
Fear.matcher { |m| yield(m) }.call_or_else(error, &:itself)
end
end
# When this future is completed call the provided block.
#
# If the future has already been completed,
# this will either be applied immediately or be scheduled asynchronously.
# @yieldparam [Fear::Try]
# @return [self]
#
# @example
# Fear.future { }.on_complete do |try|
# try.map(&:do_the_job)
# end
#
def on_complete
promise.add_observer do |_time, try, _error|
yield try
end
self
end
# When this future is completed match against result.
#
# If the future has already been completed,
# this will either be applied immediately or be scheduled asynchronously.
# @yieldparam [Fear::Try::PatternMatch]
# @return [self]
#
# @example
# Fear.future { }.on_complete_match do |m|
# m.success { |result| }
# m.failure { |error| }
# end
#
def on_complete_match
promise.add_observer do |_time, try, _error|
Fear::Try.matcher { |m| yield(m) }.call_or_else(try, &:itself)
end
self
end
# Returns whether the future has already been completed with
# a value or an error.
#
# @return [true, false] +true+ if the future is already
# completed, +false+ otherwise.
#
# @example
# future = Fear.future { }
# future.completed? #=> false
# sleep(1)
# future.completed? #=> true
#
def completed?
promise.fulfilled?
end
# The value of this +Future+.
#
# @return [Fear::Option<Fear::Try>] if the future is not completed
# the returned value will be +Fear::None+. If the future is
# completed the value will be +Fear::Some<Fear::Success>+ if it
# contains a valid result, or +Fear::Some<Fear::Failure>+ if it
# contains an error.
#
def value
Fear.option(promise.value(0))
end
# Asynchronously processes the value in the future once the value
# becomes available.
#
# Will not be called if the future fails.
# @yieldparam [any] yields with successful feature value
# @see {#on_complete}
#
alias each on_success
# Creates a new future by applying the +success+ function to the successful
# result of this future, or the +failure+ function to the failed result.
# If there is any non-fatal error raised when +success+ or +failure+ is
# applied, that error will be propagated to the resulting future.
#
# @yieldparam success [#get] function that transforms a successful result of the
# receiver into a successful result of the returned future
# @yieldparam failure [#exception] function that transforms a failure of the
# receiver into a failure of the returned future
# @return [Fear::Future] a future that will be completed with the
# transformed value
#
# @example
# Fear.future { open('http://example.com').read }
# .transform(
# ->(value) { ... },
# ->(error) { ... },
# )
#
def transform(success, failure)
promise = Promise.new(**@options)
on_complete_match do |m|
m.success { |value| promise.success(success.(value)) }
m.failure { |error| promise.failure(failure.(error)) }
end
promise.to_future
end
# Creates a new future by applying a block to the successful result of
# this future. If this future is completed with an error then the new
# future will also contain this error.
#
# @return [Fear::Future]
#
# @example
# future = Fear.future { 2 }
# future.map { |v| v * 2 } #=> the same as Fear.future { 2 * 2 }
#
def map(&block)
promise = Promise.new(**@options)
on_complete do |try|
promise.complete!(try.map(&block))
end
promise.to_future
end
# Creates a new future by applying a block to the successful result of
# this future, and returns the result of the function as the new future.
# If this future is completed with an exception then the new future will
# also contain this exception.
#
# @yieldparam [any]
# @return [Fear::Future]
#
# @example
# f1 = Fear.future { 5 }
# f2 = Fear.future { 3 }
# f1.flat_map do |v1|
# f1.map do |v2|
# v2 * v1
# end
# end
#
def flat_map
promise = Promise.new(**@options)
on_complete_match do |m|
m.case(Fear::Failure) { |failure| promise.complete!(failure) }
m.success do |value|
yield(value).on_complete { |callback_result| promise.complete!(callback_result) }
rescue StandardError => error
promise.failure!(error)
end
end
promise.to_future
end
# Creates a new future by filtering the value of the current future
# with a predicate.
#
# If the current future contains a value which satisfies the predicate,
# the new future will also hold that value. Otherwise, the resulting
# future will fail with a +NoSuchElementError+.
#
# If the current future fails, then the resulting future also fails.
#
# @yieldparam [#get]
# @return [Fear::Future]
#
# @example
# f = Fear.future { 5 }
# f.select { |value| value % 2 == 1 } # evaluates to 5
# f.select { |value| value % 2 == 0 } # fail with NoSuchElementError
#
def select
map do |result|
if yield(result)
result
else
raise NoSuchElementError, "#select predicate is not satisfied"
end
end
end
# Creates a new future that will handle any matching error that this
# future might contain. If there is no match, or if this future contains
# a valid result then the new future will contain the same.
#
# @return [Fear::Future]
#
# @example
# Fear.future { 6 / 0 }.recover { |error| 0 } # result: 0
# Fear.future { 6 / 0 }.recover do |m|
# m.case(ZeroDivisionError) { 0 }
# m.case(OtherTypeOfError) { |error| ... }
# end # result: 0
#
#
def recover(&block)
promise = Promise.new(**@options)
on_complete do |try|
promise.complete!(try.recover(&block))
end
promise.to_future
end
# Zips the values of +self+ and +other+ future, and creates
# a new future holding the array of their results.
#
# If +self+ future fails, the resulting future is failed
# with the error stored in +self+.
# Otherwise, if +other+ future fails, the resulting future is failed
# with the error stored in +other+.
#
# @param other [Fear::Future]
# @return [Fear::Future]
#
# @example
# future1 = Fear.future { call_service1 }
# future1 = Fear.future { call_service2 }
# future1.zip(future2) #=> returns the same result as Fear.future { [call_service1, call_service2] },
# # but it performs two calls asynchronously
#
def zip(other)
promise = Promise.new(**@options)
on_complete_match do |m|
m.success do |value|
other.on_complete do |other_try|
promise.complete!(
other_try.map do |other_value|
if block_given?
yield(value, other_value)
else
[value, other_value]
end
end,
)
end
end
m.failure do |error|
promise.failure!(error)
end
end
promise.to_future
end
# Creates a new future which holds the result of +self+ future if it
# was completed successfully, or, if not, the result of the +fallback+
# future if +fallback+ is completed successfully.
# If both futures are failed, the resulting future holds the error
# object of the first future.
#
# @param fallback [Fear::Future]
# @return [Fear::Future]
#
# @example
# f = Fear.future { fail 'error' }
# g = Fear.future { 5 }
# f.fallback_to(g) # evaluates to 5
#
def fallback_to(fallback)
promise = Promise.new(**@options)
on_complete_match do |m|
m.success { |value| promise.complete!(value) }
m.failure do |error|
fallback.on_complete_match do |m2|
m2.success { |value| promise.complete!(value) }
m2.failure { promise.failure!(error) }
end
end
end
promise.to_future
end
# Applies the side-effecting block to the result of +self+ future,
# and returns a new future with the result of this future.
#
# This method allows one to enforce that the callbacks are executed in a
# specified order.
#
# @note that if one of the chained +and_then+ callbacks throws
# an error, that error is not propagated to the subsequent
# +and_then+ callbacks. Instead, the subsequent +and_then+ callbacks
# are given the original value of this future.
#
# @example The following example prints out +5+:
# f = Fear.future { 5 }
# f.and_then do
# m.success { }fail| 'runtime error' }
# end.and_then do |m|
# m.success { |value| puts value } # it evaluates this branch
# m.failure { |error| puts error.massage }
# end
#
def and_then
promise = Promise.new(**@options)
on_complete do |try|
Fear.try do
Fear::Try.matcher { |m| yield(m) }.call_or_else(try, &:itself)
end
promise.complete!(try)
end
promise.to_future
end
# @api private
def __result__(at_most)
__ready__(at_most).value.get_or_else { raise "promise not completed" }
end
# @api private
def __ready__(at_most)
if promise.wait(at_most).complete?
self
else
raise Timeout::Error
end
end
class << self
# Creates an already completed +Future+ with the specified error.
# @param exception [StandardError]
# @return [Fear::Future]
#
def failed(exception)
new { raise exception }
.yield_self { |future| Fear::Await.ready(future, 10) }
end
# Creates an already completed +Future+ with the specified result.
# @param result [Object]
# @return [Fear::Future]
#
def successful(result)
new { result }
.yield_self { |future| Fear::Await.ready(future, 10) }
end
end
end
end