tweetstream/tweetstream

View on GitHub
lib/tweetstream/client.rb

Summary

Maintainability
D
1 day
Test Coverage
require 'em-twitter'
require 'eventmachine'
require 'multi_json'
require 'twitter'
require 'forwardable'

require 'tweetstream/arguments'

module TweetStream
  # Provides simple access to the Twitter Streaming API (https://dev.twitter.com/docs/streaming-api)
  # for Ruby scripts that need to create a long connection to
  # Twitter for tracking and other purposes.
  #
  # Basic usage of the library is to call one of the provided
  # methods and provide a block that will perform actions on
  # a yielded Twitter::Tweet. For example:
  #
  #     TweetStream::Client.new.track('fail') do |status|
  #       puts "[#{status.user.screen_name}] #{status.text}"
  #     end
  #
  # For information about a daemonized TweetStream client,
  # view the TweetStream::Daemon class.
  class Client # rubocop:disable ClassLength
    extend Forwardable

    OPTION_CALLBACKS = [:close,
                        :delete,
                        :scrub_geo,
                        :limit,
                        :error,
                        :enhance_your_calm,
                        :unauthorized,
                        :reconnect,
                        :inited,
                        :direct_message,
                        :timeline_status,
                        :anything,
                        :no_data_received,
                        :status_withheld,
                        :user_withheld].freeze unless defined?(OPTION_CALLBACKS)

    # @private
    attr_accessor(*Configuration::VALID_OPTIONS_KEYS)
    attr_accessor :options
    attr_reader :control_uri, :control, :stream

    def_delegators :@control, :add_user, :remove_user, :info, :friends_ids

    # Creates a new API
    def initialize(options = {})
      self.options = options
      merged_options = TweetStream.options.merge(options)
      Configuration::VALID_OPTIONS_KEYS.each do |key|
        send("#{key}=", merged_options[key])
      end
      @control_uri = nil
      @control     = nil
      @callbacks   = {}
    end

    # Returns all public statuses. The Firehose is not a generally
    # available resource. Few applications require this level of access.
    # Creative use of a combination of other resources and various access
    # levels can satisfy nearly every application use case.
    def firehose(query_parameters = {}, &block)
      start('/1.1/statuses/firehose.json', query_parameters, &block)
    end

    # Returns all statuses containing http: and https:. The links stream is
    # not a generally available resource. Few applications require this level
    # of access. Creative use of a combination of other resources and various
    # access levels can satisfy nearly every application use case.
    def links(query_parameters = {}, &block)
      start('/1.1/statuses/links.json', query_parameters, &block)
    end

    # Returns all retweets. The retweet stream is not a generally available
    # resource. Few applications require this level of access. Creative
    # use of a combination of other resources and various access levels
    # can satisfy nearly every application use case. As of 9/11/2009,
    # the site-wide retweet feature has not yet launched,
    # so there are currently few, if any, retweets on this stream.
    def retweet(query_parameters = {}, &block)
      start('/1.1/statuses/retweet.json', query_parameters, &block)
    end

    # Returns a random sample of all public statuses. The default access level
    # provides a small proportion of the Firehose. The "Gardenhose" access
    # level provides a proportion more suitable for data mining and
    # research applications that desire a larger proportion to be statistically
    # significant sample.
    def sample(query_parameters = {}, &block)
      start('/1.1/statuses/sample.json', query_parameters, &block)
    end

    # Specify keywords to track. Queries are subject to Track Limitations,
    # described in Track Limiting and subject to access roles, described in
    # the statuses/filter method. Track keywords are case-insensitive logical
    # ORs. Terms are exact-matched, and also exact-matched ignoring
    # punctuation. Phrases, keywords with spaces, are not supported.
    # Keywords containing punctuation will only exact match tokens.
    # Query parameters may be passed as the last argument.
    def track(*keywords, &block)
      query = TweetStream::Arguments.new(keywords)
      filter(query.options.merge(:track => query), &block)
    end

    # Returns public statuses from or in reply to a set of users. Mentions
    # ("Hello @user!") and implicit replies ("@user Hello!" created without
    # pressing the reply "swoosh") are not matched. Requires integer user
    # IDs, not screen names. Query parameters may be passed as the last argument.
    def follow(*user_ids, &block)
      query = TweetStream::Arguments.new(user_ids)
      filter(query.options.merge(:follow => query), &block)
    end

    # Specifies a set of bounding boxes to track. Only tweets that are both created
    # using the Geotagging API and are placed from within a tracked bounding box will
    # be included in the stream -- the user's location field is not used to filter tweets
    # (e.g. if a user has their location set to "San Francisco", but the tweet was not created
    # using the Geotagging API and has no geo element, it will not be included in the stream).
    # Bounding boxes are specified as a comma separate list of longitude/latitude pairs, with
    # the first pair denoting the southwest corner of the box
    # longitude/latitude pairs, separated by commas. The first pair specifies the southwest corner of the box.
    def locations(*locations_map, &block)
      query = TweetStream::Arguments.new(locations_map)
      filter(query.options.merge(:locations => query), &block)
    end

    # Make a call to the statuses/filter method of the Streaming API,
    # you may provide <tt>:follow</tt>, <tt>:track</tt> or both as options
    # to follow the tweets of specified users or track keywords. This
    # method is provided separately for cases when it would conserve the
    # number of HTTP connections to combine track and follow.
    def filter(query_params = {}, &block)
      start('/1.1/statuses/filter.json', query_params.merge(:method => :post), &block)
    end

    # Make a call to the userstream api for currently authenticated user
    def userstream(query_params = {}, &block)
      stream_params = {:host => 'userstream.twitter.com'}
      query_params.merge!(:extra_stream_parameters => stream_params)
      start('/1.1/user.json', query_params, &block)
    end

    # Make a call to the userstream api
    def sitestream(user_ids = [], query_params = {}, &block)
      stream_params = {:host => 'sitestream.twitter.com'}
      query_params.merge!(
        :method                  => :post,
        :follow                  => user_ids,
        :extra_stream_parameters => stream_params,
      )
      query_params.merge!(:with => 'followings') if query_params.delete(:followings)
      start('/1.1/site.json', query_params, &block)
    end

    # Set a Proc to be run when Twitter connection is closed. Tied to the
    # "close_callback" and "on_close" method in the em-twitter gem's client.rb,
    # plus to "unbind" method in its connection.rb. For example:
    #
    #     @client = TweetStream::Client.new
    #     @client.on_close do
    #       puts 'Connection closed'
    #     end
    #
    # If no block is given, it will return the currently set
    # unauthorized proc. When a block is given, the TweetStream::Client
    # object is returned to allow for chaining.
    def on_close(&block)
      on('close', &block)
    end

    # Set a Proc to be run when a deletion notice is received
    # from the Twitter stream. For example:
    #
    #     @client = TweetStream::Client.new
    #     @client.on_delete do |status_id, user_id|
    #       Tweet.delete(status_id)
    #     end
    #
    # Block must take two arguments: the status id and the user id.
    # If no block is given, it will return the currently set
    # deletion proc. When a block is given, the TweetStream::Client
    # object is returned to allow for chaining.
    def on_delete(&block)
      on('delete', &block)
    end

    # Set a Proc to be run when a scrub_geo notice is received
    # from the Twitter stream. For example:
    #
    #     @client = TweetStream::Client.new
    #     @client.on_scrub_geo do |up_to_status_id, user_id|
    #       Tweet.where(:status_id <= up_to_status_id)
    #     end
    #
    # Block must take two arguments: the upper status id and the user id.
    # If no block is given, it will return the currently set
    # scrub_geo proc. When a block is given, the TweetStream::Client
    # object is returned to allow for chaining.
    def on_scrub_geo(&block)
      on('scrub_geo', &block)
    end

    # Set a Proc to be run when a rate limit notice is received
    # from the Twitter stream. For example:
    #
    #     @client = TweetStream::Client.new
    #     @client.on_limit do |discarded_count|
    #       # Make note of discarded count
    #     end
    #
    # Block must take one argument: the number of discarded tweets.
    # If no block is given, it will return the currently set
    # limit proc. When a block is given, the TweetStream::Client
    # object is returned to allow for chaining.
    def on_limit(&block)
      on('limit', &block)
    end

    # Set a Proc to be run when an HTTP error is encountered in the
    # processing of the stream. Note that TweetStream will automatically
    # try to reconnect, this is for reference only. Don't panic!
    #
    #     @client = TweetStream::Client.new
    #     @client.on_error do |message|
    #       # Make note of error message
    #     end
    #
    # Block must take one argument: the error message.
    # If no block is given, it will return the currently set
    # error proc. When a block is given, the TweetStream::Client
    # object is returned to allow for chaining.
    def on_error(&block)
      on('error', &block)
    end

    # Set a Proc to be run when an HTTP status 401 is encountered while
    # connecting to Twitter. This could happen when system clock drift
    # has occured.
    #
    # If no block is given, it will return the currently set
    # unauthorized proc. When a block is given, the TweetStream::Client
    # object is returned to allow for chaining.
    def on_unauthorized(&block)
      on('unauthorized', &block)
    end

    # Set a Proc to be run when a direct message is encountered in the
    # processing of the stream.
    #
    #     @client = TweetStream::Client.new
    #     @client.on_direct_message do |direct_message|
    #       # do something with the direct message
    #     end
    #
    # Block must take one argument: the direct message.
    # If no block is given, it will return the currently set
    # direct message proc. When a block is given, the TweetStream::Client
    # object is returned to allow for chaining.
    def on_direct_message(&block)
      on('direct_message', &block)
    end

    # Set a Proc to be run whenever anything is encountered in the
    # processing of the stream.
    #
    #     @client = TweetStream::Client.new
    #     @client.on_anything do |status|
    #       # do something with the status
    #     end
    #
    # Block can take one or two arguments. |status (, client)|
    # If no block is given, it will return the currently set
    # timeline status proc. When a block is given, the TweetStream::Client
    # object is returned to allow for chaining.
    def on_anything(&block)
      on('anything', &block)
    end

    # Set a Proc to be run when a regular timeline message is encountered in the
    # processing of the stream.
    #
    #     @client = TweetStream::Client.new
    #     @client.on_timeline_status do |status|
    #       # do something with the status
    #     end
    #
    # Block can take one or two arguments. |status (, client)|
    # If no block is given, it will return the currently set
    # timeline status proc. When a block is given, the TweetStream::Client
    # object is returned to allow for chaining.
    def on_timeline_status(&block)
      on('timeline_status', &block)
    end

    # Set a Proc to be run on reconnect.
    #
    #     @client = TweetStream::Client.new
    #     @client.on_reconnect do |timeout, retries|
    #       # Make note of the reconnection
    #     end
    #
    def on_reconnect(&block)
      on('reconnect', &block)
    end

    # Set a Proc to be run when connection established.
    # Called in EventMachine::Connection#post_init
    #
    #     @client = TweetStream::Client.new
    #     @client.on_inited do
    #       puts 'Connected...'
    #     end
    #
    def on_inited(&block)
      on('inited', &block)
    end

    # Set a Proc to be run when no data is received from the server
    # and a stall occurs.  Twitter defines this to be 90 seconds.
    #
    #     @client = TweetStream::Client.new
    #     @client.on_no_data_received do
    #       # Make note of no data, possi
    #     end
    def on_no_data_received(&block)
      on('no_data_received', &block)
    end

    # Set a Proc to be run when enhance_your_calm signal is received.
    #
    #     @client = TweetStream::Client.new
    #     @client.on_enhance_your_calm do
    #       # do something, your account has been blocked
    #     end
    def on_enhance_your_calm(&block)
      on('enhance_your_calm', &block)
    end

    # Set a Proc to be run when a status_withheld message is received.
    #
    #     @client = TweetStream::Client.new
    #     @client.on_status_withheld do |status|
    #       # do something with the status
    #     end
    def on_status_withheld(&block)
      on('status_withheld', &block)
    end

    # Set a Proc to be run when a status_withheld message is received.
    #
    #     @client = TweetStream::Client.new
    #     @client.on_user_withheld do |status|
    #       # do something with the status
    #     end
    def on_user_withheld(&block)
      on('user_withheld', &block)
    end

    # Set a Proc to be run when a Site Stream friends list is received.
    #
    #     @client = TweetStream::Client.new
    #     @client.on_friends do |friends|
    #       # do something with the friends list
    #     end
    def on_friends(&block)
      on('friends', &block)
    end

    # Set a Proc to be run when a stall warning is received.
    #
    #     @client = TweetStream::Client.new
    #     @client.on_stall_warning do |warning|
    #       # do something with the friends list
    #     end
    def on_stall_warning(&block)
      on('stall_warning', &block)
    end

    # Set a Proc to be run on userstream events
    #
    #     @client = TweetStream::Client.new
    #     @client.on_event(:favorite) do |event|
    #       # do something with the status
    #     end
    def on_event(event, &block)
      on(event, &block)
    end

    # Set a Proc to be run when sitestream control is received
    #
    #     @client = TweetStream::Client.new
    #     @client.on_control do
    #       # do something with the status
    #     end
    def on_control(&block)
      on('control', &block)
    end

    def on(event, &block)
      if block_given?
        @callbacks[event.to_s] = block
        self
      else
        @callbacks[event.to_s]
      end
    end

    # connect to twitter while starting a new EventMachine run loop
    def start(path, query_parameters = {}, &block)
      if EventMachine.reactor_running?
        connect(path, query_parameters, &block)
      else
        if EventMachine.epoll?
          EventMachine.epoll
        elsif EventMachine.kqueue?
          EventMachine.kqueue
        else
          Kernel.warn('Your OS does not support epoll or kqueue.')
        end

        EventMachine.run do
          connect(path, query_parameters, &block)
        end
      end
    end

    # connect to twitter without starting a new EventMachine run loop
    def connect(path, options = {}, &block)
      stream_parameters, callbacks = connection_options(path, options)

      @stream = EM::Twitter::Client.connect(stream_parameters)
      @stream.each do |item|
        begin
          hash = MultiJson.decode(item, :symbolize_keys => true)
        rescue MultiJson::DecodeError
          invoke_callback(callbacks['error'], "MultiJson::DecodeError occured in stream: #{item}")
          next
        end

        unless hash.is_a?(::Hash)
          invoke_callback(callbacks['error'], "Unexpected JSON object in stream: #{item}")
          next
        end

        respond_to(hash, callbacks, &block)

        yield_message_to(callbacks['anything'], hash)
      end

      @stream.on_close { invoke_callback(callbacks['close']) }

      @stream.on_error do |message|
        invoke_callback(callbacks['error'], message)
      end

      @stream.on_unauthorized { invoke_callback(callbacks['unauthorized']) }

      @stream.on_enhance_your_calm { invoke_callback(callbacks['enhance_your_calm']) }

      @stream.on_reconnect do |timeout, retries|
        invoke_callback(callbacks['reconnect'], timeout, retries)
      end

      @stream.on_max_reconnects do |timeout, retries|
        fail TweetStream::ReconnectError.new(timeout, retries)
      end

      @stream.on_no_data_received { invoke_callback(callbacks['no_data_received']) }

      @stream
    end

    # Terminate the currently running TweetStream and close EventMachine loop
    def stop
      EventMachine.stop_event_loop
      @last_status
    end

    # Close the connection to twitter without closing the eventmachine loop
    def close_connection
      @stream.close_connection if @stream
    end

    def stop_stream
      @stream.stop if @stream
    end

    def controllable?
      !!@control
    end

  protected

    def respond_to(hash, callbacks, &block) # rubocop:disable CyclomaticComplexity, PerceivedComplexity
      if hash[:control] && hash[:control][:control_uri]
        @control_uri = hash[:control][:control_uri]
        require 'tweetstream/site_stream_client'
        @control = TweetStream::SiteStreamClient.new(@control_uri, options)
        @control.on_error(&callbacks['error'])
        invoke_callback(callbacks['control'])
      elsif hash[:warning]
        invoke_callback(callbacks['stall_warning'], hash[:warning])
      elsif hash[:delete] && hash[:delete][:status]
        invoke_callback(callbacks['delete'], hash[:delete][:status][:id], hash[:delete][:status][:user_id])
      elsif hash[:scrub_geo] && hash[:scrub_geo][:up_to_status_id]
        invoke_callback(callbacks['scrub_geo'], hash[:scrub_geo][:up_to_status_id], hash[:scrub_geo][:user_id])
      elsif hash[:limit] && hash[:limit][:track]
        invoke_callback(callbacks['limit'], hash[:limit][:track])
      elsif hash[:direct_message]
        yield_message_to(callbacks['direct_message'], Twitter::DirectMessage.new(hash[:direct_message]))
      elsif hash[:status_withheld]
        invoke_callback(callbacks['status_withheld'], hash[:status_withheld])
      elsif hash[:user_withheld]
        invoke_callback(callbacks['user_withheld'], hash[:user_withheld])
      elsif hash[:event]
        invoke_callback(callbacks[hash[:event].to_s], hash)
      elsif hash[:friends]
        invoke_callback(callbacks['friends'], hash[:friends])
      elsif hash[:text] && hash[:user]
        @last_status = Twitter::Tweet.new(hash)
        yield_message_to(callbacks['timeline_status'], @last_status)

        yield_message_to(block, @last_status) if block_given?
      elsif hash[:for_user]
        yield_message_to(block, hash) if block_given?
      end
    end

    def normalize_filter_parameters(query_parameters = {})
      [:follow, :track, :locations].each do |param|
        if query_parameters[param].is_a?(Array)
          query_parameters[param] = query_parameters[param].flatten.collect(&:to_s).join(',')
        elsif query_parameters[param]
          query_parameters[param] = query_parameters[param].to_s
        end
      end
      query_parameters
    end

    def auth_params
      if auth_method.to_s == 'basic'
        {
          :basic => {
            :username => username,
            :password => password,
          },
        }
      else
        {
          :oauth => {
            :consumer_key => consumer_key,
            :consumer_secret => consumer_secret,
            :token => oauth_token,
            :token_secret => oauth_token_secret,
          },
        }
      end
    end

    # A utility method used to invoke callback methods against the Client
    def invoke_callback(callback, *args)
      callback.call(*args) if callback
    end

    def yield_message_to(procedure, message)
      return if procedure.nil?
      # Give the block the option to receive either one or two arguments,
      # depending on its arity.
      case procedure.arity
      when 1
        invoke_callback(procedure, message)
      when 2
        invoke_callback(procedure, message, self)
      end
    end

    def connection_options(path, options)
      warn_if_callbacks(options)

      callbacks = @callbacks.dup
      OPTION_CALLBACKS.each do |callback|
        callbacks.merge(callback.to_s => options.delete(callback)) if options[callback]
      end

      inited_proc             = options.delete(:inited) || @callbacks['inited']
      extra_stream_parameters = options.delete(:extra_stream_parameters) || {}

      stream_params = {
        :path       => path,
        :method     => (options.delete(:method) || 'get').to_s.upcase,
        :user_agent => user_agent,
        :on_inited  => inited_proc,
        :params     => normalize_filter_parameters(options),
        :proxy      => proxy,
      }.merge(extra_stream_parameters).merge(auth_params)

      [stream_params, callbacks]
    end

    def warn_if_callbacks(options = {})
      Kernel.warn('Passing callbacks via the options hash is deprecated and will be removed in TweetStream 3.0') if OPTION_CALLBACKS.count { |callback| options[callback] } > 0
    end
  end
end