core/domain/exp_services.py
# coding: utf-8
#
# Copyright 2014 The Oppia Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Commands that can be used to operate on explorations.
All functions here should be agnostic of how ExplorationModel objects are
stored in the database. In particular, the various query methods should
delegate to the Exploration model class. This will enable the exploration
storage model to be changed without affecting this module and others above it.
"""
from __future__ import annotations
import collections
import datetime
import io
import logging
import math
import os
import pprint
import re
import zipfile
from core import android_validation_constants
from core import feconf
from core import utils
from core.constants import constants
from core.domain import activity_services
from core.domain import caching_services
from core.domain import change_domain
from core.domain import classifier_services
from core.domain import draft_upgrade_services
from core.domain import email_manager
from core.domain import email_subscription_services
from core.domain import exp_domain
from core.domain import exp_fetchers
from core.domain import feedback_services
from core.domain import fs_services
from core.domain import html_cleaner
from core.domain import html_validation_service
from core.domain import opportunity_services
from core.domain import param_domain
from core.domain import recommendations_services
from core.domain import rights_domain
from core.domain import rights_manager
from core.domain import search_services
from core.domain import state_domain
from core.domain import stats_domain
from core.domain import stats_services
from core.domain import suggestion_services
from core.domain import taskqueue_services
from core.domain import translation_services
from core.domain import user_domain
from core.domain import user_services
from core.domain import voiceover_services
from core.platform import models
from extensions import domain
import deepdiff
from typing import (
Dict, Final, List, Literal, Optional, Sequence, Tuple, Type, TypedDict,
Union, cast, overload
)
MYPY = False
if MYPY: # pragma: no cover
from mypy_imports import base_models
from mypy_imports import datastore_services
from mypy_imports import exp_models
from mypy_imports import stats_models
from mypy_imports import user_models
(base_models, exp_models, stats_models, user_models) = (
models.Registry.import_models([
models.Names.BASE_MODEL,
models.Names.EXPLORATION,
models.Names.STATISTICS,
models.Names.USER
])
)
datastore_services = models.Registry.import_datastore_services()
AcceptableActivityModelTypes = Union[
user_models.CompletedActivitiesModel,
user_models.IncompleteActivitiesModel
]
class UserExplorationDataDict(TypedDict):
"""Dictionary representing the user's specific exploration data."""
exploration_id: str
title: str
category: str
objective: str
language_code: str
tags: List[str]
init_state_name: str
states: Dict[str, state_domain.StateDict]
param_specs: Dict[str, param_domain.ParamSpecDict]
param_changes: List[param_domain.ParamChangeDict]
version: int
auto_tts_enabled: bool
edits_allowed: bool
draft_change_list_id: int
rights: rights_domain.ActivityRightsDict
show_state_editor_tutorial_on_load: bool
show_state_translation_tutorial_on_load: bool
is_version_of_draft_valid: Optional[bool]
draft_changes: Dict[str, str]
email_preferences: user_domain.UserExplorationPrefsDict
next_content_id_index: int
exploration_metadata: exp_domain.ExplorationMetadataDict
class SnapshotsMetadataDict(TypedDict):
"""Dictionary representing the snapshot metadata for exploration model."""
committer_id: str
commit_message: str
commit_cmds: List[Dict[str, change_domain.AcceptableChangeDictTypes]]
commit_type: str
version_number: int
created_on_ms: float
# Name for the exploration search index.
SEARCH_INDEX_EXPLORATIONS: Final = 'explorations'
# The maximum number of iterations allowed for populating the results of a
# search query.
MAX_ITERATIONS: Final = 10
# NOTE TO DEVELOPERS: The get_story_ids_linked_to_explorations function was
# removed in #13021 as part of the migration to Apache Beam. Please refer to
# that PR if you need to reinstate it.
def is_exp_summary_editable(
exp_summary: exp_domain.ExplorationSummary, user_id: str
) -> bool:
"""Checks if a given user has permissions to edit the exploration.
Args:
exp_summary: ExplorationSummary. An ExplorationSummary domain object.
user_id: str. The id of the user whose permissions are being checked.
Returns:
bool. Whether the user has permissions to edit the exploration.
"""
return user_id is not None and (
user_id in exp_summary.editor_ids
or user_id in exp_summary.owner_ids
or exp_summary.community_owned)
# Query methods.
def get_exploration_titles_and_categories(
exp_ids: List[str]
) -> Dict[str, Dict[str, str]]:
"""Returns exploration titles and categories for the given ids.
The result is a dict with exploration ids as keys. The corresponding values
are dicts with the keys 'title' and 'category'.
Any invalid exp_ids will not be included in the return dict. No error will
be raised.
Args:
exp_ids: list(str). A list of exploration ids of exploration domain
objects.
Returns:
dict. The keys are exploration ids and the corresponding values are
dicts with the keys 'title' and 'category'. Any invalid exploration
ids are excluded.
"""
explorations = [
(exp_fetchers.get_exploration_from_model(e) if e else None)
for e in exp_models.ExplorationModel.get_multi(
exp_ids, include_deleted=True)]
result = {}
for exploration in explorations:
if exploration is None:
logging.error(
'Could not find exploration corresponding to id')
else:
result[exploration.id] = {
'title': exploration.title,
'category': exploration.category,
}
return result
def get_exploration_ids_matching_query(
query_string: str,
categories: List[str],
language_codes: List[str],
offset: Optional[int] = None
) -> Tuple[List[str], Optional[int]]:
"""Returns a list with all exploration ids matching the given search query
string, as well as a search offset for future fetches.
This method returns exactly feconf.SEARCH_RESULTS_PAGE_SIZE results if
there are at least that many, otherwise it returns all remaining results.
(If this behaviour does not occur, an error will be logged.) The method
also returns a search offset.
Args:
query_string: str. A search query string.
categories: list(str). The list of categories to query for. If it is
empty, no category filter is applied to the results. If it is not
empty, then a result is considered valid if it matches at least one
of these categories.
language_codes: list(str). The list of language codes to query for. If
it is empty, no language code filter is applied to the results. If
it is not empty, then a result is considered valid if it matches at
least one of these language codes.
offset: int or None. Optional offset from which to start the search
query. If no offset is supplied, the first N results matching
the query are returned.
Returns:
2-tuple of (returned_exploration_ids, search_offset). Where:
returned_exploration_ids : list(str). A list with all
exploration ids matching the given search query string,
as well as a search offset for future fetches.
The list contains exactly feconf.SEARCH_RESULTS_PAGE_SIZE
results if there are at least that many, otherwise it
contains all remaining results. (If this behaviour does
not occur, an error will be logged.)
search_offset: int. Search offset for future fetches.
"""
returned_exploration_ids: List[str] = []
search_offset = offset
for _ in range(MAX_ITERATIONS):
remaining_to_fetch = feconf.SEARCH_RESULTS_PAGE_SIZE - len(
returned_exploration_ids)
exp_ids, search_offset = search_services.search_explorations(
query_string, categories, language_codes, remaining_to_fetch,
offset=search_offset)
invalid_exp_ids = []
for ind, model in enumerate(
exp_models.ExpSummaryModel.get_multi(exp_ids)):
if model is not None:
returned_exploration_ids.append(exp_ids[ind])
else:
invalid_exp_ids.append(exp_ids[ind])
if (len(returned_exploration_ids) == feconf.SEARCH_RESULTS_PAGE_SIZE
or search_offset is None):
break
logging.error(
'Search index contains stale exploration ids: %s' %
', '.join(invalid_exp_ids))
if (len(returned_exploration_ids) < feconf.SEARCH_RESULTS_PAGE_SIZE
and search_offset is not None):
logging.error(
'Could not fulfill search request for query string %s; at least '
'%s retries were needed.' % (query_string, MAX_ITERATIONS))
return (returned_exploration_ids, search_offset)
def get_non_private_exploration_summaries(
) -> Dict[str, exp_domain.ExplorationSummary]:
"""Returns a dict with all non-private exploration summary domain objects,
keyed by their id.
Returns:
dict. The keys are exploration ids and the values are corresponding
non-private ExplorationSummary domain objects.
"""
return exp_fetchers.get_exploration_summaries_from_models(
exp_models.ExpSummaryModel.get_non_private())
def get_top_rated_exploration_summaries(
limit: int
) -> Dict[str, exp_domain.ExplorationSummary]:
"""Returns a dict with top rated exploration summary model instances,
keyed by their id. At most 'limit' entries are returned.
Args:
limit: int. The maximum number of exploration summary model instances to
be returned.
Returns:
dict. The keys are exploration ids and the values are the corresponding
top rated ExplorationSummary domain model instances. At most limit
entries are returned.
"""
return exp_fetchers.get_exploration_summaries_from_models(
exp_models.ExpSummaryModel.get_top_rated(limit))
def get_recently_published_exp_summaries(
limit: int
) -> Dict[str, exp_domain.ExplorationSummary]:
"""Returns a dict with recently published ExplorationSummary model
instances, keyed by their exploration id. At most 'limit' entries are
returned.
Args:
limit: int. The maximum number of exploration summary model instances to
be returned.
Returns:
dict. The dict contains recently published ExplorationSummary model
instances as a value keyed by their exploration id. At most 'limit'
entries are returned.
"""
return exp_fetchers.get_exploration_summaries_from_models(
exp_models.ExpSummaryModel.get_recently_published(limit))
def get_story_id_linked_to_exploration(exp_id: str) -> Optional[str]:
"""Returns the ID of the story that the exploration is a part of, or None if
the exploration is not part of a story.
Args:
exp_id: str. The ID of the exploration.
Returns:
str|None. The ID of the story if the exploration is linked to some
story, otherwise None.
"""
exploration_context_model = exp_models.ExplorationContextModel.get(
exp_id, strict=False)
if exploration_context_model is None:
return None
# TODO(#15621): The explicit declaration of type for ndb properties
# should be removed. Currently, these ndb properties are annotated with
# Any return type. Once we have proper return type we can remove this.
story_id: str = exploration_context_model.story_id
return story_id
def get_all_exploration_summaries() -> Dict[str, exp_domain.ExplorationSummary]:
"""Returns a dict with all exploration summary domain objects,
keyed by their id.
Returns:
dict. A dict with all ExplorationSummary domain objects keyed by their
exploration id.
"""
return exp_fetchers.get_exploration_summaries_from_models(
exp_models.ExpSummaryModel.get_all().fetch())
# Methods for exporting states and explorations to other formats.
def export_to_zip_file(
exploration_id: str, version: Optional[int] = None
) -> io.BytesIO:
"""Returns a ZIP archive of the exploration.
Args:
exploration_id: str. The id of the exploration to export.
version: int or None. If provided, this indicates which version of
the exploration to export. Otherwise, the latest version of the
exploration is exported.
Returns:
BytesIO. The contents of the ZIP archive of the exploration
(which can be subsequently converted into a zip file via
zipfile.ZipFile()).
"""
exploration = exp_fetchers.get_exploration_by_id(
exploration_id, version=version)
yaml_repr = exploration.to_yaml()
temp_file = io.BytesIO()
with zipfile.ZipFile(
temp_file, mode='w', compression=zipfile.ZIP_DEFLATED) as zfile:
if not exploration.title:
zfile.writestr('Unpublished_exploration.yaml', yaml_repr)
else:
exploration_file_name = re.sub(
r'[^A-Za-z0-9_ -]+', '', exploration.title)
# Trim whitespace when checking to handle potential
# whitespace-only 'exploration_file_name'.
if not exploration_file_name.strip():
zfile.writestr('exploration.yaml', yaml_repr)
else:
zfile.writestr('%s.yaml' % exploration_file_name, yaml_repr)
fs = fs_services.GcsFileSystem(
feconf.ENTITY_TYPE_EXPLORATION, exploration_id)
html_string_list = exploration.get_all_html_content_strings()
image_filenames = (
html_cleaner.get_image_filenames_from_html_strings(
html_string_list))
for filename in image_filenames:
filepath = 'image/%s' % filename
file_contents = fs.get(filepath)
str_filepath = 'assets/%s' % filepath
logging.error(str_filepath)
zfile.writestr(str_filepath, file_contents)
return temp_file
def export_states_to_yaml(
exploration_id: str, version: Optional[int] = None, width: int = 80
) -> Dict[str, str]:
"""Returns a dictionary of the exploration, whose keys are state
names and values are yaml strings representing the state contents with
lines wrapped at 'width' characters.
Args:
exploration_id: str. The id of the exploration whose states should
be exported.
version: int or None. The version of the exploration to be returned.
If None, the latest version of the exploration is returned.
width: int. Width for the yaml representation, default value
is set to be of 80.
Returns:
dict. The keys are state names, and the values are YAML strings
representing the corresponding state's contents.
"""
exploration = exp_fetchers.get_exploration_by_id(
exploration_id, version=version)
exploration_dict = {}
for state in exploration.states:
exploration_dict[state] = utils.yaml_from_dict(
exploration.states[state].to_dict(),
width=width
)
return exploration_dict
# Repository SAVE and DELETE methods.
def apply_change_list(
exploration_id: str, change_list: Sequence[exp_domain.ExplorationChange]
) -> exp_domain.Exploration:
"""Applies a changelist to a pristine exploration and returns the result.
Each entry in change_list is a dict that represents an ExplorationChange
object.
Args:
exploration_id: str. The id of the exploration to which the change list
is to be applied.
change_list: list(ExplorationChange). The list of changes to apply.
Returns:
Exploration. The exploration domain object that results from applying
the given changelist to the existing version of the exploration.
Raises:
Exception. Any entries in the changelist are invalid.
Exception. Solution cannot exist with None interaction id.
"""
exploration = exp_fetchers.get_exploration_by_id(exploration_id)
try:
to_param_domain = param_domain.ParamChange.from_dict
for change in change_list:
if change.cmd == exp_domain.CMD_ADD_STATE:
# Here we use cast because we are narrowing down the type from
# ExplorationChange to a specific change command.
add_state_cmd = cast(
exp_domain.AddExplorationStateCmd,
change
)
exploration.add_state(
add_state_cmd.state_name,
add_state_cmd.content_id_for_state_content,
add_state_cmd.content_id_for_default_outcome
)
elif change.cmd == exp_domain.CMD_RENAME_STATE:
# Here we use cast because we are narrowing down the type from
# ExplorationChange to a specific change command.
rename_state_cmd = cast(
exp_domain.RenameExplorationStateCmd,
change
)
exploration.rename_state(
rename_state_cmd.old_state_name,
rename_state_cmd.new_state_name
)
elif change.cmd == exp_domain.CMD_DELETE_STATE:
# Here we use cast because we are narrowing down the type from
# ExplorationChange to a specific change command.
delete_state_cmd = cast(
exp_domain.DeleteExplorationStateCmd,
change
)
exploration.delete_state(delete_state_cmd.state_name)
elif change.cmd == exp_domain.CMD_EDIT_STATE_PROPERTY:
state: state_domain.State = exploration.states[
change.state_name]
if (change.property_name ==
exp_domain.STATE_PROPERTY_PARAM_CHANGES):
# Here we use cast because this 'if' condition forces
# change to have type EditExpStatePropertyParamChangesCmd.
edit_param_changes_cmd = cast(
exp_domain.EditExpStatePropertyParamChangesCmd,
change
)
state.update_param_changes(list(map(
to_param_domain, edit_param_changes_cmd.new_value
)))
elif change.property_name == exp_domain.STATE_PROPERTY_CONTENT:
# Here we use cast because this 'elif' condition forces
# change to have type EditExpStatePropertyContentCmd.
edit_content_cmd = cast(
exp_domain.EditExpStatePropertyContentCmd,
change
)
content = (
state_domain.SubtitledHtml.from_dict(
edit_content_cmd.new_value
)
)
content.validate()
state.update_content(content)
elif (change.property_name ==
exp_domain.STATE_PROPERTY_INTERACTION_ID):
state.update_interaction_id(change.new_value)
elif (change.property_name ==
exp_domain.STATE_PROPERTY_LINKED_SKILL_ID):
# Here we use cast because this 'elif'
# condition forces change to have type
# EditExpStatePropertyLinkedSkillIdCmd.
edit_linked_skill_id_cmd = cast(
exp_domain.EditExpStatePropertyLinkedSkillIdCmd,
change
)
state.update_linked_skill_id(
edit_linked_skill_id_cmd.new_value
)
elif (change.property_name ==
exp_domain.STATE_PROPERTY_INTERACTION_CUST_ARGS):
# Here we use cast because this 'elif'
# condition forces change to have type
# EditExpStatePropertyInteractionCustArgsCmd.
edit_interaction_cust_arg_cmd = cast(
exp_domain.EditExpStatePropertyInteractionCustArgsCmd,
change
)
state.update_interaction_customization_args(
edit_interaction_cust_arg_cmd.new_value)
elif (change.property_name ==
exp_domain.STATE_PROPERTY_INTERACTION_HANDLERS):
raise utils.InvalidInputException(
'Editing interaction handlers is no longer supported')
elif (change.property_name ==
exp_domain.STATE_PROPERTY_INTERACTION_ANSWER_GROUPS):
# Here we use cast because this 'elif'
# condition forces change to have type
# EditExpStatePropertyInteractionAnswerGroupsCmd.
edit_interaction_answer_group_cmd = cast(
exp_domain.EditExpStatePropertyInteractionAnswerGroupsCmd, # pylint: disable=line-too-long
change
)
answer_groups = (
edit_interaction_answer_group_cmd.new_value
)
new_answer_groups = [
state_domain.AnswerGroup.from_dict(answer_group)
for answer_group in answer_groups
]
state.update_interaction_answer_groups(new_answer_groups)
elif (change.property_name ==
exp_domain.STATE_PROPERTY_INTERACTION_DEFAULT_OUTCOME):
new_outcome = None
if change.new_value:
# Here we use cast because this 'elif'
# condition forces change to have type
# EditExpStatePropertyInteractionDefaultOutcomeCmd.
edit_interaction_default_outcome_cmd = cast(
exp_domain.EditExpStatePropertyInteractionDefaultOutcomeCmd, # pylint: disable=line-too-long
change
)
new_outcome = state_domain.Outcome.from_dict(
edit_interaction_default_outcome_cmd.new_value
)
state.update_interaction_default_outcome(new_outcome)
elif (change.property_name ==
exp_domain.STATE_PROPERTY_UNCLASSIFIED_ANSWERS):
# Here we use cast because this 'elif'
# condition forces change to have type
# EditExpStatePropertyUnclassifiedAnswersCmd.
edit_unclassified_answers_cmd = cast(
exp_domain.EditExpStatePropertyUnclassifiedAnswersCmd,
change
)
state.update_interaction_confirmed_unclassified_answers(
edit_unclassified_answers_cmd.new_value)
elif (change.property_name ==
exp_domain.STATE_PROPERTY_INTERACTION_HINTS):
# Here we use cast because this 'elif'
# condition forces change to have type
# EditExpStatePropertyInteractionHintsCmd.
edit_state_interaction_hints_cmd = cast(
exp_domain.EditExpStatePropertyInteractionHintsCmd,
change
)
hint_dicts = (
edit_state_interaction_hints_cmd.new_value
)
if not isinstance(hint_dicts, list):
raise Exception(
'Expected hints_list to be a list,'
' received %s' % hint_dicts)
new_hints_list = [
state_domain.Hint.from_dict(hint_dict)
for hint_dict in hint_dicts
]
state.update_interaction_hints(new_hints_list)
elif (change.property_name ==
exp_domain.STATE_PROPERTY_INTERACTION_SOLUTION):
new_solution = None
# Here we use cast because this 'elif'
# condition forces change to have type
# EditExpStatePropertyInteractionSolutionCmd.
edit_interaction_solution_cmd = cast(
exp_domain.EditExpStatePropertyInteractionSolutionCmd,
change
)
if edit_interaction_solution_cmd.new_value is not None:
if state.interaction.id is None:
raise Exception(
'solution cannot exist with None '
'interaction id.'
)
new_solution = state_domain.Solution.from_dict(
state.interaction.id,
edit_interaction_solution_cmd.new_value
)
state.update_interaction_solution(new_solution)
elif (change.property_name ==
exp_domain.STATE_PROPERTY_SOLICIT_ANSWER_DETAILS):
if not isinstance(change.new_value, bool):
raise Exception(
'Expected solicit_answer_details to be a ' +
'bool, received %s' % change.new_value)
# Here we use cast because this 'elif'
# condition forces change to have type
# EditExpStatePropertySolicitAnswerDetailsCmd.
edit_solicit_answer_details_cmd = cast(
exp_domain.EditExpStatePropertySolicitAnswerDetailsCmd,
change
)
state.update_solicit_answer_details(
edit_solicit_answer_details_cmd.new_value
)
elif (change.property_name ==
exp_domain.STATE_PROPERTY_CARD_IS_CHECKPOINT):
if not isinstance(change.new_value, bool):
raise Exception(
'Expected card_is_checkpoint to be a ' +
'bool, received %s' % change.new_value)
# Here we use cast because this 'elif'
# condition forces change to have type
# EditExpStatePropertyCardIsCheckpointCmd.
edit_card_is_checkpoint_cmd = cast(
exp_domain.EditExpStatePropertyCardIsCheckpointCmd,
change
)
state.update_card_is_checkpoint(
edit_card_is_checkpoint_cmd.new_value
)
elif (change.property_name ==
exp_domain.STATE_PROPERTY_RECORDED_VOICEOVERS):
if not isinstance(change.new_value, dict):
raise Exception(
'Expected recorded_voiceovers to be a dict, '
'received %s' % change.new_value)
# Explicitly convert the duration_secs value from
# int to float. Reason for this is the data from
# the frontend will be able to match the backend
# state model for Voiceover properly. Also js
# treats any number that can be float and int as
# int (no explicit types). For example,
# 10.000 is not 10.000 it is 10.
# Here we use cast because this 'elif'
# condition forces change to have type
# EditExpStatePropertyRecordedVoiceoversCmd.
edit_recorded_voiceovers_cmd = cast(
exp_domain.EditExpStatePropertyRecordedVoiceoversCmd,
change
)
new_voiceovers_mapping = (
edit_recorded_voiceovers_cmd.new_value[
'voiceovers_mapping'
]
)
language_codes_to_audio_metadata = (
new_voiceovers_mapping.values())
for language_codes in language_codes_to_audio_metadata:
for audio_metadata in language_codes.values():
audio_metadata['duration_secs'] = (
float(audio_metadata['duration_secs'])
)
recorded_voiceovers = (
state_domain.RecordedVoiceovers.from_dict(
change.new_value))
state.update_recorded_voiceovers(recorded_voiceovers)
elif change.cmd == exp_domain.CMD_EDIT_EXPLORATION_PROPERTY:
if change.property_name == 'title':
# Here we use cast because this 'if' condition forces
# change to have type EditExplorationPropertyTitleCmd.
edit_title_cmd = cast(
exp_domain.EditExplorationPropertyTitleCmd,
change
)
exploration.update_title(edit_title_cmd.new_value)
elif change.property_name == 'category':
# Here we use cast because this 'elif' condition forces
# change to have type EditExplorationPropertyCategoryCmd.
edit_category_cmd = cast(
exp_domain.EditExplorationPropertyCategoryCmd,
change
)
exploration.update_category(edit_category_cmd.new_value)
elif change.property_name == 'objective':
# Here we use cast because this 'elif' condition forces
# change to have type EditExplorationPropertyObjectiveCmd.
edit_objective_cmd = cast(
exp_domain.EditExplorationPropertyObjectiveCmd,
change
)
exploration.update_objective(edit_objective_cmd.new_value)
elif change.property_name == 'language_code':
# Here we use cast because this 'elif'
# condition forces change to have type
# EditExplorationPropertyLanguageCodeCmd.
edit_language_code_cmd = cast(
exp_domain.EditExplorationPropertyLanguageCodeCmd,
change
)
exploration.update_language_code(
edit_language_code_cmd.new_value
)
elif change.property_name == 'tags':
# Here we use cast because this 'elif' condition forces
# change to have type EditExplorationPropertyTagsCmd.
edit_tags_cmd = cast(
exp_domain.EditExplorationPropertyTagsCmd,
change
)
exploration.update_tags(edit_tags_cmd.new_value)
elif change.property_name == 'blurb':
# Here we use cast because this 'elif' condition forces
# change to have type EditExplorationPropertyBlurbCmd.
edit_blurb_cmd = cast(
exp_domain.EditExplorationPropertyBlurbCmd,
change
)
exploration.update_blurb(edit_blurb_cmd.new_value)
elif change.property_name == 'author_notes':
# Here we use cast because this 'elif' condition forces
# change to have type EditExplorationPropertyAuthorNotesCmd.
edit_author_notes_cmd = cast(
exp_domain.EditExplorationPropertyAuthorNotesCmd,
change
)
exploration.update_author_notes(
edit_author_notes_cmd.new_value
)
elif change.property_name == 'param_specs':
# Here we use cast because this 'elif' condition forces
# change to have type EditExplorationPropertyParamSpecsCmd.
edit_param_specs_cmd = cast(
exp_domain.EditExplorationPropertyParamSpecsCmd,
change
)
exploration.update_param_specs(
edit_param_specs_cmd.new_value
)
elif change.property_name == 'param_changes':
# Here we use cast because this 'elif'
# condition forces change to have type
# EditExplorationPropertyParamChangesCmd.
edit_exp_param_changes_cmd = cast(
exp_domain.EditExplorationPropertyParamChangesCmd,
change
)
exploration.update_param_changes(
list(
map(
to_param_domain,
edit_exp_param_changes_cmd.new_value
)
)
)
elif change.property_name == 'init_state_name':
# Here we use cast because this 'elif'
# condition forces change to have type
# EditExplorationPropertyInitStateNameCmd.
edit_init_state_name_cmd = cast(
exp_domain.EditExplorationPropertyInitStateNameCmd,
change
)
exploration.update_init_state_name(
edit_init_state_name_cmd.new_value
)
elif change.property_name == 'auto_tts_enabled':
# Here we use cast because this 'elif'
# condition forces change to have type
# EditExplorationPropertyAutoTtsEnabledCmd.
edit_auto_tts_enabled_cmd = cast(
exp_domain.EditExplorationPropertyAutoTtsEnabledCmd,
change
)
exploration.update_auto_tts_enabled(
edit_auto_tts_enabled_cmd.new_value
)
elif change.property_name == 'next_content_id_index':
# Here we use cast because this 'elif'
# condition forces change to have type
# EditExplorationPropertyNextContentIdIndexCmd.
cmd = cast(
exp_domain.EditExplorationPropertyNextContentIdIndexCmd,
change
)
next_content_id_index = max(
cmd.new_value, exploration.next_content_id_index)
exploration.update_next_content_id_index(
next_content_id_index)
elif (change.cmd ==
exp_domain.CMD_MIGRATE_STATES_SCHEMA_TO_LATEST_VERSION):
# Loading the exploration model from the datastore into an
# Exploration domain object automatically converts it to use
# the latest states schema version. As a result, simply
# resaving the exploration is sufficient to apply the states
# schema update. Thus, no action is needed here other than
# to make sure that the version that the user is trying to
# migrate to is the latest version.
# Here we use cast because we are narrowing down the type from
# ExplorationChange to a specific change command.
migrate_states_schema_cmd = cast(
exp_domain.MigrateStatesSchemaToLatestVersionCmd,
change
)
target_version_is_current_state_schema_version = (
migrate_states_schema_cmd.to_version ==
str(feconf.CURRENT_STATE_SCHEMA_VERSION)
)
if not target_version_is_current_state_schema_version:
raise Exception(
'Expected to migrate to the latest state schema '
'version %s, received %s' % (
feconf.CURRENT_STATE_SCHEMA_VERSION,
migrate_states_schema_cmd.to_version))
return exploration
except Exception as e:
logging.error(
'%s %s %s %s' % (
e.__class__.__name__, e, exploration_id,
pprint.pformat(change_list))
)
raise e
def populate_exp_model_fields(
exp_model: exp_models.ExplorationModel, exploration: exp_domain.Exploration
) -> exp_models.ExplorationModel:
"""Populate exploration model with the data from Exploration object.
Args:
exp_model: ExplorationModel. The model to populate.
exploration: Exploration. The exploration domain object which should be
used to populate the model.
Returns:
ExplorationModel. Populated model.
"""
exp_model.title = exploration.title
exp_model.category = exploration.category
exp_model.objective = exploration.objective
exp_model.language_code = exploration.language_code
exp_model.tags = exploration.tags
exp_model.blurb = exploration.blurb
exp_model.author_notes = exploration.author_notes
exp_model.states_schema_version = exploration.states_schema_version
exp_model.init_state_name = exploration.init_state_name
exp_model.states = {
state_name: state.to_dict()
for (state_name, state) in exploration.states.items()}
exp_model.param_specs = exploration.param_specs_dict
exp_model.param_changes = exploration.param_change_dicts
exp_model.auto_tts_enabled = exploration.auto_tts_enabled
exp_model.edits_allowed = exploration.edits_allowed
exp_model.next_content_id_index = exploration.next_content_id_index
return exp_model
def populate_exp_summary_model_fields(
exp_summary_model: Optional[exp_models.ExpSummaryModel],
exp_summary: exp_domain.ExplorationSummary
) -> exp_models.ExpSummaryModel:
"""Populate exploration summary model with the data from
ExplorationSummary object.
Args:
exp_summary_model: ExpSummaryModel|None. The model to populate.
If None, we create a new model instead.
exp_summary: ExplorationSummary. The exploration domain object which
should be used to populate the model.
Returns:
ExpSummaryModel. Populated model.
"""
exp_summary_dict = {
'title': exp_summary.title,
'category': exp_summary.category,
'objective': exp_summary.objective,
'language_code': exp_summary.language_code,
'tags': exp_summary.tags,
'ratings': exp_summary.ratings,
'scaled_average_rating': exp_summary.scaled_average_rating,
'exploration_model_last_updated': (
exp_summary.exploration_model_last_updated),
'exploration_model_created_on': (
exp_summary.exploration_model_created_on),
'first_published_msec': exp_summary.first_published_msec,
'status': exp_summary.status,
'community_owned': exp_summary.community_owned,
'owner_ids': exp_summary.owner_ids,
'editor_ids': exp_summary.editor_ids,
'voice_artist_ids': exp_summary.voice_artist_ids,
'viewer_ids': exp_summary.viewer_ids,
'contributor_ids': list(exp_summary.contributors_summary.keys()),
'contributors_summary': exp_summary.contributors_summary,
'version': exp_summary.version
}
if exp_summary_model is not None:
exp_summary_model.populate(**exp_summary_dict)
else:
exp_summary_dict['id'] = exp_summary.id
exp_summary_model = exp_models.ExpSummaryModel(**exp_summary_dict)
return exp_summary_model
def update_states_version_history(
states_version_history: Dict[str, state_domain.StateVersionHistory],
change_list: Sequence[exp_domain.ExplorationChange],
old_states_dict: Dict[str, state_domain.StateDict],
new_states_dict: Dict[str, state_domain.StateDict],
current_version: int,
committer_id: str
) -> Dict[str, state_domain.StateVersionHistory]:
"""Updates the version history of each state at a particular version
of an exploration.
Args:
states_version_history: dict(str, StateVersionHistory). The version
history data of each state in the previous version of the
exploration.
change_list: list(ExplorationChange). A list of changes introduced in
this commit.
old_states_dict: dict(str, dict). The states in the previous version of
the exploration.
new_states_dict: dict(str, dict). The states in the current version of
the exploration.
current_version: int. The latest version of the exploration.
committer_id: str. The id of the user who made the commit.
Returns:
states_version_history: dict(str, StateVersionHistory). The updated
version history data of each state.
"""
exp_versions_diff = exp_domain.ExplorationVersionsDiff(change_list)
prev_version = current_version - 1
# Firstly, delete the states from the state version history which were
# deleted during this commit.
for state_name in exp_versions_diff.deleted_state_names:
del states_version_history[state_name]
# Now, handle the updation of version history of states which were renamed.
# Firstly, we need to clean up the exp_versions_diff.old_to_new_state_names
# dict from the state names which are not effectively changed. For example,
# if a state was renamed from state_1 to state_2 and then from state_2 to
# state_1 in the same commit, then there is no effective change in state
# name and we need to clear them from this dict.
effective_old_to_new_state_names = {}
for old_state_name, new_state_name in (
exp_versions_diff.old_to_new_state_names.items()
):
if old_state_name != new_state_name:
effective_old_to_new_state_names[old_state_name] = new_state_name
for old_state_name in effective_old_to_new_state_names:
del states_version_history[old_state_name]
for old_state_name, new_state_name in (
effective_old_to_new_state_names.items()
):
states_version_history[new_state_name] = (
state_domain.StateVersionHistory(
prev_version, old_state_name, committer_id))
# The following list includes states which exist in both the old states
# and new states and were not renamed.
states_which_were_not_renamed = []
for state_name in old_states_dict:
if (
state_name not in exp_versions_diff.deleted_state_names and
state_name not in effective_old_to_new_state_names
):
states_which_were_not_renamed.append(state_name)
# We have dealt with state additions, deletions and renames.
# Now we deal with states which were present in both versions and
# underwent changes only through the command EDIT_STATE_PROPERTY.
# The following dict stores whether the properties of states present
# in states_which_were_not_renamed were changed using EDIT_STATE_PROPERTY.
state_property_changed_data = {
state_name: False
for state_name in states_which_were_not_renamed
}
# The following ignore list contains those state properties which are
# related to voiceovers. Hence, they are ignored in order to avoid
# updating the version history in case of voiceover-only commits.
state_property_ignore_list = [
exp_domain.STATE_PROPERTY_RECORDED_VOICEOVERS
]
for change in change_list:
if (
change.cmd == exp_domain.CMD_EDIT_STATE_PROPERTY and
change.property_name not in state_property_ignore_list
):
state_name = change.state_name
if state_name in state_property_changed_data:
state_property_changed_data[state_name] = True
for state_name, state_property_changed in (
state_property_changed_data.items()):
if state_property_changed:
# The purpose of checking the diff_dict between the two state
# dicts ensure that we do not change the version history of that
# particular state if the overall changes (by EDIT_STATE_PROPERTY)
# get cancelled by each other and there is no 'net change'.
diff_dict = deepdiff.DeepDiff(
old_states_dict[state_name], new_states_dict[state_name])
if diff_dict:
states_version_history[state_name] = (
state_domain.StateVersionHistory(
prev_version, state_name, committer_id
))
# Finally, add the states which were newly added during this commit. The
# version history of these states are initialized as None because they
# were newly added and have no 'previously edited version'.
for state_name in exp_versions_diff.added_state_names:
states_version_history[state_name] = (
state_domain.StateVersionHistory(None, None, committer_id))
return states_version_history
def update_metadata_version_history(
metadata_version_history: exp_domain.MetadataVersionHistory,
change_list: Sequence[exp_domain.ExplorationChange],
old_metadata_dict: exp_domain.ExplorationMetadataDict,
new_metadata_dict: exp_domain.ExplorationMetadataDict,
current_version: int,
committer_id: str
) -> exp_domain.MetadataVersionHistory:
"""Updates the version history of the exploration at a particular version
of an exploration.
Args:
metadata_version_history: MetadataVersionHistory. The metadata version
history at the previous version of the exploration.
change_list: list(ExplorationChange). A list of changes introduced in
this commit.
old_metadata_dict: dict. The exploration metadata at the
previous version of the exploration.
new_metadata_dict: dict. The exploration metadata at the
current version of the exploration.
current_version: int. The latest version of the exploration.
committer_id: str. The id of the user who made the commit.
Returns:
MetadataVersionHistory. The updated metadata version history.
"""
prev_version = current_version - 1
metadata_was_changed = any(
change.cmd == exp_domain.CMD_EDIT_EXPLORATION_PROPERTY
for change in change_list
)
if metadata_was_changed:
# The purpose of checking the diff_dict between the two metadata
# dicts ensure that we do not change the version history if the
# overall changes (by EDIT_EXPLORATION_PROPERTY) get cancelled by
# each other and there is no 'net change'.
diff_dict = deepdiff.DeepDiff(old_metadata_dict, new_metadata_dict)
if diff_dict:
metadata_version_history.last_edited_version_number = prev_version
metadata_version_history.last_edited_committer_id = committer_id
return metadata_version_history
def get_updated_committer_ids(
states_version_history: Dict[str, state_domain.StateVersionHistory],
metadata_last_edited_committer_id: str
) -> List[str]:
"""Extracts a list of user ids who made the 'previous commit' on each state
and the exploration metadata from the exploration states and metadata
version history data.
Args:
states_version_history: dict(str, StateVersionHistory). The version
history data of each state at a particular version of an
exploration.
metadata_last_edited_committer_id: str. User id of the user who
committed the last change in the exploration metadata.
Returns:
list[str]. A list of user ids who made the 'previous commit' on each
state and the exploration metadata.
"""
committer_ids = {
version_history.committer_id
for version_history in states_version_history.values()
}
committer_ids.add(metadata_last_edited_committer_id)
return list(committer_ids)
def get_updated_version_history_model(
exploration: exp_domain.Exploration,
change_list: Sequence[exp_domain.ExplorationChange],
committer_id: str,
old_states: Dict[str, state_domain.State],
old_metadata: exp_domain.ExplorationMetadata
) -> Optional[exp_models.ExplorationVersionHistoryModel]:
"""Returns an updated ExplorationVersionHistoryModel for the new version
of the exploration (after the commit).
Args:
exploration: Exploration. The explortion after the latest commit.
change_list: list(ExplorationChange). A list of changes introduced in
the latest commit.
committer_id: str. The id of the user who made the latest commit.
old_states: dict(str, State). The states in the previous version of
the exploration (before the latest commit).
old_metadata: ExplorationMetadata. The exploration metadata at the
previous version of the exploration (before the latest commit).
Returns:
ExplorationVersionHistoryModel. The updated version history model.
"""
version_history_model_id = (
exp_models.ExplorationVersionHistoryModel.get_instance_id(
exploration.id, exploration.version - 1))
version_history_model = exp_models.ExplorationVersionHistoryModel.get(
version_history_model_id, strict=False)
# TODO(#16433): Remove this check once version history models are generated
# for all exploration versions.
if version_history_model is not None:
old_states_dict = {
state_name: state.to_dict()
for state_name, state in old_states.items()
}
new_states_dict = {
state_name: state.to_dict()
for state_name, state in exploration.states.items()
}
old_metadata_dict = old_metadata.to_dict()
new_metadata_dict = exploration.get_metadata().to_dict()
states_version_history = {
state_name: state_domain.StateVersionHistory.from_dict(
state_version_history_dict)
for state_name, state_version_history_dict in (
version_history_model.state_version_history.items())
}
metadata_version_history = exp_domain.MetadataVersionHistory(
version_history_model.metadata_last_edited_version_number,
version_history_model.metadata_last_edited_committer_id)
updated_states_version_history = update_states_version_history(
states_version_history, change_list, old_states_dict,
new_states_dict, exploration.version, committer_id
)
updated_metadata_version_history = update_metadata_version_history(
metadata_version_history, change_list, old_metadata_dict,
new_metadata_dict, exploration.version, committer_id)
updated_committer_ids = get_updated_committer_ids(
updated_states_version_history,
updated_metadata_version_history.last_edited_committer_id)
updated_version_history_model_id = (
exp_models.ExplorationVersionHistoryModel.get_instance_id(
exploration.id, exploration.version))
updated_version_history_model = (
exp_models.ExplorationVersionHistoryModel(
id=updated_version_history_model_id,
exploration_id=exploration.id,
exploration_version=exploration.version,
state_version_history={
state_name: version_history.to_dict()
for state_name, version_history in (
updated_states_version_history.items())
},
metadata_last_edited_version_number=(
updated_metadata_version_history.last_edited_version_number
),
metadata_last_edited_committer_id=(
updated_metadata_version_history.last_edited_committer_id
),
committer_ids=updated_committer_ids
))
return updated_version_history_model
return None
def _compute_models_for_updating_exploration(
committer_id: str,
exploration: exp_domain.Exploration,
commit_message: Optional[str],
change_list: Sequence[exp_domain.ExplorationChange]
) -> List[base_models.BaseModel]:
"""Returns a list of updated models related to the exploration model to be
put to the datastore. The caller should ensure that the Exploration is
strictly valid before calling this function.
If successful, increments the version number of the incoming exploration
domain object by 1.
Args:
committer_id: str. The id of the user who made the commit.
exploration: Exploration. The exploration to be saved.
commit_message: str or None. A description of changes made to the state.
For published explorations, this must be present; for unpublished
explorations, it should be equal to None.
change_list: list(ExplorationChange). A list of changes introduced in
this commit.
Raises:
Exception. The versions of the given exploration and the currently
stored exploration model do not match.
Returns:
list(BaseModel). A list of models to be put to the datastore.
"""
models_to_put: List[base_models.BaseModel] = []
exploration_model = exp_models.ExplorationModel.get(exploration.id)
if exploration.version > exploration_model.version:
raise Exception(
'Unexpected error: trying to update version %s of exploration '
'from version %s. Please reload the page and try again.'
% (exploration_model.version, exploration.version))
if exploration.version < exploration_model.version:
raise Exception(
'Trying to update version %s of exploration from version %s, '
'which is too old. Please reload the page and try again.'
% (exploration_model.version, exploration.version))
old_states = exp_fetchers.get_exploration_from_model(
exploration_model).states
old_metadata = exp_fetchers.get_exploration_from_model(
exploration_model).get_metadata()
exploration_model = populate_exp_model_fields(
exploration_model, exploration)
change_list_dict = [change.to_dict() for change in change_list]
models_to_put.extend(
exploration_model.get_models_to_put_values(
committer_id,
commit_message,
change_list_dict,
)
)
caching_services.delete_multi(
caching_services.CACHE_NAMESPACE_EXPLORATION,
None,
[exploration.id]
)
exploration.version += 1
exp_versions_diff = exp_domain.ExplorationVersionsDiff(change_list)
# Update the version history data for each state and the exploration
# metadata in the new version of the exploration.
version_history_model = get_updated_version_history_model(
exploration,
change_list,
committer_id,
old_states,
old_metadata
)
if version_history_model is not None:
models_to_put.append(version_history_model)
# Trigger statistics model update.
new_exp_stats = stats_services.get_stats_for_new_exp_version(
exploration.id, exploration.version, list(exploration.states.keys()),
exp_versions_diff, revert_to_version=None)
new_state_stats_mapping = stats_services.get_state_stats_mapping(
new_exp_stats
)
new_exp_stats_instance_id = (
stats_models.ExplorationStatsModel.get_entity_id(
new_exp_stats.exp_id,
new_exp_stats.exp_version
)
)
models_to_put.append(
stats_models.ExplorationStatsModel(
id=new_exp_stats_instance_id,
exp_id=new_exp_stats.exp_id,
exp_version=new_exp_stats.exp_version,
num_starts_v1=new_exp_stats.num_starts_v1,
num_starts_v2=new_exp_stats.num_starts_v2,
num_actual_starts_v1=new_exp_stats.num_actual_starts_v1,
num_actual_starts_v2=new_exp_stats.num_actual_starts_v2,
num_completions_v1=new_exp_stats.num_completions_v1,
num_completions_v2=new_exp_stats.num_completions_v2,
state_stats_mapping=new_state_stats_mapping
)
)
if feconf.ENABLE_ML_CLASSIFIERS:
trainable_states_dict = exploration.get_trainable_states_dict(
old_states, exp_versions_diff)
state_names_with_changed_answer_groups = trainable_states_dict[
'state_names_with_changed_answer_groups']
state_names_with_unchanged_answer_groups = trainable_states_dict[
'state_names_with_unchanged_answer_groups']
state_names_to_train_classifier = state_names_with_changed_answer_groups
if state_names_with_unchanged_answer_groups:
(
state_names_without_classifier,
state_training_jobs_mapping_models_to_put
) = (
classifier_services
.get_new_job_models_for_non_trainable_states(
exploration, state_names_with_unchanged_answer_groups,
exp_versions_diff
)
)
state_names_to_train_classifier.extend(
state_names_without_classifier)
models_to_put.extend(state_training_jobs_mapping_models_to_put)
if state_names_to_train_classifier:
models_to_put.extend(
classifier_services.get_new_job_models_for_trainable_states(
exploration, state_names_to_train_classifier
)
)
# Trigger exploration issues model updation.
models_to_put.extend(
stats_services.get_updated_exp_issues_models_for_new_exp_version(
exploration,
exp_versions_diff,
None
)
)
return models_to_put
def _create_exploration(
committer_id: str,
exploration: exp_domain.Exploration,
commit_message: str,
commit_cmds: List[exp_domain.ExplorationChange]
) -> None:
"""Ensures that rights for a new exploration are saved first.
This is because _compute_models_for_updating_exploration()
depends on the rights object being present to tell it whether to do strict
validation or not.
Args:
committer_id: str. The id of the user who made the commit.
exploration: Exploration. The exploration domain object.
commit_message: str. The commit description message.
commit_cmds: list(ExplorationChange). A list of commands, describing
changes made in this model, which should give sufficient information
to reconstruct the commit.
"""
# This line is needed because otherwise a rights object will be created,
# but the creation of an exploration object will fail.
exploration.validate()
rights_manager.create_new_exploration_rights(exploration.id, committer_id)
model = exp_models.ExplorationModel(
id=exploration.id,
category=exploration.category,
title=exploration.title,
objective=exploration.objective,
language_code=exploration.language_code,
tags=exploration.tags,
blurb=exploration.blurb,
author_notes=exploration.author_notes,
states_schema_version=exploration.states_schema_version,
init_state_name=exploration.init_state_name,
states={
state_name: state.to_dict()
for (state_name, state) in exploration.states.items()},
param_specs=exploration.param_specs_dict,
param_changes=exploration.param_change_dicts,
auto_tts_enabled=exploration.auto_tts_enabled,
next_content_id_index=exploration.next_content_id_index
)
commit_cmds_dict = [commit_cmd.to_dict() for commit_cmd in commit_cmds]
model.commit(committer_id, commit_message, commit_cmds_dict)
exploration.version += 1
version_history_model = exp_models.ExplorationVersionHistoryModel(
id=exp_models.ExplorationVersionHistoryModel.get_instance_id(
exploration.id, exploration.version),
exploration_id=exploration.id,
exploration_version=exploration.version,
state_version_history={
state_name: state_domain.StateVersionHistory(
None, None, committer_id
).to_dict()
for state_name in exploration.states
},
metadata_last_edited_version_number=None,
metadata_last_edited_committer_id=committer_id,
committer_ids=[committer_id]
)
version_history_model.update_timestamps()
version_history_model.put()
# Trigger statistics model creation.
exploration_stats = stats_services.get_stats_for_new_exploration(
exploration.id, exploration.version, list(exploration.states.keys()))
stats_services.create_stats_model(exploration_stats)
if feconf.ENABLE_ML_CLASSIFIERS:
# Find out all states that need a classifier to be trained.
state_names_to_train = []
for state_name in exploration.states:
state = exploration.states[state_name]
if state.can_undergo_classification():
state_names_to_train.append(state_name)
if state_names_to_train:
datastore_services.put_multi(
classifier_services.get_new_job_models_for_trainable_states(
exploration,
state_names_to_train
)
)
# Trigger exploration issues model creation.
stats_services.create_exp_issues_for_new_exploration(
exploration.id, exploration.version)
regenerate_exploration_summary_with_new_contributor(
exploration.id, committer_id)
def save_new_exploration(
committer_id: str, exploration: exp_domain.Exploration
) -> None:
"""Saves a newly created exploration.
Args:
committer_id: str. The id of the user who made the commit.
exploration: Exploration. The exploration domain object to be saved.
"""
commit_message = (
('New exploration created with title \'%s\'.' % exploration.title)
if exploration.title else 'New exploration created.')
_create_exploration(
committer_id, exploration, commit_message, [
exp_domain.CreateNewExplorationCmd({
'cmd': exp_domain.CMD_CREATE_NEW,
'title': exploration.title,
'category': exploration.category,
})])
user_contributions = user_services.get_or_create_new_user_contributions(
committer_id
)
user_contributions.add_created_exploration_id(exploration.id)
user_contributions.add_edited_exploration_id(exploration.id)
user_services.save_user_contributions(user_contributions)
user_services.record_user_created_an_exploration(committer_id)
def delete_exploration(
committer_id: str,
exploration_id: str,
force_deletion: bool = False
) -> None:
"""Deletes the exploration with the given exploration_id.
IMPORTANT: Callers of this function should ensure that committer_id has
permissions to delete this exploration, prior to calling this function.
If force_deletion is True the exploration and its history are fully deleted
and are unrecoverable. Otherwise, the exploration and all its history are
marked as deleted, but the corresponding models are still retained in the
datastore. This last option is the preferred one.
Args:
committer_id: str. The id of the user who made the commit.
exploration_id: str. The id of the exploration to be deleted.
force_deletion: bool. If True, completely deletes the storage models
corresponding to the exploration. Otherwise, marks them as deleted
but keeps the corresponding models in the datastore.
"""
delete_explorations(
committer_id, [exploration_id], force_deletion=force_deletion)
def delete_explorations(
committer_id: str,
exploration_ids: List[str],
force_deletion: bool = False
) -> None:
"""Delete the explorations with the given exploration_ids.
IMPORTANT: Callers of this function should ensure that committer_id has
permissions to delete these explorations, prior to calling this function.
If force_deletion is True the explorations and its histories are fully
deleted and are unrecoverable. Otherwise, the explorations and all its
histories are marked as deleted, but the corresponding models are still
retained in the datastore. This last option is the preferred one.
Args:
committer_id: str. The id of the user who made the commit.
exploration_ids: list(str). The ids of the explorations to be deleted.
force_deletion: bool. If True, completely deletes the storage models
corresponding to the explorations. Otherwise, marks them as deleted
but keeps the corresponding models in the datastore.
"""
# TODO(sll): Delete the files too?
exp_models.ExplorationRightsModel.delete_multi(
exploration_ids, committer_id, '', force_deletion=force_deletion)
exp_models.ExplorationModel.delete_multi(
exploration_ids, committer_id,
feconf.COMMIT_MESSAGE_EXPLORATION_DELETED,
force_deletion=force_deletion)
caching_services.delete_multi(
caching_services.CACHE_NAMESPACE_EXPLORATION, None,
exploration_ids)
# Delete the explorations from search.
search_services.delete_explorations_from_search_index(exploration_ids)
# Delete the exploration summaries, recommendations and opportunities
# regardless of whether or not force_deletion is True.
delete_exploration_summaries(exploration_ids)
recommendations_services.delete_explorations_from_recommendations(
exploration_ids)
opportunity_services.delete_exploration_opportunities(exploration_ids)
feedback_services.delete_exploration_feedback_analytics(exploration_ids)
# Remove the explorations from the featured activity references, if
# necessary.
activity_services.remove_featured_activities(
constants.ACTIVITY_TYPE_EXPLORATION, exploration_ids)
feedback_services.delete_threads_for_multiple_entities(
feconf.ENTITY_TYPE_EXPLORATION, exploration_ids)
# Remove from subscribers.
taskqueue_services.defer(
taskqueue_services.FUNCTION_ID_DELETE_EXPS_FROM_USER_MODELS,
taskqueue_services.QUEUE_NAME_ONE_OFF_JOBS, exploration_ids)
# Remove from activities.
taskqueue_services.defer(
taskqueue_services.FUNCTION_ID_DELETE_EXPS_FROM_ACTIVITIES,
taskqueue_services.QUEUE_NAME_ONE_OFF_JOBS, exploration_ids)
def delete_explorations_from_user_models(exploration_ids: List[str]) -> None:
"""Remove explorations from all subscribers' exploration_ids.
Args:
exploration_ids: list(str). The ids of the explorations to delete.
"""
if not exploration_ids:
return
subscription_models: Sequence[
user_models.UserSubscriptionsModel
] = user_models.UserSubscriptionsModel.query(
user_models.UserSubscriptionsModel.exploration_ids.IN(exploration_ids)
).fetch()
for model in subscription_models:
model.exploration_ids = [
id_ for id_ in model.exploration_ids if id_ not in exploration_ids]
user_models.UserSubscriptionsModel.update_timestamps_multi(
list(subscription_models))
user_models.UserSubscriptionsModel.put_multi(list(subscription_models))
exp_user_data_models: Sequence[
user_models.ExplorationUserDataModel
] = (
user_models.ExplorationUserDataModel.get_all().filter(
user_models.ExplorationUserDataModel.exploration_id.IN(
exploration_ids
)
).fetch()
)
user_models.ExplorationUserDataModel.delete_multi(
list(exp_user_data_models)
)
user_contributions_models: Sequence[
user_models.UserContributionsModel
] = (
user_models.UserContributionsModel.get_all().filter(
datastore_services.any_of(
user_models.UserContributionsModel.created_exploration_ids.IN(
exploration_ids
),
user_models.UserContributionsModel.edited_exploration_ids.IN(
exploration_ids
)
)
).fetch()
)
for contribution_model in user_contributions_models:
contribution_model.created_exploration_ids = [
exp_id for exp_id in contribution_model.created_exploration_ids
if exp_id not in exploration_ids
]
contribution_model.edited_exploration_ids = [
exp_id for exp_id in contribution_model.edited_exploration_ids
if exp_id not in exploration_ids
]
user_models.UserContributionsModel.update_timestamps_multi(
list(user_contributions_models))
user_models.UserContributionsModel.put_multi(
list(user_contributions_models)
)
def delete_explorations_from_activities(exploration_ids: List[str]) -> None:
"""Remove explorations from exploration_ids field in completed and
incomplete activities models.
Args:
exploration_ids: list(str). The ids of the explorations to delete.
"""
if not exploration_ids:
return
model_classes: List[
Union[
Type[user_models.CompletedActivitiesModel],
Type[user_models.IncompleteActivitiesModel]
]
] = [
user_models.CompletedActivitiesModel,
user_models.IncompleteActivitiesModel,
]
all_entities: List[AcceptableActivityModelTypes] = []
for model_class in model_classes:
entities: Sequence[
AcceptableActivityModelTypes
] = model_class.query(
model_class.exploration_ids.IN(exploration_ids)
).fetch()
for model in entities:
model.exploration_ids = [
id_ for id_ in model.exploration_ids
if id_ not in exploration_ids
]
all_entities.extend(entities)
datastore_services.update_timestamps_multi(all_entities)
datastore_services.put_multi(all_entities)
# Operations on exploration snapshots.
def get_exploration_snapshots_metadata(
exploration_id: str, allow_deleted: bool = False
) -> List[SnapshotsMetadataDict]:
"""Returns the snapshots for this exploration, as dicts, up to and including
the latest version of the exploration.
Args:
exploration_id: str. The id of the exploration whose snapshots_metadata
is required.
allow_deleted: bool. Whether to allow retrieval of deleted snapshots.
Returns:
list(dict). List of dicts, each representing a recent snapshot. Each
dict has the following keys: committer_id, commit_message, commit_cmds,
commit_type, created_on_ms, version_number. The version numbers are
consecutive and in ascending order. There are exploration.version_number
items in the returned list.
"""
exploration = exp_fetchers.get_exploration_by_id(exploration_id)
current_version = exploration.version
version_nums = list(range(1, current_version + 1))
return exp_models.ExplorationModel.get_snapshots_metadata(
exploration_id, version_nums, allow_deleted=allow_deleted)
def get_last_updated_by_human_ms(exp_id: str) -> float:
"""Return the last time, in milliseconds, when the given exploration was
updated by a human.
Args:
exp_id: str. The id of the exploration.
Returns:
float. The last time in milliseconds when a given exploration was
updated by a human.
"""
# Iterate backwards through the exploration history metadata until we find
# the most recent snapshot that was committed by a human.
last_human_update_ms: float = 0
snapshots_metadata = get_exploration_snapshots_metadata(exp_id)
for snapshot_metadata in reversed(snapshots_metadata):
if snapshot_metadata['committer_id'] != feconf.MIGRATION_BOT_USER_ID:
last_human_update_ms = snapshot_metadata['created_on_ms']
break
return last_human_update_ms
def publish_exploration_and_update_user_profiles(
committer: user_domain.UserActionsInfo, exp_id: str
) -> None:
"""Publishes the exploration with publish_exploration() function in
rights_manager.py, as well as updates first_contribution_msec. Sends an
email to the subscribers of the committer informing them that an exploration
has been published.
It is the responsibility of the caller to check that the exploration is
valid prior to publication.
Args:
committer: UserActionsInfo. UserActionsInfo object for the user who
made the commit.
exp_id: str. The id of the exploration to be published.
Raises:
Exception. To publish explorations and update users\' profiles,
user must be logged in and have admin access.
"""
if committer.user_id is None:
raise Exception(
'To publish explorations and update users\' profiles, '
'user must be logged in and have admin access.'
)
rights_manager.publish_exploration(committer, exp_id)
exp_title = exp_fetchers.get_exploration_by_id(exp_id).title
email_subscription_services.inform_subscribers(
committer.user_id, exp_id, exp_title)
contribution_time_msec = utils.get_current_time_in_millisecs()
contributor_ids = exp_fetchers.get_exploration_summary_by_id(
exp_id).contributor_ids
for contributor in contributor_ids:
contributor_user_settings = user_services.get_user_settings(
contributor,
strict=False
)
if contributor_user_settings is not None:
contributor_user_settings.update_first_contribution_msec(
contribution_time_msec
)
user_services.save_user_settings(contributor_user_settings)
def validate_exploration_for_story(
exp: exp_domain.Exploration, strict: bool
) -> List[str]:
"""Validates an exploration with story validations.
Args:
exp: Exploration. Exploration object to be validated.
strict: bool. Whether to raise an Exception when a validation error
is encountered. If not, a list of the error messages are
returned. strict should be True when this is called before
saving the story and False when this function is called from the
frontend.
Returns:
list(str). The various validation error messages (if strict is
False).
Raises:
ValidationError. Invalid language found for exploration.
ValidationError. Non default category found for exploration.
ValidationError. Expected no exploration to have parameter values in it.
ValidationError. Invalid interaction in exploration.
ValidationError. RTE content in state of exploration with ID is not
supported on mobile.
ValidationError. Expected no exploration to have classifier models.
ValidationError. Expected no exploration to contain training data in
any answer group.
ValidationError. Expected no exploration to have parameter values in
the default outcome of any state interaction.
ValidationError. Expected no exploration to have video tags.
ValidationError. Expected no exploration to have link tags.
"""
validation_error_messages = []
if (
exp.language_code not in
android_validation_constants.SUPPORTED_LANGUAGES):
error_string = (
'Invalid language %s found for exploration '
'with ID %s. This language is not supported for explorations '
'in a story on the mobile app.' % (exp.language_code, exp.id))
if strict:
raise utils.ValidationError(error_string)
validation_error_messages.append(error_string)
if exp.param_specs or exp.param_changes:
error_string = (
'Expected no exploration in a story to have parameter '
'values in it. Invalid exploration: %s' % exp.id)
if strict:
raise utils.ValidationError(error_string)
validation_error_messages.append(error_string)
if exp.category not in constants.ALL_CATEGORIES:
error_string = (
'Expected all explorations in a story to '
'be of a default category. '
'Invalid exploration: %s' % exp.id)
if strict:
raise utils.ValidationError(error_string)
validation_error_messages.append(error_string)
for state_name in exp.states:
state = exp.states[state_name]
if not state.interaction.is_supported_on_android_app():
error_string = (
'Invalid interaction %s in exploration '
'with ID: %s. This interaction is not supported for '
'explorations in a story on the '
'mobile app.' % (state.interaction.id, exp.id))
if strict:
raise utils.ValidationError(error_string)
validation_error_messages.append(error_string)
if not state.is_rte_content_supported_on_android():
error_string = (
'RTE content in state %s of exploration '
'with ID %s is not supported on mobile for explorations '
'in a story.' % (state_name, exp.id))
if strict:
raise utils.ValidationError(error_string)
validation_error_messages.append(error_string)
if state.interaction.id == 'EndExploration':
# Here we use cast because we are narrowing down the type
# from various customization args value types to List[str]
# type, and this is done because here we are accessing
# 'recommendedExplorationIds' key from EndExploration
# customization arg whose value is always of List[str] type.
recommended_exploration_ids = cast(
List[str],
state.interaction.customization_args[
'recommendedExplorationIds'
].value
)
if len(recommended_exploration_ids) != 0:
error_string = (
'Explorations in a story are not expected to contain '
'exploration recommendations. Exploration with ID: '
'%s contains exploration recommendations in its '
'EndExploration interaction.' % (exp.id))
if strict:
raise utils.ValidationError(error_string)
validation_error_messages.append(error_string)
if state.interaction.id == 'MultipleChoiceInput':
# Here we use cast because we are narrowing down the type from
# various customization args value types to List[SubtitledHtml]
# type, and this is done because here we are accessing 'choices'
# key from MultipleChoiceInput customization arg whose value is
# always of List[SubtitledHtml] type.
choices = cast(
List[state_domain.SubtitledHtml],
state.interaction.customization_args['choices'].value
)
if len(choices) < 4:
error_string = (
'Exploration in a story having MultipleChoiceInput '
'interaction should have at least 4 choices present. '
'Exploration with ID %s and state name %s have fewer than '
'4 choices.' % (exp.id, state_name)
)
if strict:
raise utils.ValidationError(error_string)
validation_error_messages.append(error_string)
if state.classifier_model_id is not None:
error_string = (
'Explorations in a story are not expected to contain '
'classifier models. State %s of exploration with ID %s '
'contains classifier models.' % (state_name, exp.id))
if strict:
raise utils.ValidationError(error_string)
validation_error_messages.append(error_string)
for answer_group in state.interaction.answer_groups:
if len(answer_group.training_data) > 0:
error_string = (
'Explorations in a story are not expected to contain '
'training data for any answer group. State %s of '
'exploration with ID %s contains training data in one of '
'its answer groups.' % (state_name, exp.id)
)
if strict:
raise utils.ValidationError(error_string)
validation_error_messages.append(error_string)
break
if (
state.interaction.default_outcome is not None and
len(state.interaction.default_outcome.param_changes) > 0
):
error_string = (
'Explorations in a story are not expected to contain '
'parameter values. State %s of exploration with ID %s '
'contains parameter values in its default outcome.' % (
state_name, exp.id
)
)
if strict:
raise utils.ValidationError(error_string)
validation_error_messages.append(error_string)
return validation_error_messages
def update_exploration(
committer_id: str,
exploration_id: str,
change_list: Optional[Sequence[exp_domain.ExplorationChange]],
commit_message: Optional[str],
is_by_voice_artist: bool = False
) -> None:
"""Update an exploration. Commits changes.
Args:
committer_id: str. The id of the user who is performing the update
action.
exploration_id: str. The id of the exploration to be updated.
change_list: list(ExplorationChange) or None. A change list to be
applied to the given exploration. If None, it corresponds to an
empty list.
commit_message: str or None. A description of changes made to the state.
For published explorations, this must be present; for unpublished
explorations, it should be equal to None. For suggestions that are
being accepted, and only for such commits, it should start with
feconf.COMMIT_MESSAGE_ACCEPTED_SUGGESTION_PREFIX.
is_by_voice_artist: bool. Whether the changes are made by a
voice artist.
Raises:
ValueError. No commit message is supplied and the exploration is public.
ValueError. The update is due to a suggestion and the commit message is
invalid.
ValueError. The update is not due to a suggestion, and the commit
message starts with the same prefix as the commit message for
accepted suggestions.
"""
models_to_put = compute_models_to_put_when_saving_new_exp_version(
committer_id=committer_id,
exploration_id=exploration_id,
change_list=change_list,
commit_message=commit_message,
is_by_voice_artist=is_by_voice_artist
)
datastore_services.update_timestamps_multi(models_to_put)
datastore_services.put_multi(models_to_put)
index_explorations_given_ids([exploration_id])
# Explicitly clear the cache for explorations after putting the new
# version.
caching_services.delete_multi(
caching_services.CACHE_NAMESPACE_EXPLORATION, None,
[exploration_id]
)
def compute_models_to_put_when_saving_new_exp_version(
committer_id: str,
exploration_id: str,
change_list: Optional[Sequence[exp_domain.ExplorationChange]],
commit_message: Optional[str],
is_by_voice_artist: bool = False,
) -> List[base_models.BaseModel]:
"""Computes the exploration and other related models for putting to
the datastore. This method does not perform the put operation. The caller
of this method needs to perform the put operation.
Args:
committer_id: str. The id of the user who is performing the update
action.
exploration_id: str. The id of the exploration to be updated.
change_list: list(ExplorationChange) or None. A change list to be
applied to the given exploration. If None, it corresponds to an
empty list.
commit_message: str or None. A description of changes made to the state.
For published explorations, this must be present; for unpublished
explorations, it should be equal to None. For suggestions that are
being accepted, and only for such commits, it should start with
feconf.COMMIT_MESSAGE_ACCEPTED_SUGGESTION_PREFIX.
is_by_voice_artist: bool. Whether the changes are made by a
voice artist.
Raises:
ValueError. No commit message is supplied and the exploration is public.
Returns:
list(BaseModel). A list of the models that were updated.
"""
models_to_put: List[
base_models.BaseModel
] = []
if change_list is None:
change_list = []
if is_by_voice_artist and not is_voiceover_change_list(change_list):
raise utils.ValidationError(
'Voice artist does not have permission to make some '
'changes in the change list.')
is_public = rights_manager.is_exploration_public(exploration_id)
if is_public and not commit_message:
raise ValueError(
'Exploration is public so expected a commit message but '
'received none.')
caching_services.delete_multi(
caching_services.CACHE_NAMESPACE_EXPLORATION,
None,
[exploration_id]
)
old_exploration = exp_fetchers.get_exploration_by_id(exploration_id)
old_content_id_set = set(old_exploration.get_translatable_content_ids())
updated_exploration = apply_change_list(exploration_id, change_list)
if get_story_id_linked_to_exploration(exploration_id) is not None:
validate_exploration_for_story(updated_exploration, True)
updated_exploration.validate(strict=is_public)
models_to_put.extend(
_compute_models_for_updating_exploration(
committer_id,
updated_exploration,
commit_message,
change_list
)
)
voiceover_services.update_exploration_voice_artist_link_model(
committer_id, change_list, old_exploration, updated_exploration)
new_content_id_set = set(updated_exploration.get_translatable_content_ids())
content_ids_corresponding_translations_to_remove = (
old_content_id_set - new_content_id_set
)
translation_changes = []
for change in change_list:
if not change.cmd in [
exp_domain.CMD_EDIT_TRANSLATION,
exp_domain.CMD_REMOVE_TRANSLATIONS,
exp_domain.CMD_MARK_TRANSLATIONS_NEEDS_UPDATE
]:
continue
if change.cmd == exp_domain.CMD_REMOVE_TRANSLATIONS:
content_ids_corresponding_translations_to_remove.add(
change.content_id
)
translation_changes.append(change)
new_translation_models, translation_counts = (
translation_services.compute_translation_related_change(
updated_exploration,
translation_changes
)
)
models_to_put.extend(new_translation_models)
# Auto-reject any pending translation suggestions that are now obsolete due
# to the corresponding content being deleted. See issue #16022 for context.
# TODO(#16022): Refactor to compute the suggestion, suggestion stats, and
# feedback message models that are put to the datastore during the
# suggestion rejection flow instead of applying the datastore changes here.
if len(content_ids_corresponding_translations_to_remove) > 0:
(
suggestion_services
.auto_reject_translation_suggestions_for_content_ids(
exploration_id,
content_ids_corresponding_translations_to_remove))
exp_user_data_model_to_put = get_exp_user_data_model_with_draft_discarded(
exploration_id, committer_id
)
if exp_user_data_model_to_put:
models_to_put.append(exp_user_data_model_to_put)
if committer_id != feconf.MIGRATION_BOT_USER_ID:
user_contributions = user_services.get_or_create_new_user_contributions(
committer_id
)
user_contributions.add_edited_exploration_id(
exploration_id
)
models_to_put.append(
user_services.get_validated_user_contributions_model(
user_contributions
)
)
user_settings = user_services.get_user_settings(
committer_id,
strict=False
)
if user_settings is not None:
user_settings.record_user_edited_an_exploration()
if not rights_manager.is_exploration_private(exploration_id):
user_settings.update_first_contribution_msec(
utils.get_current_time_in_millisecs()
)
models_to_put.append(
user_services.convert_to_user_settings_model(
user_settings
)
)
if opportunity_services.is_exploration_available_for_contribution(
exploration_id
):
models_to_put.extend(
opportunity_services
.compute_opportunity_models_with_updated_exploration(
exploration_id,
updated_exploration.get_content_count(),
translation_counts
)
)
exp_rights = rights_manager.get_exploration_rights(exploration_id)
exp_summary_model = exp_models.ExpSummaryModel.get(exploration_id)
exp_summary = update_exploration_summary(
updated_exploration,
exp_rights,
exp_fetchers.get_exploration_summary_from_model(exp_summary_model),
skip_exploration_model_last_updated=True
)
exp_summary.add_contribution_by_user(committer_id)
exp_summary.version += 1
updated_exp_summary_model: exp_models.ExpSummaryModel = (
populate_exp_summary_model_fields(
exp_summary_model, exp_summary
)
)
models_to_put.append(updated_exp_summary_model)
return models_to_put
def regenerate_exploration_summary_with_new_contributor(
exploration_id: str, contributor_id: str
) -> None:
"""Regenerate a summary of the given exploration and add a new contributor
to the contributors summary. If the summary does not exist, this function
generates a new one.
Args:
exploration_id: str. The id of the exploration.
contributor_id: str. ID of the contributor to be added to
the exploration summary.
"""
exploration = exp_fetchers.get_exploration_by_id(
exploration_id, strict=False)
exp_summary = exp_fetchers.get_exploration_summary_by_id(
exploration_id, strict=False)
if exploration is not None:
exp_rights = rights_manager.get_exploration_rights(
exploration_id, strict=True)
if exp_summary is None:
updated_exp_summary = generate_new_exploration_summary(
exploration, exp_rights)
else:
updated_exp_summary = update_exploration_summary(
exploration, exp_rights, exp_summary)
updated_exp_summary.add_contribution_by_user(contributor_id)
save_exploration_summary(updated_exp_summary)
else:
logging.error('Could not find exploration with ID %s', exploration_id)
def regenerate_exploration_and_contributors_summaries(
exploration_id: str
) -> None:
"""Regenerate a summary of the given exploration and also regenerate
the contributors summary from the snapshots. If the summary does not exist,
this function generates a new one.
Args:
exploration_id: str. ID of the exploration.
"""
exploration = exp_fetchers.get_exploration_by_id(exploration_id)
exp_rights = rights_manager.get_exploration_rights(
exploration_id, strict=True)
exp_summary = exp_fetchers.get_exploration_summary_by_id(
exploration_id, strict=True)
updated_exp_summary = update_exploration_summary(
exploration, exp_rights, exp_summary)
updated_exp_summary.contributors_summary = (
compute_exploration_contributors_summary(updated_exp_summary.id))
save_exploration_summary(updated_exp_summary)
def update_exploration_summary(
exploration: exp_domain.Exploration,
exp_rights: rights_domain.ActivityRights,
exp_summary: exp_domain.ExplorationSummary,
skip_exploration_model_last_updated: bool = False
) -> exp_domain.ExplorationSummary:
"""Updates an exploration summary domain object from a given exploration
and its rights.
Args:
exploration: Exploration. The exploration whose summary is to be
computed.
exp_rights: ActivityRights. The exploration rights model, used
to compute summary.
exp_summary: ExplorationSummary. The exploration summary
model whose summary needs to be recomputed.
skip_exploration_model_last_updated: bool. Whether the update of
exploration_model_last_updated should be skipped.
The exploration_model_last_updated is computed from the last human
update of the exploration. The update for this value should
be skipped when we know that the current workflow isn't
due to a human-initiated update.
Returns:
ExplorationSummary. The resulting exploration summary domain object.
Raises:
Exception. No data available for when the exploration was created_on.
"""
scaled_average_rating = get_scaled_average_rating(exp_summary.ratings)
if skip_exploration_model_last_updated:
exploration_model_last_updated = (
exp_summary.exploration_model_last_updated)
else:
# TODO(#15895): Revisit this after we have validations for the model to
# see whether exploration_model_last_updated and
# ExplorationModel.last_updated are in sync or not.
exploration_model_last_updated = datetime.datetime.fromtimestamp(
get_last_updated_by_human_ms(exploration.id) / 1000.0)
contributor_ids = list(exp_summary.contributors_summary.keys())
if exploration.created_on is None:
raise Exception(
'No data available for when the exploration was created_on.'
)
return exp_domain.ExplorationSummary(
exploration.id, exploration.title, exploration.category,
exploration.objective, exploration.language_code, exploration.tags,
exp_summary.ratings, scaled_average_rating, exp_rights.status,
exp_rights.community_owned, exp_rights.owner_ids, exp_rights.editor_ids,
exp_rights.voice_artist_ids, exp_rights.viewer_ids, contributor_ids,
exp_summary.contributors_summary, exploration.version,
exploration.created_on, exploration_model_last_updated,
exp_rights.first_published_msec
)
def generate_new_exploration_summary(
exploration: exp_domain.Exploration,
exp_rights: rights_domain.ActivityRights
) -> exp_domain.ExplorationSummary:
"""Generates a new exploration summary domain object from a given
exploration and its rights.
Args:
exploration: Exploration. The exploration whose summary is to be
computed.
exp_rights: ActivityRights. The exploration rights model, used
to compute summary.
Returns:
ExplorationSummary. The resulting exploration summary domain object.
Raises:
Exception. No data available for when the exploration was created_on.
"""
ratings = feconf.get_empty_ratings()
scaled_average_rating = get_scaled_average_rating(ratings)
exploration_model_last_updated = datetime.datetime.fromtimestamp(
get_last_updated_by_human_ms(exploration.id) / 1000.0)
if exploration.created_on is None:
raise Exception(
'No data available for when the exploration was created_on.'
)
return exp_domain.ExplorationSummary(
exploration.id, exploration.title, exploration.category,
exploration.objective, exploration.language_code, exploration.tags,
ratings, scaled_average_rating, exp_rights.status,
exp_rights.community_owned, exp_rights.owner_ids, exp_rights.editor_ids,
exp_rights.voice_artist_ids, exp_rights.viewer_ids, [], {},
exploration.version, exploration.created_on,
exploration_model_last_updated, exp_rights.first_published_msec
)
def compute_exploration_contributors_summary(
exploration_id: str
) -> Dict[str, int]:
"""Returns a dict whose keys are user_ids and whose values are
the number of (non-revert) commits made to the given exploration
by that user_id. This does not count commits which have since been reverted.
Args:
exploration_id: str. The id of the exploration.
Returns:
dict. The keys are all user_ids who have made commits to the given
exploration. The corresponding values are the number of commits made by
each user. Commits that revert to an earlier version, or forward
commits which have since been reverted, are excluded.
"""
snapshots_metadata = get_exploration_snapshots_metadata(exploration_id)
current_version = len(snapshots_metadata)
contributors_summary: Dict[str, int] = collections.defaultdict(int)
while True:
snapshot_metadata = snapshots_metadata[current_version - 1]
committer_id = snapshot_metadata['committer_id']
is_revert = snapshot_metadata['commit_type'] == 'revert'
if not is_revert and committer_id not in constants.SYSTEM_USER_IDS:
contributors_summary[committer_id] += 1
if current_version == 1:
break
if is_revert:
version_number = snapshot_metadata['commit_cmds'][0][
'version_number']
# Ruling out the possibility of any other type for mypy
# type checking.
assert isinstance(version_number, int)
current_version = version_number
else:
current_version -= 1
contributor_ids = list(contributors_summary)
# Remove IDs that are deleted or do not exist.
users_settings = user_services.get_users_settings(contributor_ids)
for contributor_id, user_settings in zip(contributor_ids, users_settings):
if user_settings is None:
del contributors_summary[contributor_id]
return contributors_summary
def save_exploration_summary(
exp_summary: exp_domain.ExplorationSummary
) -> None:
"""Save an exploration summary domain object as an ExpSummaryModel entity
in the datastore.
Args:
exp_summary: ExplorationSummary. The exploration summary to save.
"""
existing_exp_summary_model = (
exp_models.ExpSummaryModel.get(exp_summary.id, strict=False))
exp_summary_model = populate_exp_summary_model_fields(
existing_exp_summary_model, exp_summary)
exp_summary_model.update_timestamps()
exp_summary_model.put()
# The index should be updated after saving the exploration
# summary instead of after saving the exploration since the
# index contains documents computed on basis of exploration
# summary.
index_explorations_given_ids([exp_summary.id])
def delete_exploration_summaries(exploration_ids: List[str]) -> None:
"""Delete multiple exploration summary models.
Args:
exploration_ids: list(str). The id of the exploration summaries to be
deleted.
"""
summary_models = exp_models.ExpSummaryModel.get_multi(exploration_ids)
existing_summary_models = [
summary_model for summary_model in summary_models
if summary_model is not None
]
exp_models.ExpSummaryModel.delete_multi(existing_summary_models)
def revert_version_history(
exploration_id: str, current_version: int, revert_to_version: int
) -> None:
"""Reverts the version history to the given version number. Puts the
reverted version history model into the datastore.
Args:
exploration_id: str. The id of the exploration for which the version
history is to be reverted to the current version.
current_version: int. The current version of the exploration.
revert_to_version: int. The version to which the version history
is to be reverted.
"""
version_history_model_id = (
exp_models.ExplorationVersionHistoryModel.get_instance_id(
exploration_id, revert_to_version))
version_history_model = exp_models.ExplorationVersionHistoryModel.get(
version_history_model_id, strict=False)
if version_history_model is not None:
new_version_history_model = exp_models.ExplorationVersionHistoryModel(
id=exp_models.ExplorationVersionHistoryModel.get_instance_id(
exploration_id, current_version + 1),
exploration_id=exploration_id,
exploration_version=current_version + 1,
state_version_history=version_history_model.state_version_history,
metadata_last_edited_version_number=(
version_history_model.metadata_last_edited_version_number),
metadata_last_edited_committer_id=(
version_history_model.metadata_last_edited_committer_id),
committer_ids=version_history_model.committer_ids
)
new_version_history_model.update_timestamps()
new_version_history_model.put()
def get_exploration_validation_error(
exploration_id: str, revert_to_version: int
) -> Optional[str]:
"""Tests whether an exploration can be reverted to the given version
number. Does not commit any changes.
Args:
exploration_id: str. The id of the exploration to be reverted to the
current version.
revert_to_version: int. The version to which the given exploration
is to be reverted.
Returns:
Optional[str]. None if the revert_to_version passes all backend
validation checks, or the error string otherwise.
"""
# Validate the previous version of the exploration.
exploration = exp_fetchers.get_exploration_by_id(
exploration_id, version=revert_to_version)
exploration_rights = rights_manager.get_exploration_rights(exploration.id)
try:
exploration.validate(
exploration_rights.status == rights_domain.ACTIVITY_STATUS_PUBLIC)
except Exception as ex:
return str(ex)
return None
def revert_exploration(
committer_id: str,
exploration_id: str,
current_version: int,
revert_to_version: int
) -> None:
"""Reverts an exploration to the given version number. Commits changes.
Args:
committer_id: str. The id of the user who made the commit.
exploration_id: str. The id of the exploration to be reverted to the
current version.
current_version: int. The current version of the exploration.
revert_to_version: int. The version to which the given exploration
is to be reverted.
Raises:
Exception. Version of exploration does not match the version of the
currently-stored exploration model.
"""
exploration_model = exp_models.ExplorationModel.get(
exploration_id, strict=True)
if current_version > exploration_model.version:
raise Exception(
'Unexpected error: trying to update version %s of exploration '
'from version %s. Please reload the page and try again.'
% (exploration_model.version, current_version))
if current_version < exploration_model.version:
raise Exception(
'Trying to update version %s of exploration from version %s, '
'which is too old. Please reload the page and try again.'
% (exploration_model.version, current_version))
# Validate the previous version of the exploration before committing the
# change.
exploration = exp_fetchers.get_exploration_by_id(
exploration_id, version=revert_to_version)
exploration_rights = rights_manager.get_exploration_rights(exploration.id)
exploration_is_public = (
exploration_rights.status != rights_domain.ACTIVITY_STATUS_PRIVATE
)
exploration.validate(strict=exploration_is_public)
exp_models.ExplorationModel.revert(
exploration_model, committer_id,
'Reverted exploration to version %s' % revert_to_version,
revert_to_version)
caching_services.delete_multi(
caching_services.CACHE_NAMESPACE_EXPLORATION, None,
[exploration.id])
revert_version_history(exploration_id, current_version, revert_to_version)
regenerate_exploration_and_contributors_summaries(exploration_id)
exploration_stats = stats_services.get_stats_for_new_exp_version(
exploration.id, current_version + 1, list(exploration.states.keys()),
None, revert_to_version)
stats_services.create_stats_model(exploration_stats)
current_exploration = exp_fetchers.get_exploration_by_id(
exploration_id, version=current_version)
exp_issues_models_to_put = (
stats_services.get_updated_exp_issues_models_for_new_exp_version(
current_exploration,
None,
revert_to_version
)
)
datastore_services.put_multi(exp_issues_models_to_put)
if feconf.ENABLE_ML_CLASSIFIERS:
exploration_to_revert_to = exp_fetchers.get_exploration_by_id(
exploration_id, version=revert_to_version)
classifier_services.create_classifier_training_job_for_reverted_exploration( # pylint: disable=line-too-long
current_exploration, exploration_to_revert_to)
# Creation and deletion methods.
def get_demo_exploration_components(
demo_path: str
) -> Tuple[str, List[Tuple[str, bytes]]]:
"""Gets the content of `demo_path` in the sample explorations folder.
Args:
demo_path: str. The file or folder path for the content of an
exploration in SAMPLE_EXPLORATIONS_DIR. E.g.: 'adventure.yaml' or
'tar/'.
Returns:
tuple. A 2-tuple, the first element of which is a yaml string, and the
second element of which is a list of (filepath, content) 2-tuples. The
filepath does not include the assets/ prefix.
Raises:
Exception. The path of the file is unrecognized or does not exist.
"""
demo_filepath = os.path.join(feconf.SAMPLE_EXPLORATIONS_DIR, demo_path)
if demo_filepath.endswith('yaml'):
file_contents = utils.get_file_contents(demo_filepath)
return file_contents, []
elif os.path.isdir(demo_filepath):
return utils.get_exploration_components_from_dir(demo_filepath)
else:
raise Exception('Unrecognized file path: %s' % demo_path)
def save_new_exploration_from_yaml_and_assets(
committer_id: str,
yaml_content: str,
exploration_id: str,
assets_list: List[Tuple[str, bytes]],
strip_voiceovers: bool = False
) -> None:
"""Saves a new exploration given its representation in YAML form and the
list of assets associated with it.
Args:
committer_id: str. The id of the user who made the commit.
yaml_content: str. The YAML representation of the exploration.
exploration_id: str. The id of the exploration.
assets_list: list(tuple(str, bytes)). A list of lists of assets, which
contains asset's filename and content.
strip_voiceovers: bool. Whether to strip away all audio voiceovers
from the imported exploration.
Raises:
Exception. The yaml file is invalid due to a missing schema version.
"""
yaml_dict = utils.dict_from_yaml(yaml_content)
if 'schema_version' not in yaml_dict:
raise Exception('Invalid YAML file: missing schema version')
# The assets are committed before the exploration is created because the
# migrating to state schema version 25 involves adding dimensions to
# images. So we need to have images in the datastore before we could
# perform the migration.
for (asset_filename, asset_content) in assets_list:
fs = fs_services.GcsFileSystem(
feconf.ENTITY_TYPE_EXPLORATION, exploration_id)
fs.commit(asset_filename, asset_content)
exploration = exp_domain.Exploration.from_yaml(exploration_id, yaml_content)
# Check whether audio translations should be stripped.
if strip_voiceovers:
for state in exploration.states.values():
state.recorded_voiceovers.strip_all_existing_voiceovers()
create_commit_message = (
'New exploration created from YAML file with title \'%s\'.'
% exploration.title)
_create_exploration(
committer_id, exploration, create_commit_message, [
exp_domain.CreateNewExplorationCmd({
'cmd': exp_domain.CMD_CREATE_NEW,
'title': exploration.title,
'category': exploration.category,
})])
def delete_demo(exploration_id: str) -> None:
"""Deletes a single demo exploration.
Args:
exploration_id: str. The id of the exploration to be deleted.
Raises:
Exception. The exploration id is invalid.
"""
if not exp_domain.Exploration.is_demo_exploration_id(exploration_id):
raise Exception('Invalid demo exploration id %s' % exploration_id)
exploration = exp_fetchers.get_exploration_by_id(
exploration_id, strict=False)
if not exploration:
logging.info(
'Exploration with id %s was not deleted, because it '
'does not exist.' % exploration_id)
else:
delete_exploration(
feconf.SYSTEM_COMMITTER_ID, exploration_id, force_deletion=True)
def load_demo(exploration_id: str) -> None:
"""Loads a demo exploration.
The resulting exploration will have two commits in its history (one for
its initial creation and one for its subsequent modification.)
Args:
exploration_id: str. The id of the demo exploration.
Raises:
Exception. The exploration id provided is invalid.
"""
if not exp_domain.Exploration.is_demo_exploration_id(exploration_id):
raise Exception('Invalid demo exploration id %s' % exploration_id)
delete_demo(exploration_id)
exp_filename = feconf.DEMO_EXPLORATIONS[exploration_id]
yaml_content, assets_list = get_demo_exploration_components(exp_filename)
save_new_exploration_from_yaml_and_assets(
feconf.SYSTEM_COMMITTER_ID, yaml_content, exploration_id, assets_list)
publish_exploration_and_update_user_profiles(
user_services.get_system_user(), exploration_id)
index_explorations_given_ids([exploration_id])
logging.info('Exploration with id %s was loaded.' % exploration_id)
def get_next_page_of_all_non_private_commits(
page_size: int = feconf.COMMIT_LIST_PAGE_SIZE,
urlsafe_start_cursor: Optional[str] = None,
max_age: Optional[datetime.timedelta] = None
) -> Tuple[List[exp_domain.ExplorationCommitLogEntry], Optional[str], bool]:
"""Returns a page of non-private commits in reverse time order. If max_age
is given, it should be a datetime.timedelta instance.
The return value is a tuple (results, cursor, more) as described in
fetch_page() at:
https://developers.google.com/appengine/docs/python/ndb/queryclass
Args:
page_size: int. Number of commits that are in the commit list page.
urlsafe_start_cursor: str. If this is not None, then the returned
commits start from cursor location. Otherwise they start from the
beginning of the list of commits.
max_age: datetime.timedelta. The maximum age to which all non private
commits are fetch from the ExplorationCommitLogEntry.
Returns:
tuple. A 3-tuple consisting of:
- list(ExplorationCommitLogEntry). A list containing
ExplorationCommitlogEntry domain objects.
- str. The postion of the cursor.
- bool. indicating whether there are (likely) more results after
this batch. If False, there are no more results; if True, there
are probably more results.
Raises:
ValueError. The argument max_age is not datetime.timedelta or None.
"""
if max_age is not None and not isinstance(max_age, datetime.timedelta):
raise ValueError(
'max_age must be a datetime.timedelta instance. or None.')
results, new_urlsafe_start_cursor, more = (
exp_models.ExplorationCommitLogEntryModel.get_all_non_private_commits(
page_size, urlsafe_start_cursor, max_age=max_age))
return ([exp_domain.ExplorationCommitLogEntry(
entry.created_on, entry.last_updated, entry.user_id,
entry.exploration_id, entry.commit_type, entry.commit_message,
entry.commit_cmds, entry.version, entry.post_commit_status,
entry.post_commit_community_owned, entry.post_commit_is_private
) for entry in results], new_urlsafe_start_cursor, more)
def get_image_filenames_from_exploration(
exploration: exp_domain.Exploration
) -> List[str]:
"""Get the image filenames from the exploration.
Args:
exploration: Exploration. The exploration to get the image filenames.
Returns:
list(str). List containing the name of the image files in exploration.
"""
filenames = []
for state in exploration.states.values():
if state.interaction.id == 'ImageClickInput':
# Here we use cast because we are narrowing down the type from
# various customization args value types to ImageAndRegionDict
# type, and this is done because here we are accessing
# 'imageAndRegions' key from ImageClickInput customization arg
# whose values is always of ImageAndRegionDict type.
image_paths = cast(
domain.ImageAndRegionDict,
state.interaction.customization_args['imageAndRegions'].value
)
filenames.append(image_paths['imagePath'])
html_list = exploration.get_all_html_content_strings()
filenames.extend(
html_cleaner.get_image_filenames_from_html_strings(html_list))
return filenames
def get_number_of_ratings(ratings: Dict[str, int]) -> int:
"""Gets the total number of ratings represented by the given ratings
object.
Args:
ratings: dict. A dict whose keys are '1', '2', '3', '4', '5' and whose
values are nonnegative integers representing frequency counts.
Returns:
int. The total number of ratings given.
"""
return sum(ratings.values()) if ratings else 0
def get_average_rating(ratings: Dict[str, int]) -> float:
"""Returns the average rating of the ratings as a float.
If there are no ratings, it will return 0.
Args:
ratings: dict. A dict whose keys are '1', '2', '3', '4', '5' and whose
values are nonnegative integers representing frequency counts.
Returns:
float. The average of the all the ratings given, or 0
if there are no rating.
"""
rating_weightings = {'1': 1, '2': 2, '3': 3, '4': 4, '5': 5}
if ratings:
rating_sum = 0.0
number_of_ratings = get_number_of_ratings(ratings)
if number_of_ratings == 0:
return 0
for rating_value, rating_count in ratings.items():
rating_sum += rating_weightings[rating_value] * rating_count
return rating_sum / number_of_ratings
return 0
def get_scaled_average_rating(ratings: Dict[str, int]) -> float:
"""Returns the lower bound wilson score of the ratings. If there are
no ratings, it will return 0. The confidence of this result is 95%.
Args:
ratings: dict. A dict whose keys are '1', '2', '3', '4', '5' and whose
values are nonnegative integers representing frequency counts.
Returns:
float. The lower bound wilson score of the ratings.
"""
# The following is the number of ratings.
n = get_number_of_ratings(ratings)
if n == 0:
return 0
average_rating = get_average_rating(ratings)
z = 1.9599639715843482
x = (average_rating - 1) / 4
# The following calculates the lower bound Wilson Score as documented
# http://www.goproblems.com/test/wilson/wilson.php?v1=0&v2=0&v3=0&v4=&v5=1
a = x + ((z**2) / (2 * n))
b = z * math.sqrt(((x * (1 - x)) / n) + ((z**2) / (4 * n**2)))
wilson_score_lower_bound = (a - b) / (1 + ((z**2) / n))
return 1 + 4 * wilson_score_lower_bound
def index_explorations_given_ids(exp_ids: List[str]) -> None:
"""Indexes the explorations corresponding to the given exploration ids.
Args:
exp_ids: list(str). List of ids of the explorations to be indexed.
"""
exploration_summaries = exp_fetchers.get_exploration_summaries_matching_ids(
exp_ids)
search_services.index_exploration_summaries([
exploration_summary for exploration_summary in exploration_summaries
if exploration_summary is not None])
def is_voiceover_change_list(
change_list: Sequence[exp_domain.ExplorationChange]
) -> bool:
"""Checks whether the change list contains only the changes which are
allowed for voice artist to do.
Args:
change_list: list(ExplorationChange). A list that contains the changes
to be made to the ExplorationUserDataModel object.
Returns:
bool. Whether the change_list contains only the changes which are
allowed for voice artist to do.
"""
for change in change_list:
if (change.property_name !=
exp_domain.STATE_PROPERTY_RECORDED_VOICEOVERS):
return False
return True
def get_composite_change_list(
exp_id: str, from_version: int, to_version: int
) -> List[exp_domain.ExplorationChange]:
"""Returns a list of ExplorationChange domain objects consisting of
changes from from_version to to_version in an exploration.
Args:
exp_id: str. The id of the exploration.
from_version: int. The version of the exploration from where we
want to start the change list.
to_version: int. The version of the exploration till which we
want are change list.
Returns:
list(ExplorationChange). List of ExplorationChange domain objects
consisting of changes from from_version to to_version.
Raises:
Exception. From version is higher than to version.
"""
if from_version > to_version:
raise Exception(
'Unexpected error: Trying to find change list from version %s '
'of exploration to version %s.'
% (from_version, to_version))
version_nums = list(range(from_version + 1, to_version + 1))
snapshots_metadata = exp_models.ExplorationModel.get_snapshots_metadata(
exp_id, version_nums, allow_deleted=False)
composite_change_list_dict = []
for snapshot in snapshots_metadata:
composite_change_list_dict += snapshot['commit_cmds']
composite_change_list = [
exp_domain.ExplorationChange(change)
for change in composite_change_list_dict]
return composite_change_list
def are_changes_mergeable(
exp_id: str,
change_list_version: int,
change_list: List[exp_domain.ExplorationChange]
) -> bool:
"""Checks whether the change list can be merged when the
intended exploration version of changes_list is not same as
the current exploration version.
Args:
exp_id: str. The id of the exploration where the change_list is to
be applied.
change_list_version: int. Version of an exploration on which the change
list was applied.
change_list: list(ExplorationChange). List of the changes made by the
user on the frontend, which needs to be checked for mergeability.
Returns:
boolean. Whether the changes are mergeable.
"""
current_exploration = exp_fetchers.get_exploration_by_id(exp_id)
if current_exploration.version == change_list_version:
return True
if current_exploration.version < change_list_version:
return False
# A complete list of changes from one version to another
# is composite_change_list.
composite_change_list = get_composite_change_list(
exp_id, change_list_version,
current_exploration.version)
exp_at_change_list_version = exp_fetchers.get_exploration_by_id(
exp_id, version=change_list_version)
changes_are_mergeable, send_email = (
exp_domain.ExplorationChangeMergeVerifier(
composite_change_list).is_change_list_mergeable(
change_list, exp_at_change_list_version,
current_exploration))
if send_email:
change_list_dict = [change.to_dict() for change in change_list]
email_manager.send_not_mergeable_change_list_to_admin_for_review(
exp_id, change_list_version, current_exploration.version,
change_list_dict)
return changes_are_mergeable
def is_version_of_draft_valid(exp_id: str, version: int) -> bool:
"""Checks if the draft version is the same as the latest version of the
exploration.
Args:
exp_id: str. The id of the exploration.
version: int. The draft version which is to be validate.
Returns:
bool. Whether the given version number is the same as the current
version number of the exploration in the datastore.
"""
return exp_fetchers.get_exploration_by_id(exp_id).version == version
def get_user_exploration_data(
user_id: str,
exploration_id: str,
apply_draft: bool = False,
version: Optional[int] = None
) -> UserExplorationDataDict:
"""Returns a description of the given exploration."""
exp_user_data = user_models.ExplorationUserDataModel.get(
user_id, exploration_id)
is_valid_draft_version = (
is_version_of_draft_valid(
exploration_id, exp_user_data.draft_change_list_exp_version)
if exp_user_data and exp_user_data.draft_change_list_exp_version
else None)
if apply_draft:
updated_exploration = (
get_exp_with_draft_applied(exploration_id, user_id))
if updated_exploration is None:
exploration = exp_fetchers.get_exploration_by_id(
exploration_id, version=version)
else:
exploration = updated_exploration
is_valid_draft_version = True
else:
exploration = exp_fetchers.get_exploration_by_id(
exploration_id, version=version)
states = {}
for state_name in exploration.states:
state_dict = exploration.states[state_name].to_dict()
states[state_name] = state_dict
draft_changes = (
exp_user_data.draft_change_list if exp_user_data
and exp_user_data.draft_change_list else None)
draft_change_list_id = (
exp_user_data.draft_change_list_id if exp_user_data else 0)
exploration_email_preferences = (
user_services.get_email_preferences_for_exploration(
user_id, exploration_id))
editor_dict: UserExplorationDataDict = {
'auto_tts_enabled': exploration.auto_tts_enabled,
'category': exploration.category,
'draft_change_list_id': draft_change_list_id,
'exploration_id': exploration_id,
'init_state_name': exploration.init_state_name,
'language_code': exploration.language_code,
'objective': exploration.objective,
'param_changes': exploration.param_change_dicts,
'param_specs': exploration.param_specs_dict,
'rights': rights_manager.get_exploration_rights(
exploration_id).to_dict(),
'show_state_editor_tutorial_on_load': False,
'show_state_translation_tutorial_on_load': False,
'states': states,
'tags': exploration.tags,
'title': exploration.title,
'version': exploration.version,
'is_version_of_draft_valid': is_valid_draft_version,
'draft_changes': draft_changes,
'email_preferences': exploration_email_preferences.to_dict(),
'next_content_id_index': exploration.next_content_id_index,
'edits_allowed': exploration.edits_allowed,
'exploration_metadata': exploration.get_metadata().to_dict()
}
return editor_dict
def create_or_update_draft(
exp_id: str,
user_id: str,
change_list: Sequence[exp_domain.ExplorationChange],
exp_version: int,
current_datetime: datetime.datetime,
is_by_voice_artist: bool = False
) -> None:
"""Create a draft with the given change list, or update the change list
of the draft if it already exists. A draft is updated only if the change
list timestamp of the new change list is greater than the change list
timestamp of the draft.
The method assumes that a ExplorationUserDataModel object exists for the
given user and exploration.
Args:
exp_id: str. The id of the exploration.
user_id: str. The id of the user.
change_list: list(ExplorationChange). A list that contains the changes
to be made to the ExplorationUserDataModel object.
exp_version: int. The current version of the exploration.
current_datetime: datetime.datetime. The current date and time.
is_by_voice_artist: bool. Whether the changes are made by a
voice artist.
"""
if is_by_voice_artist and not is_voiceover_change_list(change_list):
raise utils.ValidationError(
'Voice artist does not have permission to make some '
'changes in the change list.')
exp_user_data = user_models.ExplorationUserDataModel.get(user_id, exp_id)
if (exp_user_data and exp_user_data.draft_change_list and
exp_user_data.draft_change_list_last_updated > current_datetime):
return
updated_exploration = apply_change_list(exp_id, change_list)
updated_exploration.validate(strict=False)
if exp_user_data is None:
exp_user_data = user_models.ExplorationUserDataModel.create(
user_id, exp_id)
draft_change_list_id = exp_user_data.draft_change_list_id
draft_change_list_id += 1
change_list_dict = [change.to_dict() for change in change_list]
exp_user_data.draft_change_list = change_list_dict
exp_user_data.draft_change_list_last_updated = current_datetime
exp_user_data.draft_change_list_exp_version = exp_version
exp_user_data.draft_change_list_id = draft_change_list_id
exp_user_data.update_timestamps()
exp_user_data.put()
def get_exp_with_draft_applied(
exp_id: str, user_id: str
) -> Optional[exp_domain.Exploration]:
"""If a draft exists for the given user and exploration,
apply it to the exploration.
Args:
exp_id: str. The id of the exploration.
user_id: str. The id of the user whose draft is to be applied.
Returns:
Exploration or None. Returns the exploration domain object with draft
applied, or None if draft can not be applied.
"""
# TODO(#15075): Refactor this function.
exp_user_data = user_models.ExplorationUserDataModel.get(user_id, exp_id)
exploration = exp_fetchers.get_exploration_by_id(exp_id)
draft_change_list = []
if exp_user_data:
if exp_user_data.draft_change_list:
draft_change_list_exp_version = (
exp_user_data.draft_change_list_exp_version)
draft_change_list = [
exp_domain.ExplorationChange(change)
for change in exp_user_data.draft_change_list]
if (exploration.version >
exp_user_data.draft_change_list_exp_version):
logging.info(
'Exploration and draft versions out of sync, trying '
'to upgrade draft version to match exploration\'s.')
new_draft_change_list = (
draft_upgrade_services.try_upgrading_draft_to_exp_version(
draft_change_list,
exp_user_data.draft_change_list_exp_version,
exploration.version, exploration.id))
if new_draft_change_list is not None:
draft_change_list = new_draft_change_list
draft_change_list_exp_version = exploration.version
updated_exploration = None
if (exp_user_data and exp_user_data.draft_change_list and
are_changes_mergeable(
exp_id, draft_change_list_exp_version, draft_change_list)):
updated_exploration = apply_change_list(
exp_id, draft_change_list)
updated_exploration_has_no_invalid_math_tags = True
# verify that all the math-tags are valid before returning the
# updated exploration.
for state in updated_exploration.states.values():
html_string = ''.join(state.get_all_html_content_strings())
error_list = (
html_validation_service.
validate_math_tags_in_html_with_attribute_math_content(
html_string))
if len(error_list) > 0:
updated_exploration_has_no_invalid_math_tags = False
break
if not updated_exploration_has_no_invalid_math_tags:
updated_exploration = None
return updated_exploration
def discard_draft(exp_id: str, user_id: str) -> None:
"""Discard the draft for the given user and exploration.
Args:
exp_id: str. The id of the exploration.
user_id: str. The id of the user whose draft is to be discarded.
"""
exp_user_data = user_models.ExplorationUserDataModel.get(
user_id, exp_id)
if exp_user_data:
exp_user_data.draft_change_list = None
exp_user_data.draft_change_list_last_updated = None
exp_user_data.draft_change_list_exp_version = None
exp_user_data.update_timestamps()
exp_user_data.put()
def get_exp_user_data_model_with_draft_discarded(
exp_id: str,
user_id: str
) -> Optional[user_models.ExplorationUserDataModel]:
"""Clears change list related fields in the ExplorationUserDataModel and
returns it.
Args:
exp_id: str. The id of the exploration.
user_id: str. The id of the user whose draft is to be discarded.
Returns:
ExplorationUserDataModel|None. The ExplorationUserDataModel with
draft discarded if it exists, otherwise None.
"""
exp_user_data = user_models.ExplorationUserDataModel.get(
user_id, exp_id)
if exp_user_data:
exp_user_data.draft_change_list = None
exp_user_data.draft_change_list_last_updated = None
exp_user_data.draft_change_list_exp_version = None
return exp_user_data
return None
def get_interaction_id_for_state(exp_id: str, state_name: str) -> Optional[str]:
"""Returns the interaction id for the given state name.
Args:
exp_id: str. The ID of the exploration.
state_name: str. The name of the state.
Returns:
str|None. The ID of the interaction.
Raises:
Exception. If the state with the given state name does not exist in
the exploration.
"""
exploration = exp_fetchers.get_exploration_by_id(exp_id)
if exploration.has_state_name(state_name):
return exploration.get_interaction_id_by_state_name(state_name)
raise Exception(
'There exist no state in the exploration with the given state name.')
def regenerate_missing_stats_for_exploration(
exp_id: str
) -> Tuple[List[str], List[str], int, int]:
"""Regenerates missing ExplorationStats models and entries for all
corresponding states in an exploration.
Args:
exp_id: str. The ID of the exp.
Returns:
4-tuple(missing_exp_stats, missing_state_stats, num_valid_exp_stats,
num_valid_state_stats). where:
missing_exp_stats: list(str). List of missing exploration stats.
missing_state_stats: list(str). List of missing state stats.
num_valid_exp_stats: int. Number of valid exploration stats.
num_valid_state_stats: int. Number of valid state stats.
Raises:
Exception. Fetching exploration versions failed.
Exception. No ExplorationStatsModels found.
Exception. Exploration snapshots contain invalid commit_cmds.
Exception. Exploration does not have a given state.
"""
exploration = exp_fetchers.get_exploration_by_id(exp_id)
num_valid_state_stats = 0
num_valid_exp_stats = 0
exp_versions = list(range(1, exploration.version + 1))
missing_exp_stats_indices = []
exp_stats_list = stats_services.get_multiple_exploration_stats_by_version(
exp_id, exp_versions)
exp_list = (
exp_fetchers
.get_multiple_versioned_exp_interaction_ids_mapping_by_version(
exp_id, exp_versions))
if all(exp_stats is None for exp_stats in exp_stats_list):
for index, version in enumerate(exp_versions):
exp_stats_for_version = (
stats_services.get_stats_for_new_exploration(
exp_id, version,
list(exp_list[index].state_interaction_ids_dict.keys())))
stats_services.create_stats_model(exp_stats_for_version)
raise Exception('No ExplorationStatsModels found')
snapshots = exp_models.ExplorationModel.get_snapshots_metadata(
exp_id, exp_versions)
change_lists = []
for snapshot in snapshots:
change_list_for_snapshot = []
for commit_cmd in snapshot['commit_cmds']:
try:
change_list_for_snapshot.append(
exp_domain.ExplorationChange(commit_cmd)
)
except utils.ValidationError:
logging.error(
'Exploration(id=%r) snapshots contains invalid '
'commit_cmd: %r'
% (exp_id, commit_cmd)
)
continue
change_lists.append(change_list_for_snapshot)
missing_exp_stats = []
missing_state_stats = []
zipped_items = list(
zip(exp_stats_list, exp_list, change_lists))
revert_commit_cmd = exp_models.ExplorationModel.CMD_REVERT_COMMIT
for i, (exp_stats, exp, change_list) in enumerate(zipped_items):
revert_to_version = next(
(
int(change.version_number) for change in change_list
if change.cmd == revert_commit_cmd
), None)
new_exp_version = None
if revert_to_version is not None:
exp_versions_diff = None
# We subtract 2 from revert_to_version to get the index of the
# previous exploration version because exp_stats_list and
# prev_exp start with version 1 in the 0th index.
prev_exp_version_index = revert_to_version - 2
prev_exp_stats = exp_stats_list[prev_exp_version_index]
prev_exp = exp_list[prev_exp_version_index]
new_exp_version = revert_to_version
else:
exp_versions_diff = exp_domain.ExplorationVersionsDiff(
change_list)
# We subtract 2 from exp.version to get the index of the
# previous exploration version because exp_stats_list and
# prev_exp start with version 1 in the 0th index.
prev_exp_version_index = exp.version - 2
prev_exp_stats = exp_stats_list[prev_exp_version_index]
prev_exp = exp_list[prev_exp_version_index]
new_exp_version = exp.version
# Fill missing Exploration-level stats.
if exp_stats:
num_valid_exp_stats += 1
elif exp.version == 1:
new_exploration_stats = (
stats_services.get_stats_for_new_exploration(
exp_id, exp.version,
list(exp.state_interaction_ids_dict.keys())))
stats_services.create_stats_model(new_exploration_stats)
missing_exp_stats_indices.append(i)
missing_exp_stats.append(
'ExplorationStats(exp_id=%r, exp_version=%r)'
% (exp_id, exp.version))
num_valid_state_stats += len(
new_exploration_stats.state_stats_mapping)
continue
else:
exp_stats = prev_exp_stats and prev_exp_stats.clone()
if exp_stats is None:
new_exploration_stats = (
stats_services.get_stats_for_new_exploration(
exp_id, exp.version,
list(exp.state_interaction_ids_dict.keys())))
stats_services.create_stats_model(new_exploration_stats)
missing_exp_stats_indices.append(i)
missing_exp_stats.append(
'ExplorationStats(exp_id=%r, exp_version=%r)'
% (exp_id, exp.version))
num_valid_state_stats += len(
new_exploration_stats.state_stats_mapping)
continue
if exp_versions_diff:
exp_stats = stats_services.advance_version_of_exp_stats(
new_exp_version, exp_versions_diff, exp_stats, None,
None)
else:
exp_stats.exp_version = exp.version
stats_services.create_stats_model(exp_stats)
missing_exp_stats_indices.append(i)
missing_exp_stats.append(
'ExplorationStats(exp_id=%r, exp_version=%r)'
% (exp_id, exp.version))
# Fill missing State-level stats.
state_stats_mapping = exp_stats.state_stats_mapping
for state_name in exp.state_interaction_ids_dict.keys():
if state_name in state_stats_mapping:
num_valid_state_stats += 1
continue
if exp_versions_diff:
prev_state_name = (
exp_versions_diff.new_to_old_state_names.get(
state_name, state_name))
else:
prev_state_name = state_name
try:
prev_interaction_id = (
prev_exp.state_interaction_ids_dict[prev_state_name]
if prev_state_name in prev_exp.state_interaction_ids_dict
else None)
current_interaction_id = (
exp.state_interaction_ids_dict[state_name])
exp_stats_list_item = exp_stats_list[i]
assert exp_stats_list_item is not None
# In early schema versions of ExplorationModel, the END
# card was a persistant, implicit state present in every
# exploration. The snapshots of these old explorations have
# since been migrated but they do not have corresponding state
# stats models for the END state. So for such versions, a
# default state stats model should be created.
if (
current_interaction_id != prev_interaction_id or
(
current_interaction_id == 'EndExploration' and
prev_state_name == 'END'
)
):
exp_stats_list_item.state_stats_mapping[state_name] = (
stats_domain.StateStats.create_default()
)
else:
assert prev_exp_stats is not None
exp_stats_list_item.state_stats_mapping[state_name] = (
prev_exp_stats.state_stats_mapping[
prev_state_name].clone()
)
missing_state_stats.append(
'StateStats(exp_id=%r, exp_version=%r, '
'state_name=%r)' % (exp_id, exp.version, state_name))
except Exception as e:
assert exp_versions_diff is not None
raise Exception(
'Exploration(id=%r, exp_version=%r) has no '
'State(name=%r): %r' % (
exp_id, exp_stats.exp_version, prev_state_name, {
'added_state_names': (
exp_versions_diff.added_state_names),
'deleted_state_names': (
exp_versions_diff.deleted_state_names),
'new_to_old_state_names': (
exp_versions_diff.new_to_old_state_names),
'old_to_new_state_names': (
exp_versions_diff.old_to_new_state_names),
'prev_exp.states': (
prev_exp.state_interaction_ids_dict.keys()),
'prev_exp_stats': prev_exp_stats
})) from e
for index, exp_stats in enumerate(exp_stats_list):
if index not in missing_exp_stats_indices:
assert exp_stats is not None
stats_services.save_stats_model(exp_stats)
return (
missing_exp_stats, missing_state_stats,
num_valid_exp_stats, num_valid_state_stats
)
def update_logged_out_user_progress(
exploration_id: str,
unique_progress_url_id: str,
state_name: str,
exp_version: int,
) -> None:
"""Updates the logged-out user's progress in the
associated TransientCheckpointUrlModel.
Args:
exploration_id: str. The ID of the exploration.
unique_progress_url_id: str. Unique 6-digit url to track a
logged-out user's progress.
state_name: str. State name of the most recently
reached checkpoint in the exploration.
exp_version: int. Exploration version in which a
checkpoint was most recently reached.
"""
# Fetch the model associated with the unique_progress_url_id.
checkpoint_url_model = exp_models.TransientCheckpointUrlModel.get(
unique_progress_url_id, strict=False)
# Create a model if it doesn't already exist.
if checkpoint_url_model is None:
checkpoint_url_model = exp_models.TransientCheckpointUrlModel.create(
exploration_id, unique_progress_url_id)
current_exploration = exp_fetchers.get_exploration_by_id(
exploration_id, strict=True, version=exp_version)
# If the exploration is being visited the first time.
if checkpoint_url_model.furthest_reached_checkpoint_state_name is None:
checkpoint_url_model.furthest_reached_checkpoint_exp_version = (
exp_version)
checkpoint_url_model.furthest_reached_checkpoint_state_name = (
state_name)
elif checkpoint_url_model.furthest_reached_checkpoint_exp_version <= exp_version: # pylint: disable=line-too-long
furthest_reached_checkpoint_exp = (
exp_fetchers.get_exploration_by_id(
exploration_id,
strict=True,
version=checkpoint_url_model.furthest_reached_checkpoint_exp_version # pylint: disable=line-too-long
)
)
checkpoints_in_current_exp = user_services.get_checkpoints_in_order(
current_exploration.init_state_name, current_exploration.states)
checkpoints_in_older_exp = user_services.get_checkpoints_in_order(
furthest_reached_checkpoint_exp.init_state_name,
furthest_reached_checkpoint_exp.states)
# Get the furthest reached checkpoint in current exploration.
furthest_reached_checkpoint_in_current_exp = (
user_services.
get_most_distant_reached_checkpoint_in_current_exploration(
checkpoints_in_current_exp,
checkpoints_in_older_exp,
checkpoint_url_model.furthest_reached_checkpoint_state_name
)
)
# If the furthest reached checkpoint doesn't exist in current
# exploration.
if furthest_reached_checkpoint_in_current_exp is None:
checkpoint_url_model.furthest_reached_checkpoint_exp_version = (
exp_version)
checkpoint_url_model.furthest_reached_checkpoint_state_name = (
state_name)
else:
# Index of the furthest reached checkpoint.
frc_index = checkpoints_in_current_exp.index(
furthest_reached_checkpoint_in_current_exp)
# If furthest reached checkpoint is behind most recently
# reached checkpoint.
if frc_index <= checkpoints_in_current_exp.index(state_name):
checkpoint_url_model.furthest_reached_checkpoint_exp_version = ( # pylint: disable=line-too-long
exp_version)
checkpoint_url_model.furthest_reached_checkpoint_state_name = (
state_name)
checkpoint_url_model.most_recently_reached_checkpoint_exp_version = (
exp_version)
checkpoint_url_model.most_recently_reached_checkpoint_state_name = (
state_name)
checkpoint_url_model.last_updated = datetime.datetime.utcnow()
checkpoint_url_model.update_timestamps()
checkpoint_url_model.put()
@overload
def sync_logged_out_learner_checkpoint_progress_with_current_exp_version(
exploration_id: str, unique_progress_url_id: str, *, strict: Literal[True]
) -> exp_domain.TransientCheckpointUrl: ...
@overload
def sync_logged_out_learner_checkpoint_progress_with_current_exp_version(
exploration_id: str, unique_progress_url_id: str,
) -> Optional[exp_domain.TransientCheckpointUrl]: ...
@overload
def sync_logged_out_learner_checkpoint_progress_with_current_exp_version(
exploration_id: str, unique_progress_url_id: str, *, strict: Literal[False]
) -> Optional[exp_domain.TransientCheckpointUrl]: ...
def sync_logged_out_learner_checkpoint_progress_with_current_exp_version(
exploration_id: str, unique_progress_url_id: str, strict: bool = False
) -> Optional[exp_domain.TransientCheckpointUrl]:
"""Synchronizes the most recently reached checkpoint and the furthest
reached checkpoint with the latest exploration.
Args:
exploration_id: str. The Id of the exploration.
unique_progress_url_id: str. Unique 6-digit url to track a
logged-out user's progress.
strict: bool. Whether to fail noisily if no TransientCheckpointUrlModel
with the given unique_progress_url_id exists in the datastore.
Returns:
TransientCheckpointUrl. The domain object corresponding to the
TransientCheckpointUrlModel.
"""
# Fetch the model associated with the unique_progress_url_id.
checkpoint_url_model = exp_models.TransientCheckpointUrlModel.get(
unique_progress_url_id, strict=strict)
if checkpoint_url_model is None:
return None
latest_exploration = exp_fetchers.get_exploration_by_id(exploration_id)
most_recently_interacted_exploration = (
exp_fetchers.get_exploration_by_id(
exploration_id,
strict=True,
version=(
checkpoint_url_model.most_recently_reached_checkpoint_exp_version
)
))
furthest_reached_exploration = (
exp_fetchers.get_exploration_by_id(
exploration_id,
strict=True,
version=checkpoint_url_model.furthest_reached_checkpoint_exp_version
))
most_recently_reached_checkpoint_in_current_exploration = (
user_services.
get_most_distant_reached_checkpoint_in_current_exploration(
user_services.get_checkpoints_in_order(
latest_exploration.init_state_name,
latest_exploration.states),
user_services.get_checkpoints_in_order(
most_recently_interacted_exploration.init_state_name,
most_recently_interacted_exploration.states),
checkpoint_url_model.most_recently_reached_checkpoint_state_name
)
)
furthest_reached_checkpoint_in_current_exploration = (
user_services.
get_most_distant_reached_checkpoint_in_current_exploration(
user_services.get_checkpoints_in_order(
latest_exploration.init_state_name,
latest_exploration.states),
user_services.get_checkpoints_in_order(
furthest_reached_exploration.init_state_name,
furthest_reached_exploration.states),
checkpoint_url_model.furthest_reached_checkpoint_state_name
)
)
# If the most recently reached checkpoint doesn't exist in current
# exploration.
if (
most_recently_reached_checkpoint_in_current_exploration !=
checkpoint_url_model.most_recently_reached_checkpoint_state_name
):
checkpoint_url_model.most_recently_reached_checkpoint_state_name = (
most_recently_reached_checkpoint_in_current_exploration)
checkpoint_url_model.most_recently_reached_checkpoint_exp_version = (
latest_exploration.version)
checkpoint_url_model.update_timestamps()
checkpoint_url_model.put()
# If the furthest reached checkpoint doesn't exist in current
# exploration.
if (
furthest_reached_checkpoint_in_current_exploration !=
checkpoint_url_model.furthest_reached_checkpoint_state_name
):
checkpoint_url_model.furthest_reached_checkpoint_state_name = (
furthest_reached_checkpoint_in_current_exploration)
checkpoint_url_model.furthest_reached_checkpoint_exp_version = (
latest_exploration.version)
checkpoint_url_model.update_timestamps()
checkpoint_url_model.put()
return exp_fetchers.get_logged_out_user_progress(unique_progress_url_id)
def sync_logged_out_learner_progress_with_logged_in_progress(
user_id: str, exploration_id: str, unique_progress_url_id: str
) -> None:
"""Syncs logged out and logged in learner's checkpoints progress."""
logged_out_user_data = (
exp_fetchers.get_logged_out_user_progress(unique_progress_url_id))
# If logged out progress has been cleared by the cron job.
if logged_out_user_data is None:
return
latest_exploration = exp_fetchers.get_exploration_by_id(exploration_id)
exp_user_data = exp_fetchers.get_exploration_user_data(
user_id,
exploration_id
)
logged_in_user_model = user_models.ExplorationUserDataModel.get(
user_id, exploration_id)
if logged_in_user_model is None or exp_user_data is None:
logged_in_user_model = user_models.ExplorationUserDataModel.create(
user_id, exploration_id)
logged_in_user_model.most_recently_reached_checkpoint_exp_version = (
logged_out_user_data.most_recently_reached_checkpoint_exp_version
)
logged_in_user_model.most_recently_reached_checkpoint_state_name = (
logged_out_user_data.most_recently_reached_checkpoint_state_name
)
logged_in_user_model.furthest_reached_checkpoint_exp_version = (
logged_out_user_data.furthest_reached_checkpoint_exp_version
)
logged_in_user_model.furthest_reached_checkpoint_state_name = (
logged_out_user_data.furthest_reached_checkpoint_state_name
)
logged_in_user_model.update_timestamps()
logged_in_user_model.put()
elif logged_in_user_model.most_recently_reached_checkpoint_exp_version == logged_out_user_data.most_recently_reached_checkpoint_exp_version: # pylint: disable=line-too-long
current_exploration = exp_fetchers.get_exploration_by_id(
exploration_id,
strict=True,
version=(
logged_out_user_data.most_recently_reached_checkpoint_exp_version
)
)
recent_checkpoint_state_name = (
exp_user_data.most_recently_reached_checkpoint_state_name
)
# Ruling out the possibility of None for mypy type checking.
assert recent_checkpoint_state_name is not None
most_recently_reached_checkpoint_index_in_logged_in_progress = (
user_services.get_checkpoints_in_order(
current_exploration.init_state_name,
current_exploration.states
).index(
recent_checkpoint_state_name
)
)
most_recently_reached_checkpoint_index_in_logged_out_progress = (
user_services.get_checkpoints_in_order(
current_exploration.init_state_name,
current_exploration.states
).index(
logged_out_user_data.most_recently_reached_checkpoint_state_name
)
)
if most_recently_reached_checkpoint_index_in_logged_in_progress < most_recently_reached_checkpoint_index_in_logged_out_progress: # pylint: disable=line-too-long
logged_in_user_model.most_recently_reached_checkpoint_exp_version = ( # pylint: disable=line-too-long
logged_out_user_data.most_recently_reached_checkpoint_exp_version # pylint: disable=line-too-long
)
logged_in_user_model.most_recently_reached_checkpoint_state_name = (
logged_out_user_data.most_recently_reached_checkpoint_state_name
)
logged_in_user_model.furthest_reached_checkpoint_exp_version = (
logged_out_user_data.furthest_reached_checkpoint_exp_version
)
logged_in_user_model.furthest_reached_checkpoint_state_name = (
logged_out_user_data.furthest_reached_checkpoint_state_name
)
logged_in_user_model.update_timestamps()
logged_in_user_model.put()
elif (
logged_in_user_model.most_recently_reached_checkpoint_exp_version <
logged_out_user_data.most_recently_reached_checkpoint_exp_version
):
most_recently_interacted_exploration = (
exp_fetchers.get_exploration_by_id(
exploration_id,
strict=True,
version=exp_user_data.most_recently_reached_checkpoint_exp_version # pylint: disable=line-too-long
)
)
furthest_reached_exploration = (
exp_fetchers.get_exploration_by_id(
exploration_id,
strict=True,
version=exp_user_data.furthest_reached_checkpoint_exp_version
)
)
recent_checkpoint_state_name = (
exp_user_data.most_recently_reached_checkpoint_state_name
)
# Ruling out the possibility of None for mypy type checking.
assert recent_checkpoint_state_name is not None
most_recently_reached_checkpoint_in_current_exploration = (
user_services.get_most_distant_reached_checkpoint_in_current_exploration( # pylint: disable=line-too-long
user_services.get_checkpoints_in_order(
latest_exploration.init_state_name,
latest_exploration.states),
user_services.get_checkpoints_in_order(
most_recently_interacted_exploration.init_state_name,
most_recently_interacted_exploration.states),
recent_checkpoint_state_name
)
)
furthest_checkpoint_state_name = (
exp_user_data.furthest_reached_checkpoint_state_name
)
# Ruling out the possibility of None for mypy type checking.
assert furthest_checkpoint_state_name is not None
furthest_reached_checkpoint_in_current_exploration = (
user_services.get_most_distant_reached_checkpoint_in_current_exploration( # pylint: disable=line-too-long
user_services.get_checkpoints_in_order(
latest_exploration.init_state_name,
latest_exploration.states),
user_services.get_checkpoints_in_order(
furthest_reached_exploration.init_state_name,
furthest_reached_exploration.states),
furthest_checkpoint_state_name
)
)
# If the most recently reached checkpoint doesn't exist in current
# exploration.
if (
most_recently_reached_checkpoint_in_current_exploration !=
exp_user_data.most_recently_reached_checkpoint_state_name
):
exp_user_data.most_recently_reached_checkpoint_state_name = (
most_recently_reached_checkpoint_in_current_exploration)
exp_user_data.most_recently_reached_checkpoint_exp_version = (
latest_exploration.version)
# If the furthest reached checkpoint doesn't exist in current
# exploration.
if (
furthest_reached_checkpoint_in_current_exploration !=
exp_user_data.furthest_reached_checkpoint_state_name
):
exp_user_data.furthest_reached_checkpoint_state_name = (
furthest_reached_checkpoint_in_current_exploration)
exp_user_data.furthest_reached_checkpoint_exp_version = (
latest_exploration.version)
recent_checkpoint_state_name = (
exp_user_data.most_recently_reached_checkpoint_state_name
)
# Ruling out the possibility of None for mypy type checking.
assert recent_checkpoint_state_name is not None
most_recently_reached_checkpoint_index_in_logged_in_progress = (
user_services.get_checkpoints_in_order(
latest_exploration.init_state_name,
latest_exploration.states
).index(
recent_checkpoint_state_name
)
)
most_recently_reached_checkpoint_index_in_logged_out_progress = (
user_services.get_checkpoints_in_order(
latest_exploration.init_state_name,
latest_exploration.states
).index(
logged_out_user_data.most_recently_reached_checkpoint_state_name
))
if most_recently_reached_checkpoint_index_in_logged_in_progress < most_recently_reached_checkpoint_index_in_logged_out_progress: # pylint: disable=line-too-long
logged_in_user_model.most_recently_reached_checkpoint_exp_version = ( # pylint: disable=line-too-long
logged_out_user_data.most_recently_reached_checkpoint_exp_version # pylint: disable=line-too-long
)
logged_in_user_model.most_recently_reached_checkpoint_state_name = (
logged_out_user_data.most_recently_reached_checkpoint_state_name
)
logged_in_user_model.furthest_reached_checkpoint_exp_version = (
logged_out_user_data.furthest_reached_checkpoint_exp_version
)
logged_in_user_model.furthest_reached_checkpoint_state_name = (
logged_out_user_data.furthest_reached_checkpoint_state_name
)
logged_in_user_model.update_timestamps()
logged_in_user_model.put()
def set_exploration_edits_allowed(exp_id: str, edits_are_allowed: bool) -> None:
"""Toggled edits allowed field in the exploration.
Args:
exp_id: str. The ID of the exp.
edits_are_allowed: boolean. Whether exploration edits are allowed.
"""
exploration_model = exp_models.ExplorationModel.get(exp_id)
exploration_model.edits_allowed = edits_are_allowed
# Updating the edits_allowed field in an exploration should not result in a
# version update. So put_multi is used instead of a commit.
base_models.BaseModel.update_timestamps_multi([exploration_model])
base_models.BaseModel.put_multi([exploration_model])
caching_services.delete_multi(
caching_services.CACHE_NAMESPACE_EXPLORATION, None, [exp_id])
def rollback_exploration_to_safe_state(exp_id: str) -> int:
"""Rolls back exploration to the latest state where related metadata
models are valid.
Args:
exp_id: str. The ID of the exp.
Returns:
int. The version of the exploration.
"""
exploration_model = exp_models.ExplorationModel.get(exp_id)
current_version_in_exp_model = exploration_model.version
last_known_safe_version: int = exploration_model.version
snapshot_content_model = None
snapshot_metadata_model = None
models_to_delete: List[Union[
exp_models.ExplorationSnapshotContentModel,
exp_models.ExplorationSnapshotMetadataModel
]] = []
for version in range(current_version_in_exp_model, 1, -1):
snapshot_content_model = (
exp_models.ExplorationSnapshotContentModel.get(
'%s-%s' % (exp_id, version), strict=False))
snapshot_metadata_model = (
exp_models.ExplorationSnapshotMetadataModel.get(
'%s-%s' % (exp_id, version), strict=False))
if snapshot_content_model is None and snapshot_metadata_model is None:
last_known_safe_version = version - 1
elif (
snapshot_content_model is None and
snapshot_metadata_model is not None
):
models_to_delete.append(snapshot_metadata_model)
last_known_safe_version = version - 1
elif (
snapshot_content_model is not None and
snapshot_metadata_model is None
):
models_to_delete.append(snapshot_content_model)
last_known_safe_version = version - 1
else:
break
if last_known_safe_version != current_version_in_exp_model:
exp_summary_model = exp_models.ExpSummaryModel.get(exp_id)
exp_summary_model.version = last_known_safe_version
safe_exp_model = exp_models.ExplorationModel.get(
exp_id, strict=True, version=last_known_safe_version)
safe_exp_model.version = last_known_safe_version
base_models.BaseModel.update_timestamps_multi(
[safe_exp_model, exp_summary_model])
base_models.BaseModel.put_multi([safe_exp_model, exp_summary_model])
base_models.BaseModel.delete_multi(models_to_delete)
caching_services.delete_multi(
caching_services.CACHE_NAMESPACE_EXPLORATION, None, [exp_id])
return last_known_safe_version