webservices/legal_docs/load_legal_docs.py

Summary

Maintainability
D
2 days
Test Coverage
#!/usr/bin/env python

import itertools
import re
from zipfile import ZipFile
from tempfile import NamedTemporaryFile
from xml.etree import ElementTree as ET
from datetime import datetime
from os.path import getsize
import csv
from multiprocessing import Pool
import logging
from urllib.parse import urlencode

import requests

from webservices.rest import db
from webservices.env import env
from webservices import utils
from webservices.tasks.utils import get_bucket

from . import reclassify_statutory_citation

# sigh. This is a mess because slate uses a library also called utils.
import importlib
importlib.invalidate_caches()
import sys
from distutils.sysconfig import get_python_lib
sys.path = [get_python_lib() + '/slate'] + sys.path
import slate
sys.path = sys.path[1:]

logger = logging.getLogger('manager')

def get_sections(reg):
    sections = {}
    for subpart in reg['children']:
        for node in subpart['children']:
            sections[tuple(node['label'])] = {'text': get_text(node),
                                              'title': node['title']}
    return sections


def get_text(node):
    text = ''
    if "text" in node:
        text = node["text"]
    for child in node["children"]:
        text += ' ' + get_text(child)
    return text


def index_regulations():
    """
        Indexes the regulations relevant to the FEC in Elasticsearch.
        The regulations are accessed from FEC_EREGS_API.
    """
    eregs_api = env.get_credential('FEC_EREGS_API', '')
    if not eregs_api:
        logger.error("Regs could not be indexed, environment variable FEC_EREGS_API not set.")
        return

    logger.info("Indexing regulations")
    reg_versions = requests.get(eregs_api + 'regulation').json()['versions']
    es = utils.get_elasticsearch_connection()
    reg_count = 0
    for reg in reg_versions:
        url = '%sregulation/%s/%s' % (eregs_api, reg['regulation'],
                                        reg['version'])
        regulation = requests.get(url).json()
        sections = get_sections(regulation)

        logger.debug("Loading part %s" % reg['regulation'])
        for section_label in sections:
            doc_id = '%s_%s' % (section_label[0], section_label[1])
            section_formatted = '%s-%s' % (section_label[0], section_label[1])
            reg_url = '/regulations/{0}/{1}#{0}'.format(section_formatted,
                                                        reg['version'])
            no = '%s.%s' % (section_label[0], section_label[1])
            name = sections[section_label]['title'].split(no)[1].strip()
            doc = {
                "doc_id": doc_id,
                "name": name,
                "text": sections[section_label]['text'],
                "url": reg_url,
                "no": no,
                "sort1": int(section_label[0]),
                "sort2": int(section_label[1])
            }

            es.index('docs_index', 'regulations', doc, id=doc['doc_id'])
        reg_count += 1
    logger.info("%d regulation parts indexed", reg_count)

def get_ao_citations():
    logger.info("getting citations...")
    ao_names_results = db.engine.execute("""SELECT ao_no, name FROM aouser.document
                                  INNER JOIN aouser.ao ON ao.ao_id = document.ao_id""")
    ao_names = {}
    for row in ao_names_results:
        ao_names[row['ao_no']] = row['name']

    text = db.engine.execute("""SELECT ao_no, category, ocrtext FROM aouser.document
                                INNER JOIN aouser.ao ON ao.ao_id = document.ao_id""")
    citations = {}
    cited_by = {}
    for row in text:
        logger.info("Getting citations for %s" % row['ao_no'])
        citations_in_doc = set()
        text = row['ocrtext'] or ''
        for citation in re.findall('[12][789012][0-9][0-9]-[0-9][0-9]?[0-9]?', text):
            year, no = tuple(citation.split('-'))
            citation_txt = "{0}-{1:02d}".format(year, int(no))
            if citation_txt != row['ao_no'] and citation_txt in ao_names:
                citations_in_doc.add(citation_txt)

        citations[(row['ao_no'], row['category'])] = sorted([{'no': citation, 'name': ao_names[citation]}
         for citation in citations_in_doc], key=lambda d: d['no'])

        if row['category'] == 'Final Opinion':
            for citation in citations_in_doc:
                if citation not in cited_by:
                    cited_by[citation] = set([row['ao_no']])
                else:
                    cited_by[citation].add(row['ao_no'])

    for citation, cited_by_set in cited_by.items():
        cited_by_with_name = sorted([{'no': c, 'name': ao_names[c]}
            for c in cited_by_set], key=lambda d: d['no'])
        cited_by[citation] = cited_by_with_name

    return citations, cited_by

