the-deep/server

View on GitHub
apps/export/entries/excel_exporter.py

Summary

Maintainability
F
4 days
Test Coverage
F
8%
import logging
from django.core.files.base import ContentFile
from django.db import models
 
from deep.permalinks import Permalink
from utils.common import (
excel_column_name,
get_valid_xml_string as xstr,
deep_date_parse,
)
from export.formats.xlsx import WorkBook, RowsBuilder
 
from analysis_framework.models import Widget
from entry.models import Entry, ExportData, ProjectEntryLabel, LeadEntryGroup
from lead.models import Lead
from export.models import Export
 
from gallery.utils import get_private_file_url
from gallery.enums import PrivateFileModuleType
logger = logging.getLogger(__name__)
 
 
def get_hyperlink(url, text):
clean_text = xstr(text.replace('"', '""'))
return f'=HYPERLINK("{url}", "{clean_text}")'
 
 
class ExcelExporter:
class ColumnsData:
TITLES = {
**{
key: label
for key, label in Export.StaticColumn.choices
},
# Override labels here.
Export.StaticColumn.ENTRY_EXCERPT: lambda self: [
'Modified Excerpt', 'Original Excerpt'
] if self.modified_excerpt_exists else ['Excerpt'],
}
 
def __init__(
self,
export_object,
entries,
project,
date_format,
columns=None,
decoupled=True,
is_preview=False,
):
self.project = project
self.export_object = export_object
self.is_preview = is_preview
self.wb = WorkBook()
XXX found
# XXX: Limit memory usage? (Or use redis?)
self.geoarea_data_cache = {}
 
# Date Format
self.date_renderer = Export.get_date_renderer(date_format)
 
# Create worksheets(Main, Grouped, Entry Groups, Bibliography)
if decoupled:
self.split = self.wb.get_active_sheet()\
.set_title('Split Entries')
self.group = self.wb.create_sheet('Grouped Entries')
else:
self.split = None
self.group = self.wb.get_active_sheet().set_title('Entries')
 
self.entry_groups_sheet = self.wb.create_sheet('Entry Groups')
self.decoupled = decoupled
self.columns = columns
self.bibliography_sheet = self.wb.create_sheet('Bibliography')
 
self.modified_excerpt_exists = entries.filter(excerpt_modified=True).exists()
 
project_entry_labels = ProjectEntryLabel.objects.filter(
project=self.project,
).order_by('order')
 
self.label_id_title_map = {
_id: title for _id, title in project_entry_labels.values_list('id', 'title')
}
 
lead_groups = LeadEntryGroup.objects.filter(lead__project=self.project).order_by('order')
self.group_id_title_map = {x.id: x.title for x in lead_groups}
# Create matrix of labels and groups
 
self.group_label_matrix = {
(group.lead_id, group.id): {
_id: None for _id in self.label_id_title_map.keys()
}
for group in lead_groups
}
 
self.lead_id_titles_map = {
_id: title
for _id, title in Lead.objects.filter(
project=self.project,
id__in=[_id for _id, _ in self.group_label_matrix.keys()]
).values_list('id', 'title')
}
 
self.entry_group_titles = [
'Lead',
'Group',
*self.label_id_title_map.values(),
]
self.entry_groups_sheet.append([self.entry_group_titles])
 
self.col_types = {
0: 'date',
2: 'date',
}
 
# Keep track of sheet data present
'''
tabular_sheets = {
'leadtitle-sheettitle': {
'field1_title': col_num_in_sheet,
'field2_title': col_num_in_sheet,
}
}
'''
self.tabular_sheets = {}
 
# Keep track of tabular fields
self.tabular_fields = {}
 
self.region_data = {}
# mapping of original name vs truncated name
self._sheets = {}
 
def log_error(self, message, **kwargs):
logger.error(f'[EXPORT:{self.export_object.id}] {message}', **kwargs)
 
Function `load_exportable_titles` has a Cognitive Complexity of 16 (exceeds 12 allowed). Consider refactoring.
def load_exportable_titles(self, data, regions):
export_type = data.get('type')
col_type = data.get('col_type')
exportable_titles = []
 
if export_type == 'geo' and regions:
self.region_data = {}
 
