the-deep/server

View on GitHub
apps/deep_migration/management/commands/migrate_entry.py

Summary

Maintainability
A
3 hrs
Test Coverage
import json
 
from deep_migration.utils import (
MigrationCommand,
get_source_url,
request_with_auth,
)
 
from deep_migration.models import (
AnalysisFrameworkMigration,
ProjectMigration,
LeadMigration,
UserMigration,
CountryMigration,
)
from analysis_framework.models import (
Filter,
Exportable,
)
from entry.models import (
Entry,
Attribute,
FilterData,
ExportData,
)
from geo.models import Region, GeoArea
 
from datetime import datetime
 
import reversion
 
 
ONE_DAY = 24 * 60 * 60 * 1000
 
 
def get_user(old_user_id):
migration = UserMigration.objects.filter(old_id=old_user_id).first()
return migration and migration.user
 
 
def get_project(project_id):
migration = ProjectMigration.objects.filter(old_id=project_id).first()
return migration and migration.project
 
 
def get_lead(lead_id):
migration = LeadMigration.objects.filter(old_id=lead_id).first()
return migration and migration.lead
 
 
def get_analysis_framework(lead_id):
migration = AnalysisFrameworkMigration.objects.filter(
old_id=lead_id
).first()
return migration and migration.analysis_framework
 
 
def get_region(code):
migration = CountryMigration.objects.filter(
code=code
).first()
return migration and migration.region
 
 
`Command` has 27 functions (exceeds 20 allowed). Consider refactoring.
class Command(MigrationCommand):
def run(self):
if self.kwargs.get('data_file'):
with open(self.kwargs['data_file']) as f:
entries = json.load(f)
else:
entries = request_with_auth(
get_source_url('entries', query='template=1', version='v1')
)
 
if not entries:
print('Couldn\'t find entries data')
 
with reversion.create_revision():
for entry in entries:
self.import_entry(entry)
 
def import_entry(self, data):
print('------------')
print('Migrating entries')
 
lead_id = data['lead']
print('For lead - {}'.format(lead_id))
 
lead = get_lead(lead_id)
if not lead:
print('Lead not migrated yet')
return
 
lead.entry_set.all().delete()
 
framework = None
project_id = data['event']
if project_id:
project = get_project(project_id)
framework = project.analysis_framework
regions = project.regions.all()
else:
regions = Region.objects.none()
 
self.regions = regions
 
if not framework:
template_id = data['template']
if not template_id:
print('Not an entry with analysis framework')
return
 
framework = get_analysis_framework(template_id)
if not framework:
print('Analysis framework not migrated yet')
return
 
print('Lead title: {}'.format(lead.title))
informations = data['informations']
for information in informations:
self.import_information(data, lead, framework, information)
 
def import_information(self, entry_data, lead, framework, data):
old_id = data['id']
print('Entry info - {}'.format(old_id))
 
entry = Entry(
lead=lead,
analysis_framework=framework,
)
 
if data.get('excerpt'):
entry.excerpt = data['excerpt']
entry.entry_type = Entry.TagType.EXCERPT
elif data.get('image'):
entry.image_raw = data['image']
entry.entry_type = Entry.TagType.IMAGE
 
entry.created_by = get_user(entry_data['created_by'])
entry.modified_by = entry.created_by
entry.project = entry.lead.project
 
entry.save()
Entry.objects.filter(id=entry.id).update(
created_at=entry_data['created_at']
)
 
# Start migrating the attributes
elements = data['elements']
# TODO migrate excerpt and image widget
for element in elements:
self.migrate_attribute(entry, framework, element)
 
return entry
 
def migrate_attribute(self, entry, framework, element):
print('Migrating element {}'.format(element['id']))
 
widget = framework.widget_set.filter(key=element['id']).first()
if not widget:
print('Widget not migrated yet')
return
 
widget_method_map = {
'numberWidget': self.migrate_number,
'dateWidget': self.migrate_date,
'scaleWidget': self.migrate_scale,
'multiselectWidget': self.migrate_multiselect,
'organigramWidget': self.migrate_organigram,
'geoWidget': self.migrate_geo,
'matrix1dWidget': self.migrate_matrix1d,
'matrix2dWidget': self.migrate_matrix2d,
'numberMatrixWidget': self.migrate_number_matrix,
}
 
method = widget_method_map.get(widget.widget_id)
if method:
method(entry, widget, element)
 
def migrate_attribute_data(self, entry, widget, data):
attribute, _ = Attribute.objects.get_or_create(
entry=entry,
widget=widget,
defaults={
'data': data,
},
)
 
def migrate_filter_data(self, entry, widget, index=0,
number=None, values=None):
filter = Filter.objects.filter(
widget_key=widget.key,
analysis_framework=widget.analysis_framework,
)[index]
filter_data, _ = FilterData.objects.get_or_create(
entry=entry,
filter=filter,
defaults={
'number': number,
'values': values,
},
)
 
