elnappo/project-novis

View on GitHub
project_novis/callsign/management/commands/import_official.py

Summary

Maintainability
C
7 hrs
Test Coverage
import collections
import re
import sys
from io import BytesIO

import PyPDF2
import requests

from . import ImportCommand


class Command(ImportCommand):
    help = 'Import user data from Federal Network Agencies'
    task = 'callsign_import_official'

    def germany(self):
        # update v parameter?
        url: str = "https://www.bundesnetzagentur.de/SharedDocs/Downloads/DE/Sachgebiete/Telekommunikation/Unternehmen_Institutionen/Frequenzen/Amateurfunk/Rufzeichenliste/Rufzeichenliste_AFU.pdf?__blob=publicationFile&v=51"
        regex = re.compile(r"D[A-Z][0-9][A-Z0-9]+")
        callsigns = list()

        self._write("Import german callsigns")
        with requests.get(url, stream=False) as r:
            if r.ok:
                data = BytesIO(r.content)
                pdf_reader = PyPDF2.PdfFileReader(data)
                number_of_pages = pdf_reader.getNumPages()
                c = collections.Counter(range(number_of_pages))

                for i in c:
                    try:
                        page = pdf_reader.getPage(i)
                        page_content = page.extractText()
                        a = regex.findall(page_content)
                        callsigns.extend(a)
                    except KeyError:
                        continue

                unique_callsigns = set(callsigns)

                for callsign in unique_callsigns:
                    callsign_instance, _ = self._handle_callsign(callsign, official=True, source="germany_official")
                    if not callsign_instance._official_validated:
                        callsign_instance._official_validated = True
                        callsign_instance.save()

    def finland(self):
        # Based on https://github.com/OH6AD/koolit
        url: str = "https://eservices.viestintavirasto.fi/Licensesservices/Forms/AmateurLicenses.aspx"
        payload: dict = {'__EVENTTARGET': '', 'ButtonDownload': ''}
        callsigns = list()

        self._write("Import finnish callsigns")
        with requests.post(url, data=payload, stream=False) as r:
            if r.ok:
                for i in r.iter_lines(decode_unicode=True):
                    if not i:
                        continue
                    try:
                        callsign, status = i.split("\t", 1)
                    except ValueError:
                        # Handle invalid data in callsign list e.g.:
                        # OG73WR
                        #           VARAUS
                        self._write(f"Invalid data: {i}")
                    if status == "VOIMAS\t":
                        callsigns.append(callsign)

                    for callsign in set(callsigns):
                        callsign_instance, _ = self._handle_callsign(callsign, official=True, source="finland_official")
                        if not callsign_instance._official_validated:
                            callsign_instance._official_validated = True
                            callsign_instance.save()

    def usa(self):
        pass

    def austria(self):
        pass

    def handle(self, *args, **options):
        try:
            self.germany()
            self.finland()
            self.austria()
            self.usa()
            self._finish()
        except:
            self._finish(failed=True, error_message=sys.exc_info())
            raise