CastagnaIT/plugin.video.netflix

View on GitHub
resources/lib/utils/website.py

Summary

Maintainability
A
35 mins
Test Coverage
# -*- coding: utf-8 -*-
"""
    Copyright (C) 2017 Sebastian Golasch (plugin.video.netflix)
    Copyright (C) 2018 Caphm (original implementation module)
    Parsing of Netflix Website

    SPDX-License-Identifier: MIT
    See LICENSES/MIT.md for more information.
"""
import json
from re import search, compile as recompile, DOTALL, sub

import xbmc

import resources.lib.common as common
from resources.lib.database.db_utils import TABLE_SESSION
from resources.lib.globals import G
from resources.lib.common.exceptions import (InvalidProfilesError, InvalidAuthURLError, MbrStatusError,
                                             WebsiteParsingError, LoginValidateError, MbrStatusAnonymousError,
                                             MbrStatusNeverMemberError, MbrStatusFormerMemberError, DBProfilesMissing)
from .api_paths import jgraph_get, jgraph_get_list, jgraph_get_path
from .esn import get_website_esn, set_website_esn
from .logging import LOG, measure_exec_time_decorator


PAGE_ITEMS_INFO = [
    'models/userInfo/data/name',
    'models/userInfo/data/guid',            # Main profile guid
    'models/userInfo/data/userGuid',        # Current profile guid
    'models/userInfo/data/countryOfSignup',
    'models/userInfo/data/membershipStatus',
    'models/userInfo/data/isTestAccount',
    'models/userInfo/data/deviceTypeId',
    'models/userInfo/data/isAdultVerified',
    'models/userInfo/data/isKids',
    'models/userInfo/data/pinEnabled',
    'models/serverDefs/data/BUILD_IDENTIFIER',
    'models/esnGeneratorModel/data/esn',
    'models/memberContext/data/geo/preferredLocale',
    'models/truths/data/isAdsPlan'
]

PAGE_ITEMS_API_URL = {
    'auth_url': 'models/userInfo/data/authURL',
    'api_endpoint_root_url': 'models/serverDefs/data/API_ROOT',
    'api_endpoint_url': 'models/services/data/memberapi',
    # 'api_endpoint_url': 'models/playerModel/data/config/ui/initParams/apiUrl',  # old endpoint address path
    'request_id': 'models/serverDefs/data/requestId',
    'asset_core': 'models/playerModel/data/config/core/assets/core',
    'ui_version': 'models/playerModel/data/config/ui/initParams/uiVersion',
    'browser_info_version': 'models/playerModel/data/config/core/initParams/browserInfo/version',
    'browser_info_os_name': 'models/playerModel/data/config/core/initParams/browserInfo/os/name',
    'browser_info_os_version': 'models/playerModel/data/config/core/initParams/browserInfo/os/version',
}

PAGE_ITEM_ERROR_CODE = 'models/flow/data/fields/errorCode/value'
PAGE_ITEM_ERROR_CODE_LIST = 'models\\i18nStrings\\data\\login/login'

JSON_REGEX = r'netflix\.{}\s*=\s*(.*?);\s*</script>'
AVATAR_SUBPATH = ['images', 'byWidth', '320']

PROFILE_DEBUG_INFO = ['isAccountOwner', 'isActive', 'isKids', 'maturityLevel', 'language']


