asciidisco/plugin.video.netflix

View on GitHub
resources/lib/MSL.py

Summary

Maintainability
F
1 wk
Test Coverage
File `MSL.py` has 585 lines of code (exceeds 250 allowed). Consider refactoring.
# pylint: skip-file
# -*- coding: utf-8 -*-
# Author: trummerjo
# Module: MSLHttpRequestHandler
# Created on: 26.01.2017
# License: MIT https://goo.gl/5bMj3H
 
import re
import sys
import zlib
import gzip
import json
import time
import base64
import random
import uuid
from StringIO import StringIO
from datetime import datetime
import requests
import xml.etree.ElementTree as ET
 
import xbmcaddon
 
Block comment should start with '# '
#check if we are on Android
import subprocess
try:
sdkversion = int(subprocess.check_output(
['/system/bin/getprop', 'ro.build.version.sdk']))
Do not use bare 'except'
except:
Trailing whitespace
sdkversion = 0
 
if sdkversion >= 18:
Indentation is not a multiple of 4
from MSLMediaDrm import MSLMediaDrmCrypto as MSLHandler
else:
Indentation is not a multiple of 4
from MSLCrypto import MSLCrypto as MSLHandler
 
Expected 2 blank lines, found 1
class MSL(object):
# Is a handshake already performed and the keys loaded
handshake_performed = False
last_drm_context = ''
last_playback_context = ''
current_message_id = 0
session = requests.session()
rndm = random.SystemRandom()
tokens = []
base_url = 'http://www.netflix.com/api/msl/NFCDCH-LX/cadmium/'
endpoints = {
'manifest': base_url + 'manifest',
'license': base_url + 'license'
}
 
def __init__(self, nx_common):
 
Indentation is not a multiple of 4
"""
The Constructor checks for already existing crypto Keys.
If they exist it will load the existing keys
"""
Indentation is not a multiple of 4
self.nx_common = nx_common
 
Indentation is not a multiple of 4
self.crypto = MSLHandler(nx_common)
 
Identical blocks of code found in 2 locations. Consider refactoring.
Indentation is not a multiple of 4
if self.nx_common.file_exists(self.nx_common.data_path, 'msl_data.json'):
Indentation is not a multiple of 4
self.init_msl_data()
Indentation is not a multiple of 4
else:
Indentation is not a multiple of 4
self.crypto.fromDict(None)
Cyclomatic complexity is too high in method load_manifest. (12)
Indentation is not a multiple of 4
self.__perform_key_handshake()
 
Function `load_manifest` has a Cognitive Complexity of 15 (exceeds 5 allowed). Consider refactoring.
Function `load_manifest` has 6 arguments (exceeds 4 allowed). Consider refactoring.
def load_manifest(self, viewable_id, dolby, hevc, hdr, dolbyvision, vp9):
"""
Loads the manifets for the given viewable_id and
returns a mpd-XML-Manifest
 
:param viewable_id: The id of of the viewable
:return: MPD XML Manifest or False if no success
"""
manifest_request_data = {
'method': 'manifest',
'lookupType': 'PREPARE',
'viewableIds': [viewable_id],
'profiles': [
# Video
'playready-h264bpl30-dash',
'playready-h264mpl30-dash',
'playready-h264mpl31-dash',
'playready-h264mpl40-dash',
 
# Audio
'heaac-2-dash',
 
# Subtiltes (handled separately)
# 'dfxp-ls-sdh',
# 'simplesdh',
# 'nflx-cmisc',
 
# Unkown
'BIF240',
'BIF320'
],
'drmSystem': 'widevine',
'appId': '14673889385265',
'sessionParams': {
'pinCapableClient': False,
'uiplaycontext': 'null'
},
'sessionId': '14673889385265',
'trackId': 0,
'flavor': 'PRE_FETCH',
'secureUrls': False,
'supportPreviewContent': True,
'forceClearStreams': False,
'languages': ['de-DE'],
'clientVersion': '4.0004.899.011',
'uiVersion': 'akira'
}
 
# subtitles
addon = xbmcaddon.Addon('inputstream.adaptive')
Line too long (116 > 79 characters)
if addon and self.nx_common.compare_versions(map(int, addon.getAddonInfo('version').split('.')), [2, 3, 8]):
manifest_request_data['profiles'].append('webvtt-lssdh-ios8')
else:
manifest_request_data['profiles'].append('simplesdh')
 
