sferik/twitter

View on GitHub
lib/twitter/streaming/client.rb

Summary

Maintainability
A
0 mins
Test Coverage
require "http/request"
require "twitter/arguments"
require "twitter/client"
require "twitter/headers"
require "twitter/streaming/connection"
require "twitter/streaming/message_parser"
require "twitter/streaming/response"
require "twitter/utils"

module Twitter
  module Streaming
    class Client < Twitter::Client
      include Twitter::Utils
      attr_writer :connection

      # Initializes a new Client object
      #
      # @param options [Hash] A customizable set of options.
      # @option options [String] :tcp_socket_class A class that Connection will use to create a new TCP socket.
      # @option options [String] :ssl_socket_class A class that Connection will use to create a new SSL socket.
      # @return [Twitter::Streaming::Client]
      def initialize(options = {})
        super
        @connection = Streaming::Connection.new(options)
      end

      # Returns public statuses that match one or more filter predicates
      #
      # @see https://dev.twitter.com/streaming/reference/post/statuses/filter
      # @see https://dev.twitter.com/streaming/overview/request-parameters
      # @note At least one predicate parameter (follow, locations, or track) must be specified.
      # @param options [Hash] A customizable set of options.
      # @option options [String] :follow A comma separated list of user IDs, indicating the users to return statuses for in the stream.
      # @option options [String] :track Includes additional Tweets matching the specified keywords. Phrases of keywords are specified by a comma-separated list.
      # @option options [String] :locations Includes additional Tweets falling within the specified bounding boxes.
      # @yield [Twitter::Tweet, Twitter::Streaming::Event, Twitter::DirectMessage, Twitter::Streaming::FriendList, Twitter::Streaming::DeletedTweet, Twitter::Streaming::StallWarning] A stream of Twitter objects.
      def filter(options = {}, &)
        request(:post, "https://stream.twitter.com:443/1.1/statuses/filter.json", options, &)
      end

      # Returns all public statuses
      #
      # @see https://dev.twitter.com/streaming/reference/get/statuses/firehose
      # @see https://dev.twitter.com/streaming/overview/request-parameters
      # @note This endpoint requires special permission to access.
      # @param options [Hash] A customizable set of options.
      # @option options [Integer] :count The number of messages to backfill.
      # @yield [Twitter::Tweet, Twitter::Streaming::Event, Twitter::DirectMessage, Twitter::Streaming::FriendList, Twitter::Streaming::DeletedTweet, Twitter::Streaming::StallWarning] A stream of Twitter objects.
      def firehose(options = {}, &)
        request(:get, "https://stream.twitter.com:443/1.1/statuses/firehose.json", options, &)
      end

      # Returns a small random sample of all public statuses
      #
      # @see https://dev.twitter.com/streaming/reference/get/statuses/sample
      # @see https://dev.twitter.com/streaming/overview/request-parameters
      # @yield [Twitter::Tweet, Twitter::Streaming::Event, Twitter::DirectMessage, Twitter::Streaming::FriendList, Twitter::Streaming::DeletedTweet, Twitter::Streaming::StallWarning] A stream of Twitter objects.
      def sample(options = {}, &)
        request(:get, "https://stream.twitter.com:443/1.1/statuses/sample.json", options, &)
      end

      # Streams messages for a set of users
      #
      # @see https://dev.twitter.com/streaming/reference/get/site
      # @see https://dev.twitter.com/streaming/sitestreams
      # @see https://dev.twitter.com/streaming/overview/request-parameters
      # @note Site Streams is currently in a limited beta. Access is restricted to whitelisted accounts.
      # @overload site(*follow, options = {}, &block)
      #   @param follow [Enumerable<Integer, String, Twitter::User>] A list of user IDs, indicating the users to return statuses for in the stream.
      #   @param options [Hash] A customizable set of options.
      #   @option options [String] :with Specifies whether to return information for just the users specified in the follow parameter, or include messages from accounts they follow.
      #   @option options [String] :replies Specifies whether stall warnings should be delivered.
      #   @yield [Twitter::Tweet, Twitter::Streaming::Event, Twitter::DirectMessage, Twitter::Streaming::FriendList, Twitter::Streaming::DeletedTweet, Twitter::Streaming::StallWarning] A stream of Twitter objects.
      def site(*args, &)
        arguments = Arguments.new(args)
        user_ids = collect_user_ids(arguments)
        request(:get, "https://sitestream.twitter.com:443/1.1/site.json", arguments.options.merge(follow: user_ids.join(",")), &)
      end

      # Streams messages for a single user
      #
      # @see https://dev.twitter.com/streaming/reference/get/user
      # @see https://dev.twitter.com/streaming/userstreams
      # @see https://dev.twitter.com/streaming/overview/request-parameters
      # @param options [Hash] A customizable set of options.
      # @option options [String] :with Specifies whether to return information for just the users specified in the follow parameter, or include messages from accounts they follow.
      # @option options [String] :replies Specifies whether to return additional @replies.
      # @option options [String] :stall_warnings Specifies whether stall warnings should be delivered.
      # @option options [String] :track Includes additional Tweets matching the specified keywords. Phrases of keywords are specified by a comma-separated list.
      # @option options [String] :locations Includes additional Tweets falling within the specified bounding boxes.
      # @yield [Twitter::Tweet, Twitter::Streaming::Event, Twitter::DirectMessage, Twitter::Streaming::FriendList, Twitter::Streaming::DeletedTweet, Twitter::Streaming::StallWarning] A stream of Twitter objects.
      def user(options = {}, &)
        request(:get, "https://userstream.twitter.com:443/1.1/user.json", options, &)
      end

      # Set a Proc to be run when connection established.
      def before_request(&block)
        if block
          @before_request = block
          self
        elsif instance_variable_defined?(:@before_request)
          @before_request
        else
          proc {}
        end
      end

      def close
        @connection.close
      end

    private

      def request(method, uri, params)
        before_request.call
        headers = Twitter::Headers.new(self, method, uri, params).request_headers
        request = HTTP::Request.new(verb: method, uri: "#{uri}?#{to_url_params(params)}", headers:, proxy:)
        response = Streaming::Response.new do |data|
          if item = Streaming::MessageParser.parse(data) # rubocop:disable Lint/AssignmentInCondition
            yield(item)
          end
        end
        @connection.stream(request, response)
      end

      def to_url_params(params)
        uri = Addressable::URI.new
        uri.query_values = params
        uri.query
      end

      # Takes a mixed array of Integers and Twitter::User objects and returns a
      # consistent array of Twitter user IDs.
      #
      # @param users [Array]
      # @return [Array<Integer>]
      def collect_user_ids(users)
        users.filter_map do |user|
          case user
          when Integer       then user
          when Twitter::User then user.id
          end
        end
      end
    end
  end
end