CastagnaIT/plugin.video.netflix

View on GitHub
resources/lib/utils/api_requests.py

Summary

Maintainability
A
1 hr
Test Coverage
# -*- coding: utf-8 -*-
"""
    Copyright (C) 2017 Sebastian Golasch (plugin.video.netflix)
    Copyright (C) 2018 Caphm (original implementation module)
    Methods to execute requests to Netflix API

    SPDX-License-Identifier: MIT
    See LICENSES/MIT.md for more information.
"""
import resources.lib.common as common
import resources.lib.kodi.ui as ui
from resources.lib.common import cache_utils
from resources.lib.globals import G
from resources.lib.common.exceptions import (LoginError, MissingCredentialsError, CacheMiss, HttpError401, APIError,
                                             ErrorMsg)
from .api_paths import EPISODES_PARTIAL_PATHS, ART_PARTIAL_PATHS, build_paths
from .logging import LOG, measure_exec_time_decorator
from ..database.db_utils import TABLE_SESSION


def logout():
    """Logout of the current account"""
    common.make_call('logout')


def login(ask_credentials=True):
    """Perform a login"""
    try:
        is_success = False
        credentials = None
        is_login_with_credentials = False
        # The database 'isAdsPlan' value is stored after the login, so the first time we have None value
        # this avoids to show the notice message multiple times if more login attempts will be done over the time
        show_ads_notice = G.LOCAL_DB.get_value('is_ads_plan', None, table=TABLE_SESSION) is None
        if ask_credentials:
            is_login_with_credentials = ui.show_yesno_dialog('Login', common.get_local_string(30340),
                                                             yeslabel=common.get_local_string(30341),
                                                             nolabel=common.get_local_string(30342))
            # if is_login_with_credentials:
            #     credentials = {'credentials': ui.ask_credentials()}
        if is_login_with_credentials:
            # The login page is changed now part of HTML seem protected by reCaptcha
            # in the HTML page the reactContext data is added after the reCaptcha checks so at the moment
            # it is not accessible by requesting the login page through python script,
            # this prevents us to get the authURL code needed to perform the login request
            ui.show_ok_dialog('Login',
                              'Due to new website protections at moment the login with credentials is not available.')
            is_login_with_credentials = False

        if is_login_with_credentials:
            if common.make_call('login', credentials):
                is_success = True
        else:
            data = common.run_nf_authentication_key()
            if not data:
                raise MissingCredentialsError
            password = ui.ask_for_password()
            if password and common.make_call('login_auth_data', {'data': data, 'password': password}):
                is_success = True
        if is_success:
            if show_ads_notice and G.LOCAL_DB.get_value('is_ads_plan', False, table=TABLE_SESSION):
                from resources.lib.kodi.ui import show_ok_dialog
                show_ok_dialog('Netflix - ADS plan',
                               'ADS PLAN support is EXPERIMENTAL! You may experience of '
                               'malfunctions of add-on features (e.g. language selection).\n'
                               'ADS will be displayed at the beginning of the videos to allow the add-on '
                               'to work properly. Press OK to agree the terms.')
            return True
    except MissingCredentialsError:
        # Aborted from user or leave an empty field
        ui.show_notification(common.get_local_string(30112))
        raise
    except LoginError as exc:
        # Login not valid
        ui.show_ok_dialog(common.get_local_string(30008), str(exc))
    return False


@measure_exec_time_decorator()
def get_video_raw_data(videoids, custom_partial_path=None):  # Do not apply cache to this method
    """Retrieve raw data for specified video id's"""
    video_ids = [int(videoid.value) for videoid in videoids]
    LOG.debug('Requesting video raw data for {}', video_ids)
    if not custom_partial_path:
        paths = build_paths(['videos', video_ids], EPISODES_PARTIAL_PATHS)
        if videoids[0].mediatype == common.VideoId.EPISODE:
            paths.extend(build_paths(['videos', int(videoids[0].tvshowid)], ART_PARTIAL_PATHS + [['title']]))
    else:
        paths = build_paths(['videos', video_ids], custom_partial_path)
    return common.make_call('path_request', paths)