# add hevc profiles if setting is set
if hevc is True:
main = 'hevc-main-'
main10 = 'hevc-main10-'
prk = 'dash-cenc-prk'
cenc = 'dash-cenc'
ctl = 'dash-cenc-ctl'
manifest_request_data['profiles'].append(main10 + 'L41-' + cenc)
manifest_request_data['profiles'].append(main10 + 'L50-' + cenc)
manifest_request_data['profiles'].append(main10 + 'L51-' + cenc)
manifest_request_data['profiles'].append(main + 'L30-' + cenc)
manifest_request_data['profiles'].append(main + 'L31-' + cenc)
manifest_request_data['profiles'].append(main + 'L40-' + cenc)
manifest_request_data['profiles'].append(main + 'L41-' + cenc)
manifest_request_data['profiles'].append(main + 'L50-' + cenc)
manifest_request_data['profiles'].append(main + 'L51-' + cenc)
manifest_request_data['profiles'].append(main10 + 'L30-' + cenc)
manifest_request_data['profiles'].append(main10 + 'L31-' + cenc)
manifest_request_data['profiles'].append(main10 + 'L40-' + cenc)
manifest_request_data['profiles'].append(main10 + 'L41-' + cenc)
manifest_request_data['profiles'].append(main10 + 'L50-' + cenc)
manifest_request_data['profiles'].append(main10 + 'L51-' + cenc)
manifest_request_data['profiles'].append(main10 + 'L30-' + prk)
manifest_request_data['profiles'].append(main10 + 'L31-' + prk)
manifest_request_data['profiles'].append(main10 + 'L40-' + prk)
manifest_request_data['profiles'].append(main10 + 'L41-' + prk)
manifest_request_data['profiles'].append(main + 'L30-L31-' + ctl)
manifest_request_data['profiles'].append(main + 'L31-L40-' + ctl)
manifest_request_data['profiles'].append(main + 'L40-L41-' + ctl)
manifest_request_data['profiles'].append(main + 'L50-L51-' + ctl)
manifest_request_data['profiles'].append(main10 + 'L30-L31-' + ctl)
manifest_request_data['profiles'].append(main10 + 'L31-L40-' + ctl)
manifest_request_data['profiles'].append(main10 + 'L40-L41-' + ctl)
manifest_request_data['profiles'].append(main10 + 'L50-L51-' + ctl)
 
if hdr is True:
hdr = 'hevc-hdr-main10-'
manifest_request_data['profiles'].append(hdr + 'L30-' + cenc)
manifest_request_data['profiles'].append(hdr + 'L31-' + cenc)
manifest_request_data['profiles'].append(hdr + 'L40-' + cenc)
manifest_request_data['profiles'].append(hdr + 'L41-' + cenc)
manifest_request_data['profiles'].append(hdr + 'L50-' + cenc)
manifest_request_data['profiles'].append(hdr + 'L51-' + cenc)
manifest_request_data['profiles'].append(hdr + 'L30-' + prk)
manifest_request_data['profiles'].append(hdr + 'L31-' + prk)
manifest_request_data['profiles'].append(hdr + 'L40-' + prk)
manifest_request_data['profiles'].append(hdr + 'L41-' + prk)
manifest_request_data['profiles'].append(hdr + 'L50-' + prk)
manifest_request_data['profiles'].append(hdr + 'L51-' + prk)
 
 
Too many blank lines (2)
if dolbyvision is True:
dv = 'hevc-dv-main10-'
dv5 = 'hevc-dv5-main10-'
manifest_request_data['profiles'].append(dv + 'L30-' + cenc)
manifest_request_data['profiles'].append(dv + 'L31-' + cenc)
manifest_request_data['profiles'].append(dv + 'L40-' + cenc)
manifest_request_data['profiles'].append(dv + 'L41-' + cenc)
manifest_request_data['profiles'].append(dv + 'L50-' + cenc)
manifest_request_data['profiles'].append(dv + 'L51-' + cenc)
manifest_request_data['profiles'].append(dv5 + 'L30-' + prk)
manifest_request_data['profiles'].append(dv5 + 'L31-' + prk)
manifest_request_data['profiles'].append(dv5 + 'L40-' + prk)
manifest_request_data['profiles'].append(dv5 + 'L41-' + prk)
manifest_request_data['profiles'].append(dv5 + 'L50-' + prk)
manifest_request_data['profiles'].append(dv5 + 'L51-' + prk)
 
