uktrade/directory-api

View on GitHub
company/management/commands/update_investment_services_directory.py

Summary

Maintainability
C
1 day
Test Coverage
"""
Management utility to update Investment Services Directory.
"""

import ast
import csv
from datetime import datetime

import requests
from django.core.management.base import BaseCommand, CommandError

from company.models import Company, CompanyCaseStudy, CompanyUser
from core.helpers import get_companies_house_profile
from investment_supplier_data.stage_ids import stage_ids


class Command(BaseCommand):
    help = "Management utility to update Investment Services Directory"
    requires_migrations_checks = True
    stealth_options = ("stdin",)

    def add_arguments(self, parser):
        parser.add_argument('env', type=str)

    def validate_source_file(self, source_file):
        try:
            source_file_type = source_file.split(".")[-1]
            if source_file_type.lower() != "csv":
                raise CommandError(f"Source file must be 'csv': '{source_file}'")
            return source_file
        except IndexError:
            raise CommandError(f"Please provide the source_file path (csv only): '{source_file}'")

    def is_update(self, name: str) -> bool:
        if "update" in name:
            return True
        return False

    def model(self, name: str):
        if "company_" in name:
            return Company
        elif "casestudy_" in name:
            return CompanyCaseStudy
        return CompanyUser

    def switch_env(self, row: dict, env: str) -> dict:
        if env == 'stage':
            try:
                row["id"] = stage_ids[row['number']]
            except KeyError:
                pass
        return row

    def get_app_number(self, row: dict) -> str:
        return row['app_number']

    def clean_values(self, row: dict) -> dict:
        for key, value in row.items():
            if type(value) is str:
                # convert bool
                if value == "TRUE":
                    row[key] = True
                elif value == "FALSE":
                    row[key] = False
                # strip and truncate to 1000 (max field length)
                else:
                    striped_value = value.strip()
                    truncated_value = (striped_value[:1000]) if len(striped_value) > 1000 else striped_value
                    row[key] = truncated_value
        return row

    def removed_unwanted_data_columns(self, row: dict) -> dict:
        try:
            row.pop("app_number")
        except KeyError:
            pass
        try:
            row.pop("??")
        except KeyError:
            pass
        return row

    def adjust_field_types(self, row: dict) -> dict:
        json_fields = [
            "expertise_products_services",
        ]
        lists = ["expertise_industries", "expertise_regions", "expertise_languages"]
        for key, value in row.items():
            if key in json_fields:
                test = ast.literal_eval(value)
                row[key] = test
            if key in lists:
                try:
                    row[key] = [v.strip() for v in ast.literal_eval(value)]
                except SyntaxError:
                    pass
        return row

    def fetch_companies_house_data(self, row: dict) -> dict:
        number = row["number"]
        try:
            profile = get_companies_house_profile(number)
            if profile.get('date_of_creation'):
                row["date_of_creation"] = datetime.strptime(profile['date_of_creation'], '%Y-%m-%d').date()
            if profile.get('registered_office_address'):
                address = profile['registered_office_address']
                row["address_line_1"] = address.get('address_line_1', '')
                row["address_line_2"] = address.get('address_line_2', '')
                row["locality"] = address.get('locality', '')
                row["po_box"] = address.get('po_box', '')
                row["postal_code"] = address.get('postal_code', '')
            if profile.get('sic_codes'):
                row["companies_house_sic_codes"] = profile.get('sic_codes', [])
            if profile.get('company_status'):
                row["companies_house_company_status"] = profile.get('company_status', '')
        except requests.exceptions.HTTPError:
            pass
        if "UNKNOWN" in number:
            row["number"] = None
        return row

    def handle(self, *args, **options):
        given_env = options.get("env", None)
        env = 'stage' if given_env else 'prod'

        # Company supplied data
        source_files = {
            "company_update": "investment_supplier_data/company_update.csv",
            "companycasestudy_update": "investment_supplier_data/companycasestudy_update.csv",
            "companyuser_update": "investment_supplier_data/companyuser_update.csv",
            "company_add": "investment_supplier_data/company_add.csv",
            "companycasestudy_add": "investment_supplier_data/companycasestudy_add.csv",
            "companyuser_add": "investment_supplier_data/companyuser_add.csv",
        }

        # Validate each source file
        for key, source_file in source_files.items():
            self.validate_source_file(source_file)

        # Deactivate all case studies
        for company in Company.objects.filter(is_published_investment_support_directory=True):
            company.supplier_case_studies.all().update(is_published_case_study=False)

        # Variables to help process data in loop
        company_ids = []
        processed_companies = {}

        for key, source_file in source_files.items():
            _model = self.model(key)
            _is_update = self.is_update(key)
            try:
                with open(source_file, "r", encoding='utf-8') as csv_file:
                    csv_reader = csv.DictReader(
                        csv_file, quotechar='"', delimiter=',', quoting=csv.QUOTE_ALL, skipinitialspace=True
                    )
                    line_count = 0
                    for origin_row in csv_reader:
                        if line_count == 0:
                            line_count += 1
                        cleaned_row_values = self.clean_values(origin_row)
                        app_id = self.get_app_number(cleaned_row_values)
                        cleaned_row = self.removed_unwanted_data_columns(cleaned_row_values)

                        try:
                            # Update Company Data
                            if _is_update and _model == Company:
                                switch_envs = self.switch_env(cleaned_row, env)
                                id = switch_envs['id']
                                investment_company = Company.objects.filter(id=id)
                                if investment_company.exists():
                                    # This will update the object
                                    address_added = self.fetch_companies_house_data(switch_envs)
                                    adjust_types = self.adjust_field_types(address_added)
                                    investment_company.update(**adjust_types)
                                    investment_company = investment_company.first()
                                    processed_companies[app_id] = {
                                        "company_obj": investment_company,
                                        "name": investment_company.name,
                                        "url": investment_company.public_profile_url,
                                        "email": investment_company.email_address,
                                    }
                                    company_ids.append(id)
                                else:
                                    # This will create a new object
                                    address_added = self.fetch_companies_house_data(cleaned_row)
                                    adjust_types = self.adjust_field_types(address_added)
                                    investment_company = Company.objects.create(**adjust_types)
                                    processed_companies[app_id] = {
                                        "company_obj": investment_company,
                                        "name": investment_company.name,
                                        "url": investment_company.public_profile_url,
                                        "email": investment_company.email_address,
                                    }
                                    company_ids.append(investment_company.id)

                            # Create new Company
                            if not _is_update and _model == Company:
                                # Sanity check - does the company exist?
                                number = cleaned_row['number']
                                investment_company = Company.objects.filter(number=number)
                                if investment_company.exists():
                                    # This will update the object
                                    address_added = self.fetch_companies_house_data(cleaned_row)
                                    adjust_types = self.adjust_field_types(address_added)
                                    investment_company.update(**adjust_types)
                                    investment_company = investment_company.first()
                                    processed_companies[app_id] = {
                                        "company_obj": investment_company,
                                        "name": investment_company.name,
                                        "url": investment_company.public_profile_url,
                                        "email": investment_company.email_address,
                                    }
                                    company_ids.append(investment_company.id)
                                else:
                                    # This will create a new object
                                    address_added = self.fetch_companies_house_data(cleaned_row)
                                    adjust_types = self.adjust_field_types(address_added)
                                    investment_company = Company.objects.create(**adjust_types)
                                    processed_companies[app_id] = {
                                        "company_obj": investment_company,
                                        "name": investment_company.name,
                                        "url": investment_company.public_profile_url,
                                        "email": investment_company.email_address,
                                    }
                                    company_ids.append(investment_company.id)

                            # Create new CompanyCaseStudy
                            if _is_update and _model == CompanyCaseStudy:
                                cleaned_row.update(
                                    {
                                        "title": f"A Case Study By {processed_companies[app_id]['name']}",
                                        "company_id": processed_companies[app_id]["company_obj"].id,
                                    }
                                )
                                CompanyCaseStudy.objects.create(**cleaned_row)

                            # Create new CompanyCaseStudy for new Company
                            if not _is_update and _model == CompanyCaseStudy:
                                cleaned_row.update(
                                    {
                                        "title": f"A Case Study By {processed_companies[app_id]['name']}",
                                        "company_id": processed_companies[app_id]["company_obj"].id,
                                    }
                                )
                                CompanyCaseStudy.objects.create(**cleaned_row)

                            # Update CompanyUser
                            if _is_update and _model == CompanyUser:
                                company_id = processed_companies[app_id]["company_obj"].id
                                company_email = cleaned_row['company_email']
                                user = CompanyUser.objects.filter(company_id=company_id, company_email=company_email)
                                if user.exists():
                                    user.update(**cleaned_row)
                                    company = user.first().company
                                    company.mobile_number = cleaned_row["mobile_number"]
                                    company.save()

                            # Create CompanyUser
                            if not _is_update and _model == CompanyUser:
                                pass

                        except KeyError:
                            pass
            except FileNotFoundError:
                raise CommandError(f"No such file or directory: '{source_file}'")

        # Remove all other companies from directory
        Company.objects.exclude(id__in=company_ids).update(is_published_investment_support_directory=False)

        # Add processed companies to directory
        Company.objects.filter(id__in=company_ids).update(is_published_investment_support_directory=True)

        # Remove company objects from dict
        for key, value in processed_companies.items():
            value.pop("company_obj")

        print(processed_companies)
        return ">>>Finished"