@measure_exec_time_decorator()
def rate(videoid, rating):
    """Rate a video on Netflix"""
    LOG.debug('Rating {} as {}', videoid.value, rating)
    # In opposition to Kodi, Netflix uses a rating from 0 to in 0.5 steps
    rating = min(10, max(0, rating)) / 2
    common.make_call(
        'post_safe',
        {'endpoint': 'set_video_rating',
         'data': {
             'titleId': int(videoid.value),
             'rating': rating}})
    ui.show_notification(common.get_local_string(30127).format(rating * 2))


@measure_exec_time_decorator()
def rate_thumb(videoid, rating, track_id_jaw):
    """Rate a video on Netflix"""
    LOG.debug('Thumb rating {} as {}', videoid.value, rating)
    event_uuid = common.get_random_uuid()
    common.make_call(
        'post_safe',
        {'endpoint': 'set_thumb_rating',
         'data': {
             'eventUuid': event_uuid,
             'titleId': int(videoid.value),
             'trackId': track_id_jaw,
             'rating': rating,
         }})
    ui.show_notification(common.get_local_string(30045).split('|')[rating])


def update_remindme(operation, videoid, trackid):
    """Call API to add / remove "Remind Me" to not available videos"""
    if trackid == 'None':
        raise ErrorMsg('Unable update remind me, trackid not found.')
    response = common.make_call(
        'post_safe',
        {'endpoint': 'playlistop',
         'data': {
             'lolomoId': 'unknown',
             'operation': operation,
             'videoId': int(videoid.value),
             'trackId': int(trackid)
         }})
    LOG.debug('update_remindme response: {}', response)
    # 05/10/2022: The remove action by using this new callpath not works
    # op = 'addToRemindMeList' if operation == 'add' else 'removeToRemindMeList'
    # call_args = {
    #     'callpaths': [['videos', int(videoid.value), op]],
    #     'params': [str(trackid)],
    #     'path': ['videos', int(videoid.value), 'inRemindMeList']
    # }
    # response = common.make_call('callpath_request', call_args)
    # if response['videos'][videoid.value]['inRemindMeList']['value'] != (operation == 'add'):
    #     LOG.debug('update_remindme response: {}', response)
    #     raise Exception('Unable update remind me, an error occurred in the request.')


@measure_exec_time_decorator()
def update_my_list(videoid, operation, params):
    """Call API to add / remove videos to my list"""
    if params['trackid'] == 'None':
        raise ErrorMsg('Unable update my list, trackid not found.')
    LOG.debug('My List: {} {}', operation, videoid)
    response = common.make_call(
        'post_safe',
        {'endpoint': 'playlistop',
         'data': {
             'lolomoId': 'unknown',
             'operation': operation,
             'videoId': int(videoid.value),
             'trackId': int(params['trackid']),
             'skipRootInvalidation': True,
         }})
    if response.get('status') != 'success':
        LOG.debug('update_my_list response: {}', response)
        raise APIError('Unable update my list, an error occurred in the request.')
    _update_mylist_cache(videoid, operation, params)


def _update_mylist_cache(videoid, operation, params):
    """Update the my list cache to speeding up page load"""
    # Avoids making a new request to the server to request the entire list updated
    perpetual_range_start = params.get('perpetual_range_start')
    mylist_identifier = 'mylist'
    if perpetual_range_start and perpetual_range_start != 'None':
        mylist_identifier += f'_{perpetual_range_start}'
    if operation == 'remove':
        try:
            video_list_sorted_data = G.CACHE.get(cache_utils.CACHE_MYLIST, mylist_identifier)
            del video_list_sorted_data.videos[videoid.value]
            G.CACHE.add(cache_utils.CACHE_MYLIST, mylist_identifier, video_list_sorted_data)
        except CacheMiss:
            pass
        try:
            my_list_videoids = G.CACHE.get(cache_utils.CACHE_MYLIST, 'my_list_items')
            my_list_videoids.remove(videoid)
            G.CACHE.add(cache_utils.CACHE_MYLIST, 'my_list_items', my_list_videoids)
        except CacheMiss:
            pass
    else:
        common.make_call('add_videoids_to_video_list_cache', {'cache_bucket': cache_utils.CACHE_MYLIST,
                                                              'cache_identifier': mylist_identifier,
                                                              'video_ids': [videoid.value]})
        try:
            my_list_videoids = G.CACHE.get(cache_utils.CACHE_MYLIST, 'my_list_items')
            my_list_videoids.append(videoid)
            G.CACHE.add(cache_utils.CACHE_MYLIST, 'my_list_items', my_list_videoids)
        except CacheMiss:
            pass