def get_xml_tree_from_url(url):
    r = requests.get(url, stream=True)

    with NamedTemporaryFile('wb+') as f:
        for chunk in r:
            f.write(chunk)
        f.seek(0)
        zip_file = ZipFile(f.name)

        with zip_file.open(zip_file.namelist()[0]) as title,\
         NamedTemporaryFile('wb+') as title_xml:
            title_xml.write(title.read())
            title_xml.seek(0)
            return ET.parse(title_xml.name)

def get_title_52_statutes():
    es = utils.get_elasticsearch_connection()

    title_parsed = get_xml_tree_from_url('http://uscode.house.gov/download/' +
                    'releasepoints/us/pl/114/219/xml_usc52@114-219.zip')
    tag_name = '{{http://xml.house.gov/schemas/uslm/1.0}}{0}'
    section_count = 0
    for subtitle in title_parsed.iter(tag_name.format('subtitle')):
        if subtitle.attrib['identifier'] == '/us/usc/t52/stIII':
            for subchapter in subtitle.iter(tag_name.format('subchapter')):
                match = re.match("/us/usc/t52/stIII/ch([0-9]+)/sch([IVX]+)",
                                    subchapter.attrib['identifier'])
                chapter = match.group(1)
                subchapter_no = match.group(2)
                for section in subchapter.iter(tag_name.format('section')):
                    text = ''
                    for child in section.iter():
                        if child.text:
                            text += ' %s ' % child.text.strip()
                    heading = section.find(tag_name.format('heading')).text.strip()
                    section_no = re.match('/us/usc/t52/s([0-9]+)',
                             section.attrib['identifier']).group(1)
                    pdf_url = 'http://api.fdsys.gov/link?collection=uscode&' +\
                              'title=52&year=mostrecent&section=%s'\
                              % section_no
                    doc = {
                        "doc_id": section.attrib['identifier'],
                        "text": text,
                        "name": heading,
                        "no": section_no,
                        "title": "52",
                        "chapter": chapter,
                        "subchapter": subchapter_no,
                        "url": pdf_url,
                        "sort1": 52,
                        "sort2": int(section_no)
                    }
                    es.index('docs_index', 'statutes', doc, id=doc['doc_id'])
                    section_count += 1
    return section_count

def get_title_26_statutes():
    es = utils.get_elasticsearch_connection()

    title_parsed = get_xml_tree_from_url('http://uscode.house.gov/download/' +
                    'releasepoints/us/pl/114/219/xml_usc26@114-219.zip')
    tag_name = '{{http://xml.house.gov/schemas/uslm/1.0}}{0}'
    section_count = 0
    for subtitle in title_parsed.iter(tag_name.format('subtitle')):
        if subtitle.attrib['identifier'] == '/us/usc/t26/stH':
            for chapter in subtitle.iter(tag_name.format('chapter')):
                match = re.match("/us/usc/t26/stH/ch([0-9]+)",
                                    chapter.attrib['identifier'])
                chapter_no = match.group(1)
                for section in chapter.iter(tag_name.format('section')):
                    text = ''
                    for child in section.iter():
                        if child.text:
                            text += ' %s ' % child.text.strip()
                    heading = section.find(tag_name.format('heading')).text.strip()
                    section_no = re.match('/us/usc/t26/s([0-9]+)',
                             section.attrib['identifier']).group(1)
                    pdf_url = 'http://api.fdsys.gov/link?collection=uscode&' +\
                              'title=26&year=mostrecent&section=%s'\
                              % section_no
                    doc = {
                        "doc_id": section.attrib['identifier'],
                        "text": text,
                        "name": heading,
                        "no": section_no,
                        "title": "26",
                        "chapter": chapter_no,
                        "url": pdf_url,
                        "sort1": 26,
                        "sort2": int(section_no)
                    }
                    es.index('docs_index', 'statutes', doc, id=doc['doc_id'])
                    section_count += 1
    return section_count


def index_statutes():
    """
        Indexes statutes with titles 26 and 52 in Elasticsearch.
        The statutes are downloaded from http://uscode.house.gov.
    """
    logger.info("Indexing statutes")
    title_26_section_count = get_title_26_statutes()
    title_52_section_count = get_title_52_statutes()
    logger.info("%d statute sections indexed", title_26_section_count + title_52_section_count)