@measure_exec_time_decorator(is_immediate=True)
def extract_session_data(content, validate=False, update_profiles=False):
    """
    Call all the parsers we need to extract all
    the session relevant data from the HTML page
    """
    LOG.debug('Extracting session data...')
    react_context = extract_json(content, 'reactContext')
    if validate:
        validate_login(react_context)

    user_data = extract_userdata(react_context)
    _check_membership_status(user_data.get('membershipStatus'))
    if user_data.get('isAdsPlan') is None:
        G.LOCAL_DB.delete_key('is_ads_plan', TABLE_SESSION)
    else:
        G.LOCAL_DB.set_value('is_ads_plan', user_data['isAdsPlan'], TABLE_SESSION)

    api_data = extract_api_data(react_context)
    # Note: Falcor cache does not exist if membershipStatus is not CURRENT_MEMBER
    falcor_cache = extract_json(content, 'falcorCache')
    if update_profiles:
        parse_profiles(falcor_cache)
    # Save only some info of the current profile from user data
    G.LOCAL_DB.set_value('build_identifier', user_data.get('BUILD_IDENTIFIER'), TABLE_SESSION)
    if not get_website_esn():
        set_website_esn(user_data['esn'])
    G.LOCAL_DB.set_value('locale_id', user_data.get('preferredLocale').get('id', 'en-US'))
    # Extract the client version from assets core
    result = search(r'-([0-9\.]+)\.js$', api_data.pop('asset_core'))
    if not result:
        LOG.error('It was not possible to extract the client version!')
        api_data['client_version'] = '6.0023.976.011'
    else:
        api_data['client_version'] = result.groups()[0]
    # Save api urls
    G.LOCAL_DB.set_values(api_data, TABLE_SESSION)
    return api_data


def _check_membership_status(status):
    if status == 'CURRENT_MEMBER':
        return
    if status == 'ANONYMOUS':
        # Possible known causes:
        # -Login password has been changed
        # -In the login request, 'Content-Type' specified is not compliant with data passed or no more supported
        # -Expired profiles cookies!? (not verified)
        # In these cases it is mandatory to login again
        raise MbrStatusAnonymousError('ANONYMOUS')
    if status == 'NEVER_MEMBER':
        # The account has not been confirmed
        raise MbrStatusNeverMemberError('NEVER_MEMBER')
    if status == 'FORMER_MEMBER':
        # The account has not been reactivated
        raise MbrStatusFormerMemberError('FORMER_MEMBER')
    LOG.error('Can not login, the Membership status is {}', status)
    raise MbrStatusError(status)


@measure_exec_time_decorator(is_immediate=True)
def parse_profiles(data):
    """Parse profile information from Netflix response"""
    profiles_list = jgraph_get_list('profilesList', data)
    try:
        if not profiles_list:
            raise InvalidProfilesError('It has not been possible to obtain the list of profiles.')
        sort_order = 0
        current_guids = []
        for index, profile_data in profiles_list.items():  # pylint: disable=unused-variable
            summary = jgraph_get('summary', profile_data)
            guid = summary['guid']
            current_guids.append(guid)
            LOG.debug('Parsing profile {}', summary['guid'])
            avatar_url = _get_avatar(profile_data, data, guid)
            is_active = summary.pop('isActive')
            G.LOCAL_DB.set_profile(guid, is_active, sort_order)
            G.SHARED_DB.set_profile(guid, sort_order)
            # Add profile language description translated from locale
            summary['language_desc'] = xbmc.convertLanguage(summary['language'][:2], xbmc.ENGLISH_NAME)
            if LOG.is_enabled:
                for key, value in summary.items():
                    if key in PROFILE_DEBUG_INFO:
                        LOG.debug('Profile info: {0: <15} = {1}', key, value)
            # Translate the profile name, is coded as HTML
            summary['profileName'] = parse_html(summary['profileName'])
            summary['avatar'] = avatar_url
            G.LOCAL_DB.insert_profile_configs(summary, guid)
            sort_order += 1
        _delete_non_existing_profiles(current_guids)
    except Exception as exc:  # pylint: disable=broad-except
        import traceback
        LOG.error(traceback.format_exc())
        LOG.error('Profile list data: {}', profiles_list)
        raise InvalidProfilesError from exc