if hevc is False or vp9 is True:
Line too long (86 > 79 characters)
Over-indented
manifest_request_data['profiles'].append('vp9-profile0-L30-dash-cenc')
Line too long (86 > 79 characters)
manifest_request_data['profiles'].append('vp9-profile0-L31-dash-cenc')
 
# Check if dolby sound is enabled and add to profles
if dolby:
manifest_request_data['profiles'].append('ddplus-2.0-dash')
manifest_request_data['profiles'].append('ddplus-5.1-dash')
 
request_data = self.__generate_msl_request_data(manifest_request_data)
 
Identical blocks of code found in 2 locations. Consider refactoring.
try:
resp = self.session.post(self.endpoints['manifest'], request_data)
Do not use bare 'except'
except:
resp = None
exc = sys.exc_info()
msg = '[MSL][POST] Error {} {}'
self.nx_common.log(msg=msg.format(exc[0], exc[1]))
 
Identical blocks of code found in 2 locations. Consider refactoring.
if resp:
try:
# if the json() does not fail we have an error because
# the manifest response is a chuncked json response
resp.json()
self.nx_common.log(
msg='Error getting Manifest: ' + resp.text)
return False
except ValueError:
# json() failed so parse the chunked response
Block comment should start with '# '
#self.nx_common.log(
# msg='Got chunked Manifest Response: ' + resp.text)
resp = self.__parse_chunked_msl_response(resp.text)
Block comment should start with '# '
#self.nx_common.log(
# msg='Parsed chunked Response: ' + json.dumps(resp))
data = self.__decrypt_payload_chunks(resp['payloads'])
return self.__tranform_to_dash(data)
return False
 
Function `get_license` has a Cognitive Complexity of 8 (exceeds 5 allowed). Consider refactoring.
def get_license(self, challenge, sid):
"""
Requests and returns a license for the given challenge and sid
:param challenge: The base64 encoded challenge
:param sid: The sid paired to the challengew
:return: Base64 representation of the licensekey or False unsuccessfull
"""
license_request_data = {
'method': 'license',
'licenseType': 'STANDARD',
'clientVersion': '4.0004.899.011',
'uiVersion': 'akira',
'languages': ['de-DE'],
'playbackContextId': self.last_playback_context,
'drmContextIds': [self.last_drm_context],
'challenges': [{
'dataBase64': challenge,
'sessionId': sid
}],
'clientTime': int(time.time()),
'xid': int((int(time.time()) + 0.1612) * 1000)
 
}
request_data = self.__generate_msl_request_data(license_request_data)
 
Identical blocks of code found in 2 locations. Consider refactoring.
try:
resp = self.session.post(self.endpoints['license'], request_data)
Do not use bare 'except'
except:
resp = None
exc = sys.exc_info()
self.nx_common.log(
msg='[MSL][POST] Error {} {}'.format(exc[0], exc[1]))
 
if resp:
try:
# If is valid json the request for the licnese failed
resp.json()
self.nx_common.log(msg='Error getting license: '+resp.text)
return False
except ValueError:
# json() failed so we have a chunked json response
resp = self.__parse_chunked_msl_response(resp.text)
data = self.__decrypt_payload_chunks(resp['payloads'])
if data['success'] is True:
return data['result']['licenses'][0]['data']
else:
self.nx_common.log(
msg='Error getting license: ' + json.dumps(data))
return False
return False
 
def __decrypt_payload_chunks(self, payloadchunks):
decrypted_payload = ''
for chunk in payloadchunks:
payloadchunk = json.JSONDecoder().decode(chunk)
payload = payloadchunk.get('payload')
decoded_payload = base64.standard_b64decode(payload)
encryption_envelope = json.JSONDecoder().decode(decoded_payload)
# Decrypt the text
Identical blocks of code found in 2 locations. Consider refactoring.
Line too long (97 > 79 characters)
plaintext = self.crypto.decrypt(base64.standard_b64decode(encryption_envelope['iv']),
Continuation line under-indented for visual indent
base64.standard_b64decode(encryption_envelope.get('ciphertext')))
# unpad the plaintext
plaintext = json.JSONDecoder().decode(plaintext)
data = plaintext.get('data')
 
