IBM/pytorchpipe

View on GitHub
ptp/components/publishers/stream_file_exporter.py

Summary

Maintainability
A
2 hrs
Test Coverage
# -*- coding: utf-8 -*-
#
# Copyright (C) tkornuta, IBM Corporation 2019
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

__author__ = "Tomasz Kornuta"

from os import path

from ptp.configuration.config_parsing import get_value_list_from_dictionary
from ptp.components.component import Component
from ptp.data_types.data_definition import DataDefinition


class StreamFileExporter(Component):
    """
    Utility for exporting contents of streams of a given batch to file.
    """

    def __init__(self, name, config):
        """
        Initializes the object, retrieves names of input streams and creates the output file in experiment directory.

        :param name: Name of the component.
        :type name: str

        :param config: Dictionary of parameters (read from the configuration ``.yaml`` file).
        :type config: :py:class:`ptp.configuration.ConfigInterface`

        """
        # Call constructors of parent classes.
        Component.__init__(self, name, StreamFileExporter, config)

        # Get key mappings for indices.
        self.key_indices = self.stream_keys["indices"]

        # Load list of streams names (keys).
        self.input_stream_keys = get_value_list_from_dictionary("input_streams", self.config)
        
        # Get separator.
        self.separator = self.config["separator"]

        # Create file where we will write the results.
        filename = self.config["filename"]
        abs_filename = path.join(self.app_state.log_dir, filename)
        self.file = open(abs_filename, 'w')

        # Export additional line with separator.
        if self.config["export_separator_line_to_csv"]:
            self.file.write("sep={}\n".format(self.separator))

        # Export header - once, when we will process the first batch.
        self.export_header = self.config["export_header_to_csv"]
        

        self.logger.info("Writing values from {} streams to {}".format(self.input_stream_keys, abs_filename))


    def input_data_definitions(self):
        """ 
        Function returns a dictionary with definitions of input data that are required by the component.

        :return: dictionary containing input data definitions (each of type :py:class:`ptp.data_types.DataDefinition`).
        """
        return {
            self.key_indices: DataDefinition([-1, 1], [list, int], "Batch of sample indices [BATCH_SIZE] x [1]"),
            }

    def output_data_definitions(self):
        """ 
        Function returns a dictionary with definitions of output data produced the component.

        :return: dictionary containing output data definitions (each of type :py:class:`ptp.data_types.DataDefinition`).
        """
        return {
            }

    def __call__(self, data_streams):
        """
        Exports values from the indicated streams to file.
        :param data_streams: :py:class:`ptp.utils.DataStreams` object containing "indices" and other streams that will be exported to file.
        """
        # Get batch size.
        indices = data_streams[self.key_indices]
        batch_size = len(indices)

        # Check present streams.
        absent_streams = []
        present_streams = []
        for stream_key in self.input_stream_keys:
            if stream_key in data_streams.keys():
                present_streams.append(stream_key)
            else:
                absent_streams.append(stream_key)

        # Export header - only once.
        if self.export_header:
            header = ''
            for stream_key in self.input_stream_keys:
                if stream_key in present_streams:
                    header = header + stream_key + self.separator
            # Remove the last separator.
            header = header[:-1] + '\n'
            # Write header to file.
            self.file.write(header)
            # Do it only once.
            self.export_header = False

        # Export values to file.
        for i in range(batch_size):
            val_str = ''
            for stream_key in self.input_stream_keys:
                if stream_key in present_streams:
                    value = data_streams[stream_key][i]
                    # Add value changed to string along with separator.
                    val_str = val_str + '{}'.format(value) + self.separator
            # Remove the last separator.
            val_str = val_str[:-1] + '\n'
            # Write it to file.
            self.file.write(val_str)

        # Log values and inform about missing streams.
        if len(absent_streams) > 0:
            self.logger.warning("Could not export the following (absent) streams: {}".format(absent_streams))