lib/ringcentral_sdk/rest/subscription.rb
require 'base64'
require 'logger'
require 'multi_json'
require 'observer'
require 'openssl'
require 'pubnub'
module RingCentralSdk
module REST
# Subscription class is an observerable class that represents
# one RingCentral subscription using the PubNub transport via
# the Subscription API
class Subscription
include Observable
RENEW_HANDICAP = 60
attr_reader :event_filters
def initialize(client)
@client = client
@event_filters = []
@_timeout = nil
@_subscription = nil_subscription
@_pubnub = nil
@_logger_prefix = " -- #{self.class.name}: "
end
def nil_subscription
{
'eventFilters' => [],
'expirationTime' => '', # 2014-03-12T19:54:35.613Z
'expiresIn' => 0,
'deliveryMode' => {
'transportType' => 'PubNub',
'encryption' => false,
'address' => '',
'subscriberKey' => '',
'secretKey' => ''
},
'id' => '',
'creationTime' => '', # 2014-03-12T19:54:35.613Z
'status' => '', # Active
'uri' => ''
}
end
def pubnub
@_pubnub
end
def register(events = nil)
alive? ? renew(events) : subscribe(events)
end
def add_events(events)
raise 'Events is not an array.' unless events.is_a? Array
@event_filters.push(events) unless events.empty?
end
def set_events(events)
raise 'Events is not an array.' unless events.is_a? Array
@event_filters = events
end
def subscribe(events = nil)
set_events(events) if events.is_a? Array
raise 'Events are undefined' unless @event_filters.is_a?(Array) && !@event_filters.empty?
begin
response = @client.http.post do |req|
req.url 'subscription'
req.headers['Content-Type'] = 'application/json'
req.body = {
eventFilters: @event_filters,
deliveryMode: {
transportType: 'PubNub',
encryption: 'true'
}
}
end
puts response.body
set_subscription response.body
_subscribe_at_pubnub
changed
notify_observers response
return response
rescue StandardError => e
reset
changed
notify_observers(e)
raise 'Subscribe HTTP Request Error: ' + e.to_s
end
end
def renew(events = nil)
set_events(events) if events.is_a? Array
raise 'Subscription is not alive' unless alive?
raise 'Events are undefined' if @event_filters.empty?
_clear_timeout
begin
response = @client.http.post do |req|
req.url uri_join(@_subscription['uri'], 'renew')
req.headers['Content-Type'] = 'application/json'
end
set_subscription response.body
changed
notify_observers response
return response
rescue StandardError => e
@client.config.logger.warn "RingCentralSdk::REST::Subscription: RENEW_ERROR #{e}"
reset
changed
notify_observers e
raise 'Renew HTTP Request Error'
end
end
def remove
raise 'Subscription is not alive' unless alive?
begin
response = @client.http.delete do |req|
req.url 'subscription/' + @_subscription['id'].to_s
end
reset
changed
notify_observers response.body
return response
rescue StandardError => e
reset
changed
notify_observers e
end
end
def alive?
s = @_subscription
if
(s.key?('deliveryMode') && s['deliveryMode']) \
&& (s['deliveryMode'].key?('subscriberKey') && s['deliveryMode']['subscriberKey']) \
&& (
s['deliveryMode'].key?('address') \
&& !s['deliveryMode']['address'].nil? \
&& !s['deliveryMode']['address'].empty?
)
return true
end
false
end
def subscription
@_subscription
end
def set_subscription(data)
_clear_timeout
@_subscription = data
_set_timeout
end
def reset
_clear_timeout
_unsubscribe_at_pubnub
@_subscription = nil_subscription
end
def destroy
reset
end
def _subscribe_at_pubnub
raise 'Subscription is not alive' unless alive?
s_key = @_subscription['deliveryMode']['subscriberKey']
@_pubnub = new_pubnub(s_key, false, '')
callback = Pubnub::SubscribeCallback.new(
message: ->(envelope) {
@client.config.logger.debug "MESSAGE: #{envelope.result[:data]}"
_notify envelope.result[:data][:message]
changed
},
presence: ->(envelope) {
@client.config.logger.info "PRESENCE: #{envelope.result[:data]}"
},
status: lambda do |envelope|
@client.config.logger.info "\n\n\n#{envelope.status}\n\n\n"
if envelope.error?
@client.config.logger.info "ERROR! #{envelope.status[:category]}"
elsif envelope.status[:last_timetoken] == 0 # Connected!
@client.config.logger.info('CONNECTED!')
end
end
)
@_pubnub.add_listener callback: callback, name: :ringcentral
@_pubnub.subscribe(
channels: @_subscription['deliveryMode']['address']
)
@client.config.logger.debug('SUBSCRIBED')
end
def _notify(message)
count = count_observers
@client.config.logger.debug("RingCentralSdk::REST::Subscription NOTIFYING '#{count}' observers")
message = _decrypt message
changed
notify_observers message
end
def _decrypt(message)
unless alive?
raise 'Subscription is not alive'
end
if _encrypted?
delivery_mode = @_subscription['deliveryMode']
cipher = OpenSSL::Cipher::AES.new(128, :ECB)
cipher.decrypt
cipher.key = Base64.decode64(delivery_mode['encryptionKey'].to_s)
ciphertext = Base64.decode64(message)
plaintext = cipher.update(ciphertext) + cipher.final
message = MultiJson.decode(plaintext, symbolize_keys: false)
end
message
end
def _encrypted?
delivery_mode = @_subscription['deliveryMode']
is_encrypted = delivery_mode.key?('encryption') \
&& delivery_mode['encryption'] \
&& delivery_mode.key?('encryptionKey') \
&& delivery_mode['encryptionKey']
is_encrypted
end
def _unsubscribe_at_pubnub
if @_pubnub && alive?
@_pubnub.unsubscribe(channel: @_subscription['deliveryMode']['address']) do |envelope|
puts envelope.status
end
end
end
def _set_timeout
_clear_timeout
if @_subscription && !@_subscription.empty? && @_subscription.key?('expiresIn')
time_to_expiration = (@_subscription['expiresIn'] - RENEW_HANDICAP)
end
@_timeout = Thread.new do
sleep time_to_expiration
renew
end
end
def _clear_timeout
@_timeout.exit if @_timeout.is_a?(Thread) && @_timeout.status == 'sleep'
@_timeout = nil
end
def uri_join(*args)
url = args.join('/').gsub(%r{/+}, '/')
url.gsub(%r{^(https?:/)}i, '\1/')
end
def new_pubnub(subscribe_key = '', ssl_on = false, publish_key = '', my_logger = nil)
Pubnub.new(
subscribe_key: subscribe_key.to_s,
publish_key: publish_key.to_s
)
end
end
end
end