# uncompress data if compressed
Identical blocks of code found in 2 locations. Consider refactoring.
if plaintext.get('compressionalgo') == 'GZIP':
decoded_data = base64.standard_b64decode(data)
data = zlib.decompress(decoded_data, 16 + zlib.MAX_WBITS)
else:
data = base64.standard_b64decode(data)
decrypted_payload += data
 
Line too long (94 > 79 characters)
decrypted_payload = json.JSONDecoder().decode(decrypted_payload)[1]['payload']['data']
decrypted_payload = base64.standard_b64decode(decrypted_payload)
Cyclomatic complexity is too high in method __tranform_to_dash. (28)
return json.JSONDecoder().decode(decrypted_payload)
 
Function `__tranform_to_dash` has a Cognitive Complexity of 51 (exceeds 5 allowed). Consider refactoring.
def __tranform_to_dash(self, manifest):
 
self.nx_common.save_file(
data_path=self.nx_common.data_path,
filename='manifest.json',
content=json.dumps(manifest))
manifest = manifest['result']['viewables'][0]
 
self.last_playback_context = manifest['playbackContextId']
self.last_drm_context = manifest['drmContextId']
 
# Check for pssh
pssh = ''
keyid = None
if 'psshb64' in manifest:
if len(manifest['psshb64']) >= 1:
pssh = manifest['psshb64'][0]
psshbytes = base64.standard_b64decode(pssh)
if len(psshbytes) == 52:
keyid = psshbytes[36:]
 
seconds = manifest['runtime']/1000
init_length = seconds / 2 * 12 + 20*1000
duration = "PT"+str(seconds)+".00S"
 
root = ET.Element('MPD')
root.attrib['xmlns'] = 'urn:mpeg:dash:schema:mpd:2011'
root.attrib['xmlns:cenc'] = 'urn:mpeg:cenc:2013'
root.attrib['mediaPresentationDuration'] = duration
 
period = ET.SubElement(root, 'Period', start='PT0S', duration=duration)
 
# One Adaption Set for Video
for video_track in manifest['videoTracks']:
video_adaption_set = ET.SubElement(
parent=period,
tag='AdaptationSet',
mimeType='video/mp4',
contentType="video")
 
# Content Protection
if keyid:
protection = ET.SubElement(
parent=video_adaption_set,
tag='ContentProtection',
value='cenc',
schemeIdUri='urn:mpeg:dash:mp4protection:2011')
protection.set('cenc:default_KID', str(uuid.UUID(bytes=keyid)))
 
protection = ET.SubElement(
parent=video_adaption_set,
tag='ContentProtection',
schemeIdUri='urn:uuid:EDEF8BA9-79D6-4ACE-A3C8-27DCD51D21ED')
 
ET.SubElement(
parent=protection,
tag='widevine:license',
robustness_level='HW_SECURE_CODECS_REQUIRED')
 
if pssh is not '':
ET.SubElement(protection, 'cenc:pssh').text = pssh
 
for downloadable in video_track['downloadables']:
 
codec = 'h264'
if 'hevc' in downloadable['contentProfile']:
codec = 'hevc'
Line too long (84 > 79 characters)
elif downloadable['contentProfile'] == 'vp9-profile0-L30-dash-cenc':
Indentation is not a multiple of 4
codec = 'vp9.0.30'
Line too long (84 > 79 characters)
elif downloadable['contentProfile'] == 'vp9-profile0-L31-dash-cenc':
Indentation is not a multiple of 4
codec = 'vp9.0.31'
 
hdcp_versions = '0.0'
for hdcp in downloadable['hdcpVersions']:
if hdcp != 'none':
hdcp_versions = hdcp if hdcp != 'any' else '1.0'
 
