CastagnaIT/plugin.video.netflix

View on GitHub
resources/lib/services/nfsession/msl/msl_requests.py

Summary

Maintainability
A
0 mins
Test Coverage
# -*- coding: utf-8 -*-
"""
    Copyright (C) 2017 Sebastian Golasch (plugin.video.netflix)
    Copyright (C) 2018 Caphm (original implementation module)
    Copyright (C) 2020 Stefano Gottardo
    MSL requests

    SPDX-License-Identifier: MIT
    See LICENSES/MIT.md for more information.
"""
import base64
import json
import time
import zlib

import requests.exceptions as req_exceptions

import resources.lib.common as common
from resources.lib.common import get_system_platform, is_device_l1_enabled
from resources.lib.common.exceptions import MSLError
from resources.lib.globals import G

from resources.lib.services.nfsession.msl.msl_request_builder import MSLRequestBuilder
from resources.lib.services.nfsession.msl.msl_utils import (ENDPOINTS, create_req_params, generate_logblobs_params,
                                                            MSL_AUTH_NETFLIXID, MSL_AUTH_USER_ID_TOKEN,
                                                            MSL_AUTH_EMAIL_PASSWORD)
from resources.lib.utils.esn import get_esn
from resources.lib.utils.logging import LOG, measure_exec_time_decorator


