ssut/telegram-rb

View on GitHub
lib/telegram/client.rb

Summary

Maintainability
A
3 hrs
Test Coverage
# encoding: utf-8
require 'eventmachine'
require 'em-synchrony'
require 'em-synchrony/fiber_iterator'
require 'em-http-request'
require 'oj'
require 'date'
require 'tempfile'
require 'fastimage'

require 'telegram/config'
require 'telegram/auth_properties'
require 'telegram/authorization'
require 'telegram/cli_arguments'
require 'telegram/logger'
require 'telegram/connection'
require 'telegram/connection_pool'
require 'telegram/callback'
require 'telegram/api'
require 'telegram/models'
require 'telegram/events'
require 'ext/string'

module Telegram
  # Telegram Client
  #
  # @see API
  # @version 0.1.1
  class Client < API
    include Logging

    # @return [ConnectionPool] Socket connection pool, includes {Connection}
    # @since [0.1.0]
    attr_reader :connection

    # @return [TelegramContact] Current user's profile
    # @since [0.1.0]
    attr_reader :profile

    # @return [Array<TelegramContact>] Current user's contact list
    # @since [0.1.0]
    attr_reader :contacts

    # @return [Array<TelegramChat>] Chats that current user joined
    # @since [0.1.0]
    attr_reader :chats

    attr_reader :stdout

    # Event listeners that can respond to the event arrives
    #
    # @see EventType
    # @since [0.1.0]
    attr_accessor :on, :auth_properties

    # Initialize Telegram Client
    #
    # @yieldparam [Block] block
    # @yield [config] Given configuration struct to the block
    def initialize(&block)
      @config = Telegram::Config.new
      @auth_properties = Telegram::AuthProperties.new
      yield @config, @auth_properties
      @logger = @config.logger if @config.logger
      @connected = 0
      @stdout = nil
      @connect_callback = nil
      @on = {}

      @profile = nil
      @contacts = []
      @chats = []
      @starts_at = nil
      @events = EM::Queue.new

      logger.info("Initialized")
    end

    # Execute telegram-cli daemon and wait for the response
    #
    # @api private
    def execute
      cli_arguments = Telegram::CLIArguments.new(@config)
      command = "'#{@config.daemon}' #{cli_arguments.to_s}"
      @stdout = IO.popen(command, 'a+')
      initialize_stdout_reading
    end

    # Do the long-polling from stdout of the telegram-cli
    #
    # @api private
    def poll
      logger.info("Start polling for events")
      while (data = @stdout.gets)
        begin
          brace = data.index('{')
          data = data[brace..-2]
          data = Oj.load(data, mode: :compat)
          @events << data
        rescue
        end
      end
    end

    # Process given data to make {Event} instance
    #
    # @api private
    def process_data
      process = Proc.new { |data|
        begin
          type = case data['event']
          when 'message'
            if data['from']['peer_id'] != @profile.id
              EventType::RECEIVE_MESSAGE
            else
              EventType::SEND_MESSAGE
            end
          end

          action = data.has_key?('action') ? case data['action']
            when 'chat_add_user'
              ActionType::CHAT_ADD_USER
            when 'create_group_chat'
              ActionType::CREATE_GROUP_CHAT
             when 'add_contact'
               ActionType::ADD_CONTACT
            else
              ActionType::UNKNOWN_ACTION
            end : ActionType::NO_ACTION

          event = Event.new(self, type, action, data)
          @on[type].call(event) if @on.has_key?(type)
        rescue Exception => e
          logger.error("Error occurred during the processing: #{data}\n #{e.inspect} #{e.backtrace}")
        end
        @events.pop(&process)
      }
      @events.pop(&process)
    end

    # Start telegram-cli daemon
    #
    # @yield This block will be executed when all connections have responded
    def connect(&block)
      logger.info("Trying to start telegram-cli and then connect")
      @connect_callback = block
      process_data
      EM.defer(method(:execute), method(:create_pool), method(:execution_failed))
    end

    # Create a connection pool based on the {Connection} and given configuration
    #
    # @api private
    def create_pool(*)
      @connection = ConnectionPool.new(@config.size) do
        client = EM.connect_unix_domain(@config.sock, Connection)
        client.on_connect = self.method(:on_connect)
        client.on_disconnect = self.method(:on_disconnect)
        client
      end
    end

    # A event listener that will be called if the {Connection} successes on either of {ConnectionPool}
    #
    # @api private
    def on_connect
      @connected += 1
      if connected?
        logger.info("Successfully connected to the Telegram CLI")
        EM.defer(&method(:poll))
        update!(&@connect_callback)
      end
    end

    # A event listener that will be called if the {Connection} closes on either of {ConnectionPool}
    #
    # @api private
    def on_disconnect
      @connected -= 1
      if @connected == 0
        logger.info("Disconnected from Telegram CLI")
        close_stdout
        @disconnect_callback.call if @disconnect_callback
      end
    end

    def on_disconnect=(callback)
      @disconnect_callback = callback
    end

    # @return [bool] Connection pool status
    # @since [0.1.0]
    def connected?
      @connected == @config.size
    end

    private

    def execution_failed(e)
      logger.error("Failed execution of telegram-cli: #{e}")
      close_stdout
    end

    def close_stdout
      Process.kill('INT', stdout.pid)
    end

    def initialize_stdout_reading
      return stdout.readline unless auth_properties.present?
      Authorization.new(stdout, auth_properties, logger).perform
    end
  end
end