def process_mur_pdf(mur_no, pdf_key, bucket):
    response = requests.get('http://classic.fec.gov/disclosure_data/mur/%s.pdf'
                            % mur_no, stream=True)

    with NamedTemporaryFile('wb+') as pdf:
        for chunk in response:
            pdf.write(chunk)

        pdf.seek(0)
        try:
            pdf_doc = slate.PDF(pdf)
        except:
            logger.error('Could not parse pdf: %s' % pdf_key)
            pdf_doc = []
        pdf_pages = len(pdf_doc)
        pdf_text = ' '.join(pdf_doc)
        pdf_size = getsize(pdf.name)

        pdf.seek(0)
        bucket.put_object(Key=pdf_key, Body=pdf,
                          ContentType='application/pdf', ACL='public-read')
        return pdf_text, pdf_size, pdf_pages

def get_subject_tree(html, tree=None):
    """This is a standard shift-reduce parser for extracting the tree of subject
    topics from the html. Using a html parser (eg., beautifulsoup4) would not have
    solved this problem, since the parse tree we want is _not_ represented in
    the hierarchy of html elements. Depending on the tag, the algorithm either shifts
    (adds the tag to a list) or reduces (collapses everything until the most recent ul
    into a child of the previous element). It stops when it encounters the empty tag."""

    if tree is None:
        tree = []
    # get next token
    root = re.match("([^<]+)(?:<br>)?(.*)", html, re.S)
    list_item = re.match("<li>(.*?)</li>(.*)", html, re.S)
    unordered_list = re.match("<ul\s+class='no-top-margin'>(.*)", html, re.S)
    end_list = re.match("</ul>(.*)", html, re.S)
    empty = re.match("\s*<br>\s*", html)

    if empty:
        pass
    elif end_list:
        # reduce
        sub_tree = []
        node = tree.pop()
        while node != 'ul_start':
            sub_tree.append(node)
            node = tree.pop()

        sub_tree.reverse()
        if tree[-1] != 'ul_start':
            tree[-1]['children'] = sub_tree
        else:
            tree.extend(sub_tree)
    elif unordered_list:
        # shift
        tree.append('ul_start')
    elif list_item or root:
        # shift
        subject = re.sub('\s+', ' ', (list_item or root).group(1))\
                    .strip().lower().capitalize()
        tree.append({'text': subject})
    else:
        print(html)
        raise Exception("Could not parse next token.")

    if not empty:
        tail = (root or list_item or unordered_list or end_list)
        if tail:
            html = tail.groups()[-1].strip()
            if html:
                get_subject_tree(html, tree)
    return tree

def get_citations(citation_texts):
    us_codes = []
    regulations = []

    for citation_text in citation_texts:
        us_code_match = re.match("(?P<title>[0-9]+) U\.S\.C\. (?P<section>[0-9a-z-]+)(?P<paragraphs>.*)", citation_text)
        regulation_match = re.match(
            "(?P<title>[0-9]+) C\.F\.R\. (?P<part>[0-9]+)(?:\.(?P<section>[0-9]+))?", citation_text)

        if us_code_match:
            title, section = reclassify_statutory_citation.reclassify_archived_mur_statutory_citation(
                us_code_match.group('title'), us_code_match.group('section'))
            citation_text = '%s U.S.C. %s%s' % (title, section, us_code_match.group('paragraphs'))
            url = 'http://api.fdsys.gov/link?' +\
                  urlencode([('collection', 'uscode'), ('link-type', 'html'), ('title', title),
                    ('year', 'mostrecent'), ('section', section)])
            us_codes.append({"text": citation_text, "url": url})
        elif regulation_match:
            url = utils.create_eregs_link(regulation_match.group('part'), regulation_match.group('section'))
            regulations.append({"text": citation_text, "url": url})

        else:
            print(citation_text)
            raise Exception("Could not parse citation")
    return {"us_code": us_codes, "regulations": regulations}

def delete_murs_from_s3():
    """
    Deletes all MUR documents from S3
    """
    bucket = get_bucket()
    for obj in bucket.objects.filter(Prefix="legal/murs"):
        obj.delete()

def delete_murs_from_es():
    """
    Deletes all MURs from Elasticsearch
    """
    delete_from_es('docs_index', 'murs')

