BLKSerene/Wordless

View on GitHub
wordless/wl_concordancer_parallel.py

Summary

Maintainability
A
2 hrs
Test Coverage
# ----------------------------------------------------------------------
# Wordless: Parallel Concordancer
# Copyright (C) 2018-2024  Ye Lei (叶磊)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# ----------------------------------------------------------------------

# pylint: disable=broad-exception-caught

import bisect
import copy
import traceback

from PyQt5.QtCore import pyqtSignal, QCoreApplication, Qt
from PyQt5.QtWidgets import QGroupBox

from wordless.wl_checks import wl_checks_work_area
from wordless.wl_dialogs import wl_dialogs_misc, wl_msg_boxes
from wordless.wl_nlp import (
    wl_matching,
    wl_nlp_utils,
    wl_texts,
    wl_token_processing
)
from wordless.wl_utils import wl_misc, wl_threading
from wordless.wl_widgets import (
    wl_labels,
    wl_layouts,
    wl_tables,
    wl_widgets
)

_tr = QCoreApplication.translate

class Wrapper_Concordancer_Parallel(wl_layouts.Wl_Wrapper):
    def __init__(self, main):
        super().__init__(main)

        self.table_concordancer_parallel = Wl_Table_Concordancer_Parallel(self)

        layout_results = wl_layouts.Wl_Layout()
        layout_results.addWidget(self.table_concordancer_parallel.label_num_results, 0, 0)
        layout_results.addWidget(self.table_concordancer_parallel.button_results_search, 0, 4)

        layout_results.setColumnStretch(1, 1)

        self.wrapper_table.layout().addLayout(layout_results, 0, 0, 1, 4)
        self.wrapper_table.layout().addWidget(self.table_concordancer_parallel, 1, 0, 1, 4)
        self.wrapper_table.layout().addWidget(self.table_concordancer_parallel.button_generate_table, 2, 0)
        self.wrapper_table.layout().addWidget(self.table_concordancer_parallel.button_exp_selected_cells, 2, 1)
        self.wrapper_table.layout().addWidget(self.table_concordancer_parallel.button_exp_all_cells, 2, 2)
        self.wrapper_table.layout().addWidget(self.table_concordancer_parallel.button_clr_table, 2, 3)

        # Token Settings
        self.group_box_token_settings = QGroupBox(self.tr('Token Settings'), self)

        (
            self.checkbox_punc_marks,

            self.checkbox_assign_pos_tags,
            self.checkbox_ignore_tags,
            self.checkbox_use_tags
        ) = wl_widgets.wl_widgets_token_settings_concordancer(self)

        self.checkbox_punc_marks.stateChanged.connect(self.token_settings_changed)

        self.checkbox_assign_pos_tags.stateChanged.connect(self.token_settings_changed)
        self.checkbox_ignore_tags.stateChanged.connect(self.token_settings_changed)
        self.checkbox_use_tags.stateChanged.connect(self.token_settings_changed)

        self.group_box_token_settings.setLayout(wl_layouts.Wl_Layout())
        self.group_box_token_settings.layout().addWidget(self.checkbox_punc_marks, 0, 0, 1, 2)

        self.group_box_token_settings.layout().addWidget(wl_layouts.Wl_Separator(self), 1, 0, 1, 2)

        self.group_box_token_settings.layout().addWidget(self.checkbox_assign_pos_tags, 2, 0, 1, 2)
        self.group_box_token_settings.layout().addWidget(self.checkbox_ignore_tags, 3, 0)
        self.group_box_token_settings.layout().addWidget(self.checkbox_use_tags, 3, 1)

        # Search Settings
        self.group_box_search_settings = QGroupBox(self.tr('Search Settings'), self)

        (
            self.label_search_term,
            self.checkbox_multi_search_mode,

            self.stacked_widget_search_term,
            self.line_edit_search_term,
            self.list_search_terms,
            self.label_delimiter,

            self.checkbox_match_case,
            self.checkbox_match_whole_words,
            self.checkbox_match_inflected_forms,
            self.checkbox_use_regex,
            self.checkbox_match_without_tags,
            self.checkbox_match_tags
        ) = wl_widgets.wl_widgets_search_settings(
            self,
            tab = 'concordancer_parallel'
        )

        (
            self.label_context_settings,
            self.button_context_settings
        ) = wl_widgets.wl_widgets_context_settings(
            self,
            tab = 'concordancer_parallel'
        )

        self.checkbox_multi_search_mode.stateChanged.connect(self.search_settings_changed)
        self.line_edit_search_term.textChanged.connect(self.search_settings_changed)
        self.line_edit_search_term.returnPressed.connect(self.table_concordancer_parallel.button_generate_table.click)
        self.list_search_terms.model().dataChanged.connect(self.search_settings_changed)

        self.checkbox_match_case.stateChanged.connect(self.search_settings_changed)
        self.checkbox_match_whole_words.stateChanged.connect(self.search_settings_changed)
        self.checkbox_match_inflected_forms.stateChanged.connect(self.search_settings_changed)
        self.checkbox_use_regex.stateChanged.connect(self.search_settings_changed)
        self.checkbox_match_without_tags.stateChanged.connect(self.search_settings_changed)
        self.checkbox_match_tags.stateChanged.connect(self.search_settings_changed)

        layout_context_settings = wl_layouts.Wl_Layout()
        layout_context_settings.addWidget(self.label_context_settings, 0, 0)
        layout_context_settings.addWidget(self.button_context_settings, 0, 1)

        layout_context_settings.setColumnStretch(1, 1)

        self.group_box_search_settings.setLayout(wl_layouts.Wl_Layout())
        self.group_box_search_settings.layout().addWidget(self.label_search_term, 0, 0)
        self.group_box_search_settings.layout().addWidget(self.checkbox_multi_search_mode, 0, 1, Qt.AlignRight)
        self.group_box_search_settings.layout().addWidget(self.stacked_widget_search_term, 1, 0, 1, 2)
        self.group_box_search_settings.layout().addWidget(self.label_delimiter, 2, 0, 1, 2)

        self.group_box_search_settings.layout().addWidget(self.checkbox_match_case, 3, 0, 1, 2)
        self.group_box_search_settings.layout().addWidget(self.checkbox_match_whole_words, 4, 0, 1, 2)
        self.group_box_search_settings.layout().addWidget(self.checkbox_match_inflected_forms, 5, 0, 1, 2)
        self.group_box_search_settings.layout().addWidget(self.checkbox_use_regex, 6, 0, 1, 2)
        self.group_box_search_settings.layout().addWidget(self.checkbox_match_without_tags, 7, 0, 1, 2)
        self.group_box_search_settings.layout().addWidget(self.checkbox_match_tags, 8, 0, 1, 2)

        self.group_box_search_settings.layout().addWidget(wl_layouts.Wl_Separator(self), 9, 0, 1, 2)

        self.group_box_search_settings.layout().addLayout(layout_context_settings, 10, 0, 1, 2)

        # Table Settings
        self.group_box_table_settings = QGroupBox(self.tr('Table Settings'), self)

        (
            self.checkbox_show_pct_data,
            self.checkbox_show_cum_data,
            self.checkbox_show_breakdown_file
        ) = wl_widgets.wl_widgets_table_settings(
            self,
            tables = [self.table_concordancer_parallel]
        )

        self.checkbox_show_cum_data.hide()
        self.checkbox_show_breakdown_file.hide()

        self.checkbox_show_pct_data.stateChanged.connect(self.table_settings_changed)

        self.group_box_table_settings.setLayout(wl_layouts.Wl_Layout())
        self.group_box_table_settings.layout().addWidget(self.checkbox_show_pct_data, 0, 0)

        self.wrapper_settings.layout().addWidget(self.group_box_token_settings, 0, 0)
        self.wrapper_settings.layout().addWidget(self.group_box_search_settings, 1, 0)
        self.wrapper_settings.layout().addWidget(self.group_box_table_settings, 2, 0)

        self.load_settings()

    def load_settings(self, defaults = False):
        if defaults:
            settings = copy.deepcopy(self.main.settings_default['concordancer_parallel'])
        else:
            settings = copy.deepcopy(self.main.settings_custom['concordancer_parallel'])

        # Token Settings
        self.checkbox_punc_marks.setChecked(settings['token_settings']['punc_marks'])

        self.checkbox_assign_pos_tags.setChecked(settings['token_settings']['assign_pos_tags'])
        self.checkbox_ignore_tags.setChecked(settings['token_settings']['ignore_tags'])
        self.checkbox_use_tags.setChecked(settings['token_settings']['use_tags'])

        # Search Settings
        self.checkbox_multi_search_mode.setChecked(settings['search_settings']['multi_search_mode'])

        if not defaults:
            self.line_edit_search_term.setText(settings['search_settings']['search_term'])
            self.list_search_terms.load_items(settings['search_settings']['search_terms'])

        self.checkbox_match_case.setChecked(settings['search_settings']['match_case'])
        self.checkbox_match_whole_words.setChecked(settings['search_settings']['match_whole_words'])
        self.checkbox_match_inflected_forms.setChecked(settings['search_settings']['match_inflected_forms'])
        self.checkbox_use_regex.setChecked(settings['search_settings']['use_regex'])
        self.checkbox_match_without_tags.setChecked(settings['search_settings']['match_without_tags'])
        self.checkbox_match_tags.setChecked(settings['search_settings']['match_tags'])

        # Context Settings
        if defaults:
            self.main.wl_context_settings_concordancer.load_settings(defaults = True)

        # Table Settings
        self.checkbox_show_pct_data.setChecked(settings['table_settings']['show_pct_data'])

        self.token_settings_changed()
        self.search_settings_changed()
        self.table_settings_changed()

    def token_settings_changed(self):
        settings = self.main.settings_custom['concordancer_parallel']['token_settings']

        settings['punc_marks'] = self.checkbox_punc_marks.isChecked()

        settings['assign_pos_tags'] = self.checkbox_assign_pos_tags.isChecked()
        settings['ignore_tags'] = self.checkbox_ignore_tags.isChecked()
        settings['use_tags'] = self.checkbox_use_tags.isChecked()

        self.checkbox_match_tags.token_settings_changed()

    def search_settings_changed(self):
        settings = self.main.settings_custom['concordancer_parallel']['search_settings']

        settings['multi_search_mode'] = self.checkbox_multi_search_mode.isChecked()
        settings['search_term'] = self.line_edit_search_term.text()
        settings['search_terms'] = self.list_search_terms.model().stringList()

        settings['match_case'] = self.checkbox_match_case.isChecked()
        settings['match_whole_words'] = self.checkbox_match_whole_words.isChecked()
        settings['match_inflected_forms'] = self.checkbox_match_inflected_forms.isChecked()
        settings['use_regex'] = self.checkbox_use_regex.isChecked()
        settings['match_without_tags'] = self.checkbox_match_without_tags.isChecked()
        settings['match_tags'] = self.checkbox_match_tags.isChecked()

    def table_settings_changed(self):
        settings = self.main.settings_custom['concordancer_parallel']['table_settings']

        settings['show_pct_data'] = self.checkbox_show_pct_data.isChecked()

