susantabiswas/FaceRecog

View on GitHub
face_recog/json_persistent_storage.py

Summary

Maintainability
A
35 mins
Test Coverage
# ---- coding: utf-8 ----
# ===================================================
# Author: Susanta Biswas
# ===================================================
"""Description: Class to handle persistent data storage.
This uses simple JSON file to save data.

Usage: python -m face_recog.json_persistent_storage
"""
# ===================================================
import json
import os
import sys
from typing import Dict, List

from face_recog.exceptions import DatabaseFileNotFound
from face_recog.logger import LoggerFactory
from face_recog.persistent_storage import PersistentStorage
from face_recog.validators import path_exists

# Load the custom logger
logger = None
try:
    logger_ob = LoggerFactory(logger_name=__name__)
    logger = logger_ob.get_logger()
    logger.info("{} loaded...".format(__name__))
    # set exception hook for uncaught exceptions
    sys.excepthook = logger_ob.uncaught_exception_hook
except Exception as exc:
    raise exc


class JSONStorage(PersistentStorage):
    """Class to handle persistent data storage.
    This uses simple JSON file to save data.

    Args:
        PersistentStorage ([type]): [description]
    """

    def __init__(self, db_loc: str = "./data/facial_data_db.json"):
        """Constructor

        Args:
            db_loc (str, optional): Path to save the persistent DB. Defaults to './data/facial_data_db.json'.
        """
        self.db_loc = db_loc

    def add_data(self, face_data: Dict):
        """Adds facial data to persistent DB.

        Args:
            face_data (Dict): [description]
        """
        data = []
        # check if the db exists, otherwise create one
        base_path, filename = os.path.split(self.db_loc)

        if not path_exists(base_path):
            logger.info("DB path doesn't exist! Attempting path creation...")
            os.makedirs(base_path)
        if os.path.exists(self.db_loc):
            # load the existing data
            data = self.get_all_data()
        try:
            # Add the new data and save to disk
            data.append(face_data)
            self.save_data(data=data)
            logger.info("Data saved to DB...")
        except Exception as exc:
            raise exc

    def get_all_data(self) -> List:
        """Returns a list of facial data of all the
        registered users from cache.

        Returns:
            List[Dict]: [description]
        """
        # Data load will fail incase the file doesn't exist
        if not path_exists(self.db_loc):
            raise DatabaseFileNotFound
        try:
            # load the existing data
            with open(self.db_loc, "r") as f:
                data = json.load(f)
                # convert the list to tuple to keep
                # consistency across
                return self.sanitize_data(data)
        except Exception as exc:
            raise exc

    def delete_data(self, face_id: str) -> bool:
        """Deletes the facial record that match the facial
        Id from persistent DB.

        Args:
            face_id (str): Identifier used for searching

        Returns:
            Deletion status: [description]
        """
        # load and search if face id exists and
        # save the data without that entry
        all_data = self.get_all_data()
        num_entries = len(all_data)
        for idx, face_data in enumerate(all_data):
            for key_pair in face_data.items():
                # Check if the face id exists in current data item
                if face_id in key_pair:
                    all_data.pop(idx)

        if num_entries != len(all_data):
            self.save_data(data=all_data)
            logger.info(
                ("{} face(s) deleted and updated" " data saved to DB...").format(
                    num_entries - len(all_data)
                )
            )
            return True
        return False

    def save_data(self, data: Dict = None):
        """Saves data in the persistent DB.

        Args:
            data (Dict, optional): Facial Data. Defaults to None.
        """
        if data is not None:
            with open(self.db_loc, "w") as f:
                json.dump(data, f)

    def sanitize_data(self, data: List[Dict]) -> List[Dict]:
        """Converts data types so that the data types
        are consistent across the system.
        Facial encodings are initially in numpy format,
        which is then changed to a tuple so that it can be saved to
        cache, this method ensures that the data that is loaded
        back from JSON file also has tuple for the encoding instead of a list.

        Args:
            data (Dict): Facial Data

        Returns:
            [type]: [description]
        """
        for face_data in data:
            face_data["encoding"] = tuple(face_data["encoding"])
        return data


if __name__ == "__main__":
    """ Sanity checks """

    # ob = JSONStorage(db_loc="data/test_facial_data.json")
    # # Save data
    # face_data1 = {"name": "test1", "encoding": (-3.4, 0.3, -0.823, 1)}
    # face_data2 = {"name": "test2", "encoding": (-3.4, 0.3, -0.823, 1)}
    # face_data3 = {"name": "test3", "encoding": (-3.4, 0.3, -0.823, 1)}

    # ob.add_data(face_data=face_data1)
    # print(ob.get_all_data())

    # ob.add_data(face_data=face_data2)
    # print(ob.get_all_data())

    # # remove data
    # ob.delete_data(face_id="test1")
    # print("Test1 data deleted")
    # print(ob.get_all_data())

    # # remove the test file
    # os.remove("data/test_facial_data.json")
    # print("File: {} deleted!".format("data/test_facial_data.json"))
    pass