resources/lib/utils/cookies.py
# -*- coding: utf-8 -*-
"""
Copyright (C) 2017 Sebastian Golasch (plugin.video.netflix)
Copyright (C) 2018 Caphm (original implementation module)
Persistent cookie management
SPDX-License-Identifier: MIT
See LICENSES/MIT.md for more information.
"""
import pickle
from http.cookiejar import CookieJar
from threading import RLock
from time import time
import xbmcvfs
from resources.lib.common.exceptions import MissingCookiesError
from resources.lib.globals import G
from resources.lib.utils.logging import LOG
class PickleableCookieJar(CookieJar):
"""A pickleable CookieJar class"""
# This code has been adapted from RequestsCookieJar of "Requests" module
@classmethod
def cast(cls, cookie_jar: CookieJar):
"""Make a kind of cast to convert the class from CookieJar to PickleableCookieJar"""
assert isinstance(cookie_jar, CookieJar)
cookie_jar.__class__ = cls
assert isinstance(cookie_jar, PickleableCookieJar)
return cookie_jar
def __getstate__(self):
"""Unlike a normal CookieJar, this class is pickleable."""
state = self.__dict__.copy()
# remove the unpickleable RLock object
state.pop('_cookies_lock')
return state
def __setstate__(self, state):
"""Unlike a normal CookieJar, this class is pickleable."""
self.__dict__.update(state)
if '_cookies_lock' not in self.__dict__:
self._cookies_lock = RLock()
def __getitem__(self, name):
"""Dict-like __getitem__() for compatibility with client code. Throws
exception if there are more than one cookie with name. In that case,
use the more explicit get() method instead.
.. warning:: operation is O(n), not O(1).
"""
return self._find_no_duplicates(name)
def _find_no_duplicates(self, name, domain=None, path=None):
"""Both ``__get_item__`` and ``get`` call this function: it's never
used elsewhere in Requests.
:param name: a string containing name of cookie
:param domain: (optional) string containing domain of cookie
:param path: (optional) string containing path of cookie
:raises KeyError: if cookie is not found
:raises CookieConflictError: if there are multiple cookies
that match name and optionally domain and path
:return: cookie.value
"""
to_return = None
for cookie in iter(self):
if cookie.name == name:
if domain is None or cookie.domain == domain:
if path is None or cookie.path == path:
if to_return is not None:
# if there are multiple cookies that meet passed in criteria
raise Exception(f"There are multiple cookies with name, {name!r}") # pylint: disable=broad-exception-raised
# we will eventually return this as long as no cookie conflict
to_return = cookie.value
if to_return:
return to_return
raise KeyError(f"name={name!r}, domain={domain!r}, path={path!r}")
def save(cookie_jar, log_output=True):
"""Save a cookie jar to file and in-memory storage"""
if log_output:
log_cookie(cookie_jar)
cookie_file = xbmcvfs.File(cookie_file_path(), 'wb')
try:
# For compatibility, we convert CookieJar object to our PickleableCookieJar
# to keep possibility to change in future Requests module with another one
cookie_file.write(bytearray(pickle.dumps(PickleableCookieJar.cast(cookie_jar))))
except Exception as exc: # pylint: disable=broad-except
LOG.error('Failed to save cookies to file: {exc}', exc=exc)
finally:
cookie_file.close()
def delete():
"""Delete cookies for an account from the disk"""
try:
xbmcvfs.delete(cookie_file_path())
except Exception as exc: # pylint: disable=broad-except
LOG.error('Failed to delete cookies on disk: {exc}', exc=exc)
def load():
"""Load cookies for a given account and check them for validity"""
file_path = cookie_file_path()
if not xbmcvfs.exists(file_path):
LOG.debug('Cookies file does not exist')
raise MissingCookiesError
cookie_file = xbmcvfs.File(file_path, 'rb')
try:
cookie_jar = pickle.loads(cookie_file.readBytes())
# Clear flwssn cookie if present, as it is trouble with early expiration
# Commented: this seem not produce any change
# for cookie in cookie_jar:
# if cookie.name == 'flwssn':
# cookie_jar.clear(cookie.domain, cookie.path, cookie.name)
LOG.debug('Cookies loaded from file')
log_cookie(cookie_jar)
return cookie_jar
except Exception as exc: # pylint: disable=broad-except
import traceback
LOG.error('Failed to load cookies from file: {exc}', exc=exc)
LOG.error(traceback.format_exc())
raise MissingCookiesError from exc
finally:
cookie_file.close()
def log_cookie(cookie_jar):
"""Print cookie info to the log"""
if not LOG.is_enabled:
return
debug_output = 'Current cookies:\n'
for cookie in cookie_jar:
remaining_ttl = int((cookie.expires or 0) - time()) if cookie.expires else None
debug_output += f'{cookie.name} (expires ts {cookie.expires} - remaining TTL {remaining_ttl} sec)\n'
LOG.debug(debug_output)
def cookie_file_path():
"""Return the file path to store cookies"""
return xbmcvfs.translatePath(G.COOKIES_PATH)
def convert_chrome_cookie(cookie):
"""Convert a cookie from Chrome to a CookieJar format type"""
return {
'name': cookie['name'],
'value': cookie['value'],
'domain': cookie['domain'],
'path': cookie['path'],
'secure': cookie['secure'],
'expires': int(cookie['expires']) if cookie['expires'] != -1 else None,
'rest': {'HttpOnly': True if cookie['httpOnly'] else None}
}