class Wl_Table_Concordancer_Parallel(wl_tables.Wl_Table_Data_Search):
    def __init__(self, parent):
        super().__init__(
            parent,
            tab = 'concordancer_parallel',
            headers = [
                _tr('Wl_Table_Concordancer_Parallel', 'Parallel Unit No.'),
                _tr('Wl_Table_Concordancer_Parallel', 'Parallel Unit No. %')
            ],
            headers_int = [
                _tr('Wl_Table_Concordancer_Parallel', 'Parallel Unit No.')
            ],
            headers_pct = [
                _tr('Wl_Table_Concordancer_Parallel', 'Parallel Unit No. %')
            ],
            generate_fig = False
        )

    @wl_misc.log_time
    def generate_table(self):
        if wl_checks_work_area.check_search_terms(
            self.main,
            search_settings = self.main.settings_custom['concordancer_parallel']['search_settings'],
            show_warning = False
        ):
            search_additions_deletions = True
        else:
            # Check whether the user has simply forgotten to enter search terms
            search_additions_deletions = wl_msg_boxes.wl_msg_box_question(
                self.main,
                title = self.tr('Missing Search Terms'),
                text = self.tr('''
                    <div>You have not specified any search terms. Do you want to search for additions and deletions?</div>
                '''),
                default_to_yes = True
            )

        if search_additions_deletions:
            if self.main.settings_custom['concordancer_parallel']['token_settings']['assign_pos_tags']:
                nlp_support_ok = wl_checks_work_area.check_nlp_support(
                    self.main,
                    nlp_utils = ['pos_taggers']
                )
            else:
                nlp_support_ok = True

            if nlp_support_ok:
                worker_concordancer_parallel_table = Wl_Worker_Concordancer_Parallel_Table(
                    self.main,
                    dialog_progress = wl_dialogs_misc.Wl_Dialog_Progress_Process_Data(self.main),
                    update_gui = self.update_gui_table
                )

                wl_threading.Wl_Thread(worker_concordancer_parallel_table).start_worker()
        else:
            wl_checks_work_area.wl_status_bar_msg_missing_search_terms(self.main)

    def update_gui_table(self, err_msg, concordance_lines):
        if wl_checks_work_area.check_results(self.main, err_msg, concordance_lines):
            try:
                self.settings = copy.deepcopy(self.main.settings_custom)

                self.clr_table(0)

                for file_name in self.main.wl_file_area.get_selected_file_names():
                    self.ins_header_hor(
                        self.model().columnCount(),
                        file_name
                    )

                self.model().setRowCount(len(concordance_lines))
                self.disable_updates()

                for i, concordance_line in enumerate(concordance_lines):
                    parallel_unit_no, len_parallel_units = concordance_line[0]

                    self.set_item_num(i, 0, parallel_unit_no)
                    self.set_item_num(i, 1, parallel_unit_no, len_parallel_units)

                    for j, (parallel_unit_tokens_raw, parallel_unit_tokens_search) in enumerate(concordance_line[1]):
                        label_parallel_unit = wl_labels.Wl_Label_Html(' '.join(parallel_unit_tokens_raw), self.main)
                        label_parallel_unit.tokens_raw = parallel_unit_tokens_raw
                        label_parallel_unit.tokens_search = parallel_unit_tokens_search

                        self.setIndexWidget(self.model().index(i, 2 + j), label_parallel_unit)
                        self.indexWidget(self.model().index(i, 2 + j)).setAlignment(Qt.AlignHCenter | Qt.AlignVCenter)

                self.enable_updates()

                self.toggle_pct_data()
            except Exception:
                err_msg = traceback.format_exc()
            finally:
                wl_checks_work_area.check_err_table(self.main, err_msg)