for region in regions:
admin_levels = region.adminlevel_set.all()
admin_level_data = []
 
exportable_titles.append(f'{region.title} Polygons')
for admin_level in admin_levels:
exportable_titles.append(admin_level.title)
exportable_titles.append('{} (code)'.format(admin_level.title))
 
# Collect geo area names for each admin level
admin_level_data.append({
'id': admin_level.id,
'geo_area_titles': admin_level.get_geo_area_titles(),
})
 
self.region_data[region.id] = admin_level_data
 
elif export_type == 'multiple':
index = len(exportable_titles)
exportable_titles.extend(data.get('titles'))
if col_type:
for i in range(index, len(exportable_titles)):
self.col_types[i] = col_type[i - index]
 
elif data.get('title'):
index = len(exportable_titles)
exportable_titles.append(data.get('title'))
if col_type:
self.col_types[index] = col_type
return exportable_titles
 
Function `load_exportables` has a Cognitive Complexity of 24 (exceeds 12 allowed). Consider refactoring.
def load_exportables(self, exportables, regions=None):
# Take all exportables that contains excel info
widget_exportables = {
exportable.widget_key: exportable
for exportable in exportables.filter(
data__excel__isnull=False,
)
}
if self.columns is not None:
_exportables = []
for column in self.columns:
if not column['is_widget']:
_exportables.append(column['static_column'])
continue
widget_key = column['widget_key']
exportable = widget_exportables.get(widget_key)
if exportable:
_exportables.append(exportable)
else:
self.log_error(f'Non-existing widget key is passed <{widget_key}>')
else:
_exportables = [
*self.ColumnsData.TITLES.keys(),
*widget_exportables.values(),
]
self.exportables = _exportables
 
column_titles = []
 
# information_date_index = 1
for exportable in self.exportables:
if isinstance(exportable, str):
titles = self.ColumnsData.TITLES.get(exportable, [])
if callable(titles):
_titles = titles(self)
else:
_titles = titles
if type(_titles) not in [list, tuple]:
_titles = [_titles]
column_titles.extend(_titles)
else:
# For each exportable, create titles according to type
# and data
data = exportable.data.get('excel')
column_titles.extend(
self.load_exportable_titles(data, regions)
)
 
if self.decoupled and self.split:
self.split.append([column_titles])
self.group.append([column_titles])
 
if self.decoupled and self.split:
self.split.auto_fit_cells_in_row(1)
self.group.auto_fit_cells_in_row(1)
 
self.regions = regions
return self
 
Function `add_entries_from_excel_data_for_static_column` has a Cognitive Complexity of 25 (exceeds 12 allowed). Consider refactoring.
def add_entries_from_excel_data_for_static_column(
self,
exportable,
entry,
lead,
assignee,
):
if exportable == Export.StaticColumn.LEAD_PUBLISHED_ON:
return self.date_renderer(lead.published_on)
if exportable == Export.StaticColumn.ENTRY_CREATED_BY:
return entry.created_by and entry.created_by.profile.get_display_name()
elif exportable == Export.StaticColumn.ENTRY_CREATED_AT:
return self.date_renderer(entry.created_at)
elif exportable == Export.StaticColumn.ENTRY_CONTROL_STATUS:
return 'Controlled' if entry.controlled else 'Uncontrolled'
elif exportable == Export.StaticColumn.LEAD_ID:
return f'{lead.id}'
elif exportable == Export.StaticColumn.LEAD_TITLE:
Avoid too many `return` statements within this function.
return lead.title
elif exportable == Export.StaticColumn.LEAD_URL:
Avoid too many `return` statements within this function.
return lead.url or Permalink.lead_share_view(lead.uuid)
elif exportable == Export.StaticColumn.LEAD_PAGE_COUNT:
Avoid too many `return` statements within this function.
return lead.page_count # Annotated through Prefetch on entries_qs (export.tasks_entries)
elif exportable == Export.StaticColumn.LEAD_ORGANIZATION_TYPE_AUTHOR:
Avoid too many `return` statements within this function.
return lead.get_authoring_organizations_type_display()
elif exportable == Export.StaticColumn.LEAD_ORGANIZATION_AUTHOR:
Avoid too many `return` statements within this function.
return lead.get_authors_display()
elif exportable == Export.StaticColumn.LEAD_ORGANIZATION_SOURCE:
Avoid too many `return` statements within this function.
return lead.get_source_display()
elif exportable == Export.StaticColumn.LEAD_PRIORITY:
Avoid too many `return` statements within this function.
return lead.get_priority_display()
elif exportable == Export.StaticColumn.LEAD_ASSIGNEE:
Avoid too many `return` statements within this function.
return assignee and assignee.profile.get_display_name()
elif exportable == Export.StaticColumn.ENTRY_ID:
Avoid too many `return` statements within this function.
return f'{entry.id}'
elif exportable == Export.StaticColumn.LEAD_ENTRY_ID:
Avoid too many `return` statements within this function.
return f'{lead.id}-{entry.id}'
elif exportable == Export.StaticColumn.ENTRY_EXCERPT:
entry_excerpt = self.get_entry_data(entry)
if self.modified_excerpt_exists:
Avoid too many `return` statements within this function.
return [entry_excerpt, entry.dropped_excerpt]
Avoid too many `return` statements within this function.
return entry_excerpt
elif exportable == Export.StaticColumn.LEAD_ENTRY_ENTRY_ATTACHMENT_FILE_PREVIEW:
if entry.entry_attachment:
Avoid too many `return` statements within this function.
return get_private_file_url(
PrivateFileModuleType.ENTRY_ATTACHMENT,
entry.id,
entry.entry_attachment.file.name
)
Avoid too many `return` statements within this function.
return None
 