def migrate_export_data(self, entry, widget, data):
exportable = Exportable.objects.get(
widget_key=widget.key,
analysis_framework=widget.analysis_framework,
)
export_data, _ = ExportData.objects.get_or_create(
entry=entry,
exportable=exportable,
defaults={
'data': data,
},
)
 
def migrate_number(self, entry, widget, element):
value = element['value'] and int(element['value'])
self.migrate_attribute_data(entry, widget, {
'value': value,
})
self.migrate_filter_data(entry, widget, number=value)
self.migrate_export_data(entry, widget, {
'excel': {
'value': str(value),
}
})
 
def migrate_date(self, entry, widget, element):
value = element['value']
 
try:
date = datetime.strptime(value, '%Y-%m-%d')
except Exception:
date = datetime.strptime(value, '%d-%m-%Y')
 
self.migrate_attribute_data(entry, widget, {
'value': date and date.strftime('%Y-%m-%d'),
})
number = date and int(date.timestamp() / ONE_DAY)
self.migrate_filter_data(entry, widget, number=number)
self.migrate_export_data(entry, widget, {
'excel': {
'value': date and date.strftime('%d-%m-%Y'),
}
})
 
def migrate_scale(self, entry, widget, element):
value = element.get('value')
self.migrate_attribute_data(entry, widget, {
'selectedScale': value,
})
self.migrate_filter_data(entry, widget, values=[value])
 
widget_data = widget.properties['data']
scale_units = widget_data['scale_units']
scale = next((
s for s in scale_units
if s['key'] == value
), None)
self.migrate_export_data(entry, widget, {
'excel': {
'value': scale['title'] if scale else '',
}
})
 
def migrate_multiselect(self, entry, widget, element):
value = element.get('value') or []
self.migrate_attribute_data(entry, widget, {
'value': value,
})
self.migrate_filter_data(entry, widget, values=value)
 
widget_data = widget.properties['data']
options = widget_data['options']
 
label_list = []
for item in value:
option = next((
o for o in options
if o['key'] == item
), None)
label_list.append(option['label'])
 
self.migrate_export_data(entry, widget, {
'excel': {
'type': 'list',
'value': label_list,
}
})
 
def migrate_organigram(self, entry, widget, element):
value = element.get('value') or []
widget_data = widget.properties['data']
nodes = self.get_organigram_nodes([widget_data], value)
 
self.migrate_attribute_data(entry, widget, {
'values': nodes,
})
self.migrate_filter_data(
entry, widget,
values=self.get_organigram_filter_data([widget_data], value),
)
 
self.migrate_export_data(entry, widget, {
'excel': {
'type': 'list',
'value': [n['name'] for n in nodes],
}
})
 
def get_organigram_nodes(self, organs, keys):
nodes = []
for organ in organs:
if organ['key'] in keys:
nodes.append({
'id': organ['key'],
'name': organ['title'],
})
 
children = self.get_organigram_nodes(organ['organs'], keys)
nodes = nodes + children
 
return nodes
 
def get_organigram_filter_data(self, organs, keys):
filter_data = []
for organ in organs:
children = self.get_organigram_filter_data(organ['organs'], keys)
if children or organ['key'] in keys:
filter_data.append(organ['key'])
filter_data = filter_data + children
 
return filter_data
 
def migrate_geo(self, entry, widget, element):
areas = [self.get_geo_area(v) for v in element.get('value', [])]
values = [
{
'key': str(area.id),
'short_label': area.get_label(),
'label': area.get_label(),
} for area in areas if area
]
keys = [str(v['key']) for v in values]
 
self.migrate_attribute_data(entry, widget, {
'values': values,
})
self.migrate_filter_data(
entry,
widget,
values=keys,
)
 
self.migrate_export_data(entry, widget, {
'excel': {
'values': keys,
}
})
 
def get_geo_area(self, value):
splits = value.split(':')
region_code = splits[0]
admin_level = splits[1]
area_title = splits[2]
if len(splits) > 3:
area_code = splits[3]
else:
area_code = None
 
region = get_region(region_code)
if not region or region not in self.regions:
return None
 
areas = GeoArea.objects.filter(
admin_level__region=region,
admin_level__level=admin_level,
title=area_title,
)
if area_code:
areas = areas.filter(code=area_code)
 
return areas.first()
 
def migrate_number_matrix(self, entry, widget, element):
numbers = element.get('numbers') or []
attribute = self.get_number_matrix(numbers)
self.migrate_attribute_data(entry, widget, attribute)
 
widget_data = widget.properties['data']
rows = widget_data['row_headers']
columns = widget_data['column_headers']
 
self.migrate_export_data(
entry, widget,
self.get_number_matrix_export_data(attribute, rows, columns),
)
 
def get_number_matrix(self, numbers):
attribute = {}
for number in numbers:
row = number['row']
column = number['column']
if row not in attribute:
attribute[row] = {}
attribute[row][column] = int(number['value'])
 
return attribute
 