class MSLRequests(MSLRequestBuilder):
    """Provides methods to make MSL requests"""

    HTTP_HEADERS = {
        'User-Agent': common.get_user_agent(),
        'Content-Type': 'text/plain',
        'Accept': '*/*',
        'Host': 'www.netflix.com'
    }

    def __init__(self, msl_data, nfsession):
        super().__init__(nfsession)
        self._load_msl_data(msl_data)

    def _load_msl_data(self, msl_data):
        try:
            self.crypto.load_msl_data(msl_data)
            self.crypto.load_crypto_session(msl_data)
        except Exception:  # pylint: disable=broad-except
            import traceback
            LOG.error(traceback.format_exc())

    def perform_key_handshake(self):
        """Perform a key handshake and initialize crypto keys"""
        esn = get_esn()
        if not esn:
            LOG.error('Cannot perform key handshake, missing ESN')
            return False
        LOG.info('Performing key handshake with ESN: {}', common.censure(esn) if len(esn) > 50 else esn)
        try:
            header, _ = _process_json_response(self._post(ENDPOINTS['manifest'], self.handshake_request(esn)))
            header_data = self.decrypt_header_data(header['headerdata'], False)
            self.crypto.parse_key_response(header_data, esn, True)
        except MSLError as exc:
            if exc.err_number == 207006 and common.get_system_platform() == 'android':
                msg = ('Request failed validation during key exchange\r\n'
                       'To try to solve this problem read the Wiki FAQ on add-on GitHub.')
                raise MSLError(msg) from exc
            raise
        # Delete all the user id tokens (are correlated to the previous mastertoken)
        self.crypto.clear_user_id_tokens()
        LOG.debug('Key handshake successful')
        return True

    def _get_owner_user_id_token(self):
        """A way to get the user token id of owner profile"""
        # In order to get a user id token of another (non-owner) profile you must make a request with SWITCH_PROFILE
        # authentication scheme (a custom authentication for netflix), and this request can be directly included
        # in the MSL manifest request.
        # But in order to execute this switch profile, you need to have the user id token of the main (owner) profile.
        # The only way (found to now) to get it immediately, is send a logblob event request, and save the
        # user id token obtained in the response.
        LOG.debug('Requesting logblog')
        endpoint_url = ENDPOINTS['logblobs'] + create_req_params('bind')
        response = self.chunked_request(endpoint_url,
                                        self.build_request_data('/logblob', generate_logblobs_params()),
                                        get_esn(),
                                        msl_auth_scheme=MSL_AUTH_EMAIL_PASSWORD)
        LOG.debug('Response of logblob request: {}', response)

    def _mastertoken_checks(self):
        """Perform checks to the MasterToken and executes a new key handshake when necessary"""
        is_handshake_required = False
        if self.crypto.mastertoken:
            if self.crypto.is_current_mastertoken_expired():
                LOG.debug('Stored MSL MasterToken is expired, a new key handshake will be performed')
                is_handshake_required = True
            else:
                # Check if the current ESN is same of ESN bound to MasterToken
                if get_esn() != self.crypto.bound_esn:
                    LOG.debug('Stored MSL MasterToken is bound to a different ESN, '
                              'a new key handshake will be performed')
                    is_handshake_required = True
        else:
            LOG.debug('MSL MasterToken is not available, a new key handshake will be performed')
            is_handshake_required = True
        if is_handshake_required:
            self.perform_key_handshake()

    def _get_user_auth_data(self):
        """
        Get the user id token for the current profile GUID and return the auth data.

        :returns: The auth data, may override the current auth scheme.
        """
        # Warning: the user id token contains also contains the identity of the netflix profile
        # therefore it is necessary to use the right user id token for the request
        current_profile_guid = G.LOCAL_DB.get_active_profile_guid()
        owner_profile_guid = G.LOCAL_DB.get_guid_owner_profile()
        use_switch_profile = False

        # Get the UID token if it exists and is not expired, so we get 'None' in all other cases
        user_id_token = self.crypto.get_user_id_token(current_profile_guid)

        if not user_id_token:
            if current_profile_guid == owner_profile_guid:
                # The request need to be executed from the owner (main) profile,
                # but the current token does not exist, or it is expired
                # to avoid perform an additional request (_get_owner_user_id_token) we use MSL_AUTH_EMAIL_PASSWORD auth
                # that will provide us the owner token id in the response
                return {'auth_scheme': MSL_AUTH_EMAIL_PASSWORD}
            # The request need to be executed from a non-owner (main) profile,
            # but the current token does not exist, or it is expired
            # then we need to enable MSL profile switching
            use_switch_profile = True
            # Try to get the UID of owner (main) profile
            user_id_token = self.crypto.get_user_id_token(owner_profile_guid)
            if not user_id_token:
                # The token id does not exist, or it is expired, then we request it
                self._get_owner_user_id_token()
                user_id_token = self.crypto.get_user_id_token(owner_profile_guid)
                if user_id_token is None:
                    raise MSLError('Cannot get user token id of owner / main profile.')
        return {'use_switch_profile': use_switch_profile, 'user_id_token': user_id_token}

    @measure_exec_time_decorator(is_immediate=True)
    def chunked_request(self, endpoint, request_data, esn, msl_auth_scheme=None):
        """
        Do a POST request and process the chunked response

        :param endpoint: The endpoint composed by a key of ENDPOINTS dict and create_req_params method (msl_utils.py)
        :param request_data: The request data (need to be wrapped by using build_request_data method)
        :param esn: The current ESN
        :param msl_auth_scheme: Optionals; Force use a type of MSL auth scheme
        :returns: The response data
        """
        self._mastertoken_checks()

        # Define the default auth scheme
        if msl_auth_scheme:
            auth_scheme = msl_auth_scheme
        elif get_system_platform() == 'android':
            # On android, we have a different ESN from the login then use NETFLIXID auth may cause MSL errors,
            # (usually on L3 devices) because the identity do not match, so we need to use User id token auth
            # to switch MSL profile with current ESN when needed
            if is_device_l1_enabled():
                # 26/06/2023 replaced auth_scheme with MSL_AUTH_NETFLIXID,
                # this is a workaround since there are new website changes in to the DRM session challenge used to request
                # license that cause different problems, see discussion starting from comment in the following link:
                # https://github.com/CastagnaIT/plugin.video.netflix/issues/1585#issuecomment-1585557883
                auth_scheme = MSL_AUTH_NETFLIXID
            else:
                # 26/06/2023 despite the L1 workaround above, L3 devices can still use tokens to obtains low res videos,
                # but just make sure that 1080p workaround (esn auto generation, regen_esn) is disabled or will cause
                # invalid credentials errors
                auth_scheme = MSL_AUTH_USER_ID_TOKEN
        else:
            auth_scheme = MSL_AUTH_NETFLIXID

        # Define the user authentication scheme to be used on the MSL HTTP requests
        # https://github.com/Netflix/msl/wiki/User-Authentication-%28Configuration%29
        auth_data = {'auth_scheme': auth_scheme}

        if auth_scheme == MSL_AUTH_USER_ID_TOKEN:
            auth_data.update(self._get_user_auth_data())

        LOG.debug('Chunked request will be executed with auth data: {}', auth_data)

        chunked_response = self._process_chunked_response(
            self._post(endpoint, self.msl_request(request_data, esn, auth_data)),
            save_uid_token_to_owner=auth_data.get('user_id_token') is None)
        return chunked_response['result']

    def _post(self, endpoint, request_data):
        """Execute a post request"""
        is_attempts_enabled = 'reqAttempt=' in endpoint
        retry = 1
        while True:
            try:
                if is_attempts_enabled:
                    _endpoint = endpoint.replace('reqAttempt=', f'reqAttempt={retry}')
                else:
                    _endpoint = endpoint
                LOG.debug('Executing POST request to {}', _endpoint)
                start = time.perf_counter()
                response = self.nfsession.session.post(url=_endpoint,
                                                       data=request_data,
                                                       headers=self.HTTP_HEADERS,
                                                       timeout=4)
                LOG.debug('Request took {}s', time.perf_counter() - start)
                LOG.debug('Request returned response with status {}', response.status_code)
                break
            except req_exceptions.ConnectionError as exc:
                LOG.error('HTTP request error: {}', exc)
                if retry == 3:
                    raise
                retry += 1
                LOG.warn('Another attempt will be performed ({})', retry)
        response.raise_for_status()
        return response.text

    @measure_exec_time_decorator(is_immediate=True)
    def _process_chunked_response(self, response, save_uid_token_to_owner=False):
        """Parse and decrypt an encrypted chunked response. Raise an error if the response is plaintext json"""
        LOG.debug('Received encrypted chunked response')
        header, payloads = _process_json_response(response)

        # TODO: sending for the renewal request is not yet implemented
        # if self.crypto.get_current_mastertoken_validity()['is_renewable']:
        #     # Check if mastertoken is renewed
        #     self.request_builder.crypto.compare_mastertoken(header['mastertoken'])

        header_data = self.decrypt_header_data(header['headerdata'])
        if 'useridtoken' in header_data:
            # Save the user id token for the future msl requests
            profile_guid = G.LOCAL_DB.get_guid_owner_profile() if save_uid_token_to_owner else\
                G.LOCAL_DB.get_active_profile_guid()
            self.crypto.save_user_id_token(profile_guid, header_data['useridtoken'])
        # if 'keyresponsedata' in header_data:
        #     LOG.debug('Found key handshake in response data')
        #     # Update current mastertoken
        #     self.request_builder.crypto.parse_key_response(header_data, True)
        decrypted_response = _decrypt_chunks(payloads, self.crypto)
        return _raise_if_error(decrypted_response)