Function `add_entries_from_excel_data` has a Cognitive Complexity of 114 (exceeds 12 allowed). Consider refactoring.
def add_entries_from_excel_data(self, rows, data, export_data):
export_type = data.get('type')
 
if export_type == 'nested':
children = data.get('children')
for i, child in enumerate(children):
if export_data is None or i >= len(export_data):
_export_data = None
else:
_export_data = export_data[i]
self.add_entries_from_excel_data(
rows,
child,
_export_data,
)
 
elif export_type == 'multiple':
col_span = len(data.get('titles'))
if export_data:
if export_data.get('type') == 'lists':
export_data_values = export_data.get('values')
rows_of_value_lists = []
for export_data_value in export_data_values:
# Handle for Matrix2D subsectors
# eg: ['dimension', 'subdimension', 'sector', ['sub-sector1', 'sub-sector2']]
# -> ['dimension', 'subdimension', 'sector', 'sub-sector1']
# -> ['dimension', 'subdimension', 'sector', 'sub-sector2']
Avoid deeply nested control flow statements.
if len(export_data_value) == 4 and isinstance(export_data_value[3], list):
if len(export_data_value[3]) > 0:
for subsector in export_data_value[3]:
rows_of_value_lists.append(export_data_value[:3] + [subsector])
else:
rows_of_value_lists.append(export_data_value[:3] + [''])
elif len(export_data_value) != len(data.get('titles')):
titles_len = len(data.get('titles'))
values_len = len(export_data_value)
if titles_len > values_len:
# Add additional empty cells
rows_of_value_lists.append(export_data_value + [''] * (titles_len - values_len))
else:
# Remove extra cells
rows_of_value_lists.append(export_data_value[:titles_len])
else:
rows_of_value_lists.append(export_data_value)
rows.add_rows_of_value_lists(
# Filter if all values are None
[
x for x in rows_of_value_lists
if x is not None and not all(y is None for y in x)
],
col_span,
)
else:
export_data_values = export_data.get('values')
if export_data.get('widget_key') == Widget.WidgetType.DATE_RANGE.value:
Avoid deeply nested control flow statements.
if len(export_data_values) == 2 and any(export_data_values):
rows.add_value_list([
self.date_renderer(deep_date_parse(export_data_values[0], raise_exception=False)),
self.date_renderer(deep_date_parse(export_data_values[1], raise_exception=False)),
])
else:
rows.add_value_list(export_data_values)
else:
rows.add_value_list([''] * col_span)
 