rep = ET.SubElement(
parent=video_adaption_set,
tag='Representation',
width=str(downloadable['width']),
height=str(downloadable['height']),
bandwidth=str(downloadable['bitrate']*1024),
hdcp=hdcp_versions,
nflxContentProfile=str(downloadable['contentProfile']),
codecs=codec,
mimeType='video/mp4')
 
# BaseURL
base_url = self.__get_base_url(downloadable['urls'])
ET.SubElement(rep, 'BaseURL').text = base_url
# Init an Segment block
Identical blocks of code found in 2 locations. Consider refactoring.
segment_base = ET.SubElement(
parent=rep,
tag='SegmentBase',
indexRange='0-' + str(init_length),
indexRangeExact='true')
 
# Multiple Adaption Set for audio
language = None
for audio_track in manifest['audioTracks']:
impaired = 'false'
original = 'false'
default = 'false'
 
if audio_track.get('trackType') == 'ASSISTIVE':
impaired = 'true'
elif not language or language == audio_track.get('language'):
language = audio_track.get('language')
default = 'true'
if audio_track.get('language').find('[') > 0:
original = 'true'
 
audio_adaption_set = ET.SubElement(
parent=period,
tag='AdaptationSet',
lang=audio_track['bcp47'],
contentType='audio',
mimeType='audio/mp4',
impaired=impaired,
original=original,
default=default)
for downloadable in audio_track['downloadables']:
codec = 'aac'
Block comment should start with '# '
#self.nx_common.log(msg=downloadable)
is_dplus2 = downloadable['contentProfile'] == 'ddplus-2.0-dash'
is_dplus5 = downloadable['contentProfile'] == 'ddplus-5.1-dash'
if is_dplus2 or is_dplus5:
codec = 'ec-3'
Block comment should start with '# '
#self.nx_common.log(msg='codec is: ' + codec)
rep = ET.SubElement(
parent=audio_adaption_set,
tag='Representation',
codecs=codec,
bandwidth=str(downloadable['bitrate']*1024),
mimeType='audio/mp4')
 
# AudioChannel Config
uri = 'urn:mpeg:dash:23003:3:audio_channel_configuration:2011'
ET.SubElement(
parent=rep,
tag='AudioChannelConfiguration',
schemeIdUri=uri,
value=str(audio_track.get('channelsCount')))
 
# BaseURL
base_url = self.__get_base_url(downloadable['urls'])
ET.SubElement(rep, 'BaseURL').text = base_url
# Index range
Identical blocks of code found in 2 locations. Consider refactoring.
segment_base = ET.SubElement(
parent=rep,
tag='SegmentBase',
indexRange='0-' + str(init_length),
indexRangeExact='true')
 
# Multiple Adaption Sets for subtiles
for text_track in manifest.get('textTracks'):
is_downloadables = 'downloadables' not in text_track
if is_downloadables or text_track.get('downloadables') is None:
continue
# Only one subtitle representation per adaptationset
downloadable = text_track['downloadables'][0]
 
subtiles_adaption_set = ET.SubElement(
parent=period,
tag='AdaptationSet',
lang=text_track.get('bcp47'),
Line too long (103 > 79 characters)
codecs='wvtt' if downloadable.get('contentProfile') == 'webvtt-lssdh-ios8' else 'stpp',
contentType='text',
Line too long (125 > 79 characters)
mimeType='text/vtt' if downloadable.get('contentProfile') == 'webvtt-lssdh-ios8' else 'application/ttml+xml')
role = ET.SubElement(
parent=subtiles_adaption_set,
Unexpected spaces around keyword / parameter equals
tag = 'Role',
Unexpected spaces around keyword / parameter equals
schemeIdUri = 'urn:mpeg:dash:role:2011',
Unexpected spaces around keyword / parameter equals
Comparison to true should be 'if cond is true:' or 'if cond:'
Line too long (83 > 79 characters)
value = 'forced' if text_track.get('isForced') == True else 'main')
rep = ET.SubElement(
parent=subtiles_adaption_set,
tag='Representation',
nflxProfile=downloadable.get('contentProfile'))
base_url = self.__get_base_url(downloadable['urls'])
ET.SubElement(rep, 'BaseURL').text = base_url
 
xml = ET.tostring(root, encoding='utf-8', method='xml')
xml = xml.replace('\n', '').replace('\r', '')
 
