IlyaGusev/PoetryCorpus

View on GitHub
poetry/apps/corpus/scripts/unite.py

Summary

Maintainability
D
2 days
Test Coverage
import itertools
import os
import re
import xml.etree.ElementTree as etree

from dicttoxml import dicttoxml

from rupo.util.preprocess import to_cyrrilic, normilize_line
from poetry.settings import BASE_DIR


def clean_text(text):
    if text is None:
        return None
    new_text = []
    regexp = "[^а-яА-Яёa-zA-Z,.;-?!]"
    for line in text.split("\n"):
        if len(re.sub(regexp, "", line)) != 0:
            new_text.append(line.strip())
    return "\n".join(new_text)


def get_actual_line(text, index):
    regexp = "[^а-яА-Яёa-zA-Z\n]"
    text = to_cyrrilic(re.sub(regexp, "", text).strip().lower())
    lines = text.split("\n")
    if index > len(lines) - 1:
        return lines[len(lines)-1]
    return lines[index]


class Corpus:
    def __init__(self):
        self.data = []
        self.invalid_count = 0
        self.duplicates_count = 0
        self.concatenated_count = 0
        self.themes_not_assigned_count = 0

    def process_file(self, file_name, item_node_name, fields):
        tree = etree.parse(file_name)
        root = tree.getroot()
        for item in root.findall(item_node_name):
            element = dict()
            valid = True
            for name, properties in fields.items():
                element[name] = properties['handler'](item.find(properties['path']))
                if element[name] == "":
                    element[name] = None
                if properties['required'] and element[name] is None:
                    valid = False
                    self.invalid_count += 1
            if valid:
                self.data.append(element)

    def generate_id(self, element):
        if element['name'] is not None:
            name = normilize_line(element['name'])
        else:
            name = get_actual_line(element['text'], 0)
        return normilize_line(element['author']) + " " + name

    def clean_duplicates(self):
        data = dict()
        for element in self.data:
            id = self.generate_id(element)
            if data.get(id) is None:
                if element['text'] is None:
                    self.themes_not_assigned_count += 1
                    continue
                data[id] = element
                themes = element['themes']
                data[id]['themes'] = set()
                if themes is not None:
                    for theme in themes:
                        data[id]['themes'].add(theme)
            else:
                if element['text'] is None:
                    if element['themes'] is not None:
                        for theme in element['themes']:
                            data[id]['themes'].add(theme)
                elif data[id]['text'] is None:
                    element['themes'] = data[id]['themes']
                    data[id] = element
                else:
                    is_duplicate = False
                    for i in range(3):
                        for j in range(3):
                            flag1 = get_actual_line(element['text'], i) in get_actual_line(data[id]['text'], j)
                            flag2 = get_actual_line(data[id]['text'], i) in get_actual_line(element['text'], j)
                            is_duplicate = is_duplicate or flag1 or flag2
                    if not is_duplicate:
                        # TODO: handle this case
                        self.concatenated_count += 1
                    else:
                        for field, value in data[id].items():
                            if value is None:
                                data[id][field] = element[field]
                        if len(element['text']) > len(data[id]['text']):
                            data[id]['text'] = element['text']
                        self.duplicates_count += 1
        self.data = list(data.values())
        for i in range(len(self.data)):
            self.data[i]['themes'] = list(self.data[i]['themes'])

    def to_xml(self, filename):
        with open(filename, 'wb') as f:
            f.write(b'<?xml version="1.0" encoding="UTF-8"?><items>')
            for element in self.data:
                xml = dicttoxml(element, custom_root='item', attr_type=False)\
                    .replace(b'<?xml version="1.0" encoding="UTF-8" ?>', b'')
                f.write(xml)
            f.write(b'</items>')

    def to_sketch(self, filename, fields):
        with open(filename, 'w', encoding='utf-8') as f:
            content = '<?xml version="1.0" encoding="UTF-8"?><items>'
            for element in self.data:
                xml = '<doc '
                for field in fields.keys():
                    if element[field] is not None and field not in ['themes', 'text']:
                        xml += ' ' + field + '="{0}"'.format(element[field])
                xml += '>' + element['text'] + '</doc>'
                content += xml
            content += '</items>'
            f.write(content)

    def to_django(self, filename, themes_filename, fields):
        themes_to_pk = dict()
        with open(themes_filename, 'w', encoding='utf-8') as f:
            themes = list(set(list(itertools.chain.from_iterable([element['themes'] for element in self.data]))))
            pk = 1
            content = '['
            for theme in themes:
                themes_to_pk[theme] = pk
                content += '{"model": "corpus.Theme", "pk": ' + str(pk) + \
                           ', "fields": {"theme": ' + '"' + theme + '"}},'
                pk += 1
            content = content[:-1] + ']'
            f.write(content)

        with open(filename, 'w', encoding='utf-8') as f:
            pk = 1
            content = '['
            for element in self.data:
                content += '{"model": "corpus.Poem", "pk": ' + str(pk) + ', "fields": {'
                for field in fields.keys():
                    if element[field] is not None and field not in ['themes']:
                        escaped_element = str(element[field]).replace("\n", "\\n")\
                            .replace('"', '\\"')\
                            .replace("\t", "\\t")
                        content += '"' + str(field) + '": "' + escaped_element + '",'
                    elif field == 'themes':
                        if len(element['themes']) == 0:
                            theme_list = "[]"
                        else:
                            theme_list = "["
                            for theme in element['themes']:
                                theme_list += str(themes_to_pk[theme]) + ","
                            theme_list = theme_list[:-1] + "]"
                        content += '"' + str(field) + '": ' + theme_list + ','
                content = content[:-1] + '}},'
                pk += 1
            content = content[:-1] + ']'
            f.write(content)

    def print_statistics(self):
        print("Example:", self.data[0])
        print("Invalid: ", self.invalid_count)
        print("Duplicates: ", self.duplicates_count)
        print("Concatenated: ", self.concatenated_count)
        print("Total chars: ", sum([len(elem['text']) for elem in self.data]))
        print("Total words: ", sum([len(elem['text'].split()) for elem in self.data]))
        print("Total poems after deduplication: ", len(self.data))
        print("Total poems with themes: ", len([elem for elem in self.data if len(elem['themes']) != 0]))
        authors = set()
        for elem in self.data:
            authors.add(normilize_line(elem['author']))
        print("Total authors: ", len(authors))