elif export_type == 'geo' and self.regions:
geo_id_values = []
region_geo_polygons = {}
if export_data:
geo_id_values = [str(v) for v in export_data.get('values') or []]
for geo_polygon in export_data.get('polygons') or []:
region_id = geo_polygon['region_id']
region_geo_polygons[region_id] = region_geo_polygons.get(region_id) or []
region_geo_polygons[region_id].append(geo_polygon['title'])
 
for region in self.regions:
admin_levels = self.region_data[region.id]
geo_polygons = region_geo_polygons.get(region.id, [])
max_levels = len(admin_levels)
rows_value = []
 
rows.add_rows_of_values(geo_polygons)
 
for rev_level, admin_level in enumerate(admin_levels[::-1]):
geo_area_titles = admin_level['geo_area_titles']
level = max_levels - rev_level
for geo_id in geo_id_values:
Avoid deeply nested control flow statements.
if geo_id not in geo_area_titles:
continue
Avoid deeply nested control flow statements.
if geo_id in self.geoarea_data_cache:
rows_value.append(self.geoarea_data_cache[geo_id])
continue
 
row_values = ['' for i in range(0, max_levels - level)] * 2
 
title = geo_area_titles[geo_id].get('title', '')
code = geo_area_titles[geo_id].get('code', '')
parent_id = geo_area_titles[geo_id].get('parent_id')
 
row_values.extend([code, title])
Avoid deeply nested control flow statements.
for _level in range(0, level - 1)[::-1]:
if parent_id:
_geo_area_titles = admin_levels[_level]['geo_area_titles']
_geo_area = _geo_area_titles.get(parent_id) or {}
_title = _geo_area.get('title', '')
_code = _geo_area.get('code', '')
parent_id = _geo_area.get('parent_id')
row_values.extend([_code, _title])
else:
row_values.extend(['', ''])
rows_value.append(row_values[::-1])
self.geoarea_data_cache[geo_id] = row_values[::-1]
 
if len(rows_value) > 0:
rows.add_rows_of_value_lists(rows_value)
else:
rows.add_rows_of_value_lists([['' for i in range(0, max_levels)] * 2])
 
else:
if export_data:
if export_data.get('type') == 'list':
row_values = [
# This is in hope of filtering out non-existent data from excel row
x for x in export_data.get('value', [])
if x is not None
]
rows.add_rows_of_values(row_values if row_values else [''])
else:
rows.add_value(export_data.get('value'))
else:
rows.add_value('')
 
def get_data_series(self, entry):
lead = entry.lead
field = entry.tabular_field
 
if field is None:
return ''
self.tabular_fields[field.id] = field
 
# Get Sheet title which is Lead title - Sheet title
# Worksheet title is limited to 31 as excel's tab length is capped to 31
worksheet_title = '{}-{}'.format(lead.title, field.sheet.title)
if not self._sheets.get(worksheet_title) and len(worksheet_title) > 31:
self._sheets[worksheet_title] = '{}-{}'.format(
worksheet_title[:28],
len(self.wb.wb.worksheets)
)
elif not self._sheets.get(worksheet_title):
self._sheets[worksheet_title] = worksheet_title
worksheet_title = self._sheets[worksheet_title]
 
if worksheet_title not in self.wb.wb.sheetnames:
tabular_sheet = self.wb.create_sheet(worksheet_title).ws
else:
tabular_sheet = self.wb.wb.get_sheet_by_name(worksheet_title)
 
# Get fields data
worksheet_data = self.tabular_sheets.get(worksheet_title, {})
 
col_number = worksheet_data.get(field.title)
if col_number is None:
# col_number None means we don't have the field in the work sheet
# So, we create one assigning next number to the field
cols_count = len(worksheet_data.keys())
col_number = cols_count + 1
worksheet_data[field.title] = col_number
 
# Now add data to the column
# excel_column_name converts number to excel column names: 1 -> A..
sheet_col_name = excel_column_name(col_number)
 
self.tabular_sheets[worksheet_title] = worksheet_data
 
# Insert field title to sheet in first row
tabular_sheet['{}1'.format(sheet_col_name)].value =\
field.title
 
# Add field values to corresponding column
for i, x in enumerate(field.actual_data):
tabular_sheet[
'{}{}'.format(sheet_col_name, 2 + i)
].value = x.get('processed_value') or x['value']
else:
sheet_col_name = excel_column_name(col_number)
 
