eagletmt/kaede

View on GitHub
lib/kaede/database.rb

Summary

Maintainability
A
1 hr
Test Coverage
require 'forwardable'
require 'sequel'
require 'retryable'
require 'kaede/channel'
require 'kaede/program'

module Kaede
  class Database
    extend Forwardable
    def_delegators :@db, :transaction

    def initialize(path)
      @db = Sequel.connect(path)
    end

    def prepare_tables
      @db.create_table?(:channels) do
        primary_key :id
        String :name, size: 255, null: false, unique: true
        Integer :for_recorder, null: false, unique: true
        Integer :for_syoboi, null: false, unique: true
      end
      @db.create_table?(:programs) do
        primary_key :pid
        Integer :tid, null: false
        DateTime :start_time, null: false
        DateTime :end_time, null: false
        foreign_key :channel_id, :channels
        String :count, size: 16, null: false
        Integer :start_offset, null: false
        String :subtitle, size: 255
        String :title, size: 255
        String :comment, size: 255
      end
      @db.create_table?(:jobs) do
        foreign_key :pid, :programs, primary_key: true
        DateTime :enqueued_at, null: false
        DateTime :finished_at
        DateTime :created_at, null: false
      end
      @db.create_table?(:tracking_titles) do
        Integer :tid, primary_key: true
        DateTime :created_at, null: false
      end
    end

    def get_jobs
      retry_on_disconnected do
        @db.from(:jobs).select(:pid, :enqueued_at).where(Sequel.qualify(:jobs, :enqueued_at) >= Time.now).order(:enqueued_at).to_a
      end
    end

    def update_job(pid, enqueued_at)
      retry_on_disconnected do
        @db.transaction do
          if @db.from(:jobs).where(pid: pid).select(1).first
            @db.from(:jobs).where(pid: pid).update(enqueued_at: enqueued_at)
          else
            @db.from(:jobs).insert(pid: pid, enqueued_at: enqueued_at, created_at: Time.now)
          end
        end
      end
    end

    def delete_job(pid)
      retry_on_disconnected do
        @db.from(:jobs).where(pid: pid).delete
      end
    end

    def get_program(pid)
      get_programs([pid])[pid]
    end

    def get_programs(pids)
      programs = {}
      retry_on_disconnected do
        @db.from(:programs).inner_join(:channels, id: :channel_id).where(pid: pids).each do |row|
          program = Program.new(
            row[:pid],
            row[:tid],
            row[:start_time],
            row[:end_time],
            row[:name],
            row[:for_syoboi],
            row[:for_recorder],
            row[:count],
            row[:start_offset],
            row[:subtitle],
            row[:title],
            row[:comment],
          )
          programs[program.pid] = program
        end
      end
      programs
    end

    def mark_finished(pid)
      retry_on_disconnected do
        @db.from(:jobs).where(pid: pid).update(finished_at: Time.now)
      end
    end

    def get_channels
      retry_on_disconnected do
        @db.from(:channels).map do |row|
          Channel.new(row[:id], row[:name], row[:for_recorder], row[:for_syoboi])
        end
      end
    end

    def add_channel(channel)
      retry_on_disconnected do
        @db.from(:channels).insert(name: channel.name, for_recorder: channel.for_recorder, for_syoboi: channel.for_syoboi)
      end
    end

    def update_program(program, channel)
      attributes = {
        tid: program.tid,
        start_time: program.start_time,
        end_time: program.end_time,
        channel_id: channel.id,
        count: program.count,
        start_offset: program.start_offset,
        subtitle: program.subtitle,
        title: program.title,
        comment: program.comment,
      }
      retry_on_disconnected do
        @db.transaction do
          if @db.from(:programs).where(pid: program.pid).select(1).first
            @db.from(:programs).where(pid: program.pid).update(attributes)
          else
            @db.from(:programs).insert(attributes.merge(pid: program.pid))
          end
        end
      end
    end

    def add_tracking_title(tid)
      retry_on_disconnected do
        @db.from(:tracking_titles).insert(tid: tid, created_at: Time.now)
      end
    end

    def get_tracking_titles
      retry_on_disconnected do
        @db.from(:tracking_titles).select(:tid).map do |row|
          row[:tid]
        end
      end
    end

    private

    def retry_on_disconnected(&block)
      retry_for_disconnection { retry_for_each_connection(&block) }
    end

    def retry_for_disconnection(&block)
      Retryable.retryable(
        tries: 5,
        sleep: lambda { |n| 2**n },
        on: Sequel::DatabaseDisconnectError,
        exception_cb: lambda { |e| $stderr.puts "[#{Thread.current.object_id}] retry_for_disconnection: #{e.class}: #{e.message}" },
        &block
      )
    end

    def retry_for_each_connection(&block)
      Retryable.retryable(
        tries: @db.pool.max_size,
        sleep: 0,
        on: Sequel::DatabaseDisconnectError,
        exception_cb: lambda { |e| $stderr.puts "[#{Thread.current.object_id}] retry_for_each_connection: #{e.class}: #{e.message}" },
        &block
      )
    end
  end
end