zincware/MDSuite

View on GitHub
mdsuite/file_io/lammps_trajectory_files.py

Summary

Maintainability
A
1 hr
Test Coverage
"""
MDSuite: A Zincwarecode package.

License
-------
This program and the accompanying materials are made available under the terms
of the Eclipse Public License v2.0 which accompanies this distribution, and is
available at https://www.eclipse.org/legal/epl-v20.html

SPDX-License-Identifier: EPL-2.0

Copyright Contributors to the Zincwarecode Project.

Contact Information
-------------------
email: zincwarecode@gmail.com
github: https://github.com/zincware
web: https://zincwarecode.com/

Citation
--------
If you use this module please cite us with:
"""

import pathlib
import typing

import numpy as np

import mdsuite.file_io.tabular_text_files
import mdsuite.utils.meta_functions
from mdsuite.database.mdsuite_properties import mdsuite_properties
from mdsuite.database.simulation_database import TrajectoryMetadata
from mdsuite.file_io.tabular_text_files import (
    get_species_list_from_tabular_text_reader_data,
)
from mdsuite.utils.meta_functions import sort_array_by_column

column_names = {
    mdsuite_properties.positions: ["x", "y", "z"],
    mdsuite_properties.scaled_positions: ["xs", "ys", "zs"],
    mdsuite_properties.unwrapped_positions: ["xu", "yu", "zu"],
    mdsuite_properties.scaled_unwrapped_positions: ["xsu", "ysu", "zsu"],
    mdsuite_properties.velocities: ["vx", "vy", "vz"],
    mdsuite_properties.forces: ["fx", "fy", "fz"],
    mdsuite_properties.box_images: ["ix", "iy", "iz"],
    mdsuite_properties.dipole_orientation_magnitude: ["mux", "muy", "muz"],
    mdsuite_properties.angular_velocity_spherical: ["omegax", "omegay", "omegaz"],
    mdsuite_properties.angular_velocity_non_spherical: [
        "angmomx",
        "angmomy",
        "angmomz",
    ],
    mdsuite_properties.torque: ["tqx", "tqy", "tqz"],
    mdsuite_properties.charge: ["q"],
    mdsuite_properties.kinetic_energy: ["c_KE"],
    mdsuite_properties.potential_energy: ["c_PE"],
    mdsuite_properties.stress: [
        "c_Stress[1]",
        "c_Stress[2]",
        "c_Stress[3]",
        "c_Stress[4]",
        "c_Stress[5]",
        "c_Stress[6]",
    ],
}


