HazyResearch/fonduer

View on GitHub
src/fonduer/parser/visual_parser/pdf_visual_parser.py

Summary

Maintainability
D
1 day
Test Coverage
"""Fonduer visual parser that parses visual information from PDF."""
import logging
import os
import re
import shutil
import subprocess
from builtins import range, zip
from collections import OrderedDict, defaultdict
from operator import attrgetter
from typing import DefaultDict, Dict, Iterable, Iterator, List, Optional, Tuple

import numpy as np
from bs4 import BeautifulSoup
from bs4.element import Tag
from editdistance import eval as editdist  # Alternative library: python-levenshtein

from fonduer.parser.models import Sentence
from fonduer.parser.visual_parser.visual_parser import VisualParser
from fonduer.utils.utils_visual import Bbox

logger = logging.getLogger(__name__)

# Define a type alias for readability
# PdfWordId is an ID for a word that is unique within a PDF file.
# The two elements represent (page_num, i), where page_num is 1-based page number and
# i is a 0-based unique ID for a word within that page.
PdfWordId = Tuple[int, int]
# PdfWord is a type alias for (PdfWordId, str), where the second element is a string
# repsentation of the word.
PdfWord = Tuple[PdfWordId, str]
# Similarly, HtmlWordId is an ID for word that is unique within a HTML file.
# The two elements represent (sentence.stable_id, i), where i is an ID within a
# Sentence.
HtmlWordId = Tuple[str, int]
# Similar to PdfWord, HtmlWord is a type alias for (HtmlWordId, str), where the second
# element is a string repsentation of the word.
HtmlWord = Tuple[HtmlWordId, str]


