e-mental-health/data-processing

View on GitHub
tactus-anonymize.py

Summary

Maintainability
A
2 hrs
Test Coverage
#!/usr/bin/env python
"""
    tactus-anonymize.py: anonymize tactus file
    usage: python3 tactus-anonymyze.py [-l lang] file1.xml [file2.xml ...]
    20181015 erikt(at)xs4all.nl
"""

import html
import os
import re
import subprocess
import sys
import xml.etree.ElementTree as ET

COMMAND = sys.argv.pop(0)
BINDIR = "/home/erikt/projects/e-mental-health/data-processing"
DATADIR = "/home/erikt/projects/e-mental-health/usb/tmp"
ANONYMIZEPRG = "anonymize-dut.sh"
COUNSELORFILE = "counselors.txt"
ANONYMIZETAGS = ["Body","Subject", "Notes"]
COUNSELORTAG = "AssignedCounselor"
FIRSTNAMETAG = "FirstName"
LASTNAMETAG = "LastName"
KEEPTAGS = ["Dossier","AssignedCounselor","Messages","Message","DateSent","Subject","Body"]
KEEPTAGSTEXTTRACE = ["Sender","Recipients"]
CLEARTOKEN = "REMOVED"
CLEAREDSTRINGIDS = {"client":"CLIENT"}
OUTFILESUFFIX = "-an"
TMPFILENAME = "tactus-anonymize.py."+str(os.getpid())
BOUNDARY = "tactus-anonymize-py-mail-text-boundary"

clearedStringIds = CLEAREDSTRINGIDS
counselorIds = {}

def getRootOfFile(inFileName):
    return(ET.parse(inFileName))

def normalizeWhiteSpace(string):
    string = re.sub("^\s+","",string)
    string = re.sub("\s+$","",string)
    string = re.sub("\s+"," ",string)
    return(string)

def readCounselors():
    global counselorIds

    try:
        inFile = open(DATADIR+"/"+COUNSELORFILE,"r")
        for line in inFile:
            fields = normalizeWhiteSpace(line).strip().split()
            thisId = fields.pop(0)
            counselor = normalizeWhiteSpace(" ".join(fields))
            counselorIds[counselor] = thisId
        inFile.close()
    except: 
        sys.exit("error reading counselor file: "+DATADIR+"/"+COUNSELORFILE)

def removeTagText(tag,tagNamesKeepTextTrace):
    global clearedStringIds

    if not tag.text is None:
        if not tag.tag in tagNamesKeepTextTrace:
            tag.text = ""
        else:
            tag.text = normalizeWhiteSpace(tag.text)
            if tag.text != "":
                text = tag.text.lower()
                if not text in clearedStringIds:
                    clearedStringIds[text] = CLEARTOKEN+"-"+str(len(clearedStringIds.keys()))
                tag.text = clearedStringIds[text]

def removeTagChildren(tag):
    for child in [c for c in tag]:   
        tag.remove(child)

def clearTexts(tree,tagNamesKeep,tagNamesKeepTextTrace):
    root = tree.getroot()
    for tag in root.iter():
        if not tag.tag in tagNamesKeep:
            removeTagText(tag,tagNamesKeepTextTrace)
            removeTagChildren(tag)

def getChildText(tag,childName):
    childText = ""
    for child in tag.findall("./"+childName):
        try:
            if childText == "": childText = normalizeWhiteSpace(child.text)
            else: childText += " "+normalizeWhiteSpace(child.text)
        except: pass
    return(childText)

def getCounselorId(firstName,lastName):
    global counselorIds

    if firstName == "" and lastName == "": return("")
    name = normalizeWhiteSpace(firstName+" "+lastName)
    if not name in counselorIds: 
        counselorIds[name] = 1+len(counselorIds)
        try: outFile = open(DATADIR+"/"+COUNSELORFILE,"a")
        except: sys.exit("error writing counselor file: "+DATADIR+"/"+COUNSELORFILE)
        print(counselorIds[name],name,file=outFile)
        outFile.close()
    return(counselorIds[name])

def anonymizeCounselor(tree,tagNamesKeepTextTrace):
    root = tree.getroot()
    for tag in root.findall(".//"+COUNSELORTAG):
        firstName = getChildText(tag,FIRSTNAMETAG)
        lastName = getChildText(tag,LASTNAMETAG)
        removeTagText(tag,tagNamesKeepTextTrace)
        removeTagChildren(tag)
        tag.text = str(getCounselorId(firstName,lastName))

def getTextFromXmlText(text):
    try: textTree = ET.fromstring("<container>"+text+"</container>")
    except Exception as e: sys.exit("Error processing text "+text+": "+str(e))
    if not textTree.text is None: text = textTree.text
    else: text = ""
    for node in textTree.findall(".//"):
        if not node.text is None: text += " "+node.text
    return(text)

def runAnonymizeProcess(inText):
    text = getTextFromXmlText(html.escape(inText))
    tmpFile = open(TMPFILENAME,"w")
    print(text,file=tmpFile)
    tmpFile.close()
    anonymizeProcess = subprocess.run([BINDIR+"/"+ANONYMIZEPRG,TMPFILENAME],stdout=subprocess.PIPE)
    anonymizedText = dict(anonymizeProcess.__dict__)["stdout"]
    outText = anonymizedText.decode("utf8")
    os.remove(TMPFILENAME)
    return(outText)

def getMailTexts(tree,tagNames):
    root = tree.getroot()
    textList = []
    for tag in root.iter():
        if not tag.text is None:
            if tag.tag in tagNames:
                textList.append(getTextFromXmlText(html.escape(tag.text)))
    return(textList)

def processedTextToList(processedText):
    processedList = []
    currentText = ""
    for line in processedText.split("\n"):
        if not re.search("^"+BOUNDARY,line): currentText += line+"\n"
        else:
            processedList.append(str(currentText))
            currentText = ""
    processedList.append(str(currentText))
    return(processedList)

def updateMailTexts(tree,tagNames,processedList):
    root = tree.getroot()
    i = 0
    for tag in root.iter():
        if not tag.text is None:
            if tag.tag in tagNames:
                tag.text = processedList[i]
                i += 1
                tag.text = html.unescape(tag.text)
                tag.text = re.sub(r"\n+\s*$","",tag.text)

def anonymizeTexts(tree,tagNames):
    textList = getMailTexts(tree,tagNames)
    if len(textList) > 0:
        text = (BOUNDARY+"\n").join(textList)
        processedText = runAnonymizeProcess(text)
        processedList = processedTextToList(processedText)
        updateMailTexts(tree,tagNames,processedList)

def writeFile(tree,outFileName):
    tree.write(outFileName,encoding="utf-8")

def makeOutFileName(fileName):
    parts = fileName.split(".")
    if len(parts) > 0: parts[-2] += OUTFILESUFFIX
    fileName = ".".join(parts)
    return(fileName)

def main(argv):
    global clearedStringIds

    readCounselors()
    for inFileName in argv:
        clearedStringIds = CLEAREDSTRINGIDS
        tree = getRootOfFile(inFileName)
        anonymizeCounselor(tree,KEEPTAGSTEXTTRACE)
        clearTexts(tree,KEEPTAGS,KEEPTAGSTEXTTRACE)
        anonymizeTexts(tree,ANONYMIZETAGS)
        writeFile(tree,makeOutFileName(inFileName))
    return(0)

if __name__ == "__main__":
    sys.exit(main(sys.argv))