lib/eq-queueing/backends/leveldb.rb

Summary

Maintainability
A
25 mins
Test Coverage
require 'leveldb'

module EQ::Queueing::Backends

  # @note This is a unoreded storage, so there is no guaranteed work order
  # @note assume there is nothing else than jobs
  class LevelDB
    class JobsCollection < Struct.new(:db, :name)
      include EQ::Logging

      QUEUE              = 'queue'.freeze
      PAYLOAD            = 'payload'.freeze
      CREATED_AT         = 'created_at'.freeze
      STARTED_WORKING_AT = 'started_working_at'.freeze
      NOT_WORKING = ''.freeze

      # @param [EQ::Job] job
      def push job
        job_id = find_free_job_id
        db["#{QUEUE}:#{job_id}"] = job.queue
        db["#{PAYLOAD}:#{job_id}"] = serialize(job.payload) unless job.payload.nil?
        db["#{CREATED_AT}:#{job_id}"] = serialize(Time.now)
        db["#{STARTED_WORKING_AT}:#{job_id}"] = NOT_WORKING
        job_id
      end

      def first_waiting
        db.each do |k,v|
          if k.include?(STARTED_WORKING_AT) && v == NOT_WORKING
            return job_id_from_key(k)
          end
        end
        nil
      end

      def working_iterator
        db.each do |k,v|
          if k.include?(STARTED_WORKING_AT) && v != NOT_WORKING
            yield job_id_from_key(k), deserialize(v)
          end
        end
      end

      # @param [EQ::Job] job without id
      def exists? job
        db.each do |k,v|
          if k.include?(QUEUE) && v == job.queue
            if find_payload(job_id_from_key(k)) == job.payload
              return true
            end
          end
        end
        false
      end

      def delete job_id
        did_exist = !db["#{QUEUE}:#{job_id}"].nil?
        db.batch do |batch|
          batch.delete "#{QUEUE}:#{job_id}"
          batch.delete "#{PAYLOAD}:#{job_id}"
          batch.delete "#{CREATED_AT}:#{job_id}"
          batch.delete "#{STARTED_WORKING_AT}:#{job_id}"
        end
        does_not_exist = db["#{QUEUE}:#{job_id}"].nil?
        did_exist && does_not_exist
      end

      def start_working job_id
        db["#{STARTED_WORKING_AT}:#{job_id}"] = serialize(Time.now)
      end

      def stop_working job_id
        db["#{STARTED_WORKING_AT}:#{job_id}"] = NOT_WORKING
      end

      def find_queue job_id
        db["#{QUEUE}:#{job_id}"]
      end

      def find_payload job_id
        if raw = db["#{PAYLOAD}:#{job_id}"]
          deserialize db["#{PAYLOAD}:#{job_id}"]
        else
          nil
        end
      end

      def find_created_at job_id
        if serialized_time = db["#{CREATED_AT}:#{job_id}"]
          deserialize(serialized_time)
        end
      end

      def find_started_working_at job_id
        if serialized_time = db["#{STARTED_WORKING_AT}:#{job_id}"]
          deserialize(serialized_time)
        end
      end

      def job_id_from_key key
        prefix, job_id = *key.split(':')
        job_id
      end

      # try as hard as you can to find a free slot
      def find_free_job_id
        loop do
          job_id = generate_id
          return job_id unless db.contains? "#{QUEUE}:#{job_id}"
          debug "#{job_id} is not free"
        end
      end

      # Time in milliseconds and 4 digit random
      # @note Maybe this is a stupid idea, but for now it kinda works :)
      def generate_id
        '%d%04d' % [(Time.now.to_f * 1000.0).to_i, Kernel.rand(1000)]
      end

      def serialize data
        Marshal.dump(data)
      end

      def deserialize data
        Marshal.load(data)
      end

      def count
        result = 0
        db.each do |k,v|
          result += 1 if k.include?(QUEUE)
        end
        result
      end

      def count_waiting
        result = 0
        db.each do |k,v|
          if k.include?(STARTED_WORKING_AT) && v == NOT_WORKING
            result += 1
          end
        end
        result
      end

      def count_working
        result = 0
        db.each do |k,v|
          if k.include?(STARTED_WORKING_AT) && v != NOT_WORKING
            result += 1
          end
        end
        result
      end

      def each
        # TODO optimize this (and others) using range queries
        db.each do |k,v|
          if k.include?(QUEUE)
            job_id = job_id_from_key(k)
            yield(id: job_id,
                  queue: v,
                  payload: find_payload(job_id),
                  created_at: find_created_at(job_id),
                  started_working_at: find_started_working_at(job_id))
          end
        end
      end
    end

    attr_reader :db
    attr_reader :jobs

    def initialize config
      @db = ::LevelDB::DB.new config
      @jobs = JobsCollection.new(db)
    end

    # @param [EQ::Job] job
    def push job
      if job.unique? && jobs.exists?(job)
        false
      else
        jobs.push job
      end
    end

    def reserve
      if job_id = jobs.first_waiting
        jobs.start_working job_id
        EQ::Job.new job_id, jobs.find_queue(job_id), jobs.find_payload(job_id)
      end
    end

    def pop job_id
      jobs.delete job_id
    end

    def requeue_timed_out_jobs
      requeued = 0
      jobs.working_iterator do |job_id, started_working_at|
        # older than x
        if started_working_at <= (Time.now - EQ.config.job_timeout)
          jobs.stop_working job_id
          requeued += 1
        end
      end
      requeued
    end

    def clear
      db.each{|k,v| db.delete k}
    end

    def count name=nil
      case name
      when :waiting
        jobs.count_waiting
      when :working
        jobs.count_working
      else
        jobs.count
      end
    end

    def iterator
      jobs.each do |job|
        yield job
      end
    end
  end
end