devth/yetibot

View on GitHub
src/yetibot/models/twitter.clj

Summary

Maintainability
Test Coverage
(ns yetibot.models.twitter
  (:require
    [yetibot.db.twitter :as db]
    [yetibot.core.util.http :refer [html-decode]]
    [yetibot.core.config :refer [get-config]]
    [yetibot.core.chat :as chat]
    [clojure.spec.alpha :as s]
    [taoensso.timbre :refer [info warn error]]
    [clj-http.client :as client]
    [clojure.string :as string]
    [clojure.data.json :as json]
    [twitter.oauth :refer :all]
    [twitter.callbacks :refer :all]
    [twitter.callbacks.handlers :refer :all]
    [twitter.api.restful :refer :all]
    [twitter.api.streaming :refer :all])
  (:import
    (twitter.callbacks.protocols SyncSingleCallback)
    (twitter.callbacks.protocols AsyncStreamingCallback)))

;;;; config

(s/def ::key string?)

(s/def ::secret string?)

(s/def ::consumer (s/keys :req-un [::key ::secret]))

(s/def ::token string?)

(s/def ::lang string?)

(s/def ::search (s/keys :req-un [::lang]))

(s/def ::config (s/keys :req-un [::consumer ::token ::secret ::search]))

(def config (:value (get-config ::config [:twitter])))

(def creds (apply make-oauth-creds
                  ((juxt (comp :key :consumer)
                         (comp :secret :consumer)
                         :token :secret) config)))

;;;; helper

(defn format-url [user id] (format "https://twitter.com/%s/status/%s" user id))

(defn expand-url [url]
  (let [resp (client/get url {:follow-redirects true})]
    (if-let [redirs (:trace-redirects resp)]
      (last redirs)
      url)))

(defn expand-twitter-urls [text]
  (string/replace text #"https*://t.co/\S+" expand-url))

(defn format-screen-name [json]
  (:screen_name (:user json)))

(defn format-media-urls [json]
  ;; (info (:entities json))
  (->> (:entities json)
       :media
       (map :media_url)
       (string/join " ")))

(defn format-tweet-text [{:keys [extended_tweet] :as json}]
  ;; use extended_tweet if available; otherwise fallback to regular json entity:
  ;; extended_tweet support's Twitter's new 280 char and is only set if the Tweet
  ;; exceeds 140 chars
  (let [tweet (or extended_tweet json)]
    (str
      (or (:full_text tweet)
          (:text tweet))
      " " (format-media-urls tweet))))

(defn format-tweet [{:keys [extended_tweet] :as json}]
  #_(info "tweet json" (pr-str json))
  (let [screen-name (format-screen-name json)
        url (format-url screen-name (:id json))
        retweeted-status (:retweeted_status json)
        text (if retweeted-status
               (str "RT " (format-screen-name retweeted-status) ": "
                    (format-tweet-text retweeted-status))
               (format-tweet-text json))]
    (format "%s — @%s %s"
            (html-decode text)
            ; (-> (:text json) expand-twitter-urls html-decode)
            screen-name url)))

(defn send-tweet
  "Broadcast tweet to any channels that have broadcast: true, e.g.:
   channel set broadcast true"
  [json]
  #_(info "send-tweet" (pr-str json))
  (chat/broadcast (format-tweet json)))

;;;; streaming callback

(defn succ
  "Streaming callback success
   - response = the response that has the status and headers
   - baos = the ByteArrayOutputStream that contains a chunk of the stream"
  [response baos]
  (try
    (let [raw (str baos)
          json (if-not (empty? raw) (json/read-json raw))]
      (if (and json (:user json))
        (send-tweet json)))
    (catch Exception e)))

(def fail
  (comp
    (fn [error-response]
      (error "twitter streaming error" error-response))
    ;; response-return-everything
    ))

(defn exception [response exception]
  (error "twitter streaming exception"
         (pr-str response)
         (pr-str exception)))

(def streaming-callback (AsyncStreamingCallback. succ fail exception))

;;;; user stream

(defonce user-stream-resp
  (future
    (user-stream :oauth-creds creds :callbacks streaming-callback)))

;;;; search

(defn search [query]
  (info "twitter search for" query)
  (search-tweets
    :oauth-creds creds
    :params {:tweet_mode "extended"
             :count 20
             :q query
             :lang (:lang (:search config))}))


;;;; topic tracking

(defonce statuses-streaming-response (atom nil))

(defn reset-streaming-topics [ts]
  ; first cancel the streaming-response if it exists
  (when-let [s @statuses-streaming-response] ((:cancel (meta s))))
  ; now create a new streaming connection with the new topics
  (reset! statuses-streaming-response
          (statuses-filter :params {:track (string/join "," ts)}
                           :oauth-creds creds
                           :callbacks streaming-callback)))

(comment
  ;; reset the stream:
  (reload-topics)

  ;; inspect the current state of the streaming connection
  (meta @statuses-streaming-response)

  ((:cancelled? (meta @statuses-streaming-response)))

  ;; stop the stream
  ((:cancel (meta @statuses-streaming-response)))

  )

(defn reload-topics []
  (let [topics (db/find-all)]
    (info "reloading stream for topics" (pr-str topics))
    (reset-streaming-topics (map :topic topics))))

(defn add-topic [user-id topic]
  (let [result (db/create {:user-id user-id :topic topic})]
    (reload-topics)
    result))

(defn remove-topic [topic-id]
  (let [result (db/delete topic-id)]
    (reload-topics)
    result))

;; on startup, load the existing topics
(future (reload-topics))

;;;; follow / unfollow

(defn follow [screen-name]
  (friendships-create :oauth-creds creds
                      :params {:screen_name screen-name}))

(defn unfollow [screen-name]
  (friendships-destroy :oauth-creds creds
                       :params {:screen_name screen-name}))

(defn following []
  (loop [cursor -1
         users []
         iter 0]
    (let [body (:body (friends-list :oauth-creds creds
                                    :params {:skip-status true
                                             :include-user-entities false
                                             :cursor cursor}))
          current-users (into users (:users body))
          next-cursor (:next_cursor body)]
      (if (or (> iter 10) (zero? next-cursor)) ; limit to 10 pages
        current-users
        ; keep looping to fetch all pages until cursor is 0
        (recur next-cursor current-users (inc iter))))))

;;;; tweet

(defn tweet [status]
  (statuses-update :oauth-creds creds
                   :params {:status status}))

(defn retweet [id]
  (statuses-retweet-id :oauth-creds creds
                       :params {:id id}))

(defn reply [id status]
  (statuses-update :oauth-creds creds
                   :params {:in_reply_to_status_id id
                            :status status}))

;;;; users

(defn user-timeline [screen-name & tweet-count]
   (statuses-user-timeline :oauth-creds creds
                           :params {:screen-name screen-name
                                    :tweet_mode "extended"
                                    :count (if-not (nil? tweet-count)
                                             tweet-count
                                             3)}))
;;;; show tweet with id

(defn show [id]
  (statuses-show-id :oauth-creds creds
                    :params {:tweet_mode "extended"
                             :id id}))

;; db helpers

(defn find-by-topic [topic]
  (db/query {:where/map {:topic topic}}))

(defn find-all []
  (db/find-all))

;; scratch

(comment

  (->>
    (search "#rot13")
    :body
    :statuses
    first
    )


  (->>
    (search "#rot13")
    :body
    keys)

  (->>
    (search "#rot13")
    :body
    :search_metadata)

  )