class PdfVisualParser(VisualParser):
    """Link visual information, extracted from PDF, with parsed sentences.

    This linker assumes the following conditions for expected results:

    - The PDF file exists in a directory specified by `pdf_path`.
    - The basename of the PDF file is same as the *document name*
      and its extension is either ".pdf" or ".PDF".
    - A PDF has a text layer.
    """

    def __init__(self, pdf_path: str, verbose: bool = False) -> None:
        """Initialize VisualParser.

        :param pdf_path: a path to directory that contains PDF files.
        :param verbose: whether to turn on verbose logging.
        """
        if not os.path.isdir(pdf_path):
            raise ValueError(f"No directory exists at {pdf_path}!")
        self.pdf_path = pdf_path
        self.pdf_file: Optional[str] = None
        self.verbose = verbose
        self.coordinate_map: Optional[Dict[PdfWordId, Bbox]] = None
        self.pdf_word_list: Optional[List[PdfWord]] = None
        self.html_word_list: Optional[List[HtmlWord]] = None
        self.links: Optional[OrderedDict[HtmlWordId, PdfWordId]] = None
        self.pdf_dim: Optional[Tuple[int, int]] = None
        delimiters = (
            r"([\(\)\,\?\u2212\u201C\u201D\u2018\u2019\u00B0\*']|(?<!http):|\.$|\.\.\.)"
        )
        self.separators = re.compile(delimiters)

        # Check if poppler-utils is installed AND the version is 0.36.0 or above
        if shutil.which("pdfinfo") is None or shutil.which("pdftotext") is None:
            raise RuntimeError("poppler-utils is not installed or they are not in PATH")
        version = subprocess.check_output(
            "pdfinfo -v", shell=True, stderr=subprocess.STDOUT, universal_newlines=True
        )
        m = re.search(r"\d{1,2}\.\d{2}\.\d", version)
        if int(m.group(0).replace(".", "")) < 360:
            raise RuntimeError(
                f"Installed poppler-utils's version is {m.group(0)}, "
                f"but should be 0.36.0 or above"
            )

    def parse(
        self, document_name: str, sentences: Iterable[Sentence]
    ) -> Iterator[Sentence]:
        """Link visual information with sentences.

        :param document_name: the document name.
        :param sentences: sentences to be linked with visual information.
        :return: A generator of ``Sentence``.
        """
        # sentences should be sorted as their order is not deterministic.
        self.sentences = sorted(sentences, key=attrgetter("position"))
        self.pdf_file = self._get_linked_pdf_path(document_name)
        try:
            self._extract_pdf_words()
        except RuntimeError as e:
            logger.exception(e)
            return
        self._extract_html_words()
        self._link_lists(search_max=200)
        for sentence in self._update_coordinates():
            yield sentence

    def _extract_pdf_words(self) -> None:
        logger.debug(f"pdfinfo '{self.pdf_file}' | grep -a ^Pages: | sed 's/[^0-9]*//'")
        num_pages = subprocess.check_output(
            f"pdfinfo '{self.pdf_file}' | grep -a ^Pages: | sed 's/[^0-9]*//'",
            shell=True,
        )
        pdf_word_list: List[PdfWord] = []
        coordinate_map: Dict[PdfWordId, Bbox] = {}
        for i in range(1, int(num_pages) + 1):
            logger.debug(f"pdftotext -f {i} -l {i} -bbox-layout '{self.pdf_file}' -")
            html_content = subprocess.check_output(
                f"pdftotext -f {i} -l {i} -bbox-layout '{self.pdf_file}' -", shell=True
            )
            soup = BeautifulSoup(html_content, "html.parser")
            pages = soup.find_all("page")
            pdf_word_list_i, coordinate_map_i = self._coordinates_from_HTML(pages[0], i)
            pdf_word_list += pdf_word_list_i
            # update coordinate map
            coordinate_map.update(coordinate_map_i)
        self.pdf_word_list = pdf_word_list
        self.coordinate_map = coordinate_map
        if len(self.pdf_word_list) == 0:
            raise RuntimeError(
                f"Words could not be extracted from PDF: {self.pdf_file}"
            )
        # take last page dimensions
        page_width, page_height = (
            int(float(pages[0].get("width"))),
            int(float(pages[0].get("height"))),
        )
        self.pdf_dim = (page_width, page_height)
        if self.verbose:
            logger.info(f"Extracted {len(self.pdf_word_list)} pdf words")

    def _get_linked_pdf_path(self, document_name: str) -> str:
        """Get the pdf file path, return None if it doesn't exist.

        :param document_name: a document name.
        """
        full_path = os.path.join(self.pdf_path, document_name + ".pdf")
        if os.path.isfile(full_path):
            return full_path
        full_path = os.path.join(self.pdf_path, document_name + ".PDF")
        if os.path.isfile(full_path):
            return full_path

        return None

    def is_parsable(self, document_name: str) -> bool:
        """Verify that the file exists and has a PDF extension.

        :param document_name: The path to the PDF document.
        """
        return False if self._get_linked_pdf_path(document_name) is None else True

    def _coordinates_from_HTML(
        self, page: Tag, page_num: int
    ) -> Tuple[List[PdfWord], Dict[PdfWordId, Bbox]]:
        pdf_word_list: List[PdfWord] = []
        coordinate_map: Dict[PdfWordId, Bbox] = {}
        block_coordinates: Dict[PdfWordId, Tuple[int, int]] = {}
        blocks = page.find_all("block")
        i = 0  # counter for word_id in page_num
        for block in blocks:
            x_min_block = int(float(block.get("xmin")))
            y_min_block = int(float(block.get("ymin")))
            lines = block.find_all("line")
            for line in lines:
                y_min_line = int(float(line.get("ymin")))
                y_max_line = int(float(line.get("ymax")))
                words = line.find_all("word")
                for word in words:
                    xmin = int(float(word.get("xmin")))
                    xmax = int(float(word.get("xmax")))
                    for content in self.separators.split(word.getText()):
                        if len(content) > 0:  # Ignore empty characters
                            word_id: PdfWordId = (page_num, i)
                            pdf_word_list.append((word_id, content))
                            coordinate_map[word_id] = Bbox(
                                page_num,
                                y_min_line,
                                y_max_line,
                                xmin,
                                xmax,
                            )
                            block_coordinates[word_id] = (y_min_block, x_min_block)
                            i += 1
        # sort pdf_word_list by page, block top then block left, top, then left
        pdf_word_list = sorted(
            pdf_word_list,
            key=lambda word_id__: block_coordinates[word_id__[0]]
            + (coordinate_map[word_id__[0]].top, coordinate_map[word_id__[0]].left),
        )
        return pdf_word_list, coordinate_map

    def _extract_html_words(self) -> None:
        html_word_list: List[HtmlWord] = []
        for sentence in self.sentences:
            for i, word in enumerate(sentence.words):
                html_word_list.append(((sentence.stable_id, i), word))
        self.html_word_list = html_word_list
        if self.verbose:
            logger.info(f"Extracted {len(self.html_word_list)} html words")

    def _link_lists(
        self, search_max: int = 100, edit_cost: int = 20, offset_cost: int = 1
    ) -> None:
        # NOTE: there are probably some inefficiencies here from rehashing words
        # multiple times, but we're not going to worry about that for now

        def link_exact(l: int, u: int) -> None:
            l, u, L, U = get_anchors(l, u)
            # Inverted index that maps word to index(es) of html_word_list
            html_dict: DefaultDict[str, List[int]] = defaultdict(list)
            # Inverted index that maps word to index(es) of pdf_word_list
            pdf_dict: DefaultDict[str, List[int]] = defaultdict(list)
            for i, (_, word) in enumerate(self.html_word_list[l:u]):
                if html_to_pdf[l + i] is None:
                    html_dict[word].append(l + i)
            for j, (_, word) in enumerate(self.pdf_word_list[L:U]):
                if pdf_to_html[L + j] is None:
                    pdf_dict[word].append(L + j)
            for word, html_list in list(html_dict.items()):
                pdf_list = pdf_dict[word]
                if len(html_list) == len(pdf_list):
                    for k in range(len(html_list)):
                        html_to_pdf[html_list[k]] = pdf_list[k]
                        pdf_to_html[pdf_list[k]] = html_list[k]

        def link_fuzzy(i: int) -> None:
            (_, word) = self.html_word_list[i]
            l = u = i
            l, u, L, U = get_anchors(l, u)
            offset = int(L + float(i - l) / (u - l) * (U - L))
            searchIndices = np.clip(offset + search_order, 0, M - 1)
            cost = [0] * search_max
            for j, k in enumerate(searchIndices):
                other = self.pdf_word_list[k][1]
                if (
                    word.startswith(other)
                    or word.endswith(other)
                    or other.startswith(word)
                    or other.endswith(word)
                ):
                    html_to_pdf[i] = k
                    return
                else:
                    cost[j] = int(editdist(word, other)) * edit_cost + j * offset_cost
            html_to_pdf[i] = searchIndices[np.argmin(cost)]
            return

        def get_anchors(l: int, u: int) -> Tuple[int, int, int, int]:
            while l >= 0 and html_to_pdf[l] is None:
                l -= 1
            while u < N and html_to_pdf[u] is None:
                u += 1
            if l < 0:
                l = 0
                L = 0
            else:
                L = html_to_pdf[l]
            if u >= N:
                u = N
                U = M
            else:
                U = html_to_pdf[u]
            return l, u, L, U

        def display_match_counts() -> int:
            matches = sum(
                [
                    html_to_pdf[i] is not None
                    and self.html_word_list[i][1]
                    == self.pdf_word_list[html_to_pdf[i]][1]
                    for i in range(len(self.html_word_list))
                ]
            )
            total = len(self.html_word_list)
            logger.info(f"({matches}/{total}) = {matches / total:.2f}")
            return matches

        N = len(self.html_word_list)
        M = len(self.pdf_word_list)

        try:
            assert N > 0 and M > 0
        except Exception:
            logger.exception(f"N = {N} and M = {M} are invalid values.")

        html_to_pdf: List[Optional[int]] = [None] * N
        pdf_to_html: List[Optional[int]] = [None] * M
        search_radius = search_max // 2

        # first pass: global search for exact matches
        link_exact(0, N)
        if self.verbose:
            logger.debug("Global exact matching:")
            display_match_counts()

        # second pass: local search for exact matches
        for i in range(((N + 2) // search_radius) + 1):
            link_exact(
                max(0, i * search_radius - search_radius),
                min(N, i * search_radius + search_radius),
            )
        if self.verbose:
            logger.debug("Local exact matching:")
            display_match_counts()

        # third pass: local search for approximate matches
        search_order = np.array(
            [(-1) ** (i % 2) * (i // 2) for i in range(1, search_max + 1)]
        )
        for i in range(len(html_to_pdf)):
            if html_to_pdf[i] is None:
                link_fuzzy(i)
        if self.verbose:
            logger.debug("Local approximate matching:")
            display_match_counts()

        # convert list to dict
        matches = sum(
            [
                html_to_pdf[i] is not None
                and self.html_word_list[i][1] == self.pdf_word_list[html_to_pdf[i]][1]
                for i in range(len(self.html_word_list))
            ]
        )
        total = len(self.html_word_list)
        if self.verbose:
            logger.debug(
                f"Linked {matches}/{total} ({matches / total:.2f}) html words exactly"
            )
        self.links = OrderedDict(
            (self.html_word_list[i][0], self.pdf_word_list[html_to_pdf[i]][0])
            for i in range(len(self.html_word_list))
        )

    def _update_coordinates(self) -> Iterator[Sentence]:
        for sentence in self.sentences:
            (page, top, bottom, left, right) = list(
                zip(
                    *[
                        self.coordinate_map[self.links[((sentence.stable_id), i)]]
                        for i in range(len(sentence.words))
                    ]
                )
            )
            sentence.page = list(page)
            sentence.top = list(top)
            sentence.left = list(left)
            sentence.bottom = list(bottom)
            sentence.right = list(right)
            yield sentence
        if self.verbose:
            logger.debug("Updated coordinates in database")