webservices/legal_docs/advisory_opinions.py

Summary

Maintainability
D
2 days
Test Coverage
from collections import defaultdict
import logging
import re

from webservices.env import env
from webservices.rest import db
from webservices.utils import get_elasticsearch_connection
from webservices.tasks.utils import get_bucket
from .reclassify_statutory_citation import reclassify_archived_mur_statutory_citation


logger = logging.getLogger(__name__)

ALL_AOS = """
    SELECT
        ao_parsed.ao_id,
        ao_parsed.ao_no,
        ao_parsed.name,
        ao_parsed.summary,
        ao_parsed.req_date,
        ao_parsed.issue_date,
        ao.stage
    FROM aouser.aos_with_parsed_numbers ao_parsed
    INNER JOIN aouser.ao ao
        ON ao_parsed.ao_id = ao.ao_id
    WHERE (
        (ao_parsed.ao_year = %s AND ao_parsed.ao_serial >= %s)
        OR
        (ao_parsed.ao_year > %s)
    )
    ORDER BY ao_parsed.ao_year, ao_parsed.ao_serial
"""

AO_ENTITIES = """
    SELECT
        e.name,
        et.description AS entity_type_description,
        r.description AS role_description
    FROM aouser.players p
    INNER JOIN aouser.entity e USING (entity_id)
    INNER JOIN aouser.entity_type et ON et.entity_type_id = e.type
    INNER JOIN aouser.role r USING (role_id)
    WHERE p.ao_id = %s
"""

AO_DOCUMENTS = """
    SELECT
        ao.ao_no,
        doc.document_id,
        doc.filename,
        doc.ocrtext,
        doc.fileimage,
        doc.description,
        doc.category,
        doc.document_date
    FROM aouser.document doc
    INNER JOIN aouser.ao ao
        ON ao.ao_id = doc.ao_id
    WHERE doc.ao_id = %s
"""

STATUTE_CITATION_REGEX = re.compile(
    r"(?P<title>\d+)\s+U\.?S\.?C\.?\s+§*\s*(?P<section>\d+).*\.?")

REGULATION_CITATION_REGEX = re.compile(
    r"(?P<title>\d+)\s+C\.?F\.?R\.?\s+§*\s*(?P<part>\d+)\.(?P<section>\d+)")

AO_CITATION_REGEX = re.compile(
    r"\b(?P<year>\d{4,4})-(?P<serial_no>\d+)\b")

AOS_WITH_CORRECTED_STAGE = {"2009-05": "Withdrawn"}


def load_advisory_opinions(from_ao_no=None):
    """
    Reads data for advisory opinions from a Postgres database,
    assembles a JSON document corresponding to the advisory opinion
    and indexes this document in Elasticsearch in the index `docs_index`
    with a doc_type of `advisory_opinions`.
    In addition, all documents attached to the advisory opinion
    are uploaded to an S3 bucket under the _directory_`legal/aos/`.
    """
    es = get_elasticsearch_connection()

    logger.info("Loading advisory opinions")
    ao_count = 0
    for ao in get_advisory_opinions(from_ao_no):
        logger.info("Loading AO: %s", ao['no'])
        es.index('docs_index', 'advisory_opinions', ao, id=ao['no'])
        ao_count += 1
    logger.info("%d advisory opinions loaded", ao_count)


def ao_stage_to_pending(stage):

    return stage == 0


def ao_stage_to_status(ao_no, stage):

    if ao_no in AOS_WITH_CORRECTED_STAGE:
        return AOS_WITH_CORRECTED_STAGE[ao_no]
    if stage == 2:
        return "Withdrawn"
    elif stage == 1:
        return "Final"
    else:
        return "Pending"


def get_advisory_opinions(from_ao_no):
    bucket = get_bucket()

    ao_names = get_ao_names()
    ao_no_to_component_map = {a: tuple(map(int, a.split('-'))) for a in ao_names}

    citations = get_citations(ao_names)

    if from_ao_no is None:
        start_ao_year, start_ao_serial = 0, 0
    else:
        start_ao_year, start_ao_serial = tuple(map(int, from_ao_no.split('-')))

    with db.engine.connect() as conn:
        rs = conn.execute(ALL_AOS, (start_ao_year, start_ao_serial, start_ao_year))
        for row in rs:
            ao_id = row["ao_id"]
            year, serial = ao_no_to_component_map[row["ao_no"]]
            ao = {
                "no": row["ao_no"],
                "name": row["name"],
                "summary": row["summary"],
                "request_date": row["req_date"],
                "issue_date": row["issue_date"],
                "is_pending": ao_stage_to_pending(row["stage"]),
                "status": ao_stage_to_status(row["ao_no"], row["stage"]),
                "ao_citations": citations[row["ao_no"]]["ao"],
                "aos_cited_by": citations[row["ao_no"]]["aos_cited_by"],
                "statutory_citations": citations[row["ao_no"]]["statutes"],
                "regulatory_citations": citations[row["ao_no"]]["regulations"],
                "sort1": -year,
                "sort2": -serial,
            }
            ao["documents"] = get_documents(ao_id, bucket)
            (ao["requestor_names"], ao["requestor_types"], ao["commenter_names"],
                    ao["representative_names"], ao["entities"]) = get_entities(ao_id)

            yield ao


