CastagnaIT/plugin.video.netflix

View on GitHub
resources/lib/services/nfsession/msl/android_crypto.py

Summary

Maintainability
A
0 mins
Test Coverage
# -*- coding: utf-8 -*-
"""
    Copyright (C) 2017 Sebastian Golasch (plugin.video.netflix)
    Copyright (C) 2018 Caphm (original implementation module)
    Crypto handler for Android platforms

    SPDX-License-Identifier: MIT
    See LICENSES/MIT.md for more information.
"""
import base64
import json

import xbmcdrm

from resources.lib.common.exceptions import MSLError
from resources.lib.database.db_utils import TABLE_SESSION
from resources.lib.globals import G
from resources.lib.utils.esn import WidevineForceSecLev
from resources.lib.utils.logging import LOG
from .base_crypto import MSLBaseCrypto


class AndroidMSLCrypto(MSLBaseCrypto):
    """Crypto handler for Android platforms"""
    def __init__(self):
        super().__init__()
        self.crypto_session = None
        self.keyset_id = None
        self.key_id = None
        self.hmac_key_id = None
        try:
            self.crypto_session = xbmcdrm.CryptoSession(
                'edef8ba9-79d6-4ace-a3c8-27dcd51d21ed', 'AES/CBC/NoPadding', 'HmacSHA256')
            LOG.debug('Widevine CryptoSession successful constructed')
        except Exception as exc:  # pylint: disable=broad-except
            import traceback
            LOG.error(traceback.format_exc())
            raise MSLError('Failed to construct Widevine CryptoSession') from exc

        drm_info = {
            'version': self.crypto_session.GetPropertyString('version'),
            'system_id': self.crypto_session.GetPropertyString('systemId'),
            #  'device_unique_id': self.crypto_session.GetPropertyByteArray('deviceUniqueId')
            'hdcp_level': self.crypto_session.GetPropertyString('hdcpLevel'),
            'hdcp_level_max': self.crypto_session.GetPropertyString('maxHdcpLevel'),
            'security_level': self.crypto_session.GetPropertyString('securityLevel')
        }

        if not drm_info['version']:
            # Possible cases where no data is obtained:
            # - Device with custom ROM or without Widevine support
            # - Using Kodi debug build with a InputStream Adaptive release build (yes users do it)
            raise MSLError('It was not possible to get the data from Widevine CryptoSession.\r\n'
                           'Your system is not Widevine certified or you have a wrong Kodi version installed.')

        G.LOCAL_DB.set_value('drm_system_id', drm_info['system_id'], TABLE_SESSION)
        G.LOCAL_DB.set_value('drm_security_level', drm_info['security_level'], TABLE_SESSION)
        G.LOCAL_DB.set_value('drm_hdcp_level', drm_info['hdcp_level'], TABLE_SESSION)

        LOG.debug('Widevine version: {}', drm_info['version'])
        if drm_info['system_id']:
            LOG.debug('Widevine CryptoSession system id: {}', drm_info['system_id'])
        else:
            LOG.warn('Widevine CryptoSession system id not obtained!')
        LOG.debug('Widevine CryptoSession security level: {}', drm_info['security_level'])
        wv_force_sec_lev = G.LOCAL_DB.get_value('widevine_force_seclev',
                                                WidevineForceSecLev.DISABLED,
                                                table=TABLE_SESSION)
        if wv_force_sec_lev != WidevineForceSecLev.DISABLED:
            LOG.warn('Widevine security level is forced to {} by user settings!', wv_force_sec_lev)
        LOG.debug('Widevine CryptoSession current hdcp level: {}', drm_info['hdcp_level'])
        LOG.debug('Widevine CryptoSession max hdcp level supported: {}', drm_info['hdcp_level_max'])
        LOG.debug('Widevine CryptoSession algorithms: {}', self.crypto_session.GetPropertyString('algorithms'))

    def load_crypto_session(self, msl_data=None):
        if not msl_data:
            return
        self.keyset_id = base64.standard_b64decode(msl_data['key_set_id']).decode('utf-8')
        self.key_id = base64.standard_b64decode(msl_data['key_id'])
        self.hmac_key_id = base64.standard_b64decode(msl_data['hmac_key_id'])
        self.crypto_session.RestoreKeys(self.keyset_id)

    def __del__(self):
        self.crypto_session = None

    def key_request_data(self):
        """Return a key request dict"""
        # No key update supported -> remove existing keys
        self.crypto_session.RemoveKeys()
        _key_request = self.crypto_session.GetKeyRequest(  # pylint: disable=assignment-from-none
            bytes([10, 122, 0, 108, 56, 43]), 'application/xml', True, {})
        if not _key_request:
            raise MSLError('Widevine CryptoSession getKeyRequest failed!')
        LOG.debug('Widevine CryptoSession getKeyRequest successful. Size: {}', len(_key_request))
        _key_request = base64.standard_b64encode(_key_request).decode('utf-8')
        return [{
            'scheme': 'WIDEVINE',
            'keydata': {
                'keyrequest': _key_request
            }
        }]

    def _provide_key_response(self, data):
        if not data:
            raise MSLError('Missing key response data')
        self.keyset_id = self.crypto_session.ProvideKeyResponse(data)  # pylint: disable=assignment-from-none
        if not self.keyset_id:
            raise MSLError('Widevine CryptoSession provideKeyResponse failed')
        LOG.debug('Widevine CryptoSession provideKeyResponse successful')
        LOG.debug('keySetId: {}', self.keyset_id)

    def encrypt(self, plaintext, esn):  # pylint: disable=unused-argument
        """
        Encrypt the given Plaintext with the encryption key
        :param plaintext:
        :return: Serialized JSON String of the encryption Envelope
        """
        from secrets import token_bytes
        init_vector = token_bytes(16)

        # Add PKCS5Padding
        pad = 16 - len(plaintext) % 16
        padded_data = plaintext + ''.join([chr(pad)] * pad)

        encrypted_data = self.crypto_session.Encrypt(self.key_id,
                                                     padded_data.encode('utf-8'),
                                                     init_vector)

        if not encrypted_data:
            raise MSLError('Widevine CryptoSession encrypt failed!')

        return json.dumps({
            'version': 1,
            'ciphertext': base64.standard_b64encode(encrypted_data).decode('utf-8'),
            'sha256': 'AA==',
            'keyid': base64.standard_b64encode(self.key_id).decode('utf-8'),
            # 'cipherspec' : 'AES/CBC/PKCS5Padding',
            'iv': base64.standard_b64encode(init_vector).decode('utf-8')
        })

    def decrypt(self, init_vector, ciphertext):
        """Decrypt a ciphertext"""
        decrypted_data = self.crypto_session.Decrypt(self.key_id, ciphertext, init_vector)
        if not decrypted_data:
            raise MSLError('Widevine CryptoSession decrypt failed!')

        # remove PKCS5Padding
        pad = decrypted_data[len(decrypted_data) - 1]
        return decrypted_data[:-pad].decode('utf-8')

    def sign(self, message):
        """Sign a message"""
        signature = self.crypto_session.Sign(self.hmac_key_id, message.encode('utf-8'))
        if not signature:
            raise MSLError('Widevine CryptoSession sign failed!')
        return base64.standard_b64encode(signature).decode('utf-8')

    def verify(self, message, signature):
        """Verify a message's signature"""
        return self.crypto_session.Verify(self.hmac_key_id, message, signature)

    def _init_keys(self, key_response_data):
        key_response = base64.standard_b64decode(
            key_response_data['keydata']['cdmkeyresponse'])
        self._provide_key_response(key_response)
        self.key_id = base64.standard_b64decode(
            key_response_data['keydata']['encryptionkeyid'])
        self.hmac_key_id = base64.standard_b64decode(
            key_response_data['keydata']['hmackeyid'])

    def _export_keys(self):
        return {
            'key_set_id': base64.standard_b64encode(self.keyset_id.encode('utf-8')).decode('utf-8'),
            'key_id': base64.standard_b64encode(self.key_id).decode('utf-8'),
            'hmac_key_id': base64.standard_b64encode(self.hmac_key_id).decode('utf-8')
        }