fuzeman/trakt.py

View on GitHub
trakt/interfaces/oauth/device.py

Summary

Maintainability
A
1 hr
Test Coverage


from trakt.core.emitter import Emitter
from trakt.interfaces.base import Interface

from datetime import datetime, timedelta
from threading import Thread
import calendar
import logging
import requests
import time

log = logging.getLogger(__name__)


class DeviceOAuthInterface(Interface):
    path = 'oauth/device'

    def code(self, **kwargs):
        client_id = self.client.configuration['client.id']

        if not client_id:
            raise ValueError('"client.id" configuration parameter is required')

        response = self.http.post(
            'code',
            data={
                'client_id': client_id
            }
        )

        data = self.get_data(response, **kwargs)

        if isinstance(data, requests.Response):
            return data

        if not data:
            return None

        return data

    def poll(self, device_code, expires_in, interval, **kwargs):
        """Construct the device authentication poller.

        :param device_code: Device authentication code
        :type device_code: str

        :param expires_in: Device authentication code expiry (in seconds)
        :type         in: int

        :param interval: Device authentication poll interval
        :type interval: int

        :rtype: DeviceOAuthPoller
        """
        return DeviceOAuthPoller(self.client, device_code, expires_in, interval)

    def token(self, device_code, **kwargs):
        client_id = self.client.configuration['client.id']
        client_secret = self.client.configuration['client.secret']

        if not client_id:
            raise ValueError('"client.id" and "client.secret" configuration parameters are required')

        response = self.http.post(
            'token',
            data={
                'client_id': client_id,
                'client_secret': client_secret,

                'code': device_code
            }
        )

        data = self.get_data(response, **kwargs)

        if isinstance(data, requests.Response):
            return data

        if not data:
            return None

        return data


class DeviceOAuthPoller(Interface, Emitter):
    def __init__(self, client, device_code, expires_in, interval):
        super(DeviceOAuthPoller, self).__init__(client)

        self.device_code = device_code
        self.expires_in = expires_in
        self.interval = interval

        # Calculate code expiry date/time
        self.expires_at = datetime.utcnow() + timedelta(seconds=self.expires_in)

        # Private attributes
        self._abort = False
        self._active = False
        self._running = False
        self._thread = None

    @property
    def active(self):
        return self._active

    def has_expired(self):
        return datetime.utcnow() > self.expires_at

    def start(self, daemon=None):
        if self._active or self._thread:
            raise Exception('Poller already started')

        # Construct thread process wrapper
        def wrapper():
            try:
                self._process()
            except Exception as ex:
                log.warning('Exception raised in DeviceOAuthPoller: %s', ex, exc_info=True)
            finally:
                self._active = False
                self._running = False

                if self._abort:
                    self.emit('aborted')

        # Construct poller thread
        self._thread = Thread(
            target=wrapper,
            name='%s:%s' % (DeviceOAuthPoller.__module__, DeviceOAuthPoller.__name__)
        )

        # Set `daemon` state
        if daemon is not None:
            self._thread.daemon = daemon

        # Start polling
        self._abort = False
        self._active = True
        self._running = True
        self._thread.start()

    def stop(self):
        # Flag as thread abort
        self._abort = True

        # Flag thread to stop
        self._running = False

    def _process(self):
        while self._running:
            # Ensure code hasn't expired yet
            if self.has_expired():
                self.emit('expired')
                break

            # Trigger "poll" event, check if we should continue polling
            if not self._should_poll():
                self.stop()
                break

            # Poll for token
            response = self.client['oauth/device'].token(self.device_code, parse=False)

            if response:
                # Parse authorization
                data = self.get_data(response)

                if 'created_at' not in data:
                    data['created_at'] = calendar.timegm(datetime.utcnow().utctimetuple())

                # Authentication complete
                self.emit('authenticated', data)
                break

            # Sleep for defined interval
            time.sleep(self.interval)

    def _poll_callback(self, state=True):
        self._abort = not state

    def _should_poll(self):
        # Assume poller should abort if `callback` isn't fired
        self._abort = True

        # Trigger "poll" event
        self.emit('poll', self._poll_callback)

        # Continue polling if `abort` flag isn't set
        return not self._abort