def _delete_non_existing_profiles(current_guids):
    list_guid = G.LOCAL_DB.get_guid_profiles()
    for guid in list_guid:
        if guid not in current_guids:
            LOG.debug('Deleting non-existing profile {}', guid)
            G.LOCAL_DB.delete_profile(guid)
            G.SHARED_DB.delete_profile(guid)
    # Ensures at least one active profile
    try:
        G.LOCAL_DB.get_active_profile_guid()
    except DBProfilesMissing:
        G.LOCAL_DB.switch_active_profile(G.LOCAL_DB.get_guid_owner_profile())
    # Verify if auto select profile exists
    autoselect_profile_guid = G.LOCAL_DB.get_value('autoselect_profile_guid', '')
    if autoselect_profile_guid and autoselect_profile_guid not in current_guids:
        LOG.warn('Auto-selection disabled, the GUID {} not more exists', autoselect_profile_guid)
        G.LOCAL_DB.set_value('autoselect_profile_guid', '')
    # Verify if profile for library auto-sync exists
    sync_mylist_profile_guid = G.SHARED_DB.get_value('sync_mylist_profile_guid')
    if sync_mylist_profile_guid and sync_mylist_profile_guid not in current_guids:
        LOG.warn('Library auto-sync disabled, the GUID {} not more exists', sync_mylist_profile_guid)
        with G.SETTINGS_MONITOR.ignore_events(1):
            G.ADDON.setSettingBool('lib_sync_mylist', False)
        G.SHARED_DB.delete_key('sync_mylist_profile_guid')
    # Verify if profile for library playback exists
    library_playback_profile_guid = G.LOCAL_DB.get_value('library_playback_profile_guid')
    if library_playback_profile_guid and library_playback_profile_guid not in current_guids:
        LOG.warn('Profile set for playback from library cleared, the GUID {} not more exists',
                 library_playback_profile_guid)
        G.LOCAL_DB.set_value('library_playback_profile_guid', '')


def _get_avatar(profile_data, data, guid):
    try:
        avatar = jgraph_get('avatar', profile_data, data)
        return jgraph_get_path(AVATAR_SUBPATH, avatar)
    except (KeyError, TypeError):
        LOG.warn('Cannot find avatar for profile {}', guid)
        LOG.debug('Profile list data: {}', profile_data)
        return G.ICON


@measure_exec_time_decorator(is_immediate=True)
def extract_userdata(react_context, debug_log=True):
    """Extract essential userdata from the reactContext of the webpage"""
    LOG.debug('Extracting userdata from webpage')
    user_data = {}

    for path in (path.split('/') for path in PAGE_ITEMS_INFO):
        try:
            prop_name = path[-1]
            prop_value = common.get_path(path, react_context)
            user_data[prop_name] = prop_value
            if debug_log and 'esn' not in path:
                LOG.debug('Extracted: {0: <19} = {1}', prop_name, prop_value)
        except (AttributeError, KeyError):
            LOG.error('Could not extract {}', path)
    return user_data


def extract_api_data(react_context, debug_log=True):
    """Extract api urls from the reactContext of the webpage"""
    LOG.debug('Extracting api urls from webpage')
    api_data = {}
    for key, value in list(PAGE_ITEMS_API_URL.items()):
        path = value.split('/')
        try:
            extracted_value = common.get_path(path, react_context)
            if key == 'api_endpoint_url' and isinstance(extracted_value, dict):
                addr = f'{extracted_value["protocol"]}://{extracted_value["hostname"]}{extracted_value["path"][0]}'
                extracted_value = addr
            api_data.update({key: extracted_value})
            if debug_log:
                LOG.debug('Extracted: {0: <34} = {1}', value, extracted_value)
        except (AttributeError, KeyError):
            LOG.warn('Could not extract {}', path)
    return assert_valid_auth_url(api_data)


def assert_valid_auth_url(user_data):
    """Raise an exception if user_data does not contain a valid authURL"""
    if len(user_data.get('auth_url', '')) != 42:
        raise InvalidAuthURLError('authURL is not valid')
    return user_data


