giganticode/codeprep

View on GitHub
codeprep/pipeline/dataset.py

Summary

Maintainability
C
1 day
Test Coverage
# SPDX-FileCopyrightText: 2020 Hlib Babii <hlibbabii@gmail.com>
#
# SPDX-License-Identifier: Apache-2.0

import ast
import logging
import os
from typing import Type, Optional, Generator, List

from codeprep.bpepkg.bpe_config import BpeConfig
from codeprep.config import DEFAULT_PARSED_DATASETS_DIR, DEFAULT_PREP_DATASETS_DIR, USER_BPE_DIR, DEFAULT_FILE_LIST_DIR, \
    USER_VOCAB_DIR, DEFAULT_CORPUS_SIZES_DIR
from codeprep.util.dir import walk_and_save, get_timestamp
from codeprep.pipeline.bperegistry import get_codes_id_by_bpe_path, create_new_id_from, write_bpe_codes_id, \
    CustomBpeConfig
from codeprep.pipeline.vocab import VOCAB_FILENAME
from codeprep.prepconfig import PrepConfig

logger = logging.getLogger(__name__)

PP_PARAMS_FILENAME = 'params.json'
PREPROCESSING_TYPES_FILENAME = 'preprocessing_types.json'
FILE_LIST_FILENAME = "filelist"
DIR_LIST_FILENAME = "dirlist"

PARSED_EXTENSION = ".parsed"
PREPROCESSED_EXTENSION = ".prep"
NOT_FINISHED_EXTENSION = ".part"
ARCHIVED_EXT = "archived"

NONBPE_VOCAB_FILENAME = 'nonbpe_vocab'


class SubDataset(object):
    def __init__(self, dataset: 'Dataset', path: str, suffix: str = ''):
        self._dataset = dataset
        self._path = path
        self._suffix = suffix

    @property
    def dataset(self):
        return self._dataset

    @property
    def path(self) -> str:
        return self._path

    def set_ready(self) -> None:
        set_path_ready(self.path)

    def is_outdated(self) -> bool:
        return is_path_outdated(self.path)

    def file_iterator(self) -> Generator[bytes, None, None]:
        encoded_path = self.path.encode()
        encoded_suffix = self._suffix.encode()
        for file in self._dataset.get_all_files():
            if os.path.isfile(encoded_path):
                yield encoded_path + encoded_suffix
            else:
                yield os.path.join(encoded_path, file + encoded_suffix)

    def get_new_file_name(self, file_path: bytes, new_subdataset: 'SubDataset') -> bytes:
        encoded_path = self.path.encode()
        rel_path = os.path.relpath(file_path, encoded_path)
        if rel_path == '.'.encode(): # this check is needed and the result is true for cases when only one file is being preprocessed
            rel_path = os.path.basename(file_path)
        return os.path.join(new_subdataset.path.encode(),
                            (rel_path[:-len(self._suffix.encode())] if len(self._suffix.encode()) else rel_path) + new_subdataset._suffix.encode())

    def ready(self) -> bool:
        return is_path_ready(self.path)

    def archive(self) -> None:
        archive_path(self.path)

    def __eq__(self, o: object) -> bool:
        if isinstance(o, SubDataset):
            return self._dataset.path == o._dataset.path and self._path == o._path and self._suffix == o._suffix
        return False

    def __repr__(self) -> str:
        return f'{self.__class__.__name__} ({self._dataset}, {self._path}, {self._suffix}) '

    def __str__(self) -> str:
        return self._path


