CastagnaIT/plugin.video.netflix

View on GitHub
resources/lib/services/nfsession/msl/msl_request_builder.py

Summary

Maintainability
A
0 mins
Test Coverage
# -*- coding: utf-8 -*-
"""
    Copyright (C) 2017 Sebastian Golasch (plugin.video.netflix)
    Copyright (C) 2018 Caphm (original implementation module)
    MSL request building

    SPDX-License-Identifier: MIT
    See LICENSES/MIT.md for more information.
"""
import json
import base64
import random
import time
from typing import TYPE_CHECKING

from resources.lib.common.exceptions import MSLError
from resources.lib.globals import G
import resources.lib.common as common
from resources.lib.services.nfsession.msl.msl_utils import (MSL_AUTH_USER_ID_TOKEN, MSL_AUTH_EMAIL_PASSWORD,
                                                            MSL_AUTH_NETFLIXID)
from resources.lib.utils.logging import measure_exec_time_decorator

if TYPE_CHECKING:  # This variable/imports are used only by the editor, so not at runtime
    from resources.lib.services.nfsession.nfsession_ops import NFSessionOperations


class MSLRequestBuilder:
    """Provides mechanisms to create MSL requests"""

    def __init__(self, nfsession):
        self.nfsession: 'NFSessionOperations' = nfsession
        self.current_message_id = None
        self.rndm = random.SystemRandom()
        # Set the Crypto handler
        if common.get_system_platform() == 'android':
            from .android_crypto import AndroidMSLCrypto as MSLCrypto
        else:
            from .default_crypto import DefaultMSLCrypto as MSLCrypto
        self.crypto = MSLCrypto()

    @staticmethod
    def build_request_data(url, params=None, echo=''):
        """Create a standard request data"""
        timestamp = int(time.time() * 100000000)
        request_data = {
            'version': 2,
            'url': url,
            'id': timestamp,
            'languages': [G.LOCAL_DB.get_profile_config('language')],
            'params': params,
            'echo': echo
        }
        return request_data

    @measure_exec_time_decorator(is_immediate=True)
    def msl_request(self, data, esn, auth_data):
        """Create an encrypted MSL request"""
        return (json.dumps(self._signed_header(esn, auth_data)) +
                json.dumps(self._encrypted_chunk(data, esn)))

    @measure_exec_time_decorator(is_immediate=True)
    def handshake_request(self, esn):
        """Create a key handshake request"""
        header = json.dumps({
            'entityauthdata': {
                'scheme': 'NONE',
                'authdata': {'identity': esn}},
            'headerdata':
                base64.standard_b64encode(
                    self._headerdata(auth_data={}, is_handshake=True).encode('utf-8')).decode('utf-8'),
            'signature': ''
        }, sort_keys=True)
        payload = json.dumps(self._encrypted_chunk(envelope_payload=False))
        return header + payload

    def _signed_header(self, esn, auth_data):
        encryption_envelope = self.crypto.encrypt(self._headerdata(auth_data=auth_data, esn=esn), esn)
        return {
            'headerdata': base64.standard_b64encode(
                encryption_envelope.encode('utf-8')).decode('utf-8'),
            'signature': self.crypto.sign(encryption_envelope),
            'mastertoken': self.crypto.mastertoken,
        }

    def _headerdata(self, auth_data, esn=None, compression=None, is_handshake=False):
        """
        Function that generates a MSL header dict
        :return: The header JSON data as string
        """
        self.current_message_id = self.rndm.randint(0, pow(2, 52))
        header_data = {
            'messageid': self.current_message_id,
            'renewable': True,
            'capabilities': {
                'languages': [G.LOCAL_DB.get_value('locale_id')],
                'compressionalgos': [compression] if compression else []  # GZIP, LZW, Empty
            }
        }

        if is_handshake:
            header_data['keyrequestdata'] = self.crypto.key_request_data()
        else:
            header_data['sender'] = esn
            self._add_auth_info(header_data, auth_data)

        return json.dumps(header_data)

    def _encrypted_chunk(self, data='', esn=None, envelope_payload=True):
        if data:
            data = base64.standard_b64encode(json.dumps(data).encode('utf-8')).decode('utf-8')
        payload = json.dumps({
            'messageid': self.current_message_id,
            'data': data,
            'sequencenumber': 1,
            'endofmsg': True
        })
        if envelope_payload:
            payload = self.crypto.encrypt(payload, esn)
        return {
            'payload': base64.standard_b64encode(payload.encode('utf-8')).decode('utf-8'),
            'signature': self.crypto.sign(payload) if envelope_payload else '',
        }

    def decrypt_header_data(self, data, enveloped=True):
        """Decrypt a message header"""
        header_data = json.loads(base64.standard_b64decode(data))
        if enveloped:
            init_vector = base64.standard_b64decode(header_data['iv'])
            cipher_text = base64.standard_b64decode(header_data['ciphertext'])
            return json.loads(self.crypto.decrypt(init_vector, cipher_text))
        return header_data

    def _add_auth_info(self, header_data, auth_data):
        """User authentication identifies the application user associated with a message"""
        if auth_data['auth_scheme'] == MSL_AUTH_USER_ID_TOKEN:
            # Authentication scheme with: User ID token
            # Make requests by using by default main netflix profile, since the user id token contains also the identity
            # of the netflix profile, to send data to the right profile (e.g. for continue watching) must be used
            # SWITCH_PROFILE scheme to switching MSL profile.
            if auth_data['use_switch_profile']:
                # The SWITCH_PROFILE is a custom Netflix MSL user authentication scheme, that is needed for switching
                # profile on MSL side, works only combined with user id token and can not be used with all endpoints
                # after use it you will get the user id token of the requested profile in the response.
                header_data['userauthdata'] = {
                    'scheme': 'SWITCH_PROFILE',
                    'authdata': {
                        'useridtoken': auth_data['user_id_token'],
                        'profileguid': G.LOCAL_DB.get_active_profile_guid()
                    }
                }
            else:
                header_data['useridtoken'] = auth_data['user_id_token']
        elif auth_data['auth_scheme'] == MSL_AUTH_EMAIL_PASSWORD:
            # Authentication scheme with: Email password
            # Make requests by using main netflix profile only (you cannot update continue watching to other profiles)
            credentials = common.get_credentials()
            header_data['userauthdata'] = {
                'scheme': 'EMAIL_PASSWORD',
                'authdata': {
                    'email': credentials['email'],
                    'password': credentials['password']
                }
            }
        elif auth_data['auth_scheme'] == MSL_AUTH_NETFLIXID:
            # Authentication scheme with: Netflix ID cookies
            # Make requests by using the same netflix profile used/set on nfsession (website)
            header_data['userauthdata'] = {
                'scheme': 'NETFLIXID',
                'authdata': {
                    'netflixid': self.nfsession.session.cookies['NetflixId'],
                    'securenetflixid': self.nfsession.session.cookies['SecureNetflixId']
                }
            }
        else:
            raise MSLError(f'Authentication scheme "{auth_data["auth_scheme"]}" is not supported.')