def validate_login(react_context):
    path_code_list = PAGE_ITEM_ERROR_CODE_LIST.split('\\')
    path_error_code = PAGE_ITEM_ERROR_CODE.split('/')
    if common.check_path_exists(path_error_code, react_context):
        # If the path exists, a login error occurs
        try:
            error_code_list = common.get_path(path_code_list, react_context)
            error_code = common.get_path(path_error_code, react_context)
            LOG.error('Login not valid, error code {}', error_code)
            error_description = common.get_local_string(30102) + error_code
            if f'login_{error_code}' in error_code_list:
                error_description = error_code_list[f'login_{error_code}']
            elif f'email_{error_code}' in error_code_list:
                error_description = error_code_list[f'email_{error_code}']
            elif error_code in error_code_list:
                error_description = error_code_list[error_code]
            raise LoginValidateError(common.remove_html_tags(error_description))
        except (AttributeError, KeyError) as exc:
            import traceback
            LOG.error(traceback.format_exc())
            error_msg = (
                'Something is wrong in PAGE_ITEM_ERROR_CODE or PAGE_ITEM_ERROR_CODE_LIST paths.'
                'react_context data may have changed.')
            LOG.error(error_msg)
            raise WebsiteParsingError(error_msg) from exc


@measure_exec_time_decorator(is_immediate=True)
def extract_json(content, name):
    """Extract json from netflix content page"""
    LOG.debug('Extracting {} JSON', name)
    json_str = None
    try:
        json_array = recompile(JSON_REGEX.format(name), DOTALL).findall(content.decode('utf-8'))
        json_str = json_array[0]
        json_str_replace = json_str.replace(r'\"', r'\\"')  # Escape \"
        json_str_replace = json_str_replace.replace(r'\s', r'\\s')  # Escape whitespace
        json_str_replace = json_str_replace.replace(r'\r', r'\\r')  # Escape return
        json_str_replace = json_str_replace.replace(r'\n', r'\\n')  # Escape line feed
        json_str_replace = json_str_replace.replace(r'\t', r'\\t')  # Escape tab
        json_str_replace = json_str_replace.replace(r'\p', r'/p')  # Unicode property not supported, we change slash to avoid unescape it
        json_str_replace = json_str_replace.encode().decode('unicode_escape')  # Decode the string as unicode
        json_str_replace = sub(r'\\(?!["])', r'\\\\', json_str_replace)  # Escape backslash (only when is not followed by double quotation marks \")
        return json.loads(json_str_replace)
    except Exception as exc:  # pylint: disable=broad-except
        if json_str:
            # For testing purposes remember to add raw prefix to the string to test: json_str = r'string to test'
            LOG.error('JSON string trying to load: {}', json_str)
        import traceback
        LOG.error(traceback.format_exc())
        raise WebsiteParsingError(f'Unable to extract {name}') from exc


def extract_parental_control_data(content, current_maturity):
    """Extract the content of parental control data"""
    try:
        react_context = extract_json(content, 'reactContext')
        # Extract country max maturity value
        max_maturity = common.get_path(['models', 'parentalControls', 'data', 'accountProps', 'countryMaxMaturity'],
                                       react_context)
        # Extract rating levels
        rc_rating_levels = common.get_path(['models', 'memberContext', 'data', 'userInfo', 'ratingLevels'],
                                           react_context)
        rating_levels = []
        levels_count = len(rc_rating_levels) - 1
        current_level_index = levels_count
        for index, rating_level in enumerate(rc_rating_levels):
            if index == levels_count:
                # Last level must use the country max maturity level
                level_value = max_maturity
            else:
                level_value = int(rating_level['level'])
            rating_levels.append({'level': index,
                                  'value': level_value,
                                  'label': rating_level['labels'][0]['label'],
                                  'description': parse_html(rating_level['labels'][0]['description'])})
            if level_value == current_maturity:
                current_level_index = index
        if not rating_levels:
            raise WebsiteParsingError('Unable to get maturity rating levels')
        return {'rating_levels': rating_levels, 'current_level_index': current_level_index}
    except KeyError as exc:
        raise WebsiteParsingError('Unable to get path in to reactContext data') from exc


def parse_html(html_value):
    """Parse HTML entities"""
    try:  # Python >= 3.4
        from html import unescape
        return unescape(html_value)
    except ImportError:  # Python <= 3.3
        from html.parser import HTMLParser
        return HTMLParser().unescape(html_value)  # pylint: disable=no-member