grokify/ringcentral-sdk-ruby

View on GitHub
lib/ringcentral_sdk/rest/subscription.rb

Summary

Maintainability
B
5 hrs
Test Coverage
require 'base64'
require 'logger'
require 'multi_json'
require 'observer'
require 'openssl'
require 'pubnub'

module RingCentralSdk
  module REST
    # Subscription class is an observerable class that represents
    # one RingCentral subscription using the PubNub transport via
    # the Subscription API
    class Subscription
      include Observable

      RENEW_HANDICAP = 60

      attr_reader :event_filters

      def initialize(client)
        @client = client
        @event_filters = []
        @_timeout = nil
        @_subscription = nil_subscription
        @_pubnub = nil
        @_logger_prefix = " -- #{self.class.name}: "
      end

      def nil_subscription
        {
          'eventFilters'    => [],
          'expirationTime'  => '', # 2014-03-12T19:54:35.613Z
          'expiresIn'       => 0,
          'deliveryMode'    => {
            'transportType' => 'PubNub',
            'encryption'    => false,
            'address'       => '',
            'subscriberKey' => '',
            'secretKey'     => ''
          },
          'id'              => '',
          'creationTime'    => '', # 2014-03-12T19:54:35.613Z
          'status'          => '', # Active
          'uri'             => ''
        }
      end

      def pubnub
        @_pubnub
      end

      def register(events = nil)
        alive? ? renew(events) : subscribe(events)
      end

      def add_events(events)
        raise 'Events is not an array.' unless events.is_a? Array
        @event_filters.push(events) unless events.empty?
      end

      def set_events(events)
        raise 'Events is not an array.' unless events.is_a? Array
        @event_filters = events
      end

      def subscribe(events = nil)
        set_events(events) if events.is_a? Array

        raise 'Events are undefined' unless @event_filters.is_a?(Array) && !@event_filters.empty?
        
        begin
          response = @client.http.post do |req|
            req.url 'subscription'
            req.headers['Content-Type'] = 'application/json'
            req.body = {
              eventFilters: @event_filters,
              deliveryMode: {
                transportType: 'PubNub',
                encryption: 'true'
              }
            }
          end
          puts response.body
          set_subscription response.body
          _subscribe_at_pubnub
          changed
          notify_observers response
          return response
        rescue StandardError => e
          reset
          changed
          notify_observers(e)
          raise 'Subscribe HTTP Request Error: ' + e.to_s
        end
      end

      def renew(events = nil)
        set_events(events) if events.is_a? Array

        raise 'Subscription is not alive' unless alive?
        raise 'Events are undefined' if @event_filters.empty?
        _clear_timeout

        begin
          response = @client.http.post do |req|
            req.url uri_join(@_subscription['uri'], 'renew')
            req.headers['Content-Type'] = 'application/json'
          end

          set_subscription response.body
          changed
          notify_observers response

          return response
        rescue StandardError => e
          @client.config.logger.warn "RingCentralSdk::REST::Subscription: RENEW_ERROR #{e}"
          reset
          changed
          notify_observers e
          raise 'Renew HTTP Request Error'
        end
      end

      def remove
        raise 'Subscription is not alive' unless alive?

        begin
          response = @client.http.delete do |req|
            req.url 'subscription/' + @_subscription['id'].to_s
          end
          reset
          changed
          notify_observers response.body
          return response
        rescue StandardError => e
          reset
          changed
          notify_observers e
        end
      end

      def alive?
        s = @_subscription
        if
          (s.key?('deliveryMode') && s['deliveryMode']) \
          && (s['deliveryMode'].key?('subscriberKey') && s['deliveryMode']['subscriberKey']) \
          && (
            s['deliveryMode'].key?('address') \
            && !s['deliveryMode']['address'].nil? \
            && !s['deliveryMode']['address'].empty?
          )
          return true
        end
        false
      end

      def subscription
        @_subscription
      end

      def set_subscription(data)
        _clear_timeout
        @_subscription = data
        _set_timeout
      end

      def reset
        _clear_timeout
        _unsubscribe_at_pubnub
        @_subscription = nil_subscription
      end

      def destroy
        reset
      end

      def _subscribe_at_pubnub
        raise 'Subscription is not alive' unless alive?

        s_key = @_subscription['deliveryMode']['subscriberKey']

        @_pubnub = new_pubnub(s_key, false, '')

        callback = Pubnub::SubscribeCallback.new(
          message: ->(envelope) {
            @client.config.logger.debug "MESSAGE: #{envelope.result[:data]}"
            _notify envelope.result[:data][:message]
            changed
          },
          presence: ->(envelope) {
            @client.config.logger.info "PRESENCE: #{envelope.result[:data]}"
          },
          status: lambda do |envelope|
            @client.config.logger.info "\n\n\n#{envelope.status}\n\n\n"
            if envelope.error?
              @client.config.logger.info "ERROR! #{envelope.status[:category]}"
            elsif envelope.status[:last_timetoken] == 0 # Connected!
              @client.config.logger.info('CONNECTED!')
            end
          end
        )

        @_pubnub.add_listener callback: callback, name: :ringcentral

        @_pubnub.subscribe(
          channels: @_subscription['deliveryMode']['address']
        )
        @client.config.logger.debug('SUBSCRIBED')
      end

      def _notify(message)
        count = count_observers
        @client.config.logger.debug("RingCentralSdk::REST::Subscription NOTIFYING '#{count}' observers")

        message = _decrypt message
        changed
        notify_observers message
      end

      def _decrypt(message)
        unless alive?
          raise 'Subscription is not alive'
        end

        if _encrypted?
          delivery_mode = @_subscription['deliveryMode']

          cipher = OpenSSL::Cipher::AES.new(128, :ECB)
          cipher.decrypt
          cipher.key = Base64.decode64(delivery_mode['encryptionKey'].to_s)

          ciphertext = Base64.decode64(message)
          plaintext = cipher.update(ciphertext) + cipher.final

          message = MultiJson.decode(plaintext, symbolize_keys: false)
        end

        message
      end

      def _encrypted?
        delivery_mode = @_subscription['deliveryMode']
        is_encrypted  = delivery_mode.key?('encryption') \
          && delivery_mode['encryption'] \
          && delivery_mode.key?('encryptionKey') \
          && delivery_mode['encryptionKey']
        is_encrypted
      end

      def _unsubscribe_at_pubnub
        if @_pubnub && alive?
          @_pubnub.unsubscribe(channel: @_subscription['deliveryMode']['address']) do |envelope|
            puts envelope.status
          end
        end
      end

      def _set_timeout
        _clear_timeout

        if @_subscription && !@_subscription.empty? && @_subscription.key?('expiresIn')
          time_to_expiration = (@_subscription['expiresIn'] - RENEW_HANDICAP)
        end

        @_timeout = Thread.new do
          sleep time_to_expiration
          renew
        end
      end

      def _clear_timeout
        @_timeout.exit if @_timeout.is_a?(Thread) && @_timeout.status == 'sleep'
        @_timeout = nil
      end

      def uri_join(*args)
        url = args.join('/').gsub(%r{/+}, '/')
        url.gsub(%r{^(https?:/)}i, '\1/')
      end

      def new_pubnub(subscribe_key = '', ssl_on = false, publish_key = '', my_logger = nil)
        Pubnub.new(
          subscribe_key: subscribe_key.to_s,
          publish_key: publish_key.to_s
        )
      end
    end
  end
end