webservices/legal_docs/current_murs.py
import logging
import re
from collections import defaultdict
from urllib.parse import urlencode
from webservices.env import env
from webservices.rest import db
from webservices.utils import create_eregs_link, get_elasticsearch_connection
from webservices.tasks.utils import get_bucket
from .reclassify_statutory_citation import reclassify_current_mur_statutory_citation
logger = logging.getLogger(__name__)
ALL_MURS = """
SELECT
case_id,
case_no,
name
FROM fecmur.cases_with_parsed_case_serial_numbers
WHERE case_type = 'MUR'
AND case_serial >= %s
ORDER BY case_serial
"""
MUR_SUBJECTS = """
SELECT
subject.description AS subj,
relatedsubject.description AS rel
FROM fecmur.case_subject
JOIN fecmur.subject USING (subject_id)
LEFT OUTER JOIN fecmur.relatedsubject USING (subject_id, relatedsubject_id)
WHERE case_id = %s
"""
MUR_ELECTION_CYCLES = """
SELECT
election_cycle::INT
FROM fecmur.electioncycle
WHERE case_id = %s
"""
MUR_PARTICIPANTS = """
SELECT
entity_id,
name,
role.description AS role
FROM fecmur.players
JOIN fecmur.role USING (role_id)
JOIN fecmur.entity USING (entity_id)
WHERE case_id = %s
"""
MUR_DOCUMENTS = """
SELECT
doc.document_id,
mur.case_no,
doc.filename,
doc.category,
doc.description,
doc.ocrtext,
doc.fileimage,
length(fileimage) AS length,
doc.doc_order_id,
doc.document_date
FROM fecmur.document doc
INNER JOIN fecmur.cases_with_parsed_case_serial_numbers mur
ON mur.case_id = doc.case_id
WHERE doc.case_id = %s
ORDER BY doc.doc_order_id, doc.document_date desc, doc.document_id DESC;
"""
# TODO: Check if document order matters
MUR_VIOLATIONS = """
SELECT entity_id, stage, statutory_citation, regulatory_citation
FROM fecmur.violations
WHERE case_id = %s
;
"""
OPEN_AND_CLOSE_DATES = """
SELECT min(event_date), max(event_date)
FROM fecmur.calendar
WHERE case_id = %s;
"""
DISPOSITION_DATA = """
SELECT fecmur.event.event_name,
fecmur.settlement.final_amount, fecmur.entity.name, violations.statutory_citation,
violations.regulatory_citation
FROM fecmur.calendar
INNER JOIN fecmur.event ON fecmur.calendar.event_id = fecmur.event.event_id
INNER JOIN fecmur.entity ON fecmur.entity.entity_id = fecmur.calendar.entity_id
LEFT JOIN (SELECT * FROM fecmur.relatedobjects WHERE relation_id = 1) AS relatedobjects
ON relatedobjects.detail_key = fecmur.calendar.entity_id
LEFT JOIN fecmur.settlement ON fecmur.settlement.settlement_id = relatedobjects.master_key
LEFT JOIN (SELECT * FROM fecmur.violations WHERE stage = 'Closed' AND case_id = {0}) AS violations
ON violations.entity_id = fecmur.calendar.entity_id
WHERE fecmur.calendar.case_id = {0}
AND event_name NOT IN ('Complaint/Referral', 'Disposition')
ORDER BY fecmur.event.event_name ASC, fecmur.settlement.final_amount DESC NULLS LAST, event_date DESC;
"""
COMMISSION_VOTES = """
SELECT vote_date, action from fecmur.commission
WHERE case_id = %s
ORDER BY vote_date desc;
"""
STATUTE_REGEX = re.compile(r'(?<!\(|\d)(?P<section>\d+([a-z](-1)?)?)')
REGULATION_REGEX = re.compile(r'(?<!\()(?P<part>\d+)(\.(?P<section>\d+))?')
MUR_NO_REGEX = re.compile(r'(?P<serial>\d+)')
def load_current_murs(from_mur_no=None):
"""
Reads data for current MURs from a Postgres database, assembles a JSON document
corresponding to the MUR and indexes this document in Elasticsearch in the index
`docs_index` with a doc_type of `murs`. In addition, all documents attached to
the MUR are uploaded to an S3 bucket under the _directory_ `legal/murs/current/`.
"""
es = get_elasticsearch_connection()
logger.info("Loading current MURs")
mur_count = 0
for mur in get_murs(from_mur_no):
logger.info("Loading current MUR: %s", mur['no'])
es.index('docs_index', 'murs', mur, id=mur['doc_id'])
mur_count += 1
logger.info("%d current MURs loaded", mur_count)
def get_murs(from_mur_no):
bucket = get_bucket()
bucket_name = env.get_credential('bucket')
if from_mur_no is None:
start_mur_serial = 0
else:
start_mur_serial = int(MUR_NO_REGEX.match(from_mur_no).group('serial'))
with db.engine.connect() as conn:
rs = conn.execute(ALL_MURS, start_mur_serial)
for row in rs:
case_id = row['case_id']
sort1, sort2 = get_sort_fields(row['case_no'])
mur = {
'doc_id': 'mur_%s' % row['case_no'],
'no': row['case_no'],
'name': row['name'],
'mur_type': 'current',
'sort1': sort1,
'sort2': sort2,
}
mur['subjects'] = get_subjects(case_id)
mur['election_cycles'] = get_election_cycles(case_id)
participants = get_participants(case_id)
mur['participants'] = list(participants.values())
mur['respondents'] = get_sorted_respondents(mur['participants'])
mur['commission_votes'] = get_commission_votes(case_id)
mur['dispositions'] = get_dispositions(case_id)
mur['documents'] = get_documents(case_id, bucket, bucket_name)
mur['open_date'], mur['close_date'] = get_open_and_close_dates(case_id)
mur['url'] = '/legal/matter-under-review/%s/' % row['case_no']
yield mur
def get_election_cycles(case_id):
election_cycles = []
with db.engine.connect() as conn:
rs = conn.execute(MUR_ELECTION_CYCLES, case_id)
for row in rs:
election_cycles.append(row['election_cycle'])
return election_cycles
def get_open_and_close_dates(case_id):
with db.engine.connect() as conn:
rs = conn.execute(OPEN_AND_CLOSE_DATES, case_id)
open_date, close_date = rs.fetchone()
return open_date, close_date
def get_dispositions(case_id):
with db.engine.connect() as conn:
rs = conn.execute(DISPOSITION_DATA.format(case_id))
disposition_data = []
for row in rs:
citations = parse_statutory_citations(row['statutory_citation'], case_id, row['name'])
citations.extend(parse_regulatory_citations(row['regulatory_citation'], case_id, row['name']))
disposition_data.append({'disposition': row['event_name'], 'penalty': row['final_amount'],
'respondent': row['name'], 'citations': citations})
return disposition_data
def get_commission_votes(case_id):
with db.engine.connect() as conn:
rs = conn.execute(COMMISSION_VOTES, case_id)
commission_votes = []
for row in rs:
commission_votes.append({'vote_date': row['vote_date'], 'action': row['action']})
return commission_votes
def get_participants(case_id):
participants = {}
with db.engine.connect() as conn:
rs = conn.execute(MUR_PARTICIPANTS, case_id)
for row in rs:
participants[row['entity_id']] = {
'name': row['name'],
'role': row['role'],
'citations': defaultdict(list)
}
return participants
def get_sorted_respondents(participants):
"""
Returns the respondents in a MUR sorted in the order of most important to least important
"""
SORTED_RESPONDENT_ROLES = ['Primary Respondent', 'Respondent', 'Previous Respondent']
respondents = []
for role in SORTED_RESPONDENT_ROLES:
respondents.extend(sorted([p['name'] for p in participants if p['role'] == role]))
return respondents
def get_subjects(case_id):
subjects = []
with db.engine.connect() as conn:
rs = conn.execute(MUR_SUBJECTS, case_id)
for row in rs:
if row['rel']:
subject_str = row['subj'] + "-" + row['rel']
else:
subject_str = row['subj']
subjects.append(subject_str)
return subjects
def assign_citations(participants, case_id):
with db.engine.connect() as conn:
rs = conn.execute(MUR_VIOLATIONS, case_id)
for row in rs:
entity_id = row['entity_id']
if entity_id not in participants:
logger.warn("Entity %s from violations not found in participants for case %s", entity_id, case_id)
continue
participants[entity_id]['citations'][row['stage']].extend(
parse_statutory_citations(row['statutory_citation'], case_id, entity_id))
participants[entity_id]['citations'][row['stage']].extend(
parse_regulatory_citations(row['regulatory_citation'], case_id, entity_id))
def parse_statutory_citations(statutory_citation, case_id, entity_id):
citations = []
if statutory_citation:
statutory_citation = remove_reclassification_notes(statutory_citation)
matches = list(STATUTE_REGEX.finditer(statutory_citation))
for index, match in enumerate(matches):
section = match.group('section')
orig_title, new_title, new_section = reclassify_current_mur_statutory_citation(section)
url = 'https://api.fdsys.gov/link?' +\
urlencode([
('collection', 'uscode'),
('year', 'mostrecent'),
('link-type', 'html'),
('title', new_title),
('section', new_section)
])
if index == len(matches) - 1:
match_text = statutory_citation[match.start():]
else:
match_text = statutory_citation[match.start():matches[index + 1].start()]
text = match_text.rstrip(' ,;')
citations.append({'text': text, 'type': 'statute', 'title': orig_title, 'url': url})
if not citations:
logger.warn("Cannot parse statutory citation %s for Entity %s in case %s",
statutory_citation, entity_id, case_id)
return citations
def parse_regulatory_citations(regulatory_citation, case_id, entity_id):
citations = []
if regulatory_citation:
matches = list(REGULATION_REGEX.finditer(regulatory_citation))
for index, match in enumerate(matches):
part = match.group('part')
section = match.group('section')
url = create_eregs_link(part, section)
if index == len(matches) - 1:
match_text = regulatory_citation[match.start():]
else:
match_text = regulatory_citation[match.start():matches[index + 1].start()]
text = match_text.rstrip(' ,;')
citations.append({'text': text, 'type': 'regulation', 'title': '11', 'url': url})
if not citations:
logger.warn("Cannot parse regulatory citation %s for Entity %s in case %s",
regulatory_citation, entity_id, case_id)
return citations
def get_documents(case_id, bucket, bucket_name):
documents = []
with db.engine.connect() as conn:
rs = conn.execute(MUR_DOCUMENTS, case_id)
for row in rs:
document = {
'document_id': row['document_id'],
'category': row['category'],
'description': row['description'],
'length': row['length'],
'text': row['ocrtext'],
'document_date': row['document_date'],
}
if not row['fileimage']:
logger.error('Error uploading document ID {0} for MUR Case {1}: No file image'.format(row['document_id'], row['case_no']))
else:
pdf_key = 'legal/murs/{0}/{1}'.format(row['case_no'],
row['filename'].replace(' ', '-'))
document['url'] = '/files/' + pdf_key
logger.debug("S3: Uploading {}".format(pdf_key))
bucket.put_object(Key=pdf_key, Body=bytes(row['fileimage']),
ContentType='application/pdf', ACL='public-read')
documents.append(document)
return documents
def remove_reclassification_notes(statutory_citation):
""" Statutory citations include notes on reclassification of the form
"30120 (formerly 441d)" and "30120 (formerly 432(e)(1))". These need to be
removed as we explicitly perform the necessary reclassifications.
"""
UNPARENTHESIZED_FORMERLY_REGEX = re.compile(r' formerly \S*')
PARENTHESIZED_FORMERLY_REGEX = re.compile(r'\(formerly ')
def remove_to_matching_parens(citation):
""" In the case of reclassification notes of the form "(formerly ...",
remove all characters up to the matching closing ')' allowing for nested
parentheses pairs.
"""
match = PARENTHESIZED_FORMERLY_REGEX.search(citation)
pos = match.end()
paren_count = 0
while pos < len(citation):
if citation[pos] == ')':
if paren_count == 0:
return citation[:match.start()] + citation[pos + 1:]
else:
paren_count -= 1
elif citation[pos] == '(':
paren_count += 1
pos += 1
return citation[:match.start()] # Degenerate case - no matching ')'
cleaned_citation = UNPARENTHESIZED_FORMERLY_REGEX.sub(' ', statutory_citation)
while PARENTHESIZED_FORMERLY_REGEX.search(cleaned_citation):
cleaned_citation = remove_to_matching_parens(cleaned_citation)
return cleaned_citation
def get_sort_fields(case_no):
match = MUR_NO_REGEX.match(case_no)
return -int(match.group("serial")), None