def main():
    corpus = Corpus()
    text_files = [os.path.join(BASE_DIR, "datasets", "web", "rupoem.xml"), os.path.join(BASE_DIR, "datasets", "web", "strofa.xml"),
                  os.path.join(BASE_DIR, "datasets", "web", "klassika.xml"),]
    themes_files = [os.path.join(BASE_DIR, "datasets", "web", "themes.xml")]
    fields = {
        'name': {
            'path': './/name',
            'handler': lambda x: x if x is None else x.text,
            'required': False
        },
        'author': {
            'path': './/author',
            'handler': lambda x: x if x is None else x.text,
            'required': True
        },
        'text': {
            'path': './/text',
            'handler': lambda x: x if x is None else clean_text(x.text),
            'required': True
        },
        'date_from': {
            'path': './/date_from',
            'handler': lambda x: x if x is None else int(x.text),
            'required': False
        },
        'date_to': {
            'path': './/date_to',
            'handler': lambda x: x if x is None else int(x.text),
            'required': False
        },
        'themes': {
            'path': './/themes',
            'handler': lambda x: [] if x is None else [value.text for value in x.findall(".//value")],
            'required': False
        },
    }
    for filename in text_files:
        corpus.process_file(filename, './/item', fields)

    fields['text']['required'] = False
    fields['themes']['required'] = True
    for filename in themes_files:
        corpus.process_file(filename, './/item', fields)

    corpus.clean_duplicates()
    corpus.print_statistics()
    corpus.to_xml(os.path.join(BASE_DIR, "datasets", "corpus", "all.xml"))
    corpus.to_django(os.path.join(BASE_DIR, "datasets", "django", "all_django.json"),
                     os.path.join(BASE_DIR, "datasets", "django", "themes_django.json"), fields)

main()