CastagnaIT/plugin.video.netflix

View on GitHub
resources/lib/utils/cookies.py

Summary

Maintainability
A
2 hrs
Test Coverage
# -*- coding: utf-8 -*-
"""
    Copyright (C) 2017 Sebastian Golasch (plugin.video.netflix)
    Copyright (C) 2018 Caphm (original implementation module)
    Persistent cookie management

    SPDX-License-Identifier: MIT
    See LICENSES/MIT.md for more information.
"""
import pickle
from http.cookiejar import CookieJar
from threading import RLock
from time import time

import xbmcvfs

from resources.lib.common.exceptions import MissingCookiesError
from resources.lib.globals import G
from resources.lib.utils.logging import LOG


class PickleableCookieJar(CookieJar):
    """A pickleable CookieJar class"""
    # This code has been adapted from RequestsCookieJar of "Requests" module
    @classmethod
    def cast(cls, cookie_jar: CookieJar):
        """Make a kind of cast to convert the class from CookieJar to PickleableCookieJar"""
        assert isinstance(cookie_jar, CookieJar)
        cookie_jar.__class__ = cls
        assert isinstance(cookie_jar, PickleableCookieJar)
        return cookie_jar

    def __getstate__(self):
        """Unlike a normal CookieJar, this class is pickleable."""
        state = self.__dict__.copy()
        # remove the unpickleable RLock object
        state.pop('_cookies_lock')
        return state

    def __setstate__(self, state):
        """Unlike a normal CookieJar, this class is pickleable."""
        self.__dict__.update(state)
        if '_cookies_lock' not in self.__dict__:
            self._cookies_lock = RLock()

    def __getitem__(self, name):
        """Dict-like __getitem__() for compatibility with client code. Throws
        exception if there are more than one cookie with name. In that case,
        use the more explicit get() method instead.

        .. warning:: operation is O(n), not O(1).
        """
        return self._find_no_duplicates(name)

    def _find_no_duplicates(self, name, domain=None, path=None):
        """Both ``__get_item__`` and ``get`` call this function: it's never
        used elsewhere in Requests.

        :param name: a string containing name of cookie
        :param domain: (optional) string containing domain of cookie
        :param path: (optional) string containing path of cookie
        :raises KeyError: if cookie is not found
        :raises CookieConflictError: if there are multiple cookies
            that match name and optionally domain and path
        :return: cookie.value
        """
        to_return = None
        for cookie in iter(self):
            if cookie.name == name:
                if domain is None or cookie.domain == domain:
                    if path is None or cookie.path == path:
                        if to_return is not None:
                            # if there are multiple cookies that meet passed in criteria
                            raise Exception(f"There are multiple cookies with name, {name!r}")  # pylint: disable=broad-exception-raised
                        # we will eventually return this as long as no cookie conflict
                        to_return = cookie.value
        if to_return:
            return to_return
        raise KeyError(f"name={name!r}, domain={domain!r}, path={path!r}")

def save(cookie_jar, log_output=True):
    """Save a cookie jar to file and in-memory storage"""
    if log_output:
        log_cookie(cookie_jar)
    cookie_file = xbmcvfs.File(cookie_file_path(), 'wb')
    try:
        # For compatibility, we convert CookieJar object to our PickleableCookieJar
        # to keep possibility to change in future Requests module with another one
        cookie_file.write(bytearray(pickle.dumps(PickleableCookieJar.cast(cookie_jar))))
    except Exception as exc:  # pylint: disable=broad-except
        LOG.error('Failed to save cookies to file: {exc}', exc=exc)
    finally:
        cookie_file.close()


def delete():
    """Delete cookies for an account from the disk"""
    try:
        xbmcvfs.delete(cookie_file_path())
    except Exception as exc:  # pylint: disable=broad-except
        LOG.error('Failed to delete cookies on disk: {exc}', exc=exc)


def load():
    """Load cookies for a given account and check them for validity"""
    file_path = cookie_file_path()
    if not xbmcvfs.exists(file_path):
        LOG.debug('Cookies file does not exist')
        raise MissingCookiesError
    cookie_file = xbmcvfs.File(file_path, 'rb')
    try:
        cookie_jar = pickle.loads(cookie_file.readBytes())
        # Clear flwssn cookie if present, as it is trouble with early expiration
        # Commented: this seem not produce any change
        # for cookie in cookie_jar:
        #    if cookie.name == 'flwssn':
        #        cookie_jar.clear(cookie.domain, cookie.path, cookie.name)
        LOG.debug('Cookies loaded from file')
        log_cookie(cookie_jar)
        return cookie_jar
    except Exception as exc:  # pylint: disable=broad-except
        import traceback
        LOG.error('Failed to load cookies from file: {exc}', exc=exc)
        LOG.error(traceback.format_exc())
        raise MissingCookiesError from exc
    finally:
        cookie_file.close()


def log_cookie(cookie_jar):
    """Print cookie info to the log"""
    if not LOG.is_enabled:
        return
    debug_output = 'Current cookies:\n'
    for cookie in cookie_jar:
        remaining_ttl = int((cookie.expires or 0) - time()) if cookie.expires else None
        debug_output += f'{cookie.name} (expires ts {cookie.expires} - remaining TTL {remaining_ttl} sec)\n'
    LOG.debug(debug_output)


def cookie_file_path():
    """Return the file path to store cookies"""
    return xbmcvfs.translatePath(G.COOKIES_PATH)


def convert_chrome_cookie(cookie):
    """Convert a cookie from Chrome to a CookieJar format type"""
    return {
        'name': cookie['name'],
        'value': cookie['value'],
        'domain': cookie['domain'],
        'path': cookie['path'],
        'secure': cookie['secure'],
        'expires': int(cookie['expires']) if cookie['expires'] != -1 else None,
        'rest': {'HttpOnly': True if cookie['httpOnly'] else None}
    }