class Dataset(object):
    """
    Abstaction that incapsulates the location of the dataset in the file system and assures integrity of intermediate
    representation of data when the data preprocessing operation consists of multiple steps.
    """
    def __init__(self, path: str, prep_config: PrepConfig, normalized_extension_list: Optional[List[str]],
                 custom_bpe_config: Optional[CustomBpeConfig],
                 bpe_config: Optional[BpeConfig],
                 overridden_path_to_prep_dataset, suppress_caching: bool):
        self._path = path
        self._prep_config = prep_config
        self._normalized_extension_list = normalized_extension_list
        self._custom_bpe_config = custom_bpe_config
        self._bpe_config = bpe_config
        self._dataset_last_modified = get_timestamp(path)

        self._original = SubDataset(self, self.path)
        self._parsed = SubDataset(self, self._get_path_to_parsed_dataset(), suffix=PARSED_EXTENSION)
        self._preprocessed = SubDataset(self, self._get_path_to_prep_dataset(overridden_path_to_prep_dataset), suffix=PREPROCESSED_EXTENSION)

        self._suppress_caching = suppress_caching

    def __eq__(self, o: object) -> bool:
        if isinstance(o, Dataset):
            return self._path == o._path and \
                   self._prep_config == o._prep_config and \
                   self._normalized_extension_list == o._normalized_extension_list and \
                   self._custom_bpe_config == o._custom_bpe_config and \
                   self._bpe_config == o._bpe_config and \
                   self._dataset_last_modified == o._dataset_last_modified and \
                   self._original == o._original and \
                   self._parsed == o._parsed and \
                   self._preprocessed == o._preprocessed and \
                   self._suppress_caching == o._suppress_caching
        return False

    #####################################################

    @classmethod
    def create(cls: Type['Dataset'], path_to_dataset: str, prep_config: PrepConfig, extensions: Optional[str],
               custom_bpe_config: Optional[CustomBpeConfig],
               bpe_config: Optional[BpeConfig] = None,
               overriden_path_to_prep_dataset: Optional[str] = None, suppress_caching: bool = False) -> 'Dataset':
        if not os.path.exists(path_to_dataset):
            raise ValueError(f"Path {path_to_dataset} does not exist")

        normalized_extension_list = normalize_extension_string(extensions)
        dataset = cls(path_to_dataset, prep_config, normalized_extension_list, custom_bpe_config, bpe_config, overriden_path_to_prep_dataset, suppress_caching=suppress_caching)

        if not os.path.exists(dataset.parsed.path):
            os.makedirs(dataset.parsed.path)

        if not os.path.exists(dataset.preprocessed.path):
            os.makedirs(dataset.preprocessed.path)

        if not os.path.exists(DEFAULT_FILE_LIST_DIR):
            os.makedirs(DEFAULT_FILE_LIST_DIR)

        if not os.path.exists(DEFAULT_CORPUS_SIZES_DIR):
            os.makedirs(DEFAULT_CORPUS_SIZES_DIR)

        if not os.path.exists(dataset.bpe_path):
            os.makedirs(dataset.bpe_path)

        return dataset

    #####################################################

    @property
    def path(self) -> str:
        return self._path

    @property
    def name(self) -> str:
        return os.path.basename(self.path)

    @property
    def dataset_last_modified(self) -> str:
        return self._dataset_last_modified

    @property
    def prep_config(self) -> PrepConfig:
        return self._prep_config

    @property
    def get_dataset_dir_name(self) -> str:
        name = f'{self.name}_{self.dataset_last_modified}'
        if self._normalized_extension_list:
            name += ('_' + "_".join(self._normalized_extension_list))
        return name

    @property
    def parsed(self) -> SubDataset:
        return self._parsed

    @property
    def preprocessed(self) -> SubDataset:
        return self._preprocessed

    def _get_path_to_parsed_dataset(self) -> str:
        return os.path.join(DEFAULT_PARSED_DATASETS_DIR, self.get_dataset_dir_name)

    def _get_prep_suffix(self) -> str:
        suffix = str(self.prep_config)
        if self._custom_bpe_config:
            suffix += f'_{self._custom_bpe_config.merge_list_id}-{self._custom_bpe_config.n_merges}'
        return suffix

    def _get_path_to_prep_dataset(self, overriden_path_to_prep_dataset: Optional[str]) -> str:
        prefix = overriden_path_to_prep_dataset if overriden_path_to_prep_dataset else DEFAULT_PREP_DATASETS_DIR

        basename = f'{self.get_dataset_dir_name}_-_{self._get_prep_suffix()}'

        if overriden_path_to_prep_dataset:
            basename += '_-_prep'

        return os.path.join(prefix, basename)

    @property
    def suppress_caching(self) -> bool:
        return self._suppress_caching

    @property
    def original(self) -> SubDataset:
        return self._original

    @property
    def bpe_path(self) -> str:
        name_parts = [self.get_dataset_dir_name]
        if self._bpe_config:
            suffix = self._bpe_config.to_suffix()
            if suffix:
                name_parts.append(suffix)

        return os.path.join(USER_BPE_DIR, "_-_".join(name_parts))

    @property
    def base_bpe_vocab_path(self) -> str:
        return os.path.join(USER_VOCAB_DIR, f'{self.get_dataset_dir_name}_-_{self._bpe_config.to_prep_config()}')

    @property
    def vocab_path(self) -> str:
        return os.path.join(USER_VOCAB_DIR, f'{self.get_dataset_dir_name}_-_{self._get_prep_suffix()}')

    @property
    def path_to_vocab_file(self) -> str:
        return os.path.join(self.vocab_path, VOCAB_FILENAME)

    @property
    def path_to_bpe_vocab_file(self) -> str:
        return os.path.join(self.base_bpe_vocab_path, VOCAB_FILENAME)

    @property
    def path_to_nonbpe_vocab_file(self) -> str:
        return os.path.join(self.base_bpe_vocab_path if self._bpe_config else self.vocab_path, NONBPE_VOCAB_FILENAME)

    @property
    def bpe_codes_id(self) -> Optional[str]:
        return get_codes_id_by_bpe_path(self.bpe_path)

    def assign_bpe_codes_id(self, bpe_config: BpeConfig, predefined_bpe_codes_id: str) -> None:
        id = create_new_id_from(self.path, bpe_config, predefined_bpe_codes_id)
        write_bpe_codes_id(self.bpe_path, id)

    @property
    def path_to_file_list_folder(self) -> str:
        return os.path.join(DEFAULT_FILE_LIST_DIR, f'{self.get_dataset_dir_name}')

    @property
    def path_to_prep_corpus_size_file(self) -> str:
        return os.path.join(DEFAULT_CORPUS_SIZES_DIR, f'{os.path.basename(self.preprocessed.path)}')

    def get_all_files(self, return_dirs_instead_of_regular_files: bool=False) -> Generator[bytes, None, None]:
        if self.files_need_to_be_saved():
            if not os.path.exists(self.path_to_file_list_folder):
                os.makedirs(self.path_to_file_list_folder)
            for filepath in walk_and_save(self.original.path,
                                          os.path.join(self.path_to_file_list_folder, DIR_LIST_FILENAME),
                                          os.path.join(self.path_to_file_list_folder, FILE_LIST_FILENAME),
                                          return_dirs_instead_of_regular_files, self._normalized_extension_list):
                yield filepath
            if not self.suppress_caching:
                set_path_ready(self.path_to_file_list_folder)
        else:
            file_to_save_to = DIR_LIST_FILENAME if return_dirs_instead_of_regular_files else FILE_LIST_FILENAME
            with open(os.path.join(self.path_to_file_list_folder, file_to_save_to)) as f:
                for line in f:
                    yield ast.literal_eval(line)

    ###################################

    def to_summary(self) -> str:
        return f"Original dataset: {self.original.path}\nParsed dataset (internal): {self.parsed.path}\nPreprocessed dataset: {self.preprocessed.path}"

    def __str__(self) -> str:
        return self.to_summary()

    def files_need_to_be_saved(self) -> bool:
        return not is_path_ready(self.path_to_file_list_folder) or is_path_outdated(self.path_to_file_list_folder)


