lib/process_shared/mutex.rb
require 'process_shared'
require 'process_shared/open_with_self'
require 'process_shared/process_error'
module ProcessShared
# This Mutex class is implemented as a Semaphore with a second
# internal Semaphore used to track the locking process and thread.
#
# {ProcessError} is raised if either {#unlock} is called by a
# process + thread different from the locking process + thread, or
# if {#lock} is called while the process + thread already holds the
# lock (i.e. the mutex is not re-entrant). This tracking is not
# without performance cost, of course (current implementation uses
# the additional {Semaphore} and {SharedMemory} segment).
#
# The API is intended to be identical to the {::Mutex} in the core
# Ruby library.
#
# TODO: the core Ruby api has no #close method, but this Mutex must
# release its {Semaphore} and {SharedMemory} resources. For now,
# rely on the object finalizers of those objects...
class Mutex
extend OpenWithSelf
def initialize
@internal_sem = Semaphore.new
@locked_by = SharedMemory.new(:uint64, 2) # [Process ID, Thread ID]
@sem = Semaphore.new
end
# @return [Mutex]
def lock
if (p, t = current_process_and_thread) == locked_by
raise ProcessError, "already locked by this process #{p}, thread #{t}"
end
@sem.wait
self.locked_by = current_process_and_thread
self
end
# @return [Boolean]
def locked?
locked_by != UNLOCKED
end
# Releases the lock and sleeps timeout seconds if it is given and
# non-nil or forever.
#
# @return [Numeric]
def sleep(timeout = nil)
unlock
begin
timeout ? Kernel.sleep(timeout) : Kernel.sleep
ensure
lock
end
end
# @return [Boolean]
def try_lock
with_internal_lock do
if locked?
false # was locked
else
@sem.wait # should return immediately
self.locked_by = current_process_and_thread
true
end
end
end
# @return [Mutex]
def unlock
if (p, t = locked_by) != (cp, ct = current_process_and_thread)
raise ProcessError, "lock is held by process #{p}, thread #{t}: not process #{cp}, thread #{ct}"
end
self.locked_by = UNLOCKED
@sem.post
self
end
# Acquire the lock, yield the block, then ensure the lock is
# unlocked.
#
# @return [Object] the result of the block
def synchronize
lock
begin
yield
ensure
unlock
end
end
protected
# @return [Array<(Fixnum, Fixnum)>]
# If locked, IDs of the locking process and thread, otherwise +UNLOCKED+
def locked_by
with_internal_lock do
@locked_by.read_array_of_uint64(2)
end
end
# @param [Array<(Fixnum, Fixnum)>] ary
# Set the IDs of the locking process and thread, or +UNLOCKED+ if none
def locked_by=(ary)
with_internal_lock do
@locked_by.write_array_of_uint64(ary)
end
end
def with_internal_lock(&block)
@internal_sem.synchronize &block
end
# @return [Array<(Fixnum, Fixnum)>] IDs of the current process and thread
def current_process_and_thread
[::Process.pid, Thread.current.object_id]
end
# Represents the state of being unlocked
UNLOCKED = [0, 0].freeze
end
end