self.nx_common.save_file(
data_path=self.nx_common.data_path,
filename='manifest.mpd',
content=xml)
 
return xml
 
def __get_base_url(self, urls):
for key in urls:
return urls[key]
 
Identical blocks of code found in 2 locations. Consider refactoring.
def __parse_chunked_msl_response(self, message):
header = message.split('}}')[0] + '}}'
Line too long (90 > 79 characters)
payloads = re.split(',\"signature\":\"[0-9A-Za-z=/+]+\"}', message.split('}}')[1])
payloads = [x + '}' for x in payloads][:-1]
 
return {
'header': header,
'payloads': payloads
}
 
def __generate_msl_request_data(self, data):
Block comment should start with '# '
#self.__load_msl_data()
header_encryption_envelope = self.__encrypt(
plaintext=self.__generate_msl_header())
headerdata = base64.standard_b64encode(header_encryption_envelope)
header = {
'headerdata': headerdata,
'signature': self.__sign(header_encryption_envelope),
'mastertoken': self.mastertoken,
}
 
# Serialize the given Data
raw_marshalled_data = json.dumps(data)
marshalled_data = raw_marshalled_data.replace('"', '\\"')
serialized_data = '[{},{"headers":{},"path":"/cbp/cadmium-13"'
serialized_data += ',"payload":{"data":"'
serialized_data += marshalled_data
serialized_data += '"},"query":""}]\n'
 
compressed_data = self.__compress_data(serialized_data)
 
# Create FIRST Payload Chunks
first_payload = {
'messageid': self.current_message_id,
'data': compressed_data,
'compressionalgo': 'GZIP',
'sequencenumber': 1,
'endofmsg': True
}
first_payload_encryption_envelope = self.__encrypt(
plaintext=json.dumps(first_payload))
payload = base64.standard_b64encode(first_payload_encryption_envelope)
first_payload_chunk = {
'payload': payload,
'signature': self.__sign(first_payload_encryption_envelope),
}
request_data = json.dumps(header) + json.dumps(first_payload_chunk)
return request_data
 
def __compress_data(self, data):
# GZIP THE DATA
out = StringIO()
with gzip.GzipFile(fileobj=out, mode="w") as f:
f.write(data)
return base64.standard_b64encode(out.getvalue())
 
Function `__generate_msl_header` has a Cognitive Complexity of 6 (exceeds 5 allowed). Consider refactoring.
def __generate_msl_header(
self,
is_handshake=False,
is_key_request=False,
compressionalgo='GZIP',
encrypt=True):
"""
Function that generates a MSL header dict
:return: The base64 encoded JSON String of the header
"""
self.current_message_id = self.rndm.randint(0, pow(2, 52))
esn = self.nx_common.get_esn()
 
# Add compression algo if not empty
compression_algos = [compressionalgo] if compressionalgo != '' else []
 
header_data = {
'sender': esn,
'handshake': is_handshake,
'nonreplayable': False,
'capabilities': {
'languages': ['en-US'],
'compressionalgos': compression_algos
},
'recipient': 'Netflix',
'renewable': True,
'messageid': self.current_message_id,
'timestamp': 1467733923
}
 
# If this is a keyrequest act diffrent then other requests
Identical blocks of code found in 2 locations. Consider refactoring.
if is_key_request:
header_data['keyrequestdata'] = self.crypto.get_key_request()
else:
if 'usertoken' in self.tokens:
pass
else:
account = self.nx_common.get_credentials()
# Auth via email and password
header_data['userauthdata'] = {
'scheme': 'EMAIL_PASSWORD',
'authdata': {
'email': account['email'],
'password': account['password']
}
}
 
return json.dumps(header_data)
 
Identical blocks of code found in 2 locations. Consider refactoring.
def __encrypt(self, plaintext):
Line too long (105 > 79 characters)
return json.dumps(self.crypto.encrypt(plaintext, self.nx_common.get_esn(), self.sequence_number))
 
def __sign(self, text):
"""
Calculates the HMAC signature for the given
text with the current sign key and SHA256
 
:param text:
:return: Base64 encoded signature
"""
return base64.standard_b64encode(self.crypto.sign(text))
 
