lib/concurrent-ruby/concurrent/tvar.rb
require 'set'
require 'concurrent/synchronization/object'
module Concurrent
# A `TVar` is a transactional variable - a single-element container that
# is used as part of a transaction - see `Concurrent::atomically`.
#
# @!macro thread_safe_variable_comparison
#
# {include:file:docs-source/tvar.md}
class TVar < Synchronization::Object
safe_initialization!
# Create a new `TVar` with an initial value.
def initialize(value)
@value = value
@lock = Mutex.new
end
# Get the value of a `TVar`.
def value
Concurrent::atomically do
Transaction::current.read(self)
end
end
# Set the value of a `TVar`.
def value=(value)
Concurrent::atomically do
Transaction::current.write(self, value)
end
end
# @!visibility private
def unsafe_value # :nodoc:
@value
end
# @!visibility private
def unsafe_value=(value) # :nodoc:
@value = value
end
# @!visibility private
def unsafe_lock # :nodoc:
@lock
end
end
# Run a block that reads and writes `TVar`s as a single atomic transaction.
# With respect to the value of `TVar` objects, the transaction is atomic, in
# that it either happens or it does not, consistent, in that the `TVar`
# objects involved will never enter an illegal state, and isolated, in that
# transactions never interfere with each other. You may recognise these
# properties from database transactions.
#
# There are some very important and unusual semantics that you must be aware of:
#
# * Most importantly, the block that you pass to atomically may be executed
# more than once. In most cases your code should be free of
# side-effects, except for via TVar.
#
# * If an exception escapes an atomically block it will abort the transaction.
#
# * It is undefined behaviour to use callcc or Fiber with atomically.
#
# * If you create a new thread within an atomically, it will not be part of
# the transaction. Creating a thread counts as a side-effect.
#
# Transactions within transactions are flattened to a single transaction.
#
# @example
# a = new TVar(100_000)
# b = new TVar(100)
#
# Concurrent::atomically do
# a.value -= 10
# b.value += 10
# end
def atomically
raise ArgumentError.new('no block given') unless block_given?
# Get the current transaction
transaction = Transaction::current
# Are we not already in a transaction (not nested)?
if transaction.nil?
# New transaction
begin
# Retry loop
loop do
# Create a new transaction
transaction = Transaction.new
Transaction::current = transaction
# Run the block, aborting on exceptions
begin
result = yield
rescue Transaction::AbortError => e
transaction.abort
result = Transaction::ABORTED
rescue Transaction::LeaveError => e
transaction.abort
break result
rescue => e
transaction.abort
raise e
end
# If we can commit, break out of the loop
if result != Transaction::ABORTED
if transaction.commit
break result
end
end
end
ensure
# Clear the current transaction
Transaction::current = nil
end
else
# Nested transaction - flatten it and just run the block
yield
end
end
# Abort a currently running transaction - see `Concurrent::atomically`.
def abort_transaction
raise Transaction::AbortError.new
end
# Leave a transaction without committing or aborting - see `Concurrent::atomically`.
def leave_transaction
raise Transaction::LeaveError.new
end
module_function :atomically, :abort_transaction, :leave_transaction
private
# @!visibility private
class Transaction
ABORTED = ::Object.new
OpenEntry = Struct.new(:value, :modified)
AbortError = Class.new(StandardError)
LeaveError = Class.new(StandardError)
def initialize
@open_tvars = {}
end
def read(tvar)
entry = open(tvar)
entry.value
end
def write(tvar, value)
entry = open(tvar)
entry.modified = true
entry.value = value
end
def open(tvar)
entry = @open_tvars[tvar]
unless entry
unless tvar.unsafe_lock.try_lock
Concurrent::abort_transaction
end
entry = OpenEntry.new(tvar.unsafe_value, false)
@open_tvars[tvar] = entry
end
entry
end
def abort
unlock
end
def commit
@open_tvars.each do |tvar, entry|
if entry.modified
tvar.unsafe_value = entry.value
end
end
unlock
end
def unlock
@open_tvars.each_key do |tvar|
tvar.unsafe_lock.unlock
end
end
def self.current
Thread.current[:current_tvar_transaction]
end
def self.current=(transaction)
Thread.current[:current_tvar_transaction] = transaction
end
end
end