IlyaGusev/rulm

View on GitHub
data_processing/parse_zip_fb2.py

Summary

Maintainability
B
4 hrs
Test Coverage
import json
import os
from zipfile import ZipFile

from data_processing.parse_fb2 import FB2Parser
from data_processing.util import TextProcessor

import fire
from tqdm import tqdm



def main(input_dir, output_dir):
    parser = FB2Parser()
    processor = TextProcessor(
        min_chars=3,
        min_text_part=0.0,
        fix_punct=False,
        fix_spaces=True,
        fix_short_lines=True,
        check_code=False,
        check_pii=False,
        check_links=False,
        check_languages=False,
        check_email=False,
        check_text_part=False
    )
    existing_files = os.listdir(output_dir)
    for zip_name in os.listdir(input_dir):
        print(f"Parsing {zip_name}")
        if not zip_name.endswith(".zip"):
            continue
        start_id = int(zip_name.split(".")[0].split("-")[1])
        end_id = zip_name.split(".")[0].split("-")[2]
        is_lost = "lost" in end_id
        end_id = int(end_id.replace("_lost", ""))
        if end_id <= 500000:
            continue
        found = False
        for file_name in existing_files:
            if str(start_id) in file_name and str(end_id) in file_name:
                found = True
        if found:
            continue
        input_file = os.path.join(input_dir, zip_name)
        output_file = os.path.join(output_dir, zip_name.replace(".zip", ".jsonl"))
        with ZipFile(input_file, "r") as archive, open(output_file, "w") as w:
            names = [name for name in archive.namelist() if name.endswith(".fb2")]
            for name in tqdm(names):
                if not name.endswith(".fb2"):
                    continue
                content = archive.read(name)
                record = parser(content)
                if record is None:
                    print(f"{name} is ill-formed!")
                    continue
                record["file_name"] = name
                clean_sections = []
                for section in record["sections"]:
                    clean_section = processor(section)
                    if clean_section:
                        clean_sections.append(clean_section)
                record["sections"] = clean_sections
                w.write(json.dumps(record, ensure_ascii=False) + "\n")


if __name__ == "__main__":
    fire.Fire(main)