class LAMMPSTrajectoryFile(mdsuite.file_io.tabular_text_files.TabularTextFileProcessor):
    """Reader for LAMMPS files."""

    def __init__(
        self,
        file_path: typing.Union[str, pathlib.Path],
        trajectory_is_sorted_by_ids=False,
        custom_data_map: dict = None,
    ):
        """

        Parameters
        ----------
        file_path:
            path to the LAMMPS trajectory file
        trajectory_is_sorted_by_ids:
            Flag to indicate if the particle order in the trajectory file is the same for
            each configuration
        custom_data_map
            If your file contains columns with data that is not part of the standard set
            of properties (see column_names in this file),
            you can map the column names to the corresponding property.
            example: custom_data_map = {"Reduced_Momentum": ["rp_x", "rp_y", "rp_z"]},
            if the file contains columns labelled as 'rp_{x,y,z}' for the three components
            of the reduced momentum vector
        """
        super(LAMMPSTrajectoryFile, self).__init__(
            file_path,
            file_format_column_names=column_names,
            custom_column_names=custom_data_map,
        )
        self.n_header_lines = 9
        self.trajectory_is_sorted_by_ids = trajectory_is_sorted_by_ids

    def _get_tabular_text_reader_mdata(
        self,
    ) -> mdsuite.file_io.tabular_text_files.TabularTextFileReaderMData:
        """Implement abstract parent method."""
        with open(self.file_path, "r") as file:
            header = mdsuite.file_io.tabular_text_files.read_n_lines(
                file, self.n_header_lines, start_at=0
            )

            # extract data that can be directly read off the header
            n_particles = int(header[3].split()[0])

            # extract properties from the column names
            header_property_names = header[8].split()[2:]
            id_column_idx = header_property_names.index("id")
            property_dict = extract_properties_from_header(
                header_property_names, self._column_name_dict
            )

            # get number of configs from file length
            file.seek(0)
            num_lines = sum(1 for _ in file)
            n_configs_float = num_lines / (n_particles + self.n_header_lines)
            n_configs = int(round(n_configs_float))
            assert abs(n_configs_float - n_configs) < 1e-10

            # get information on which particles with which id belong to which species
            # by analysing the first configuration
            file.seek(0)
            species_dict = self._get_species_information(
                file, header_property_names, n_particles
            )

        return mdsuite.file_io.tabular_text_files.TabularTextFileReaderMData(
            n_configs=n_configs,
            species_name_to_line_idx_dict=species_dict,
            property_to_column_idx_dict=property_dict,
            n_header_lines=self.n_header_lines,
            n_particles=n_particles,
            header_lines_for_each_config=True,
            sort_by_column_idx=(
                None if self.trajectory_is_sorted_by_ids else id_column_idx
            ),
        )

    def _get_metadata(self) -> TrajectoryMetadata:
        """
        Gets the metadata for database creation as an implementation of the parent class
        virtual function.
        """
        with open(self.file_path, "r") as file:
            header = mdsuite.file_io.tabular_text_files.read_n_lines(
                file, self.n_header_lines, start_at=0
            )
            header_boxl_lines = header[5:8]
            box_l = [
                float(line.split()[1]) - float(line.split()[0])
                for line in header_boxl_lines
            ]
            # extract sample step information from consecutive headers
            file.seek(0)
            sample_rate = self._get_sample_rate(
                file, self.tabular_text_reader_data.n_particles
            )

        species_list = get_species_list_from_tabular_text_reader_data(
            self.tabular_text_reader_data
        )

        mdata = TrajectoryMetadata(
            n_configurations=self.tabular_text_reader_data.n_configs,
            species_list=species_list,
            box_l=box_l,
            sample_rate=sample_rate,
        )

        return mdata

    def _get_species_information(
        self, file, header_property_names: list, n_particles: int
    ):
        """
        Get the information which species are present and which particle ids/ lines in
        the file belong to them.

        Parameters
        ----------
        file: file object
            the open file object to read from
        header_property_names: list
            list of the names of the columns in the file

        """
        header_id_index = header_property_names.index("id")
        #
        # Look for element keyword in trajectory.
        if "element" in header_property_names:
            header_species_index = header_property_names.index("element")
        # Look for type keyword if element is not present.
        elif "type" in header_property_names:
            header_species_index = header_property_names.index("type")
        # Raise an error if no identifying keywords are found.
        else:
            raise ValueError("Insufficient species or type identification available.")

        species_dict = {}
        # skip the header
        mdsuite.file_io.tabular_text_files.skip_n_lines(file, self.n_header_lines)
        # read one configuration
        traj_data = np.stack(
            [np.array(list(file.readline().split())) for _ in range(n_particles)]
        )
        # sort by particle id
        if not self.trajectory_is_sorted_by_ids:
            traj_data = sort_array_by_column(traj_data, header_id_index)
        # iterate over the first configuration, whenever a new species
        # (value at species_index) is encountered, add an entry
        for i, line in enumerate(traj_data):
            species_name = line[header_species_index]
            if species_name not in species_dict.keys():
                species_dict[species_name] = []
            species_dict[species_name].append(i)

        return species_dict

    def _get_sample_rate(self, file, n_particles: int) -> typing.Union[int, None]:
        first_header = mdsuite.file_io.tabular_text_files.read_n_lines(
            file, self.n_header_lines, start_at=0
        )
        time_step_0 = int(first_header[1])  # Time in first configuration
        # catch single snapshot trajectory (second_header == [])
        try:
            second_header = mdsuite.file_io.tabular_text_files.read_n_lines(
                file, self.n_header_lines, start_at=self.n_header_lines + n_particles
            )
        except StopIteration:
            return None

        time_step_1 = int(second_header[1])  # Time in second configuration
        return time_step_1 - time_step_0


def extract_properties_from_header(
    header_property_names: list, database_correspondence_dict: dict
) -> dict:
    """
    Takes the property names from a file header, sees if there is a corresponding
    mdsuite property in database_correspondence_dict.
    Returns a dict that links the mdsuite property names to the column indices at which
    they can be found in the file.

    Parameters
    ----------
    header_property_names
        The names of the columns in the data file
    database_correspondence_dict
        The translation between mdsuite properties and the column names of the
        respective file format.
        lammps example:
        {"Positions": ["x", "y", "z"],
         "Unwrapped_Positions": ["xu", "yu", "zu"],

    Returns
    -------
    trajectory_properties : dict
        A dict of the form
        {'MDSuite_Property_1': [column_indices], 'MDSuite_Property_2': ...}
        Example {'Unwrapped_Positions': [2,3,4], 'Velocities': [5,6,8]}
    """
    column_dict_properties = {
        variable: idx for idx, variable in enumerate(header_property_names)
    }
    # for each property label (position, velocity,etc) in the lammps definition
    for property_label, property_names in database_correspondence_dict.items():
        # for each coordinate for a given property label (position: x, y, z),
        # get idx and the name
        for idx, property_name in enumerate(property_names):
            if (
                property_name in column_dict_properties.keys()
            ):  # if this name (x) is in the input file properties
                # we change the lammps_properties_dict replacing the string of the
                # property name by the column name
                database_correspondence_dict[property_label][idx] = (
                    column_dict_properties[property_name]
                )

    # trajectory_properties only need the labels with the integer columns, then we
    # only copy those
    trajectory_properties = {}
    for property_label, properties_columns in database_correspondence_dict.items():
        if all(
            isinstance(property_column, int) for property_column in properties_columns
        ):
            trajectory_properties[property_label] = properties_columns

    return trajectory_properties