coursera/api.py
# vim: set fileencoding=utf8 :
"""
This module contains implementations of different APIs that are used by the
downloader.
"""
import os
import re
import json
import base64
import logging
import time
import requests
import urllib
from collections import namedtuple, OrderedDict
from six import iterkeys, iteritems
from six.moves.urllib_parse import quote_plus
import attr
from .utils import (BeautifulSoup, make_coursera_absolute_url,
extend_supplement_links, clean_url, clean_filename,
is_debug_run, unescape_html)
from .network import get_reply, get_page, post_page_and_reply
from .define import (OPENCOURSE_SUPPLEMENT_URL,
OPENCOURSE_PROGRAMMING_ASSIGNMENTS_URL,
OPENCOURSE_ASSET_URL,
OPENCOURSE_ASSETS_URL,
OPENCOURSE_API_ASSETS_V1_URL,
OPENCOURSE_ONDEMAND_COURSE_MATERIALS,
OPENCOURSE_ONDEMAND_COURSE_MATERIALS_V2,
OPENCOURSE_ONDEMAND_COURSES_V1,
OPENCOURSE_ONDEMAND_LECTURE_VIDEOS_URL,
OPENCOURSE_ONDEMAND_LECTURE_ASSETS_URL,
OPENCOURSE_ONDEMAND_SPECIALIZATIONS_V1,
OPENCOURSE_MEMBERSHIPS,
OPENCOURSE_REFERENCES_POLL_URL,
OPENCOURSE_REFERENCE_ITEM_URL,
OPENCOURSE_PROGRAMMING_IMMEDIATE_INSTRUCTIOINS_URL,
OPENCOURSE_PEER_ASSIGNMENT_INSTRUCTIONS,
# New feature, Notebook (Python Jupyter)
OPENCOURSE_NOTEBOOK_DESCRIPTIONS,
OPENCOURSE_NOTEBOOK_LAUNCHES,
OPENCOURSE_NOTEBOOK_TREE,
OPENCOURSE_NOTEBOOK_DOWNLOAD,
POST_OPENCOURSE_API_QUIZ_SESSION,
POST_OPENCOURSE_API_QUIZ_SESSION_GET_STATE,
POST_OPENCOURSE_ONDEMAND_EXAM_SESSIONS,
POST_OPENCOURSE_ONDEMAND_EXAM_SESSIONS_GET_STATE,
INSTRUCTIONS_HTML_INJECTION_PRE,
INSTRUCTIONS_HTML_MATHJAX_URL,
INSTRUCTIONS_HTML_INJECTION_AFTER,
IN_MEMORY_EXTENSION,
IN_MEMORY_MARKER)
from .cookies import prepare_auth_headers
class QuizExamToMarkupConverter(object):
"""
Converts quiz/exam JSON into semi HTML (Coursera Markup) for local viewing.
The output needs to be further processed by MarkupToHTMLConverter.
"""
KNOWN_QUESTION_TYPES = ('mcq',
'mcqReflect',
'checkbox',
'singleNumeric',
'textExactMatch',
'mathExpression',
'regex',
'reflect')
# TODO: support live MathJAX preview rendering for mathExpression
# and regex question types
KNOWN_INPUT_TYPES = ('textExactMatch',
'singleNumeric',
'mathExpression',
'regex',
'reflect')
def __init__(self, session):
self._session = session
def __call__(self, quiz_or_exam_json):
result = []
for question_index, question_json in enumerate(quiz_or_exam_json['questions']):
question_type = question_json['question']['type']
if question_type not in self.KNOWN_QUESTION_TYPES:
logging.info('Unknown question type: %s', question_type)
logging.info('Question json: %s', question_json)
logging.info('Please report class name, quiz name and the data'
' above to coursera-dl authors')
prompt = question_json['variant']['definition']['prompt']
options = question_json['variant']['definition'].get('options', [])
# Question number
result.append('<h3>Question %d</h3>' % (question_index + 1))
# Question text
question_text = unescape_html(prompt['definition']['value'])
result.append(question_text)
# Input for answer
if question_type in self.KNOWN_INPUT_TYPES:
result.extend(self._generate_input_field())
# Convert input_type from JSON reply to HTML input type
input_type = {
'mcq': 'radio',
'mcqReflect': 'radio',
'checkbox': 'checkbox'
}.get(question_type, '')
# Convert options, they are either checkboxes or radio buttons
result.extend(self._convert_options(
question_index, options, input_type))
result.append('<hr>')
return '\n'.join(result)
def _convert_options(self, question_index, options, input_type):
if not options:
return []
result = ['<form>']
for option in options:
option_text = unescape_html(
option['display']['definition']['value'])
# We need to replace <text> with <span> so that answer text
# stays on the same line with checkbox/radio button
option_text = self._replace_tag(option_text, 'text', 'span')
result.append('<label><input type="%s" name="%s">'
'%s<br></label>' % (
input_type, question_index, option_text))
result.append('</form>')
return result
def _replace_tag(self, text, initial_tag, target_tag):
soup = BeautifulSoup(text)
while soup.find(initial_tag):
soup.find(initial_tag).name = target_tag
return soup.prettify()
def _generate_input_field(self):
return ['<form><label>Enter answer here:<input type="text" '
'name=""><br></label></form>']
class MarkupToHTMLConverter(object):
def __init__(self, session, mathjax_cdn_url=None):
self._session = session
self._asset_retriever = AssetRetriever(session)
if not mathjax_cdn_url:
mathjax_cdn_url = INSTRUCTIONS_HTML_MATHJAX_URL
self._mathjax_cdn_url = mathjax_cdn_url
def __call__(self, markup):
"""
Convert instructions markup to make it more suitable for
offline reading.
@param markup: HTML (kinda) markup to prettify.
@type markup: str
@return: Prettified HTML with several markup tags replaced with HTML
equivalents.
@rtype: str
"""
soup = BeautifulSoup(markup)
self._convert_markup_basic(soup)
self._convert_markup_images(soup)
self._convert_markup_audios(soup)
return soup.prettify()
def _convert_markup_basic(self, soup):
"""
Perform basic conversion of instructions markup. This includes
replacement of several textual markup tags with their HTML equivalents.
@param soup: BeautifulSoup instance.
@type soup: BeautifulSoup
"""
# Inject meta charset tag
meta = soup.new_tag('meta', charset='UTF-8')
soup.insert(0, meta)
# 1. Inject basic CSS style
css = "".join([
INSTRUCTIONS_HTML_INJECTION_PRE,
self._mathjax_cdn_url,
INSTRUCTIONS_HTML_INJECTION_AFTER])
css_soup = BeautifulSoup(css)
soup.append(css_soup)
# 2. Replace <text> with <p>
while soup.find('text'):
soup.find('text').name = 'p'
# 3. Replace <heading level="1"> with <h1>
while soup.find('heading'):
heading = soup.find('heading')
heading.name = 'h%s' % heading.attrs.get('level', '1')
# 4. Replace <code> with <pre>
while soup.find('code'):
soup.find('code').name = 'pre'
# 5. Replace <list> with <ol> or <ul>
while soup.find('list'):
list_ = soup.find('list')
type_ = list_.attrs.get('bullettype', 'numbers')
list_.name = 'ol' if type_ == 'numbers' else 'ul'
def _convert_markup_images(self, soup):
"""
Convert images of instructions markup. Images are downloaded,
base64-encoded and inserted into <img> tags.
@param soup: BeautifulSoup instance.
@type soup: BeautifulSoup
"""
# 6. Replace <img> assets with actual image contents
images = [image for image in soup.find_all('img')
if image.attrs.get('assetid') is not None]
if not images:
return
# Get assetid attribute from all images
asset_ids = [image.attrs.get('assetid') for image in images]
self._asset_retriever(asset_ids)
for image in images:
# Encode each image using base64
asset = self._asset_retriever[image['assetid']]
if asset.data is not None:
encoded64 = base64.b64encode(asset.data).decode()
image['src'] = 'data:%s;base64,%s' % (
asset.content_type, encoded64)
def _convert_markup_audios(self, soup):
"""
Convert audios of instructions markup. Audios are downloaded,
base64-encoded and inserted as <audio controls> <source> tag.
@param soup: BeautifulSoup instance.
@type soup: BeautifulSoup
"""
# 7. Replace <asset> audio assets with actual audio contents
audios = [audio for audio in soup.find_all('asset')
if audio.attrs.get('id') is not None
and audio.attrs.get('assettype') == 'audio']
if not audios:
return
# Get assetid attribute from all audios
asset_ids = [audio.attrs.get('id') for audio in audios]
self._asset_retriever(asset_ids)
for audio in audios:
# Encode each audio using base64
asset = self._asset_retriever[audio['id']]
if asset.data is not None:
encoded64 = base64.b64encode(asset.data).decode()
data_string = 'data:%s;base64,%s' % (
asset.content_type, encoded64)
source_tag = soup.new_tag(
'source', src=data_string, type=asset.content_type)
controls_tag = soup.new_tag('audio', controls="")
controls_tag.string = 'Your browser does not support the audio element.'
controls_tag.append(source_tag)
audio.insert_after(controls_tag)
class OnDemandCourseMaterialItemsV1(object):
"""
Helper class that allows accessing lecture JSONs by lesson IDs.
"""
def __init__(self, items):
"""
Initialization. Build a map from lessonId to Lecture (item)
@param items: linked.OnDemandCourseMaterialItems key of
OPENCOURSE_ONDEMAND_COURSE_MATERIALS response.
@type items: dict
"""
# Build a map of lessonId => Item
self._items = dict((item['lessonId'], item) for item in items)
@staticmethod
def create(session, course_name):
"""
Create an instance using a session and a course_name.
@param session: Requests session.
@type session: requests.Session
@param course_name: Course name (slug) from course json.
@type course_name: str
@return: Instance of OnDemandCourseMaterialItems
@rtype: OnDemandCourseMaterialItems
"""
dom = get_page(session, OPENCOURSE_ONDEMAND_COURSE_MATERIALS,
json=True,
class_name=course_name)
return OnDemandCourseMaterialItemsV1(
dom['linked']['onDemandCourseMaterialItems.v1'])
def get(self, lesson_id):
"""
Return lecture by lesson ID.
@param lesson_id: Lesson ID.
@type lesson_id: str
@return: Lesson JSON.
@rtype: dict
Example:
{
"id": "AUd0k",
"moduleId": "0MGvs",
"lessonId": "QgCuM",
"name": "Programming Assignment 1: Decomposition of Graphs",
"slug": "programming-assignment-1-decomposition-of-graphs",
"timeCommitment": 10800000,
"content": {
"typeName": "gradedProgramming",
"definition": {
"programmingAssignmentId": "zHzR5yhHEeaE0BKOcl4zJQ@2",
"gradingWeight": 20
}
},
"isLocked": true,
"itemLockedReasonCode": "PREMIUM",
"trackId": "core"
},
"""
return self._items.get(lesson_id)
class Asset(namedtuple('Asset', 'id name type_name url content_type data')):
"""
This class contains information about an asset.
"""
__slots__ = ()
def __repr__(self):
return 'Asset(id="%s", name="%s", type_name="%s", url="%s", content_type="%s", data="<...>")' % (
self.id, self.name, self.type_name, self.url, self.content_type)
class AssetRetriever(object):
"""
This class helps download assets by their ID.
"""
def __init__(self, session):
self._session = session
self._asset_mapping = {}
def __getitem__(self, asset_id):
return self._asset_mapping[asset_id]
def __call__(self, asset_ids, download=True):
result = []
# Download information about assets (by IDs)
asset_list = get_page(self._session, OPENCOURSE_API_ASSETS_V1_URL,
json=True,
id=','.join(asset_ids))
# Create a map "asset_id => asset" for easier access
asset_map = dict((asset['id'], asset)
for asset in asset_list['elements'])
for asset_id in asset_ids:
# Download each asset
asset_dict = asset_map[asset_id]
url = asset_dict['url']['url'].strip()
data, content_type = None, None
if download:
reply = get_reply(self._session, url)
if reply.status_code == 200:
data = reply.content
content_type = reply.headers.get('Content-Type')
asset = Asset(id=asset_dict['id'].strip(),
name=asset_dict['name'].strip(),
type_name=asset_dict['typeName'].strip(),
url=url,
content_type=content_type,
data=data)
self._asset_mapping[asset.id] = asset
result.append(asset)
return result
@attr.s
class ModuleV1(object):
name = attr.ib()
id = attr.ib()
slug = attr.ib()
child_ids = attr.ib()
def children(self, all_children):
return [all_children[child] for child in self.child_ids]
@attr.s
class ModulesV1(object):
children = attr.ib()
@staticmethod
def from_json(data):
return ModulesV1(OrderedDict(
(item['id'],
ModuleV1(item['name'],
item['id'],
item['slug'],
item['lessonIds']))
for item in data
))
def __getitem__(self, key):
return self.children[key]
def __iter__(self):
return iter(self.children.values())
@attr.s
class LessonV1(object):
name = attr.ib()
id = attr.ib()
slug = attr.ib()
child_ids = attr.ib()
def children(self, all_children):
return [all_children[child] for child in self.child_ids]
@attr.s
class LessonsV1(object):
children = attr.ib()
@staticmethod
def from_json(data):
return LessonsV1(OrderedDict(
(item['id'],
LessonV1(item['name'],
item['id'],
item['slug'],
item['itemIds']))
for item in data
))
def __getitem__(self, key):
return self.children[key]
@attr.s
class ItemV2(object):
name = attr.ib()
id = attr.ib()
slug = attr.ib()
type_name = attr.ib()
lesson_id = attr.ib()
module_id = attr.ib()
@attr.s
class ItemsV2(object):
children = attr.ib()
@staticmethod
def from_json(data):
return ItemsV2(OrderedDict(
(item['id'],
ItemV2(item['name'],
item['id'],
item['slug'],
item['contentSummary']['typeName'],
item['lessonId'],
item['moduleId']))
for item in data
))
def __getitem__(self, key):
return self.children[key]
@attr.s
class VideoV1(object):
resolution = attr.ib()
mp4_video_url = attr.ib()
@attr.s
class VideosV1(object):
children = attr.ib()
@staticmethod
def from_json(data):
videos = [VideoV1(resolution, links['mp4VideoUrl'])
for resolution, links
in data['sources']['byResolution'].items()]
videos.sort(key=lambda video: video.resolution, reverse=True)
videos = OrderedDict(
(video.resolution, video)
for video in videos
)
return VideosV1(videos)
def __contains__(self, key):
return key in self.children
def __getitem__(self, key):
return self.children[key]
def get_best(self):
return next(iter(self.children.values()))
def expand_specializations(session, class_names):
"""
Checks whether any given name is not a class but a specialization.
If it's a specialization, expand the list of class names with the child
class names.
"""
result = []
for class_name in class_names:
specialization = SpecializationV1.create(session, class_name)
if specialization is None:
result.append(class_name)
else:
result.extend(specialization.children)
logging.info('Expanded specialization "%s" into the following'
' classes: %s',
class_name, ' '.join(specialization.children))
return result
@attr.s
class SpecializationV1(object):
children = attr.ib()
@staticmethod
def create(session, class_name):
try:
dom = get_page(session, OPENCOURSE_ONDEMAND_SPECIALIZATIONS_V1,
json=True, quiet=True,
class_name=class_name)
except requests.exceptions.HTTPError as e:
logging.debug('Could not expand %s: %s', class_name, e)
return None
return SpecializationV1(
[course['slug'] for course in dom['linked']['courses.v1']])
class CourseraOnDemand(object):
"""
This is a class that provides a friendly interface to extract certain
parts of on-demand courses. On-demand class is a new format that Coursera
is using, they contain `/learn/' in their URLs. This class does not support
old-style Coursera classes. This API is by no means complete.
"""
def __init__(self, session, course_id, course_name,
unrestricted_filenames=False,
mathjax_cdn_url=None):
"""
Initialize Coursera OnDemand API.
@param session: Current session that holds cookies and so on.
@type session: requests.Session
@param course_id: Course ID from course json.
@type course_id: str
@param unrestricted_filenames: Flag that indicates whether grabbed
file names should endure stricter character filtering. @see
`clean_filename` for the details.
@type unrestricted_filenames: bool
"""
self._session = session
self._notebook_cookies = None
self._course_id = course_id
self._course_name = course_name
self._unrestricted_filenames = unrestricted_filenames
self._user_id = None
self._quiz_to_markup = QuizExamToMarkupConverter(session)
self._markup_to_html = MarkupToHTMLConverter(
session, mathjax_cdn_url=mathjax_cdn_url)
self._asset_retriever = AssetRetriever(session)
def obtain_user_id(self):
reply = get_page(self._session, OPENCOURSE_MEMBERSHIPS, json=True)
elements = reply['elements']
user_id = elements[0]['userId'] if elements else None
self._user_id = user_id
def list_courses(self):
"""
List enrolled courses.
@return: List of enrolled courses.
@rtype: [str]
"""
reply = get_page(self._session, OPENCOURSE_MEMBERSHIPS, json=True)
course_list = reply['linked']['courses.v1']
slugs = [element['slug'] for element in course_list]
return slugs
def extract_links_from_exam(self, exam_id):
try:
session_id = self._get_exam_session_id(exam_id)
exam_json = self._get_exam_json(exam_id, session_id)
return self._convert_quiz_json_to_links(exam_json, 'exam')
except requests.exceptions.HTTPError as exception:
logging.error('Could not download exam %s: %s', exam_id, exception)
if is_debug_run():
logging.exception(
'Could not download exam %s: %s', exam_id, exception)
return None
def _get_notebook_folder(self, url, jupyterId, **kwargs):
supplement_links = {}
url = url.format(**kwargs)
reply = get_page(self._session, url, json=True)
for content in reply['content']:
if content['type'] == 'directory':
a = self._get_notebook_folder(
OPENCOURSE_NOTEBOOK_TREE, jupyterId, jupId=jupyterId,
path=content['path'], timestamp=int(time.time()))
supplement_links.update(a)
elif content['type'] == 'file':
tmp_url = OPENCOURSE_NOTEBOOK_DOWNLOAD.format(
path=content['path'], jupId=jupyterId,
timestamp=int(time.time()))
filename, extension = os.path.splitext(clean_url(tmp_url))
head, tail = os.path.split(content['path'])
# '/' in the following line is for a reason:
# @noureddin says: "I split head using split('/') not
# os.path.split() because it's seems to me that it comes from a
# web page, so the separator will always be /, so using the
# native path splitting function is not the most portable
# way to do it."
# Original pull request:
# https://github.com/coursera-dl/coursera-dl/pull/654
head = '/'.join([clean_filename(dir, minimal_change=True)
for dir in head.split('/')])
tail = clean_filename(tail, minimal_change=True)
if not os.path.isdir(self._course_name + "/notebook/" + head + "/"):
logging.info('Creating [%s] directories...', head)
os.makedirs(self._course_name + "/notebook/" + head + "/")
r = requests.get(tmp_url.replace(" ", "%20"),
cookies=self._session.cookies)
if not os.path.exists(self._course_name + "/notebook/" + head + "/" + tail):
logging.info('Downloading %s into %s', tail, head)
with open(self._course_name + "/notebook/" + head + "/" + tail, 'wb+') as f:
f.write(r.content)
else:
logging.info('Skipping %s... (file exists)', tail)
if str(extension[1:]) not in supplement_links:
supplement_links[str(extension[1:])] = []
supplement_links[str(extension[1:])].append(
(tmp_url.replace(" ", "%20"), filename))
elif content['type'] == 'notebook':
tmp_url = OPENCOURSE_NOTEBOOK_DOWNLOAD.format(
path=content['path'], jupId=jupyterId, timestamp=int(time.time()))
filename, extension = os.path.splitext(clean_url(tmp_url))
head, tail = os.path.split(content['path'])
if not os.path.isdir(self._course_name + "/notebook/" + head + "/"):
logging.info('Creating [%s] directories...', head)
os.makedirs(self._course_name + "/notebook/" + head + "/")
r = requests.get(tmp_url.replace(" ", "%20"),
cookies=self._session.cookies)
if not os.path.exists(self._course_name + "/notebook/" + head + "/" + tail):
logging.info(
'Downloading Jupyter %s into %s', tail, head)
with open(self._course_name + "/notebook/" + head + "/" + tail, 'wb+') as f:
f.write(r.content)
else:
logging.info('Skipping %s... (file exists)', tail)
if "ipynb" not in supplement_links:
supplement_links["ipynb"] = []
supplement_links["ipynb"].append(
(tmp_url.replace(" ", "%20"), filename))
else:
logging.info(
'Unsupported typename %s in notebook', content['type'])
return supplement_links
def _get_notebook_json(self, notebook_id, authorizationId):
headers = self._auth_headers_with_json()
reply = get_page(
self._session,
OPENCOURSE_NOTEBOOK_DESCRIPTIONS,
json=False,
authId=authorizationId,
headers=headers
)
jupyted_id = re.findall(r"\"\/user\/(.*)\/tree\"", reply)
if len(jupyted_id) == 0:
logging.error('Could not download notebook %s', notebook_id)
return None
jupyted_id = jupyted_id[0]
newReq = requests.Session()
req = newReq.get(OPENCOURSE_NOTEBOOK_TREE.format(
jupId=jupyted_id, path="/", timestamp=int(time.time())),
headers=headers)
return self._get_notebook_folder(
OPENCOURSE_NOTEBOOK_TREE, jupyted_id, jupId=jupyted_id,
path="/", timestamp=int(time.time()))
def extract_links_from_notebook(self, notebook_id):
try:
authorizationId = self._extract_notebook_text(notebook_id)
ret = self._get_notebook_json(notebook_id, authorizationId)
return ret
except requests.exceptions.HTTPError as exception:
logging.error('Could not download notebook %s: %s',
notebook_id, exception)
if is_debug_run():
logging.exception(
'Could not download notebook %s: %s', notebook_id, exception)
return None
def extract_links_from_quiz(self, quiz_id):
try:
session_id = self._get_quiz_session_id(quiz_id)
quiz_json = self._get_quiz_json(quiz_id, session_id)
return self._convert_quiz_json_to_links(quiz_json, 'quiz')
except requests.exceptions.HTTPError as exception:
logging.error('Could not download quiz %s: %s', quiz_id, exception)
if is_debug_run():
logging.exception(
'Could not download quiz %s: %s', quiz_id, exception)
return None
def _convert_quiz_json_to_links(self, quiz_json, filename_suffix):
markup = self._quiz_to_markup(quiz_json)
html = self._markup_to_html(markup)
supplement_links = {}
instructions = (IN_MEMORY_MARKER + html, filename_suffix)
extend_supplement_links(
supplement_links, {IN_MEMORY_EXTENSION: [instructions]})
return supplement_links
def _get_exam_json(self, exam_id, session_id):
headers = self._auth_headers_with_json()
data = {"name": "getState", "argument": []}
reply = get_page(self._session,
POST_OPENCOURSE_ONDEMAND_EXAM_SESSIONS_GET_STATE,
json=True,
post=True,
data=json.dumps(data),
headers=headers,
session_id=session_id)
return reply['elements'][0]['result']
def _get_exam_session_id(self, exam_id):
headers = self._auth_headers_with_json()
data = {'courseId': self._course_id, 'itemId': exam_id}
_body, reply = post_page_and_reply(self._session,
POST_OPENCOURSE_ONDEMAND_EXAM_SESSIONS,
data=json.dumps(data),
headers=headers)
return reply.headers.get('X-Coursera-Id')
def _get_quiz_json(self, quiz_id, session_id):
headers = self._auth_headers_with_json()
data = {"contentRequestBody": {"argument": []}}
reply = get_page(self._session,
POST_OPENCOURSE_API_QUIZ_SESSION_GET_STATE,
json=True,
post=True,
data=json.dumps(data),
headers=headers,
user_id=self._user_id,
class_name=self._course_name,
quiz_id=quiz_id,
session_id=session_id)
return reply['contentResponseBody']['return']
def _get_quiz_session_id(self, quiz_id):
headers = self._auth_headers_with_json()
data = {"contentRequestBody": []}
reply = get_page(self._session,
POST_OPENCOURSE_API_QUIZ_SESSION,
json=True,
post=True,
data=json.dumps(data),
headers=headers,
user_id=self._user_id,
class_name=self._course_name,
quiz_id=quiz_id)
return reply['contentResponseBody']['session']['id']
def _auth_headers_with_json(self):
headers = prepare_auth_headers(self._session, include_cauth=True)
headers.update({
'Content-Type': 'application/json; charset=UTF-8'
})
return headers
def extract_links_from_lecture(self, course_id,
video_id, subtitle_language='en',
resolution='540p'):
"""
Return the download URLs of on-demand course video.
@param video_id: Video ID.
@type video_id: str
@param subtitle_language: Subtitle language.
@type subtitle_language: str
@param resolution: Preferred video resolution.
@type resolution: str
@return: @see CourseraOnDemand._extract_links_from_text
"""
try:
links = self._extract_videos_and_subtitles_from_lecture(
course_id, video_id, subtitle_language, resolution)
assets = self._get_lecture_asset_ids(course_id, video_id)
assets = self._normalize_assets(assets)
extend_supplement_links(
links, self._extract_links_from_lecture_assets(assets))
return links
except requests.exceptions.HTTPError as exception:
logging.error('Could not download lecture %s: %s',
video_id, exception)
if is_debug_run():
logging.exception(
'Could not download lecture %s: %s', video_id, exception)
return None
def _get_lecture_asset_ids(self, course_id, video_id):
"""
Obtain a list of asset ids from a lecture.
"""
dom = get_page(self._session, OPENCOURSE_ONDEMAND_LECTURE_ASSETS_URL,
json=True, course_id=course_id, video_id=video_id)
# Note that we extract here "id", not definition -> assetId, as it
# be extracted later.
return [asset['id']
for asset in dom['linked']['openCourseAssets.v1']]
def _normalize_assets(self, assets):
"""
Perform asset normalization. For some reason, assets that are sometimes
present in lectures, have "@1" at the end of their id. Such "uncut"
asset id when fed to OPENCOURSE_ASSETS_URL results in error that says:
"Routing error: 'get-all' not implemented". To avoid that, the last
two characters from asset id are cut off and after that that method
works fine. It looks like, Web UI is doing the same.
@param assets: List of asset ids.
@type assets: [str]
@return: Normalized list of asset ids (without trailing "@1")
@rtype: [str]
"""
new_assets = []
for asset in assets:
# For example: giAxucdaEeWJTQ5WTi8YJQ@1
if len(asset) == 24:
# Turn it into: giAxucdaEeWJTQ5WTi8YJQ
asset = asset[:-2]
new_assets.append(asset)
return new_assets
def _extract_links_from_lecture_assets(self, asset_ids):
"""
Extract links to files of the asset ids.
@param asset_ids: List of asset ids.
@type asset_ids: [str]
@return: @see CourseraOnDemand._extract_links_from_text
"""
links = {}
def _add_asset(name, url, destination):
filename, extension = os.path.splitext(clean_url(name))
if extension is '':
return
extension = clean_filename(
extension.lower().strip('.').strip(),
self._unrestricted_filenames)
basename = clean_filename(
os.path.basename(filename),
self._unrestricted_filenames)
url = url.strip()
if extension not in destination:
destination[extension] = []
destination[extension].append((url, basename))
for asset_id in asset_ids:
for asset in self._get_asset_urls(asset_id):
_add_asset(asset['name'], asset['url'], links)
return links
def _get_asset_urls(self, asset_id):
"""
Get list of asset urls and file names. This method may internally
use AssetRetriever to extract `asset` element types.
@param asset_id: Asset ID.
@type asset_id: str
@return List of dictionaries with asset file names and urls.
@rtype [{
'name': '<filename.ext>'
'url': '<url>'
}]
"""
dom = get_page(self._session, OPENCOURSE_ASSETS_URL,
json=True, id=asset_id)
logging.debug('Parsing JSON for asset_id <%s>.', asset_id)
urls = []
for element in dom['elements']:
typeName = element['typeName']
definition = element['definition']
# Elements of `asset` types look as follows:
#
# {'elements': [{'definition': {'assetId': 'gtSfvscoEeW7RxKvROGwrw',
# 'name': 'Презентация к лекции'},
# 'id': 'phxNlMcoEeWXCQ4nGuQJXw',
# 'typeName': 'asset'}],
# 'linked': None,
# 'paging': None}
#
if typeName == 'asset':
open_course_asset_id = definition['assetId']
for asset in self._asset_retriever([open_course_asset_id],
download=False):
urls.append({'name': asset.name, 'url': asset.url})
# Elements of `url` types look as follows:
#
# {'elements': [{'definition': {'name': 'What motivates you.pptx',
# 'url': 'https://d396qusza40orc.cloudfront.net/learning/Powerpoints/2-4A_What_motivates_you.pptx'},
# 'id': '0hixqpWJEeWQkg5xdHApow',
# 'typeName': 'url'}],
# 'linked': None,
# 'paging': None}
#
elif typeName == 'url':
urls.append({'name': definition['name'].strip(),
'url': definition['url'].strip()})
else:
logging.warning(
'Unknown asset typeName: %s\ndom: %s\n'
'If you think the downloader missed some '
'files, please report the issue here:\n'
'https://github.com/coursera-dl/coursera-dl/issues/new',
typeName, json.dumps(dom, indent=4))
return urls
def _extract_videos_and_subtitles_from_lecture(self,
course_id,
video_id,
subtitle_language='en',
resolution='540p'):
logging.debug('Parsing JSON for video_id <%s>.', video_id)
dom = get_page(self._session, OPENCOURSE_ONDEMAND_LECTURE_VIDEOS_URL,
json=True,
course_id=course_id,
video_id=video_id)
dom = dom['linked']['onDemandVideos.v1'][0]
videos = VideosV1.from_json(dom)
video_content = {}
if resolution in videos:
source = videos[resolution]
logging.debug('Proceeding with download of resolution %s of <%s>.',
resolution, video_id)
else:
source = videos.get_best()
logging.warning(
'Requested resolution %s not available for <%s>. '
'Downloading highest resolution (%s) available instead.',
resolution, video_id, source.resolution)
video_content['mp4'] = source.mp4_video_url
subtitle_link = self._extract_subtitles_from_video_dom(
dom, subtitle_language, video_id)
for key, value in iteritems(subtitle_link):
video_content[key] = value
lecture_video_content = {}
for key, value in iteritems(video_content):
lecture_video_content[key] = [(value, '')]
return lecture_video_content
def _extract_subtitles_from_video_dom(self, video_dom,
subtitle_language, video_id):
# subtitles and transcripts
subtitle_nodes = [
('subtitles', 'srt', 'subtitle'),
('subtitlesTxt', 'txt', 'transcript'),
]
subtitle_set_download = set()
subtitle_set_nonexist = set()
subtitle_links = {}
for (subtitle_node, subtitle_extension, subtitle_description) \
in subtitle_nodes:
logging.debug('Gathering %s URLs for video_id <%s>.',
subtitle_description, video_id)
subtitles = video_dom.get(subtitle_node)
download_all_subtitle = False
if subtitles is not None:
subtitles_set = set(subtitles)
requested_subtitle_list = [s.strip() for s in
subtitle_language.split(",")]
for language_with_alts in requested_subtitle_list:
if download_all_subtitle:
break
grouped_language_list = [l.strip() for l in
language_with_alts.split("|")]
for language in grouped_language_list:
if language == "all":
download_all_subtitle = True
break
elif language in subtitles_set:
subtitle_set_download.update([language])
break
else:
subtitle_set_nonexist.update([language])
if download_all_subtitle and subtitles is not None:
subtitle_set_download = set(subtitles)
if not download_all_subtitle and subtitle_set_nonexist:
logging.warning("%s unavailable in '%s' language for video "
"with video id: [%s],"
"%s", subtitle_description.capitalize(),
", ".join(subtitle_set_nonexist), video_id,
subtitle_description)
if not subtitle_set_download:
logging.warning("%s all requested subtitles are unavailable,"
"with video id: [%s], falling back to 'en' "
"%s", subtitle_description.capitalize(),
video_id,
subtitle_description)
subtitle_set_download = set(['en'])
for current_subtitle_language in subtitle_set_download:
subtitle_url = subtitles.get(current_subtitle_language)
if subtitle_url is not None:
# some subtitle urls are relative!
subtitle_links[
"%s.%s" % (current_subtitle_language,
subtitle_extension)
] = make_coursera_absolute_url(subtitle_url)
return subtitle_links
def extract_links_from_programming_immediate_instructions(self, element_id):
"""
Return a dictionary with links to supplement files (pdf, csv, zip,
ipynb, html and so on) extracted from graded programming assignment.
@param element_id: Element ID to extract files from.
@type element_id: str
@return: @see CourseraOnDemand._extract_links_from_text
"""
logging.debug('Extracting links from programming immediate '
'instructions for element_id <%s>.', element_id)
try:
# Assignment text (instructions) contains asset tags which describe
# supplementary files.
text = ''.join(
self._extract_programming_immediate_instructions_text(element_id))
if not text:
return {}
supplement_links = self._extract_links_from_text(text)
instructions = (IN_MEMORY_MARKER + self._markup_to_html(text),
'instructions')
extend_supplement_links(
supplement_links, {IN_MEMORY_EXTENSION: [instructions]})
return supplement_links
except requests.exceptions.HTTPError as exception:
logging.error('Could not download programming assignment %s: %s',
element_id, exception)
if is_debug_run():
logging.exception('Could not download programming assignment %s: %s',
element_id, exception)
return None
def extract_links_from_programming(self, element_id):
"""
Return a dictionary with links to supplement files (pdf, csv, zip,
ipynb, html and so on) extracted from graded programming assignment.
@param element_id: Element ID to extract files from.
@type element_id: str
@return: @see CourseraOnDemand._extract_links_from_text
"""
logging.debug(
'Gathering supplement URLs for element_id <%s>.', element_id)
try:
# Assignment text (instructions) contains asset tags which describe
# supplementary files.
text = ''.join(self._extract_assignment_text(element_id))
if not text:
return {}
supplement_links = self._extract_links_from_text(text)
instructions = (IN_MEMORY_MARKER + self._markup_to_html(text),
'instructions')
extend_supplement_links(
supplement_links, {IN_MEMORY_EXTENSION: [instructions]})
return supplement_links
except requests.exceptions.HTTPError as exception:
logging.error('Could not download programming assignment %s: %s',
element_id, exception)
if is_debug_run():
logging.exception('Could not download programming assignment %s: %s',
element_id, exception)
return None
def extract_links_from_peer_assignment(self, element_id):
"""
Return a dictionary with links to supplement files (pdf, csv, zip,
ipynb, html and so on) extracted from peer assignment.
@param element_id: Element ID to extract files from.
@type element_id: str
@return: @see CourseraOnDemand._extract_links_from_text
"""
logging.debug(
'Gathering supplement URLs for element_id <%s>.', element_id)
try:
# Assignment text (instructions) contains asset tags which describe
# supplementary files.
text = ''.join(self._extract_peer_assignment_text(element_id))
if not text:
return {}
supplement_links = self._extract_links_from_text(text)
instructions = (IN_MEMORY_MARKER + self._markup_to_html(text),
'peer_assignment_instructions')
extend_supplement_links(
supplement_links, {IN_MEMORY_EXTENSION: [instructions]})
return supplement_links
except requests.exceptions.HTTPError as exception:
logging.error('Could not download peer assignment %s: %s',
element_id, exception)
if is_debug_run():
logging.exception('Could not download peer assignment %s: %s',
element_id, exception)
return None
def extract_links_from_supplement(self, element_id):
"""
Return a dictionary with supplement files (pdf, csv, zip, ipynb, html
and so on) extracted from supplement page.
@return: @see CourseraOnDemand._extract_links_from_text
"""
logging.debug(
'Gathering supplement URLs for element_id <%s>.', element_id)
try:
dom = get_page(self._session, OPENCOURSE_SUPPLEMENT_URL,
json=True,
course_id=self._course_id,
element_id=element_id)
supplement_content = {}
# Supplement content has structure as follows:
# 'linked' {
# 'openCourseAssets.v1' [ {
# 'definition' {
# 'value'
for asset in dom['linked']['openCourseAssets.v1']:
value = asset['definition']['value']
# Supplement lecture types are known to contain both <asset> tags
# and <a href> tags (depending on the course), so we extract
# both of them.
extend_supplement_links(
supplement_content, self._extract_links_from_text(value))
instructions = (IN_MEMORY_MARKER + self._markup_to_html(value),
'instructions')
extend_supplement_links(
supplement_content, {IN_MEMORY_EXTENSION: [instructions]})
return supplement_content
except requests.exceptions.HTTPError as exception:
logging.error('Could not download supplement %s: %s',
element_id, exception)
if is_debug_run():
logging.exception('Could not download supplement %s: %s',
element_id, exception)
return None
def _extract_asset_tags(self, text):
"""
Extract asset tags from text into a convenient form.
@param text: Text to extract asset tags from. This text contains HTML
code that is parsed by BeautifulSoup.
@type text: str
@return: Asset map.
@rtype: {
'<id>': {
'name': '<name>',
'extension': '<extension>'
},
...
}
"""
soup = BeautifulSoup(text)
asset_tags_map = {}
for asset in soup.find_all('asset'):
asset_tags_map[asset['id']] = {'name': asset['name'],
'extension': asset['extension']}
return asset_tags_map
def _extract_asset_urls(self, asset_ids):
"""
Extract asset URLs along with asset ids.
@param asset_ids: List of ids to get URLs for.
@type assertn: [str]
@return: List of dictionaries with asset URLs and ids.
@rtype: [{
'id': '<id>',
'url': '<url>'
}]
"""
dom = get_page(self._session, OPENCOURSE_ASSET_URL,
json=True,
ids=quote_plus(','.join(asset_ids)))
return [{'id': element['id'],
'url': element['url'].strip()}
for element in dom['elements']]
def extract_references_poll(self):
try:
dom = get_page(self._session,
OPENCOURSE_REFERENCES_POLL_URL.format(
course_id=self._course_id),
json=True
)
logging.info('Downloaded resource poll (%d bytes)', len(dom))
return dom['elements']
except requests.exceptions.HTTPError as exception:
logging.error('Could not download resource section: %s',
exception)
if is_debug_run():
logging.exception('Could not download resource section: %s',
exception)
return None
def extract_links_from_reference(self, short_id):
"""
Return a dictionary with supplement files (pdf, csv, zip, ipynb, html
and so on) extracted from supplement page.
@return: @see CourseraOnDemand._extract_links_from_text
"""
logging.debug('Gathering resource URLs for short_id <%s>.', short_id)
try:
dom = get_page(self._session, OPENCOURSE_REFERENCE_ITEM_URL,
json=True,
course_id=self._course_id,
short_id=short_id)
resource_content = {}
# Supplement content has structure as follows:
# 'linked' {
# 'openCourseAssets.v1' [ {
# 'definition' {
# 'value'
for asset in dom['linked']['openCourseAssets.v1']:
value = asset['definition']['value']
# Supplement lecture types are known to contain both <asset> tags
# and <a href> tags (depending on the course), so we extract
# both of them.
extend_supplement_links(
resource_content, self._extract_links_from_text(value))
instructions = (IN_MEMORY_MARKER + self._markup_to_html(value),
'resources')
extend_supplement_links(
resource_content, {IN_MEMORY_EXTENSION: [instructions]})
return resource_content
except requests.exceptions.HTTPError as exception:
logging.error('Could not download supplement %s: %s',
short_id, exception)
if is_debug_run():
logging.exception('Could not download supplement %s: %s',
short_id, exception)
return None
def _extract_programming_immediate_instructions_text(self, element_id):
"""
Extract assignment text (instructions).
@param element_id: Element id to extract assignment instructions from.
@type element_id: str
@return: List of assignment text (instructions).
@rtype: [str]
"""
dom = get_page(self._session, OPENCOURSE_PROGRAMMING_IMMEDIATE_INSTRUCTIOINS_URL,
json=True,
course_id=self._course_id,
element_id=element_id)
return [element['assignmentInstructions']['definition']['value']
for element in dom['elements']]
def _extract_notebook_text(self, element_id):
"""
Extract notebook text (instructions).
@param element_id: Element id to extract notebook links.
@type element_id: str
@return: Notebook URL.
@rtype: [str]
"""
headers = self._auth_headers_with_json()
data = {'courseId': self._course_id,
'learnerId': self._user_id, 'itemId': element_id}
dom = get_page(self._session, OPENCOURSE_NOTEBOOK_LAUNCHES,
post=True,
json=True,
user_id=self._user_id,
course_id=self._course_id,
headers=headers,
element_id=element_id,
data=json.dumps(data)
)
# Return authorization id. This id changes on each request
return dom['elements'][0]['authorizationId']
def _extract_assignment_text(self, element_id):
"""
Extract assignment text (instructions).
@param element_id: Element id to extract assignment instructions from.
@type element_id: str
@return: List of assignment text (instructions).
@rtype: [str]
"""
dom = get_page(self._session, OPENCOURSE_PROGRAMMING_ASSIGNMENTS_URL,
json=True,
course_id=self._course_id,
element_id=element_id)
return [element['submissionLearnerSchema']['definition']
['assignmentInstructions']['definition']['value']
for element in dom['elements']]
def _extract_peer_assignment_text(self, element_id):
"""
Extract peer assignment text (instructions).
@param element_id: Element id to extract peer assignment instructions from.
@type element_id: str
@return: List of peer assignment text (instructions).
@rtype: [str]
"""
dom = get_page(self._session, OPENCOURSE_PEER_ASSIGNMENT_INSTRUCTIONS,
json=True,
user_id=self._user_id,
course_id=self._course_id,
element_id=element_id)
result = []
for element in dom['elements']:
# There is only one section with Instructions
if 'introduction' in element['instructions']:
result.append(element['instructions']
['introduction']['definition']['value'])
# But there may be multiple sections in Sections
for section in element['instructions'].get('sections', []):
section_value = section['content']['definition']['value']
section_title = section.get('title')
if section_title is not None:
# If section title is present, put it in the beginning of
# section value as if it was there.
section_value = ('<heading level="3">%s</heading>' %
section_title) + section_value
result.append(section_value)
return result
def _extract_links_from_text(self, text):
"""
Extract supplement links from the html text. Links may be provided
in two ways:
1. <a> tags with href attribute
2. <asset> tags with id attribute (requires additional request
to get the direct URL to the asset file)
@param text: HTML text.
@type text: str
@return: Dictionary with supplement links grouped by extension.
@rtype: {
'<extension1>': [
('<link1>', '<title1>'),
('<link2>', '<title2')
],
'extension2': [
('<link3>', '<title3>'),
('<link4>', '<title4>')
],
...
}
"""
supplement_links = self._extract_links_from_a_tags_in_text(text)
extend_supplement_links(
supplement_links,
self._extract_links_from_asset_tags_in_text(text))
return supplement_links
def _extract_links_from_asset_tags_in_text(self, text):
"""
Scan the text and extract asset tags and links to corresponding
files.
@param text: Page text.
@type text: str
@return: @see CourseraOnDemand._extract_links_from_text
"""
# Extract asset tags from instructions text
asset_tags_map = self._extract_asset_tags(text)
ids = list(iterkeys(asset_tags_map))
if not ids:
return {}
# asset tags contain asset names and ids. We need to make another
# HTTP request to get asset URL.
asset_urls = self._extract_asset_urls(ids)
supplement_links = {}
# Build supplement links, providing nice titles along the way
for asset in asset_urls:
title = clean_filename(
asset_tags_map[asset['id']]['name'],
self._unrestricted_filenames)
extension = clean_filename(
asset_tags_map[asset['id']]['extension'].strip(),
self._unrestricted_filenames)
url = asset['url'].strip()
if extension not in supplement_links:
supplement_links[extension] = []
supplement_links[extension].append((url, title))
return supplement_links
def _extract_links_from_a_tags_in_text(self, text):
"""
Extract supplement links from the html text that contains <a> tags
with href attribute.
@param text: HTML text.
@type text: str
@return: Dictionary with supplement links grouped by extension.
@rtype: {
'<extension1>': [
('<link1>', '<title1>'),
('<link2>', '<title2')
],
'extension2': [
('<link3>', '<title3>'),
('<link4>', '<title4>')
]
}
"""
soup = BeautifulSoup(text)
links = [item['href'].strip()
for item in soup.find_all('a') if 'href' in item.attrs]
links = sorted(list(set(links)))
supplement_links = {}
for link in links:
filename, extension = os.path.splitext(clean_url(link))
# Some courses put links to sites in supplement section, e.g.:
# http://pandas.pydata.org/
if extension is '':
continue
# Make lowercase and cut the leading/trailing dot
extension = clean_filename(
extension.lower().strip('.').strip(),
self._unrestricted_filenames)
basename = clean_filename(
os.path.basename(filename),
self._unrestricted_filenames)
if extension not in supplement_links:
supplement_links[extension] = []
# Putting basename into the second slot of the tuple is important
# because that will allow to download many supplements within a
# single lecture, e.g.:
# 01_slides-presented-in-this-module.pdf
# 01_slides-presented-in-this-module_Dalal-cvpr05.pdf
# 01_slides-presented-in-this-module_LM-3dtexton.pdf
supplement_links[extension].append((link, basename))
return supplement_links