aliciawyy/dmining

View on GitHub
rx_tutorial/twitter_credentials.py

Summary

Maintainability
A
0 mins
Test Coverage
import os
import pprint
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import json
import time
from rx import Observable

TWITTER_CREDENTIAL_FILE = os.path.expanduser("~/.config/twitter/credentials")
"""
twitter credential file example

consumer_key:xxx
consumer_secret:xxx
access_token:xxx
access_token_secret:xxx
"""


def get_twitter_credentials():
    credentials = {}
    with open(TWITTER_CREDENTIAL_FILE) as f:
        for x in f.readlines():
            k, v = x.strip().split(':')
            credentials[k] = v
    return credentials


def tweets_for(topics):

    def observe_tweets(observer):
        class TweetListener(StreamListener):
            def on_data(self, raw_data):
                observer.on_next(raw_data)
                return True

            def on_error(self, status_code):
                observer.on_error(status_code)

        l = TweetListener()
        keys = get_twitter_credentials()
        auth = OAuthHandler(keys["consumer_key"], keys["consumer_secret"])
        auth.set_access_token(keys["access_token"],
                              keys["access_token_secret"])
        stream = Stream(auth, l)
        stream.filter(track=topics)

    return Observable.create(observe_tweets).share()

topics0 = ["SP500", "Trump"]

disposable = tweets_for(topics0).map(lambda d: json.loads(d)) \
    .filter(lambda m: "text" in m) \
    .map(lambda m: m["text"].strip()) \
    .subscribe(pprint.pprint)

time.sleep(5)
disposable.dispose()