def delete_advisory_opinions_from_es():
    """
    Deletes all advisory opinions from Elasticsearch
    """
    delete_from_es('docs_index', 'advisory_opinions')

def delete_from_es(index, doc_type):
    """
    Deletes all documents with the given `doc_type` from Elasticsearch
    """
    es = utils.get_elasticsearch_connection()
    es.delete_by_query(index=index, body={'query': {'match_all': {}}}, doc_type=doc_type)

def get_mur_names(mur_names={}):
    # Cache the mur names
    if len(mur_names):
        return mur_names

    with open('data/archived_mur_names.csv') as csvfile:
        for row in csv.reader(csvfile):
            if row[0] == 'MUR':
                mur_names[row[1]] = row[2]
    return mur_names

def process_mur(mur):
    es = utils.get_elasticsearch_connection()
    bucket = get_bucket()
    mur_names = get_mur_names()
    (mur_no_td, open_date_td, close_date_td, parties_td, subject_td, citations_td)\
        = re.findall("<td[^>]*>(.*?)</td>", mur[2], re.S)
    mur_no = re.search("/disclosure_data/mur/([0-9_A-Z]+)\.pdf", mur_no_td).group(1)
    logger.info("Loading archived MUR %s: %s of %s", mur_no, mur[0] + 1, mur[1])
    pdf_key = 'legal/murs/%s.pdf' % mur_no
    text, pdf_size, pdf_pages = process_mur_pdf(mur_no, pdf_key, bucket)
    pdf_url = '/files/' + pdf_key
    open_date, close_date = (None, None)
    if open_date_td:
        open_date = datetime.strptime(open_date_td, '%m/%d/%Y').isoformat()
    if close_date_td:
        close_date = datetime.strptime(close_date_td, '%m/%d/%Y').isoformat()
    parties = re.findall("(.*?)<br>", parties_td)
    complainants = []
    respondents = []
    for party in parties:
        match = re.match("\(([RC])\) - (.*)", party)
        name = match.group(2).strip().title()
        if match.group(1) == 'C':
            complainants.append(name)
        if match.group(1) == 'R':
            respondents.append(name)

    subject = get_subject_tree(subject_td)
    citations = get_citations(re.findall("(.*?)<br>", citations_td))

    mur_digits = re.match("([0-9]+)", mur_no).group(1)
    name = mur_names[mur_digits] if mur_digits in mur_names else ''
    doc = {
        'doc_id': 'mur_%s' % mur_no,
        'no': mur_no,
        'name': name,
        'text': text,
        'mur_type': 'archived',
        'pdf_size': pdf_size,
        'pdf_pages': pdf_pages,
        'open_date': open_date,
        'close_date': close_date,
        'complainants': complainants,
        'respondents': respondents,
        'subject': subject,
        'citations': citations,
        'url': pdf_url
    }
    es.index('archived_murs_index', 'murs', doc, id=doc['doc_id'])

def load_archived_murs(from_mur_no=None, specific_mur_no=None, num_processes=1, tasks_per_child=None):
    """
    Reads data for archived MURs from http://classic.fec.gov/MUR, assembles a JSON
    document corresponding to the MUR and indexes this document in Elasticsearch
    in the index `docs_index` with a doc_type of `murs`. In addition, the MUR
    document is uploaded to an S3 bucket under the _directory_ `legal/murs/`.
    """
    logger.info("Loading archived MURs")
    table_text = requests.get('http://classic.fec.gov/MUR/MURData.do').text
    rows = re.findall("<tr [^>]*>(.*?)</tr>", table_text, re.S)[1:]
    if from_mur_no is not None:
        rows = list(itertools.dropwhile(
            lambda x: re.search('/disclosure_data/mur/([0-9_A-Z]+)\.pdf', x, re.M).group(1) != from_mur_no, rows))
    elif specific_mur_no is not None:
        rows = list(filter(
            lambda x: re.search('/disclosure_data/mur/([0-9_A-Z]+)\.pdf', x, re.M).group(1) == specific_mur_no, rows))
    murs = zip(range(len(rows)), [len(rows)] * len(rows), rows)
    processes = int(num_processes)
    maxtasksperchild = int(tasks_per_child) if tasks_per_child else None
    with Pool(processes=processes, maxtasksperchild=maxtasksperchild) as pool:
        pool.map(process_mur, murs, chunksize=1)
    logger.info("%d archived MURs loaded", len(rows))