christoph2/pyA2L

View on GitHub
pya2l/__init__.py

Summary

Maintainability
A
50 mins
Test Coverage
#!/usr/bin/env python
# -*- coding: utf-8 -*-

__copyright__ = """
   pySART - Simplified AUTOSAR-Toolkit for Python.

   (C) 2010-2022 by Christoph Schueler <cpu12.gems.googlemail.com>

   All Rights Reserved

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License along
   with this program; if not, write to the Free Software Foundation, Inc.,
   51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

   s. FLOSS-EXCEPTION.txt
"""
__author__ = "Christoph Schueler"
__version__ = "0.10.2"


import pickle
import pkgutil
import sys
from time import perf_counter
from pathlib import Path
import warnings


import pya2l.model as model
from pya2l.logger import Logger
from pya2l.templates import doTemplateFromText
from pya2l.utils import detect_encoding


class InvalidA2LDatabase(Exception):
    """"""

    pass


class DB(object):
    """"""

    A2L_TEMPLATE = pkgutil.get_data("pya2l.cgen.templates", "a2l.tmpl")

    def import_a2l(
        self,
        file_name,
        debug=False,
        in_memory=False,
        remove_existing=False,
        local=False,
        encoding=None,
        loglevel="INFO",
    ):
        """Import `.a2l` file to `.a2ldb` database.


        Parameters
        ----------
        file_name: str
            Name of the A2L to be imported. If you don't specify an extension ``.a2l`` is added.

        debug: bool
            Additional debugging output.

        in_memory: bool
            Create non-persistent in-memory database.

        remove_existing: bool
            ** DANGER ZONE **: Remove existing database.

        local: bool
            If `True` create A2LDB in current working directory else in A2L source directory.

        encoding: str
            File encoding like "latin-1" or "utf-8" or None to auto-detect.

        loglevel: str
            "INFO" | "WARN" | "DEBUG" | "ERROR" | "CRITICAL"

        Returns
        -------
        SQLAlchemy session object.

        Raises
        ------
        OSError
            If database already exists.

        Note
        ----
        ``AML`` and ``IF_DATA`` sections are currently not processed.
        """
        from os import unlink

        from pya2l.a2l_listener import A2LListener
        from pya2l import parsers
        from pya2l.preprocessor import Preprocessor
        from pya2l.aml.db import Importer

        start_time = perf_counter()
        self.logger = Logger(self.__class__.__name__, loglevel)
        self.in_memory = in_memory

        self._set_path_components(file_name, local)
        if not in_memory:
            if remove_existing:
                try:
                    unlink(str(self._dbfn))
                except Exception:
                    pass
            elif self._dbfn.exists():
                raise OSError("file '{}' already exists.".format(self._dbfn))
        prepro = Preprocessor(loglevel=loglevel)

        encoding = encoding or detect_encoding(file_name=self._a2lfn)
        prepro_result = prepro.process(self._a2lfn, encoding=encoding)
        a2l_parser = parsers.a2l(debug=debug, prepro_result=prepro_result)
        self.logger.info("Parsing pre-processed data ...")
        self.db, listener_result = a2l_parser.parseFromString(prepro_result.a2l_data, dbname=str(self._dbfn), encoding=encoding)
        self.session = self.db.session
        aml_section = prepro_result.aml_section
        if aml_section:
            self.logger.info("Parsing AML section ...")
            aml_parser = parsers.aml(prepro_result=prepro_result)
            aml_result = aml_parser.parseFromString(aml_section, encoding=encoding).listener_result
            aml_parsed = pickle.dumps(aml_result)
            self.session.add(model.AMLSection(text=aml_section, parsed=aml_parsed))
            for item in self.session.query(model.IfData).all():
                ip = parsers.if_data(aml_result)
                parsed_if_data = pickle.dumps(ip.parse(item.raw))
                item.parsed = parsed_if_data
                self.session.add(item)
            self.session.commit()
        self.logger.info("Done [elapsed time {:.2f}s].".format(perf_counter() - start_time))
        return self.session

    def export_a2l(self, file_name=sys.stdout, encoding="utf-8"):
        """"""
        namespace = dict(session=self.db.session, model=model)
        data = doTemplateFromText(self.A2L_TEMPLATE, namespace, formatExceptions=False, encoding=encoding)
        result = []
        for line in data.splitlines():
            line = line.rstrip()
            if not line:
                continue
            else:
                result.append(line)
        result = "\n".join(result)
        print(result)
        # with io.open("{}.render".format(file_name), "w", encoding = encoding, newline = "\r\n") as outf:
        #    outf.write(res)

    def open_create(self, file_name, local=False, encoding=None, loglevel="INFO"):
        """Open or create an A2LDB."""
        self.in_memory = False
        self._set_path_components(file_name)
        if not self._dbfn.exists():
            return self.import_a2l(self._a2lfn, local=local, encoding=encoding, loglevel=loglevel)
        else:
            return self.open_existing(self._dbfn)

    def open_existing(self, file_name):
        """Open an existing `.a2ldb` database.

        Parameters
        ----------
        file_name: str
            Name of your database file, resulting from :meth:`import_a2l`.
            Extension `.a2ldb` not needed.

        Returns
        -------
        SQLAlchemy session object.

        Raises
        ------
        OSError
            If database already exists.
        """
        self.in_memory = False
        self._set_path_components(file_name)
        if not self._dbfn.exists():
            raise OSError("file '{}' does not exists.".format(self._dbfn))
        else:
            self.db = model.A2LDatabase(str(self._dbfn))
            self.session = self.db.session
            res = self.session.query(model.MetaData).first()
            if res:
                return self.session
            else:
                raise InvalidA2LDatabase("Database seems to be corrupted. No meta-data found.")

    def close(self):
        self.db.close()

    def _set_path_components(self, file_name, local=False):
        """"""
        if hasattr(self, "_dbfn"):
            return
        file_path = Path(file_name)
        if self.in_memory:
            self._dbfn = ":memory:"
        else:
            if local:
                self._dbfn = Path(file_path.stem).with_suffix(".a2ldb")
            else:
                self._dbfn = (file_path.parent / file_path.stem).with_suffix(".a2ldb")
        if not file_path.suffix:
            self._a2lfn = (file_path.parent / file_path.stem).with_suffix(".a2l")
        else:
            self._a2lfn = (file_path.parent / file_path.stem).with_suffix(file_path.suffix)