def _process_json_response(response):
    """Processes the response data by returning header and payloads in JSON format and check for possible MSL error"""
    comma_separated_response = response.replace('}{', '},{')
    try:
        data = json.loads(f'[{comma_separated_response}]')
        # On 'data' list the first dict is always the header or the error
        payloads = [msg_part for msg_part in data if 'payload' in msg_part]
        return _raise_if_error(data[0]), payloads
    except ValueError as exc:
        LOG.error('Unable to load json data {}', response)
        raise MSLError('Unable to load json data') from exc


def _raise_if_error(decoded_response):
    raise_error = False
    # Catch a manifest/chunk error
    if any(key in decoded_response for key in ['error', 'errordata']):
        raise_error = True
    # Catch a license error
    if 'result' in decoded_response and isinstance(decoded_response.get('result'), list):
        if 'error' in decoded_response['result'][0]:
            raise_error = True
    if raise_error:
        LOG.error('Full MSL error information:')
        LOG.error(json.dumps(decoded_response))
        err_message, err_number = _get_error_details(decoded_response)
        raise MSLError(err_message, err_number)
    return decoded_response


def _get_error_details(decoded_response):
    err_message = 'Unhandled error check log.'
    err_number = None
    # Catch a chunk error
    if 'errordata' in decoded_response:
        err_data = json.loads(base64.standard_b64decode(decoded_response['errordata']))
        err_message = err_data['errormsg']
        err_number = err_data['internalcode']
    # Catch a manifest error
    elif 'error' in decoded_response:
        if decoded_response['error'].get('errorDisplayMessage'):
            err_message = decoded_response['error']['errorDisplayMessage']
            err_number = decoded_response['error'].get('bladeRunnerCode')
    # Catch a license error
    elif 'result' in decoded_response and isinstance(decoded_response.get('result'), list):
        if 'error' in decoded_response['result'][0]:
            if decoded_response['result'][0]['error'].get('errorDisplayMessage'):
                err_message = decoded_response['result'][0]['error']['errorDisplayMessage']
                err_number = decoded_response['result'][0]['error'].get('bladeRunnerCode')
    return err_message, err_number


@measure_exec_time_decorator(is_immediate=True)
def _decrypt_chunks(chunks, crypto):
    decrypted_payload = ''
    for chunk in chunks:
        payload = chunk.get('payload')
        decoded_payload = base64.standard_b64decode(payload)
        encryption_envelope = json.loads(decoded_payload)
        # Decrypt the text
        plaintext = crypto.decrypt(
            base64.standard_b64decode(encryption_envelope['iv']),
            base64.standard_b64decode(encryption_envelope.get('ciphertext')))
        # unpad the plaintext
        plaintext = json.loads(plaintext)
        data = plaintext.get('data')
        # uncompress data if compressed
        if plaintext.get('compressionalgo') == 'GZIP':
            decoded_data = base64.standard_b64decode(data)
            data = zlib.decompress(decoded_data, 16 + zlib.MAX_WBITS).decode('utf-8')
        else:
            data = base64.standard_b64decode(data).decode('utf-8')
        decrypted_payload += data
    return json.loads(decrypted_payload)