class Wl_Worker_Concordancer_Parallel_Table(wl_threading.Wl_Worker):
    worker_done = pyqtSignal(str, list)

    def run(self):
        err_msg = ''
        texts = []
        parallel_units = {}
        offsets_paras_files = []
        concordance_lines = []

        try:
            settings = self.main.settings_custom['concordancer_parallel']

            files = list(self.main.wl_file_area.get_selected_files())
            len_files = len(files)

            # Parallel Unit No.
            for file in files:
                text = wl_token_processing.wl_process_tokens_concordancer(
                    self.main, file['text'],
                    token_settings = settings['token_settings'],
                    search_settings = settings['search_settings'],
                    preserve_blank_lines = True
                )

                offsets_paras_files.append(text.get_offsets()[0])
                # Save text
                texts.append(text)

            len_max_parallel_units = max((len(offsets) for offsets in offsets_paras_files))

            for i, (text, offsets_paras) in enumerate(zip(texts, offsets_paras_files)):
                tokens = text.get_tokens_flat()

                if wl_checks_work_area.check_search_terms(self.main, settings['search_settings'], show_warning = False):
                    search_terms = wl_matching.match_search_terms_ngrams(
                        self.main, tokens,
                        lang = text.lang,
                        token_settings = settings['token_settings'],
                        search_settings = settings['search_settings']
                    )

                    (
                        search_terms_incl,
                        search_terms_excl
                    ) = wl_matching.match_search_terms_context(
                        self.main, tokens,
                        lang = text.lang,
                        token_settings = settings['token_settings'],
                        context_settings = settings['search_settings']['context_settings']
                    )

                    if search_terms:
                        len_search_term_min = min((len(search_term) for search_term in search_terms))
                        len_search_term_max = max((len(search_term) for search_term in search_terms))
                    else:
                        len_search_term_min = 0
                        len_search_term_max = 0

                    for len_search_term in range(len_search_term_min, len_search_term_max + 1):
                        for j, ngram in enumerate(wl_nlp_utils.ngrams(tokens, len_search_term)):
                            if (
                                ngram in search_terms
                                and wl_matching.check_context(
                                    j, tokens,
                                    context_settings = settings['search_settings']['context_settings'],
                                    search_terms_incl = search_terms_incl,
                                    search_terms_excl = search_terms_excl
                                )
                            ):
                                parallel_unit_no = bisect.bisect(offsets_paras, j)

                                if parallel_unit_no not in parallel_units:
                                    # Save all nodes if multiple nodes are found in the same parallel unit
                                    parallel_units[parallel_unit_no] = [[] for _ in range(len_files)]

                                parallel_units[parallel_unit_no][i].append(ngram)
                # Search for additions & deletions
                else:
                    for j, para in enumerate(text.tokens_multilevel):
                        if para == []:
                            parallel_units[j + 1] = [[] for _ in range(len_files)]

                    # Empty lines at the end of files
                    if len(offsets_paras) < len_max_parallel_units:
                        for j in range(len(offsets_paras) + 1, len_max_parallel_units + 1):
                            parallel_units[j] = [[] for _ in range(len_files)]

            node_color = self.main.settings_custom['tables']['parallel_concordancer']['highlight_color_settings']['search_term_color']

            for i, (text, offsets_paras) in enumerate(zip(texts, offsets_paras_files)):
                len_parallel_units = len(offsets_paras)

                for parallel_unit_no, parallel_unit_nodes in parallel_units.items():
                    nodes = parallel_unit_nodes[i]

                    if parallel_unit_no <= len_parallel_units:
                        parallel_unit = list(wl_misc.flatten_list(text.tokens_multilevel[parallel_unit_no - 1]))

                        if settings['token_settings']['punc_marks']:
                            parallel_unit_tokens_search = copy.deepcopy(parallel_unit)
                        # Convert trailing punctuation marks, if any, to separate tokens for searching
                        else:
                            parallel_unit_tokens_search = []

                            for token in copy.deepcopy(parallel_unit):
                                parallel_unit_tokens_search.append(token)

                                if token.punc_mark:
                                    parallel_unit_tokens_search.append(wl_texts.Wl_Token(token.punc_mark, lang = token.lang))

                        parallel_unit_tokens_raw = wl_nlp_utils.escape_tokens(wl_texts.to_display_texts(
                            parallel_unit,
                            punc_mark = True
                        ))

                        # Highlight nodes if found
                        if nodes:
                            for node in nodes:
                                len_node = len(node)

                                for j, ngram in enumerate(wl_nlp_utils.ngrams(parallel_unit, len_node)):
                                    if ngram == tuple(node):
                                        parallel_unit_tokens_raw[j] = f'<span style="color: {node_color}; font-weight: bold;">{parallel_unit_tokens_raw[j]}'
                                        parallel_unit_tokens_raw[j + len_node - 1] += '</span>'
                    else:
                        parallel_unit_tokens_raw = []
                        parallel_unit_tokens_search = []

                    parallel_unit_nodes[i] = [parallel_unit_tokens_raw, parallel_unit_tokens_search]

            # Remove empty concordance lines
            for parallel_unit_no, parallel_units_files in parallel_units.copy().items():
                if not any(wl_misc.flatten_list(parallel_units_files)):
                    del parallel_units[parallel_unit_no]

            # Sort by Parallel Unit No.
            concordance_lines = sorted(parallel_units.items())

            for i, concordance_line in enumerate(concordance_lines):
                concordance_line = list(concordance_line)
                concordance_line[0] = [concordance_line[0], len_max_parallel_units]

                concordance_lines[i] = concordance_line
        except Exception:
            err_msg = traceback.format_exc()

        self.progress_updated.emit(self.tr('Rendering table...'))
        self.worker_done.emit(err_msg, concordance_lines)