def perform_key_handshake(self):
Cyclomatic complexity is too high in method __perform_key_handshake. (6)
self.__perform_key_handshake()
 
Function `__perform_key_handshake` has a Cognitive Complexity of 7 (exceeds 5 allowed). Consider refactoring.
def __perform_key_handshake(self):
esn = self.nx_common.get_esn()
self.nx_common.log(msg='perform_key_handshake: esn:' + esn)
 
if not esn:
Indentation is not a multiple of 4
return False
 
header = self.__generate_msl_header(
is_key_request=True,
is_handshake=True,
compressionalgo='',
encrypt=False)
 
request = {
'entityauthdata': {
'scheme': 'NONE',
'authdata': {
'identity': esn
}
},
'headerdata': base64.standard_b64encode(header),
'signature': '',
}
Block comment should start with '# '
#self.nx_common.log(msg='Key Handshake Request:')
Block comment should start with '# '
#self.nx_common.log(msg=json.dumps(request))
 
Identical blocks of code found in 2 locations. Consider refactoring.
try:
resp = self.session.post(
url=self.endpoints['manifest'],
data=json.dumps(request, sort_keys=True))
Do not use bare 'except'
except:
resp = None
exc = sys.exc_info()
self.nx_common.log(
msg='[MSL][POST] Error {} {}'.format(exc[0], exc[1]))
 
if resp and resp.status_code == 200:
resp = resp.json()
Identical blocks of code found in 2 locations. Consider refactoring.
if 'errordata' in resp:
self.nx_common.log(msg='Key Exchange failed')
self.nx_common.log(
msg=base64.standard_b64decode(resp['errordata']))
return False
base_head = base64.standard_b64decode(resp['headerdata'])
 
Missing whitespace around operator
headerdata=json.JSONDecoder().decode(base_head)
Line too long (81 > 79 characters)
self.__set_master_token(headerdata['keyresponsedata']['mastertoken'])
self.crypto.parse_key_response(headerdata)
self.__save_msl_data()
else:
self.nx_common.log(msg='Key Exchange failed')
self.nx_common.log(msg=resp.text)
 
Identical blocks of code found in 2 locations. Consider refactoring.
def init_msl_data(self):
self.nx_common.log(msg='MSL Data exists. Use old Tokens.')
self.__load_msl_data()
self.handshake_performed = True
 
def __load_msl_data(self):
raw_msl_data = self.nx_common.load_file(
data_path=self.nx_common.data_path,
filename='msl_data.json')
msl_data = json.JSONDecoder().decode(raw_msl_data)
# Check expire date of the token
raw_token = msl_data['tokens']['mastertoken']['tokendata']
base_token = base64.standard_b64decode(raw_token)
master_token = json.JSONDecoder().decode(base_token)
exp = int(master_token['expiration'])
valid_until = datetime.utcfromtimestamp(exp)
present = datetime.now()
difference = valid_until - present
# If token expires in less then 10 hours or is expires renew it
Identical blocks of code found in 2 locations. Consider refactoring.
Line too long (146 > 79 characters)
self.nx_common.log(msg='Expiration time: Key:' + str(valid_until) + ', Now:' + str(present) + ', Diff:' + str(difference.total_seconds()))
difference = difference.total_seconds() / 60 / 60
if self.crypto.fromDict(msl_data) or difference < 10:
self.__perform_key_handshake()
return
 
self.__set_master_token(msl_data['tokens']['mastertoken'])
 
def save_msl_data(self):
self.__save_msl_data()
 
Identical blocks of code found in 2 locations. Consider refactoring.
def __save_msl_data(self):
"""
Saves the keys and tokens in json file
:return:
"""
data = {
'tokens': {
'mastertoken': self.mastertoken
}
}
data.update(self.crypto.toDict())
 
serialized_data = json.JSONEncoder().encode(data)
self.nx_common.save_file(
data_path=self.nx_common.data_path,
filename='msl_data.json',
content=serialized_data)
 
def __set_master_token(self, master_token):
self.mastertoken = master_token
raw_token = master_token['tokendata']
base_token = base64.standard_b64decode(raw_token)
decoded_token = json.JSONDecoder().decode(base_token)
self.sequence_number = decoded_token.get('sequencenumber')