def get_number_matrix_export_data(self, attribute, rows, columns):
excel_values = []
for row in rows:
row_values = []
for column in columns:
value = attribute.get(row['key'], {}).get(column['key'], None)
if value is not None:
excel_values.append(str(value))
row_values.append(int(value))
else:
excel_values.append('')
is_same = len(row_values) == 0 or len(set(row_values)) == 1
excel_values.append('True' if is_same else 'False')
 
return {
'excel': {
'values': excel_values
}
}
 
def migrate_matrix1d(self, entry, widget, element):
selections = element.get('selections') or []
attribute = self.get_matrix1d_attribute(selections)
self.migrate_attribute_data(entry, widget, attribute)
 
widget_data = widget.properties['data']
rows = widget_data['rows']
 
self.migrate_filter_data(
entry, widget,
values=self.get_matrix1d_filter_values(selections),
)
self.migrate_export_data(
entry, widget,
self.get_matrix1d_export_data(selections, rows),
)
 
def get_matrix1d_attribute(self, selections):
attribute = {}
for selection in selections:
pillar = selection['pillar']
subpillar = selection['subpillar']
if pillar not in attribute:
attribute[pillar] = {}
attribute[pillar][subpillar] = True
return attribute
 
def get_matrix1d_filter_values(self, selections):
filter_values = []
for selection in selections:
pillar = selection['pillar']
subpillar = selection['subpillar']
if pillar not in filter_values:
filter_values.append(pillar)
filter_values.append(subpillar)
return filter_values
 
def get_matrix1d_export_data(self, selections, rows):
excel_values = []
report_values = []
 
for selection in selections:
row = next((r for r in rows if r['key'] == selection['pillar']),
None)
if not row:
continue
cell = next((c for c in row['cells']
if c['key'] == selection['subpillar']),
None)
if not cell:
continue
 
excel_values.append([row['title'], cell['value']])
report_values.append('{}-{}'.format(
row['key'],
cell['key'],
))
 
return {
'excel': {
'type': 'lists',
'values': excel_values,
},
'report': {
'keys': report_values,
},
}
 
def migrate_matrix2d(self, entry, widget, element):
selections = element.get('selections') or []
attribute = self.get_matrix2d_attribute(selections)
self.migrate_attribute_data(entry, widget, attribute)
 
widget_data = widget.properties['data']
 
filter_values1, filter_values2 = self.get_matrix2d_filter_values(
selections
)
self.migrate_filter_data(
entry, widget,
index=0,
values=filter_values1,
)
self.migrate_filter_data(
entry, widget,
index=1,
values=filter_values2,
)
self.migrate_export_data(
entry, widget,
self.get_matrix2d_export_data(selections, widget_data),
)
 
def get_matrix2d_attribute(self, selections):
attribute = {}
for selection in selections:
pillar = selection.get('pillar')
subpillar = selection.get('subpillar')
sector = selection.get('sector')
if not pillar or not subpillar or not sector:
continue
subsectors = selection.get('subsectors') or []
 
if pillar not in attribute:
attribute[pillar] = {}
if subpillar not in attribute[pillar]:
attribute[pillar][subpillar] = {}
attribute[pillar][subpillar][sector] = subsectors
return attribute
 
def get_matrix2d_filter_values(self, selections):
filter_values1 = []
filter_values2 = []
 
for selection in selections:
pillar = selection.get('pillar')
subpillar = selection.get('subpillar')
sector = selection.get('sector')
if not pillar or not subpillar or not sector:
continue
subsectors = selection.get('subsectors') or []
 
if pillar not in filter_values1:
filter_values1.append(pillar)
if subpillar not in filter_values1:
filter_values1.append(subpillar)
 
filter_values2.append(sector)
filter_values2.extend(subsectors)
return filter_values1, filter_values2
 
Function `get_matrix2d_export_data` has a Cognitive Complexity of 14 (exceeds 12 allowed). Consider refactoring.
def get_matrix2d_export_data(self, selections, data):
excel_values = []
report_values = []
dimensions = data['dimensions']
sectors = data['sectors']
 
for selection in selections:
pillar = selection.get('pillar')
subpillar = selection.get('subpillar')
sector = selection.get('sector')
if not pillar or not subpillar or not sector:
continue
 
dim = next((d for d in dimensions
if d['id'] == pillar),
None)
if not dim:
continue
sub = next((s for s in dim['subdimensions']
if s['id'] == subpillar),
None)
if not sub:
continue
sector = next((s for s in sectors
if s['id'] == sector),
None)
if not sector:
continue
 
subsector_names = []
for subsector in selection.get('subsectors') or []:
ss = next((ss for ss in sector['subsectors']
if ss['id'] == subsector), None)
if ss:
subsector_names.append(ss['title'])
 
excel_values.append([
dim['title'],
sub['title'],
sector['title'],
','.join(subsector_names),
])
report_values.append('{}-{}-{}'.format(
dim['id'], sub['id'], sector['id'],
))
 
return {
'excel': {
'type': 'lists',
'values': excel_values,
},
'report': {
'keys': report_values,
},
}