lib/volt/utils/read_write_lock.rb

Summary

Maintainability
C
7 hrs
Test Coverage
# Currently the released gem version of the concurrent-ruby gem does not have the ReadWriteLock
# relased in it (it is on master).

# Ruby read-write lock implementation
# Allows any number of concurrent readers, but only one concurrent writer
# (And if the "write" lock is taken, any readers who come along will have to wait)

# If readers are already active when a writer comes along, the writer will wait for
#   all the readers to finish before going ahead
# But any additional readers who come when the writer is already waiting, will also
#   wait (so writers are not starved)

# Written by Alex Dowad
# Bug fixes contributed by Alex Kliuchnikau
# Suggestion on avoiding reader starvation contributed by maniek
# Thanks to Doug Lea for java.util.concurrent.ReentrantReadWriteLock (used for inspiration)

# Usage:
# lock = ReadWriteLock.new
# lock.with_read_lock  { data.retrieve }
# lock.with_write_lock { data.modify! }

# NOTE: DON'T try to acquire the write lock while already holding a read lock!
# OR try to acquire the write lock while you already have it
# It will lead to deadlock

# Implementation notes:
# A goal is to make the uncontended path for both readers/writers lock-free
# Only if there is reader-writer or writer-writer contention, should locks be used
# Internal state is represented by a single integer ("counter"), and updated
#  using atomic compare-and-swap operations
# When the counter is 0, the lock is free
# Each reader increments the counter by 1 when acquiring a read lock
#   (and decrements by 1 when releasing the read lock)
# The counter is increased by (1 << 15) for each writer waiting to acquire the
#   write lock, and by (1 << 30) if the write lock is taken

require 'rubygems' # for compatibility with JRuby, MRI 1.8, etc
require 'concurrent/atomic'
require 'thread'

class ReadWriteLock
  def initialize
    @counter      = Concurrent::Atomic.new(0)         # single integer which represents lock state
    @reader_q     = ConditionVariable.new # queue for waiting readers
    @reader_mutex = Mutex.new             # to protect reader queue
    @writer_q     = ConditionVariable.new # queue for waiting writers
    @writer_mutex = Mutex.new             # to protect writer queue
  end

  WAITING_WRITER  = 1 << 15
  RUNNING_WRITER  = 1 << 30
  MAX_READERS     = WAITING_WRITER - 1
  MAX_WRITERS     = RUNNING_WRITER - MAX_READERS - 1

  def with_read_lock
    acquire_read_lock
    result = yield
    release_read_lock
    result
  end

  def with_write_lock
    acquire_write_lock
    result = yield
    release_write_lock
    result
  end

  def acquire_read_lock
    loop do
      c = @counter.value
      fail 'Too many reader threads!' if (c & MAX_READERS) == MAX_READERS

      # If a writer is waiting when we first queue up, we need to wait
      if c >= WAITING_WRITER
        # But it is possible that the writer could finish and decrement @counter right here...
        @reader_mutex.synchronize do
          # So check again inside the synchronized section
          @reader_q.wait(@reader_mutex) if @counter.value >= WAITING_WRITER
        end

        # after a reader has waited once, they are allowed to "barge" ahead of waiting writers
        # but if a writer is *running*, the reader still needs to wait (naturally)
        loop do
          c = @counter.value
          if c >= RUNNING_WRITER
            @reader_mutex.synchronize do
              @reader_q.wait(@reader_mutex) if @counter.value >= RUNNING_WRITER
            end
          else
            return if @counter.compare_and_swap(c, c + 1)
          end
        end
      else
        break if @counter.compare_and_swap(c, c + 1)
      end
    end
  end

  def release_read_lock
    loop do
      c = @counter.value
      if @counter.compare_and_swap(c, c - 1)
        # If one or more writers were waiting, and we were the last reader, wake a writer up
        if c >= WAITING_WRITER && (c & MAX_READERS) == 1
          @writer_mutex.synchronize { @writer_q.signal }
        end
        break
      end
    end
  end

  def acquire_write_lock
    loop do
      c = @counter.value
      fail 'Too many writers!' if (c & MAX_WRITERS) == MAX_WRITERS

      if c == 0 # no readers OR writers running
        # if we successfully swap the RUNNING_WRITER bit on, then we can go ahead
        break if @counter.compare_and_swap(0, RUNNING_WRITER)
      elsif @counter.compare_and_swap(c, c + WAITING_WRITER)
        loop do
          # Now we have successfully incremented, so no more readers will be able to increment
          #   (they will wait instead)
          # However, readers OR writers could decrement right here, OR another writer could increment
          @writer_mutex.synchronize do
            # So we have to do another check inside the synchronized section
            # If a writer OR reader is running, then go to sleep
            c = @counter.value
            @writer_q.wait(@writer_mutex) if (c >= RUNNING_WRITER) || ((c & MAX_READERS) > 0)
          end

          # We just came out of a wait
          # If we successfully turn the RUNNING_WRITER bit on with an atomic swap,
          # Then we are OK to stop waiting and go ahead
          # Otherwise go back and wait again
          c = @counter.value
          break if (c < RUNNING_WRITER) &&
                   ((c & MAX_READERS) == 0) &&
                   @counter.compare_and_swap(c, c + RUNNING_WRITER - WAITING_WRITER)
        end
        break
      end
    end
  end

  def release_write_lock
    loop do
      c = @counter.value
      if @counter.compare_and_swap(c, c - RUNNING_WRITER)
        @reader_mutex.synchronize { @reader_q.broadcast }
        @writer_mutex.synchronize { @writer_q.signal } if (c & MAX_WRITERS) > 0 # if any writers are waiting...
        break
      end
    end
  end

  def to_s
    c = @counter.value
    s = if c >= RUNNING_WRITER
          '1 writer running, '
        elsif (c & MAX_READERS) > 0
          "#{c & MAX_READERS} readers running, "
        else
          ''
    end

    "#<ReadWriteLock:#{object_id.to_s(16)} #{s}#{(c & MAX_WRITERS) / WAITING_WRITER} writers waiting>"
  end
end