def get_entities(ao_id):
    requestor_names = []
    commenter_names = []
    representative_names = []
    requestor_types = set()
    entities = []
    with db.engine.connect() as conn:
        rs = conn.execute(AO_ENTITIES, ao_id)
        for row in rs:
            if row["role_description"] == "Requestor":
                requestor_names.append(row["name"])
                requestor_types.add(row["entity_type_description"])
            elif row["role_description"] == "Commenter":
                commenter_names.append(row["name"])
            elif row["role_description"] == "Counsel/Representative":
                representative_names.append(row["name"])
            entities.append({"role": row["role_description"],
                "name": row["name"],
                "type": row["entity_type_description"]})
    return requestor_names, list(requestor_types),\
            commenter_names, representative_names, entities


def get_documents(ao_id, bucket):
    documents = []
    with db.engine.connect() as conn:
        rs = conn.execute(AO_DOCUMENTS, ao_id)
        for row in rs:
            document = {
                "document_id": row["document_id"],
                "category": row["category"],
                "description": row["description"],
                "text": row["ocrtext"],
                "date": row["document_date"],
            }
            if not row['fileimage']:
                logger.error('Error uploading document ID {0} for AO no {1}: No file image'.format(row['document_id'], row['ao_no']))
            else:
                pdf_key = "legal/aos/{0}/{1}".format(row['ao_no'],
                    row["filename"].replace(' ', '-'))
                document["url"] = '/files/' + pdf_key
                logger.debug("S3: Uploading {}".format(pdf_key))
                bucket.put_object(Key=pdf_key, Body=bytes(row["fileimage"]),
                        ContentType="application/pdf", ACL="public-read")
                documents.append(document)

    return documents


def get_ao_names():
    ao_names_results = db.engine.execute("""SELECT ao_no, name FROM aouser.ao""")
    ao_names = {}
    for row in ao_names_results:
        ao_names[row["ao_no"]] = row["name"]

    return ao_names


def get_citations(ao_names):
    ao_component_to_name_map = {tuple(map(int, a.split('-'))): a for a in ao_names}

    logger.info("Getting citations...")

    rs = db.engine.execute("""SELECT ao_no, ocrtext FROM aouser.document
                                INNER JOIN aouser.ao USING (ao_id)
                              WHERE category = 'Final Opinion'""")

    all_regulatory_citations = set()
    all_statutory_citations = set()
    raw_citations = defaultdict(lambda: defaultdict(set))
    for row in rs:
        logger.debug("Getting citations for AO %s" % row["ao_no"])

        ao_citations_in_doc = parse_ao_citations(row["ocrtext"], ao_component_to_name_map)
        ao_citations_in_doc.discard(row["ao_no"])  # Remove self

        raw_citations[row["ao_no"]]["ao"].update(ao_citations_in_doc)

        for citation in ao_citations_in_doc:
            raw_citations[citation]["aos_cited_by"].add(row["ao_no"])

        statutory_citations = parse_statutory_citations(row["ocrtext"])
        regulatory_citations = parse_regulatory_citations(row["ocrtext"])
        all_statutory_citations.update(statutory_citations)
        all_regulatory_citations.update(regulatory_citations)
        raw_citations[row["ao_no"]]["statutes"].update(statutory_citations)
        raw_citations[row["ao_no"]]["regulations"].update(regulatory_citations)

    citations = defaultdict(lambda: defaultdict(list))
    for ao in raw_citations:
        citations[ao]["ao"] = sorted([
            {"no": c, "name": ao_names[c]}
            for c in raw_citations[ao]["ao"]], key=lambda d: d["no"])
        citations[ao]["aos_cited_by"] = sorted([
            {"no": c, "name": ao_names[c]}
            for c in raw_citations[ao]["aos_cited_by"]], key=lambda d: d["no"])
        citations[ao]["statutes"] = sorted([
            {"title": c[0], "section": c[1]}
            for c in raw_citations[ao]["statutes"]], key=lambda d: (d["title"], d["section"]))
        citations[ao]["regulations"] = sorted([
            {"title": c[0], "part": c[1], "section": c[2]}
            for c in raw_citations[ao]["regulations"]], key=lambda d: (d["title"], d["part"], d["section"]))

    es = get_elasticsearch_connection()

    for citation in all_regulatory_citations:
        entry = {'citation_text': '%d CFR §%d.%d'
                 % (citation[0], citation[1], citation[2]),'citation_type': 'regulation'}
        es.index('docs_index', 'citations', entry, id=entry['citation_text'])

    for citation in all_statutory_citations:
        entry = {'citation_text': '%d U.S.C. §%d'
                 % (citation[0], citation[1]), 'citation_type': 'statute'}
        es.index('docs_index', 'citations', entry, id=entry['citation_text'])

    logger.info("Citations loaded.")

    return citations


def parse_ao_citations(text, ao_component_to_name_map):
    matches = set()

    if text:
        for citation in AO_CITATION_REGEX.finditer(text):
            year, serial_no = int(citation.group('year')), int(citation.group('serial_no'))
            if (year, serial_no) in ao_component_to_name_map:
                matches.add(ao_component_to_name_map[(year, serial_no)])
    return matches


def parse_statutory_citations(text):
    matches = set()
    if text:
        for citation in STATUTE_CITATION_REGEX.finditer(text):
            new_title, new_section = reclassify_archived_mur_statutory_citation(
                citation.group('title'), citation.group('section'))
            matches.add((
                int(new_title),
                int(new_section)
            ))
    return matches


def parse_regulatory_citations(text):
    matches = set()
    if text:
        for citation in REGULATION_CITATION_REGEX.finditer(text):
            matches.add((
                int(citation.group('title')),
                int(citation.group('part')),
                int(citation.group('section'))
            ))
    return matches