wordless/wl_nlp/wl_nlp_utils.py
# ----------------------------------------------------------------------
# Wordless: NLP - NLP utilities
# Copyright (C) 2018-2024 Ye Lei (叶磊)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# ----------------------------------------------------------------------
import collections
import html
import importlib
import itertools
import os
import re
import shutil
import sys
import traceback
import zipfile
import botok
import bs4
import mecab
import nltk
import nltk.tokenize.nist
import packaging.version
import pymorphy3
import pyphen
from PyQt5.QtCore import pyqtSignal
import sacremoses
import spacy
import spacy_pkuseg
import stanza
import sudachipy
from wordless.wl_checks import wl_checks_work_area
from wordless.wl_dialogs import wl_dialogs_misc
from wordless.wl_nlp import wl_sentence_tokenization
from wordless.wl_utils import (
wl_conversion,
wl_misc,
wl_paths,
wl_threading
)
LANGS_WITHOUT_SPACES = ['mya', 'zho_cn', 'zho_tw', 'khm', 'lao', 'jpn', 'tha', 'bod']
def to_lang_util_code(main, util_type, util_text):
return main.settings_global['mapping_lang_utils'][util_type][util_text]
def to_lang_util_codes(main, util_type, util_texts):
return (
main.settings_global['mapping_lang_utils'][util_type][util_text]
for util_text in util_texts
)
def _to_lang_util_text(main, util_type, util_code):
for text, code in main.settings_global['mapping_lang_utils'][util_type].items():
if code == util_code:
return text
return None
def to_lang_util_text(main, util_type, util_code):
return _to_lang_util_text(main, util_type, util_code)
def to_lang_util_texts(main, util_type, util_codes):
return (
_to_lang_util_text(main, util_type, util_code)
for util_code in util_codes
)
LANGS_SPACY = {
'cat': 'ca_core_news_trf',
'zho': 'zh_core_web_trf',
'hrv': 'hr_core_news_lg',
'dan': 'da_core_news_trf',
'nld': 'nl_core_news_lg',
'eng': 'en_core_web_trf',
'fin': 'fi_core_news_lg',
'fra': 'fr_dep_news_trf',
'deu': 'de_dep_news_trf',
'ell': 'el_core_news_lg',
'ita': 'it_core_news_lg',
'jpn': 'ja_core_news_trf',
'kor': 'ko_core_news_lg',
'lit': 'lt_core_news_lg',
'mkd': 'mk_core_news_lg',
'nob': 'nb_core_news_lg',
'pol': 'pl_core_news_lg',
'por': 'pt_core_news_lg',
'ron': 'ro_core_news_lg',
'rus': 'ru_core_news_lg',
'slv': 'sl_core_news_trf',
'spa': 'es_dep_news_trf',
'swe': 'sv_core_news_lg',
'ukr': 'uk_core_news_trf',
'other': 'en_core_web_trf'
}
def get_langs_stanza(main, util_type):
langs_stanza = set()
for lang_code, lang_utils in main.settings_global[util_type].items():
if any(('stanza' in lang_util for lang_util in lang_utils)):
langs_stanza.add(lang_code)
return langs_stanza
@wl_misc.log_time
def check_models(main, langs, lang_utils = None):
def update_gui_stanza(main, err_msg):
nonlocal models_ok
models_ok = wl_checks_work_area.check_results_download_model(main, err_msg)
models_ok = True
langs = list(langs)
# Check all language utilities if not specified
if lang_utils is None:
lang_utils = []
for lang in langs:
lang_utils.append([])
for settings in [
main.settings_custom['sentence_tokenization']['sentence_tokenizer_settings'],
main.settings_custom['word_tokenization']['word_tokenizer_settings'],
main.settings_custom['pos_tagging']['pos_tagger_settings']['pos_taggers'],
main.settings_custom['lemmatization']['lemmatizer_settings'],
main.settings_custom['dependency_parsing']['dependency_parser_settings'],
main.settings_custom['sentiment_analysis']['sentiment_analyzer_settings']
]:
if lang in settings:
lang_utils[-1].append(settings[lang])
elif settings in [
main.settings_custom['sentence_tokenization']['sentence_tokenizer_settings'],
main.settings_custom['word_tokenization']['word_tokenizer_settings']
]:
lang_utils[-1].append(settings['other'])
for lang, utils in zip(langs, lang_utils):
for i, util in enumerate(utils):
if util == 'default_sentence_tokenizer':
if lang in main.settings_custom['sentence_tokenization']['sentence_tokenizer_settings']:
utils[i] = main.settings_custom['sentence_tokenization']['sentence_tokenizer_settings'][lang]
else:
utils[i] = main.settings_custom['sentence_tokenization']['sentence_tokenizer_settings']['other']
elif util == 'default_word_tokenizer':
if lang in main.settings_custom['word_tokenization']['word_tokenizer_settings']:
utils[i] = main.settings_custom['word_tokenization']['word_tokenizer_settings'][lang]
else:
utils[i] = main.settings_custom['word_tokenization']['word_tokenizer_settings']['other']
elif util == 'default_pos_tagger':
if lang in main.settings_custom['pos_tagging']['pos_tagger_settings']['pos_taggers']:
utils[i] = main.settings_custom['pos_tagging']['pos_tagger_settings']['pos_taggers'][lang]
elif util == 'default_lemmatizer':
if lang in main.settings_custom['lemmatization']['lemmatizer_settings']:
utils[i] = main.settings_custom['lemmatization']['lemmatizer_settings'][lang]
elif util == 'default_dependency_parser':
if lang in main.settings_custom['dependency_parsing']['dependency_parser_settings']:
utils[i] = main.settings_custom['dependency_parsing']['dependency_parser_settings'][lang]
elif util == 'default_sentiment_analyzer':
if lang in main.settings_custom['sentiment_analysis']['sentiment_analyzer_settings']:
utils[i] = main.settings_custom['sentiment_analysis']['sentiment_analyzer_settings'][lang]
for lang, utils in zip(langs, lang_utils):
if any((util.startswith('spacy_') for util in utils)):
if lang == 'nno':
lang_spacy = 'nob'
else:
lang_spacy = wl_conversion.remove_lang_code_suffixes(main, lang)
if lang_spacy in LANGS_SPACY:
model_name = LANGS_SPACY[lang_spacy]
try:
importlib.import_module(model_name)
except ModuleNotFoundError:
worker_download_model = Wl_Worker_Download_Model_Spacy(
main,
dialog_progress = wl_dialogs_misc.Wl_Dialog_Progress_Download_Model(main),
update_gui = lambda err_msg, model_name = model_name: wl_checks_work_area.check_results_download_model(main, err_msg, model_name),
model_name = model_name
)
wl_threading.Wl_Thread(worker_download_model).start_worker()
try:
importlib.import_module(model_name)
except ModuleNotFoundError:
models_ok = False
if not models_ok:
break
if (
any((util.startswith('stanza_') for util in utils))
and lang in get_langs_stanza(main, util_type = 'word_tokenizers')
):
worker_download_model = Wl_Worker_Download_Model_Stanza(
main,
dialog_progress = wl_dialogs_misc.Wl_Dialog_Progress_Download_Model(main),
update_gui = lambda err_msg: update_gui_stanza(main, err_msg),
lang = lang
)
wl_threading.Wl_Thread(worker_download_model).start_worker()
if models_ok:
wl_checks_work_area.wl_status_bar_msg_success_download_model(main)
return models_ok
class Wl_Worker_Download_Model_Spacy(wl_threading.Wl_Worker):
worker_done = pyqtSignal(str)
def __init__(self, main, dialog_progress, update_gui, model_name):
super().__init__(main, dialog_progress, update_gui, model_name = model_name)
self.err_msg = ''
def run(self):
try:
self.progress_updated.emit(self.tr('Fetching model information...'))
# Clean existing models
for file in os.listdir('.'):
if os.path.isdir(file) and file.startswith(self.model_name):
shutil.rmtree(self.model_name, ignore_errors = True)
spacy_ver = packaging.version.Version(spacy.about.__version__)
model_ver = f'{spacy_ver.major}.{spacy_ver.minor}'
r, err_msg = wl_misc.wl_download(self.main, spacy.about.__compatibility__)
if not err_msg:
model_ver = r.json()['spacy'][model_ver][self.model_name][0]
model_file = f'{self.model_name}-{model_ver}{spacy.cli._util.WHEEL_SUFFIX}'
model_url = f'{spacy.about.__download_url__}/{self.model_name}-{model_ver}/{model_file}'
# Get model size
file_size = wl_misc.wl_download_file_size(self.main, model_url)
if file_size:
self.progress_updated.emit(self.tr('Downloading model ({:.2f} MB)...').format(file_size))
else:
self.progress_updated.emit(self.tr('Downloading model...'))
if getattr(sys, '_MEIPASS', False):
r, err_msg = wl_misc.wl_download(self.main, model_url)
if not err_msg:
with open(model_file, 'wb') as f:
f.write(r.content)
with zipfile.ZipFile(model_file) as f:
f.extractall(wl_paths.get_path_file(''))
# Clear cache
os.remove(model_file)
else:
import pip # pylint: disable=import-outside-toplevel
pip.main(['install', '--no-deps', model_url])
# Clear cache
pip.main(['cache', 'purge'])
else:
self.err_msg = err_msg
except Exception: # pylint: disable=broad-exception-caught
self.err_msg = traceback.format_exc()
self.progress_updated.emit(self.tr('Download completed successfully.'))
self.worker_done.emit(self.err_msg)
class Wl_Worker_Download_Model_Stanza(wl_threading.Wl_Worker):
worker_done = pyqtSignal(str)
def __init__(self, main, dialog_progress, update_gui, lang):
super().__init__(main, dialog_progress, update_gui, lang = lang)
self.err_msg = ''
def run(self):
try:
self.progress_updated.emit(self.tr('Downloading model...'))
# Change the directory for Stanza's downloaded models when the application is frozen
if getattr(sys, '_MEIPASS', False):
model_dir = wl_paths.get_path_file('stanza_resources')
else:
model_dir = stanza.resources.common.DEFAULT_MODEL_DIR
processors = []
if self.lang in get_langs_stanza(self.main, util_type = 'word_tokenizers'):
processors.append('tokenize')
if self.lang in get_langs_stanza(self.main, util_type = 'pos_taggers'):
processors.append('pos')
if self.lang in get_langs_stanza(self.main, util_type = 'lemmatizers'):
processors.append('lemma')
if self.lang in get_langs_stanza(self.main, util_type = 'dependency_parsers'):
processors.append('depparse')
if self.lang in get_langs_stanza(self.main, util_type = 'sentiment_analyzers'):
processors.append('sentiment')
if self.lang == 'zho_cn':
lang_stanza = 'zh-hans'
elif self.lang == 'zho_tw':
lang_stanza = 'zh-hant'
elif self.lang == 'srp_latn':
lang_stanza = 'sr'
elif self.lang == 'other':
lang_stanza = 'en'
else:
lang_stanza = wl_conversion.to_iso_639_1(self.main, self.lang, no_suffix = True)
stanza.download(
lang = lang_stanza,
model_dir = model_dir,
package = 'default',
processors = processors,
proxies = wl_misc.wl_get_proxies(self.main),
download_json = False
)
except Exception: # pylint: disable=broad-exception-caught
self.err_msg = traceback.format_exc()
self.progress_updated.emit(self.tr('Download completed successfully.'))
self.worker_done.emit(self.err_msg)
LANGS_SPACY_LEMMATIZERS = [
'ben', 'ces', 'grc', 'hun', 'ind', 'gle', 'ltz', 'fas', 'srp', 'tgl',
'tur', 'urd'
]
def init_model_spacy(main, lang, sentencizer_only = False):
sentencizer_config = {'punct_chars': list(wl_sentence_tokenization.SENTENCE_TERMINATORS)}
# Sentencizer
if sentencizer_only:
if 'spacy_nlp_sentencizer' not in main.__dict__:
main.__dict__['spacy_nlp_sentencizer'] = spacy.blank('en')
main.__dict__['spacy_nlp_sentencizer'].add_pipe('sentencizer', config = sentencizer_config)
else:
lang = wl_conversion.remove_lang_code_suffixes(main, lang)
if f'spacy_nlp_{lang}' not in main.__dict__:
# Languages with models
if lang in LANGS_SPACY:
model_name = LANGS_SPACY[lang]
model = importlib.import_module(model_name)
# Exclude NER to boost speed
main.__dict__[f'spacy_nlp_{lang}'] = model.load(exclude = ['ner'])
# Transformer-based models do not have sentence recognizer
if not model_name.endswith('_trf'):
main.__dict__[f'spacy_nlp_{lang}'].enable_pipe('senter')
if lang == 'other':
main.__dict__[f'spacy_nlp_{lang}'].add_pipe('sentencizer', config = sentencizer_config)
# Languages without models
else:
main.__dict__[f'spacy_nlp_{lang}'] = spacy.blank(wl_conversion.to_iso_639_1(main, lang))
# Add sentencizer and lemmatizer
main.__dict__[f'spacy_nlp_{lang}'].add_pipe('sentencizer', config = sentencizer_config)
if lang in LANGS_SPACY_LEMMATIZERS:
main.__dict__[f'spacy_nlp_{lang}'].add_pipe('lemmatizer')
main.__dict__[f'spacy_nlp_{lang}'].initialize()
def init_model_stanza(main, lang, lang_util, tokenized = False):
if lang_util in ['sentence_tokenizer', 'word_tokenizer']:
processors = ['tokenize']
elif lang_util == 'pos_tagger':
processors = ['tokenize', 'pos']
elif lang_util == 'lemmatizer':
processors = ['tokenize', 'pos', 'lemma']
elif lang_util == 'dependency_parser':
processors = ['tokenize', 'pos', 'lemma', 'depparse']
elif lang_util == 'sentiment_analyzer':
processors = ['tokenize', 'sentiment']
if lang in get_langs_stanza(main, util_type = 'word_tokenizers'):
if lang not in ['zho_cn', 'zho_tw', 'srp_latn']:
lang = wl_conversion.remove_lang_code_suffixes(main, lang)
if (
f'stanza_nlp_{lang}' not in main.__dict__
# Some language models require 'mwt' by default
or set(processors) | {'mwt'} != set(main.__dict__[f'stanza_nlp_{lang}'].processors) | {'mwt'}
or tokenized != main.__dict__[f'stanza_nlp_{lang}'].kwargs.get('tokenize_pretokenized', False)
):
if lang == 'zho_cn':
lang_stanza = 'zh-hans'
elif lang == 'zho_tw':
lang_stanza = 'zh-hant'
elif lang == 'srp_latn':
lang_stanza = 'sr'
elif lang == 'other':
lang_stanza = 'en'
else:
lang_stanza = wl_conversion.to_iso_639_1(main, lang, no_suffix = True)
if getattr(sys, '_MEIPASS', False):
model_dir = wl_paths.get_path_file('stanza_resources')
else:
model_dir = stanza.resources.common.DEFAULT_MODEL_DIR
main.__dict__[f'stanza_nlp_{lang}'] = stanza.Pipeline(
lang = lang_stanza,
dir = model_dir,
package = 'default',
processors = processors,
download_method = None,
tokenize_pretokenized = tokenized
)
def init_sudachipy_word_tokenizer(main):
if 'sudachipy_word_tokenizer' not in main.__dict__:
main.sudachipy_word_tokenizer = sudachipy.Dictionary().create()
def init_sentence_tokenizers(main, lang, sentence_tokenizer):
# spaCy
if sentence_tokenizer.startswith('spacy_'):
if sentence_tokenizer == 'spacy_sentencizer':
init_model_spacy(main, lang, sentencizer_only = True)
else:
init_model_spacy(main, lang)
# Stanza
elif sentence_tokenizer.startswith('stanza_'):
init_model_stanza(main, lang, lang_util = 'sentence_tokenizer')
def init_word_tokenizers(main, lang, word_tokenizer = 'default'):
if lang not in main.settings_global['word_tokenizers']:
lang = 'other'
if word_tokenizer == 'default':
word_tokenizer = main.settings_custom['word_tokenization']['word_tokenizer_settings'][lang]
# NLTK
if word_tokenizer.startswith('nltk_'):
if word_tokenizer == 'nltk_nist':
if 'nltk_nist_tokenizer' not in main.__dict__:
main.nltk_nist_tokenizer = nltk.tokenize.nist.NISTTokenizer()
elif word_tokenizer == 'nltk_nltk':
if 'nltk_nltk_tokenizer' not in main.__dict__:
main.nltk_nltk_tokenizer = nltk.NLTKWordTokenizer()
elif word_tokenizer == 'nltk_penn_treebank':
if 'nltk_treebank_tokenizer' not in main.__dict__:
main.nltk_treebank_tokenizer = nltk.TreebankWordTokenizer()
elif word_tokenizer == 'nltk_regex':
if 'nltk_regex_tokenizer' not in main.__dict__:
main.nltk_regex_tokenizer = nltk.WordPunctTokenizer()
elif word_tokenizer == 'nltk_tok_tok':
if 'nltk_toktok_tokenizer' not in main.__dict__:
main.nltk_toktok_tokenizer = nltk.ToktokTokenizer()
elif word_tokenizer == 'nltk_twitter':
if 'nltk_tweet_tokenizer' not in main.__dict__:
main.nltk_tweet_tokenizer = nltk.TweetTokenizer()
# Sacremoses
elif word_tokenizer == 'sacremoses_moses':
lang_sacremoses = wl_conversion.remove_lang_code_suffixes(main, wl_conversion.to_iso_639_1(main, lang))
lang = wl_conversion.remove_lang_code_suffixes(main, lang)
if f'sacremoses_moses_tokenizer_{lang}' not in main.__dict__:
main.__dict__[f'sacremoses_moses_tokenizer_{lang}'] = sacremoses.MosesTokenizer(lang = lang_sacremoses)
# spaCy
elif word_tokenizer.startswith('spacy_'):
init_model_spacy(main, lang)
# Stanza
elif word_tokenizer.startswith('stanza_'):
init_model_stanza(main, lang, lang_util = 'word_tokenizer')
# Chinese
elif word_tokenizer == 'pkuseg_zho':
if 'pkuseg_word_tokenizer' not in main.__dict__:
main.pkuseg_word_tokenizer = spacy_pkuseg.pkuseg(model_name = 'mixed')
# Chinese & Japanese
elif word_tokenizer.startswith('wordless_'):
init_model_spacy(main, 'eng_us')
init_model_spacy(main, 'other')
# Japanese
elif word_tokenizer.startswith('sudachipy_jpn'):
init_sudachipy_word_tokenizer(main)
# Korean
elif word_tokenizer == 'python_mecab_ko_mecab':
if 'python_mecab_ko_mecab' not in main.__dict__:
main.__dict__['python_mecab_ko_mecab'] = mecab.MeCab()
# Tibetan
elif word_tokenizer == 'botok_bod':
if 'botok_word_tokenizer' not in main.__dict__:
main.botok_word_tokenizer = botok.WordTokenizer()
def init_syl_tokenizers(main, lang, syl_tokenizer):
# NLTK
if syl_tokenizer == 'nltk_legality':
if 'nltk_syl_tokenizer_legality' not in main.__dict__:
main.nltk_syl_tokenizer_legality = nltk.tokenize.LegalitySyllableTokenizer(nltk.corpus.words.words())
elif syl_tokenizer == 'nltk_sonority_sequencing':
if 'nltk_syl_tokenizer_sonority_sequencing' not in main.__dict__:
main.nltk_syl_tokenizer_sonority_sequencing = nltk.tokenize.SyllableTokenizer()
# Pyphen
elif syl_tokenizer.startswith('pyphen_'):
if f'pyphen_syl_tokenizer_{lang}' not in main.__dict__:
lang_pyphen = wl_conversion.to_iso_639_1(main, lang)
main.__dict__[f'pyphen_syl_tokenizer_{lang}'] = pyphen.Pyphen(lang = lang_pyphen)
def init_word_detokenizers(main, lang):
if lang not in ['zho_cn', 'zho_tw', 'jpn', 'tha', 'bod']:
# Sacremoses
lang_sacremoses = wl_conversion.remove_lang_code_suffixes(main, wl_conversion.to_iso_639_1(main, lang))
lang = wl_conversion.remove_lang_code_suffixes(main, lang)
if f'sacremoses_moses_detokenizer_{lang}' not in main.__dict__:
main.__dict__[f'sacremoses_moses_detokenizer_{lang}'] = sacremoses.MosesDetokenizer(lang = lang_sacremoses)
def init_pos_taggers(main, lang, pos_tagger, tokenized = False):
# spaCy
if pos_tagger.startswith('spacy_'):
init_model_spacy(main, lang)
# Stanza
elif pos_tagger.startswith('stanza_'):
init_model_stanza(main, lang, lang_util = 'pos_tagger', tokenized = tokenized)
# Japanese
elif pos_tagger == 'sudachipy_jpn':
init_sudachipy_word_tokenizer(main)
# Korean
elif pos_tagger == 'python_mecab_ko_mecab':
init_word_tokenizers(main, lang = 'kor', word_tokenizer = 'python_mecab_ko_mecab')
# Russian & Ukrainian
elif pos_tagger == 'pymorphy3_morphological_analyzer':
if lang == 'rus':
if 'pymorphy3_morphological_analyzer_rus' not in main.__dict__:
main.pymorphy3_morphological_analyzer_rus = pymorphy3.MorphAnalyzer(lang = 'ru')
elif lang == 'ukr':
if 'pymorphy3_morphological_analyzer_ukr' not in main.__dict__:
main.pymorphy3_morphological_analyzer_ukr = pymorphy3.MorphAnalyzer(lang = 'uk')
def init_lemmatizers(main, lang, lemmatizer, tokenized = False):
# spaCy
if lemmatizer.startswith('spacy_'):
init_model_spacy(main, lang)
# Stanza
elif lemmatizer.startswith('stanza_'):
init_model_stanza(main, lang, lang_util = 'lemmatizer', tokenized = tokenized)
# Japanese
elif lemmatizer == 'sudachipy_jpn':
init_sudachipy_word_tokenizer(main)
# Russian & Ukrainian
elif lemmatizer == 'pymorphy3_morphological_analyzer':
if lang == 'rus':
if 'pymorphy3_morphological_analyzer_rus' not in main.__dict__:
main.pymorphy3_morphological_analyzer_rus = pymorphy3.MorphAnalyzer(lang = 'ru')
elif lang == 'ukr':
if 'pymorphy3_morphological_analyzer_ukr' not in main.__dict__:
main.pymorphy3_morphological_analyzer_ukr = pymorphy3.MorphAnalyzer(lang = 'uk')
def init_dependency_parsers(main, lang, dependency_parser, tokenized = False):
# spaCy
if dependency_parser.startswith('spacy_'):
init_model_spacy(main, lang)
# Stanza
elif dependency_parser.startswith('stanza_'):
init_model_stanza(main, lang, lang_util = 'dependency_parser', tokenized = tokenized)
def init_sentiment_analyzers(main, lang, sentiment_analyzer, tokenized = False):
# Stanza
if sentiment_analyzer.startswith('stanza_'):
init_model_stanza(main, lang, lang_util = 'sentiment_analyzer', tokenized = tokenized)
# Make sure tokenization is not modified during NLP processing
def align_tokens(tokens_raw, tokens_processed, results, prefer_raw = False):
results_modified = []
tokens_raw = list(tokens_raw)
i_raw = 0
i_processed = 0
len_raw = len(tokens_raw)
len_processed = len(tokens_processed)
while i_raw < len_raw and i_processed < len_processed:
# Different token
if len(tokens_raw[i_raw]) != len(tokens_processed[i_processed]):
tokens_raw_temp = [tokens_raw[i_raw]]
tokens_processed_temp = [tokens_processed[i_processed]]
results_temp = [results[i_processed]]
while i_raw < len_raw - 1 or i_processed < len_processed - 1:
len_raw_temp = sum((len(token) for token in tokens_raw_temp))
len_processed_temp = sum((len(token) for token in tokens_processed_temp))
if len_raw_temp < len_processed_temp:
tokens_raw_temp.append(tokens_raw[i_raw + 1])
i_raw += 1
elif len_raw_temp > len_processed_temp:
tokens_processed_temp.append(tokens_processed[i_processed + 1])
results_temp.append(results[i_processed + 1])
i_processed += 1
elif len_raw_temp == len_processed_temp:
# eg. lemmatization
if prefer_raw:
# Always use original tokens
results_modified.extend(tokens_raw_temp)
# eg. POS tagging
else:
len_raw_temp_tokens = len(tokens_raw_temp)
len_processed_temp_tokens = len(tokens_processed_temp)
# Use results if one-to-one
if len_raw_temp_tokens == len_processed_temp_tokens:
results_modified.extend(results_temp)
# Clip results if one-to-many
elif len_raw_temp_tokens < len_processed_temp_tokens:
results_modified.extend(results_temp[:len_raw_temp_tokens])
# Extend results if many-to-one
elif len_raw_temp_tokens > len_processed_temp_tokens:
results_modified.extend(results_temp)
results_modified.extend([results_temp[-1]] * (len_raw_temp_tokens - len_processed_temp_tokens))
tokens_raw_temp.clear()
tokens_processed_temp.clear()
results_temp.clear()
break
# If reaching end of file
if tokens_raw_temp:
if prefer_raw:
results_modified.extend(tokens_raw_temp)
else:
len_raw_temp_tokens = len(tokens_raw_temp)
len_processed_temp_tokens = len(tokens_processed_temp)
if len_raw_temp_tokens == len_processed_temp_tokens:
results_modified.extend(results_temp)
elif len_raw_temp_tokens < len_processed_temp_tokens:
results_modified.extend(results_temp[:len_raw_temp_tokens])
elif len_raw_temp_tokens > len_processed_temp_tokens:
results_modified.extend(results_temp)
results_modified.extend([results_temp[-1]] * (len_raw_temp_tokens - len_processed_temp_tokens))
else:
results_modified.append(results[i_processed])
i_raw += 1
i_processed += 1
return results_modified
def to_sections(tokens, num_sections):
len_tokens = len(tokens)
if len_tokens >= num_sections:
sections = []
section_size, remainder = divmod(len_tokens, num_sections)
for i in range(num_sections):
if i < remainder:
section_start = i * section_size + i
else:
section_start = i * section_size + remainder
if i + 1 < remainder:
section_stop = (i + 1) * section_size + i + 1
else:
section_stop = (i + 1) * section_size + remainder
sections.append(tokens[section_start:section_stop])
else:
sections = [[token] for token in tokens]
return sections
def to_sections_unequal(tokens, section_size):
tokens = list(tokens)
for i in range(0, len(tokens), section_size):
yield tokens[i : i + section_size]
# Read text in chunks to avoid memory error
def split_into_chunks_text(text, section_size):
# Split text into paragraphs excluding the last empty one
paras = text.splitlines(keepends = True)
for section in to_sections_unequal(paras, section_size):
yield ''.join(section)
# Split long list of tokens
def split_token_list(main, inputs, nlp_util):
section_size = main.settings_custom['files']['misc_settings']['read_files_in_chunks']
# Split tokens into sub-lists as inputs of SudachiPy cannot be more than 49149 BYTES
if nlp_util in ['spacy_jpn', 'sudachipy_jpn'] and sum((len(token) for token in inputs)) > 49149 // 4:
# Around 6 characters per token and 4 bytes per character (≈ 49149 / 4 / 6)
texts = to_sections_unequal(inputs, section_size = 2000)
else:
texts = to_sections_unequal(inputs, section_size = section_size * 100)
return texts
# Serbian
SRP_CYRL_TO_LATN = {
# Uppercase
'А': 'A',
'Б': 'B',
'Ц': 'C',
'Ч': 'Č',
'Ћ': 'Ć',
'Д': 'D',
'Џ': 'Dž',
'Ђ': 'Đ',
'Е': 'E',
'Ф': 'F',
'Г': 'G',
'Х': 'H',
'И': 'I',
'Ј': 'J',
'К': 'K',
'Л': 'L',
'Љ': 'Lj',
'М': 'M',
'Н': 'N',
'Њ': 'Nj',
'О': 'O',
'П': 'P',
'Р': 'R',
'С': 'S',
'Ш': 'Š',
'Т': 'T',
'У': 'U',
'В': 'V',
'З': 'Z',
'Ж': 'Ž',
# Lowercase
'а': 'a',
'б': 'b',
'ц': 'c',
'ч': 'č',
'ћ': 'ć',
'д': 'd',
'џ': 'dž',
'ђ': 'đ',
'е': 'e',
'ф': 'f',
'г': 'g',
'х': 'h',
'и': 'i',
'ј': 'j',
'к': 'k',
'л': 'l',
'љ': 'lj',
'м': 'm',
'н': 'n',
'њ': 'nj',
'о': 'o',
'п': 'p',
'р': 'r',
'с': 's',
'ш': 'š',
'т': 't',
'у': 'u',
'в': 'v',
'з': 'z',
'ж': 'ž',
}
SRP_LATN_TO_CYRL = {
# Uppercase
'A': 'А',
'B': 'Б',
'C': 'Ц',
'Č': 'Ч',
'Ć': 'Ћ',
'D': 'Д',
'Dž': 'Џ',
'Đ': 'Ђ',
'E': 'Е',
'F': 'Ф',
'G': 'Г',
'H': 'Х',
'I': 'И',
'J': 'Ј',
'K': 'К',
'L': 'Л',
'Lj': 'Љ',
'M': 'М',
'N': 'Н',
'Nj': 'Њ',
'O': 'О',
'P': 'П',
'R': 'Р',
'S': 'С',
'Š': 'Ш',
'T': 'Т',
'U': 'У',
'V': 'В',
'Z': 'З',
'Ž': 'Ж',
# Lowercase
'a': 'а',
'b': 'б',
'c': 'ц',
'č': 'ч',
'ć': 'ћ',
'd': 'д',
'dž': 'џ',
'đ': 'ђ',
'e': 'е',
'f': 'ф',
'g': 'г',
'h': 'х',
'i': 'и',
'j': 'ј',
'k': 'к',
'l': 'л',
'lj': 'љ',
'm': 'м',
'n': 'н',
'nj': 'њ',
'o': 'о',
'p': 'п',
'r': 'р',
's': 'с',
'š': 'ш',
't': 'т',
'u': 'у',
'v': 'в',
'z': 'з',
'ž': 'ж'
}
SRP_LATN_TO_CYRL_DIGRAPHS = {
'Dž': 'Џ',
'Lj': 'Љ',
'Nj': 'Њ',
'dž': 'џ',
'lj': 'љ',
'nj': 'њ'
}
def to_srp_latn(tokens):
tokens_latn = []
for token in tokens:
token_latn = ''
for char in token:
if char not in SRP_CYRL_TO_LATN:
token_latn += char
else:
token_latn += SRP_CYRL_TO_LATN[char]
tokens_latn.append(token_latn)
return tokens_latn
def to_srp_cyrl(tokens):
tokens_cyrl = []
for token in tokens:
token_cyrl = ''
for char_latn, char_cyrl in SRP_LATN_TO_CYRL_DIGRAPHS.items():
token = token.replace(char_latn, char_cyrl)
for char in token:
if char not in SRP_LATN_TO_CYRL:
token_cyrl += char
else:
token_cyrl += SRP_LATN_TO_CYRL[char]
tokens_cyrl.append(token_cyrl)
return tokens_cyrl
# N-grams
# Reference: https://more-itertools.readthedocs.io/en/stable/_modules/more_itertools/recipes.html#sliding_window
def ngrams(tokens, ngram_size):
if ngram_size == 1:
for token in tokens:
yield (token,)
else:
it = iter(tokens)
window = collections.deque(itertools.islice(it, ngram_size), maxlen = ngram_size)
if len(window) == ngram_size:
yield tuple(window)
for x in it:
window.append(x)
yield tuple(window)
# Reference: https://www.nltk.org/_modules/nltk/util.html#everygrams
def everygrams(tokens, ngram_size_min, ngram_size_max):
if ngram_size_min == ngram_size_max:
yield from ngrams(tokens, ngram_size_min)
else:
# Pad token list to the right
SENTINEL = object()
tokens = itertools.chain(tokens, (SENTINEL,) * (ngram_size_max - 1))
for ngram in ngrams(tokens, ngram_size_max):
for i in range(ngram_size_min, ngram_size_max + 1):
if ngram[i - 1] is not SENTINEL:
yield ngram[:i]
# Reference: https://www.nltk.org/_modules/nltk/util.html#skipgrams
def skipgrams(tokens, ngram_size, num_skipped_tokens):
if ngram_size == 1:
yield from ngrams(tokens, ngram_size = 1)
else:
# Pad token list to the right
SENTINEL = object()
tokens = itertools.chain(tokens, (SENTINEL,) * (ngram_size - 1))
for ngram in ngrams(tokens, ngram_size + num_skipped_tokens):
head = ngram[:1]
tail = ngram[1:]
for skip_tail in itertools.combinations(tail, ngram_size - 1):
if skip_tail[-1] is not SENTINEL:
yield head + skip_tail
# HTML
def escape_token(token):
return html.escape(token).strip()
def escape_tokens(tokens):
return [html.escape(token).strip() for token in tokens]
def html_to_text(text):
# Remove tags and unescape character entities
text = bs4.BeautifulSoup(text, features = 'lxml').get_text()
text = text.replace('\n', ' ')
text = re.sub(r'\s+', ' ', text)
return text.strip()