def _get_last_modif_file_path_for_dir(path: str) -> str:
    dirname, filename = os.path.split(path)
    return os.path.join(dirname, f'.{filename}.lastmodif')


def set_path_ready(path: str) -> None:
    modif_file = _get_last_modif_file_path_for_dir(path)
    with open(modif_file, 'w') as f:
        f.write(get_timestamp(path))


def is_path_outdated(path: str) -> bool:
    modif_file = _get_last_modif_file_path_for_dir(path)
    if not os.path.exists(modif_file):
        raise FileNotFoundError()
    with open(modif_file) as f:
        expected_timestamp = f.read()
        actual_timestamp = get_timestamp(path)
        return (expected_timestamp != actual_timestamp)


def is_path_ready(path: str) -> bool:
    if not os.path.exists(path):
        return False
    modif_file = _get_last_modif_file_path_for_dir(path)
    return os.path.exists(modif_file)


def archive_path(path: str) -> None:
    modif_file = _get_last_modif_file_path_for_dir(path)
    timestamp = get_timestamp(path)
    if not os.path.exists(DEFAULT_PREP_DATASETS_DIR):
        os.makedirs(DEFAULT_PREP_DATASETS_DIR)
    os.rename(path, os.path.join(DEFAULT_PREP_DATASETS_DIR, f'{os.path.basename(path)}.{ARCHIVED_EXT}.{timestamp}'))
    os.rename(modif_file, os.path.join(DEFAULT_PREP_DATASETS_DIR, f'{os.path.basename(modif_file)}.{ARCHIVED_EXT}.{timestamp}'))


def normalize_extension_string(extensions: Optional[str]) -> Optional[List[str]]:
    """
    >>> normalize_extension_string("java|c|python")
    ['c', 'java', 'python']

    >>> normalize_extension_string("java")
    ['java']

    >>> normalize_extension_string("java|c|java")
    ['c', 'java']

    >>> res = normalize_extension_string(None)
    >>> str(res)
    'None'
    """
    if not extensions:
        return extensions

    exts = extensions.split("|")
    unique_ext = set(exts)
    return sorted(unique_ext)