link = f'#\'{worksheet_title}\'!{sheet_col_name}1'
return get_hyperlink(link, field.title)
 
def get_entry_data(self, entry):
if entry.entry_type == Entry.TagType.EXCERPT:
return entry.excerpt
 
if entry.entry_type == Entry.TagType.IMAGE:
return entry.get_image_url()
 
if entry.entry_type == Entry.TagType.DATA_SERIES:
try:
return self.get_data_series(entry)
except Exception:
self.log_error(
'Data Series EXCEL Export Failed for entry',
exc_info=1,
extra={'data': {'entry_id': entry.pk}},
)
 
return ''
 
Function `add_entries` has a Cognitive Complexity of 25 (exceeds 12 allowed). Consider refactoring.
def add_entries(self, entries):
iterable_entries = entries[:Export.PREVIEW_ENTRY_SIZE] if self.is_preview else entries
for i, entry in enumerate(iterable_entries):
# Export each entry
# Start building rows and export data for each exportable
 
# ENTRY GROUP
# Add it to appropriate row/column in self.group_label_matrix
for group_label in entry.entrygrouplabel_set.all():
key = (group_label.group.lead_id, group_label.group_id)
entries_sheet_name = 'Grouped Entries' if self.decoupled else 'Entries'
link = f'#\'{entries_sheet_name}\'!A{i + 2}'
self.group_label_matrix[key][group_label.label_id] = get_hyperlink(link, entry.excerpt[:50])
 
lead = entry.lead
assignee = lead.get_assignee()
rows = RowsBuilder(self.split, self.group, self.decoupled)
 
for exportable in self.exportables:
if isinstance(exportable, str):
# Static columns
values = self.add_entries_from_excel_data_for_static_column(
exportable,
entry,
lead,
assignee,
)
if type(values) in [list, tuple]:
rows.add_value_list(values)
else:
rows.add_value(values)
else:
# Get export data for this entry corresponding to this
# exportable.
# And write some value based on type and data
# or empty strings if no data.
data = exportable.data.get('excel')
export_data = ExportData.objects.filter(
exportable=exportable,
entry=entry,
data__excel__isnull=False,
).first()
 
if export_data and type(export_data.data.get('excel', {})) == list:
export_data = export_data.data.get('excel', [])
else:
export_data = export_data and {
**export_data.data.get('common', {}),
**export_data.data.get('excel', {})
}
self.add_entries_from_excel_data(rows, data, export_data)
 
rows.apply()
 
# Now add data to entry group sheet
for (leadid, gid), labeldata in self.group_label_matrix.items():
row_data = [
self.lead_id_titles_map.get(leadid),
self.group_id_title_map.get(gid),
*labeldata.values(),
]
self.entry_groups_sheet.append([row_data])
return self
 
def add_bibliography_sheet(self, leads_qs):
self.bibliography_sheet.append([['Author', 'Source', 'Published Date', 'Title', 'Entries Count']])
qs = leads_qs
# This is annotated from LeadGQFilterSet.filter_queryset if not use total entries count
if 'filtered_entry_count' not in qs.query.annotations:
qs = qs.annotate(
filtered_entry_count=models.functions.Coalesce(
models.Subquery(
Entry.objects.filter(
project=self.project,
analysis_framework=self.project.analysis_framework_id,
lead=models.OuterRef('pk'),
).order_by().values('lead')
.annotate(count=models.Count('id'))
.values('count'),
output_field=models.IntegerField(),
), 0,
)
)
for lead in qs:
self.bibliography_sheet.append(
[[
lead.get_authors_display(),
lead.get_source_display(),
self.date_renderer(lead.published_on),
get_hyperlink(lead.url, lead.title) if lead.url else lead.title,
lead.filtered_entry_count,
]]
)
 
def export(self, leads_qs):
"""
Export and return export data
"""
self.group.set_col_types(self.col_types)
if self.split:
self.split.set_col_types(self.col_types)
 
# Add bibliography
self.add_bibliography_sheet(leads_qs)
 
buffer = self.wb.save()
return ContentFile(buffer)