@measure_exec_time_decorator()
def get_parental_control_data(guid, password):
    """Get the parental control data"""
    return common.make_call('parental_control_data', {'guid': guid, 'password': password})


@measure_exec_time_decorator()
def set_parental_control_data(data):
    """Set the parental control data"""
    common.make_call(
        'post_safe',
        {'endpoint': 'content_restrictions',
         'data': {'action': 'update',
                  'authURL': data['token'],
                  'experience': data['experience'],
                  'guid': data['guid'],
                  'maturity': data['maturity']}}
    )


@measure_exec_time_decorator()
def verify_profile_lock(guid, pin):
    """Send profile PIN to Netflix and verify it."""
    try:
        response = common.make_call(
            'post_safe',
            {'endpoint': 'profile_lock',
             'data': {'pin': pin,
                      'action': 'verify',
                      'guid': guid}}
        )
        return response.get('success') is True
    except HttpError401:  # Wrong PIN
        return False


def get_available_audio_languages():
    """Get the list of available audio languages of videos"""
    # originalAudioLanguages genres: 81555714
    call_args = {
        'paths': [['genres', 81555714, 'subgenres', {'to': 50}, ['id', 'name', 'languageCode']]]
    }
    response = common.make_call('path_request', call_args)
    lang_list = {}
    data_list = response.get('genres', {}).get('81555714', {}).get('subgenres', {})
    for lang_dict in data_list.values():
        lang_id = lang_dict['id'].get('value')
        if lang_id is None:  # If none the list is ended
            break
        lang_list[lang_id] = lang_dict['name']['value']
    return lang_list

# todo: dubbingLanguages genres: 81586478

def get_available_subtitles_languages():
    """Get the list of available subtitles languages of videos"""
    # subtitleLanguages genres: 81586477
    call_args = {
        'paths': [['genres', 81586477, 'subgenres', {'to': 50}, ['id', 'name', 'languageCode']]]
    }
    response = common.make_call('path_request', call_args)
    lang_list = {}
    data_list = response.get('genres', {}).get('81586477', {}).get('subgenres', {})
    for lang_dict in data_list.values():
        lang_id = lang_dict['id'].get('value')
        if lang_id is None:  # If none the list is ended
            break
        lang_list[lang_id] = lang_dict['name']['value']
    return lang_list


def get_genre_title(genre_id):
    """
    Get the title of a genre list of given ID
    :return None if the ID not exists
    """
    call_args = {
        'paths': [['genres', int(genre_id), ['name']]]
    }
    response = common.make_call('path_request', call_args)
    return response['genres'].get(genre_id, {}).get('name', {}).get('value')


def remove_watched_status(videoid):
    """Request to delete the watched status of a video (delete also the item from "continue watching" list)"""
    call_args = {
        'callpaths': [['removeVideosFromContinueWatching', [int(videoid.value)]]]
    }
    response = common.make_call('callpath_request', call_args)
    if not response['removeVideosFromContinueWatching'][videoid.value]['value'].get('success'):
        LOG.debug('remove_watched_status response: {}', response)
        raise APIError('Unable to remove watched status, an error occurred in the request.')
    # Old API way
    # try:
    #     data = common.make_call(
    #         'post_safe',
    #         {'endpoint': 'viewing_activity',
    #          'data': {'movieID': videoid.value,
    #                   'seriesAll': videoid.mediatype == common.VideoId.SHOW,
    #                   'guid': G.LOCAL_DB.get_active_profile_guid()}}
    #     )
    #     return data.get('status', False)
    # except Exception as exc:  # pylint: disable=broad-except
    #     LOG.error('remove_watched_status raised this error: {}', exc)
    #     return False


def get_metadata(videoid, refresh=False):
    return common.make_call('get_metadata', {'videoid': videoid,
                                             'refresh': refresh})


def get_mylist_videoids_profile_switch():
    return common.make_call('get_mylist_videoids_profile_switch')