hongbo-miao/hongbomiao.com

View on GitHub
embedded/decode-can-data/src/main.py

Summary

Maintainability
A
0 mins
Test Coverage
import logging
import time
from decimal import Decimal
from pathlib import Path

import can
import cantools
import pandas as pd


def load_dbc_dict(dbc_path_dict: dict[str, Path]) -> dict[str, cantools.db.Database]:
    return {
        unit_type: cantools.db.load_file(dbc_path)
        for unit_type, dbc_path in dbc_path_dict.items()
    }


def decode_can_data(
    blf_path: Path,
    dbc_dict: dict[str, cantools.db.Database],
    unit_dict: dict[str, dict[str, str]],
) -> dict[str, list[dict[str, bool | float | int | str]]]:
    blf_size_bytes = blf_path.stat().st_size
    unit_type_and_unit_id_dict: dict[str, list[dict[str, str | int | float]]] = {}
    message_count = 0
    with can.BLFReader(blf_path) as blf_reader:
        for frame in blf_reader:
            current_bytes = blf_reader.file.tell()
            unit = unit_dict[str(frame.channel)]
            message_definition = dbc_dict[unit["type"]].get_message_by_frame_id(
                frame.arbitration_id
            )
            message = message_definition.decode(frame.data)
            # Convert cantools.database.can.signal.NamedSignalValue to str
            message = {
                signal_name: (
                    str(signal_value)
                    if isinstance(
                        signal_value,
                        cantools.database.can.signal.NamedSignalValue,
                    )
                    else signal_value
                )
                for signal_name, signal_value in message.items()
            }
            # Add message_definition.name as prefix
            message = {
                f"{message_definition.name}.{signal_name}": signal_value
                for signal_name, signal_value in message.items()
            }
            message.update(
                {
                    "arbitration_id": frame.arbitration_id,
                    "channel": frame.channel,
                    "dlc": frame.dlc,
                    "is_extended_id": frame.is_extended_id,
                    "timestamp": frame.timestamp,
                    # Avoid `int(frame.timestamp * 1e9` as it leads to time drift due to floating-point arithmetic
                    "_time": int(Decimal(str(frame.timestamp)) * Decimal("1e9")),
                    "_can_id": str(frame.arbitration_id),
                    "_can_logger_channel_id": str(frame.channel),
                    "_unit_id": unit["id"],
                }
            )
            unit_type_and_unit_id = f"{unit['type']}_{unit['id']}"
            unit_type_and_unit_id_dict.setdefault(unit_type_and_unit_id, []).append(
                message
            )
            if message_count % 1000000 == 0:
                logging.info(
                    f"Decoded: {round(current_bytes * 100.0 / blf_size_bytes)} %, {message_count = }"
                )
            message_count += 1
    logging.info(f"Decoded: 100 %, {message_count = }")
    return unit_type_and_unit_id_dict


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)

    data_dir_path = Path("data")
    blf_path = data_dir_path / Path("can.blf")
    dbc_path_dict = {
        "bms": data_dir_path / Path("bms.dbc"),
        "eec": data_dir_path / Path("eec.dbc"),
    }
    unit_dict = {
        "0": {"can_logger_channel_id": "0", "type": "bms", "id": "1"},
        "1": {"can_logger_channel_id": "1", "type": "bms", "id": "2"},
        "2": {"can_logger_channel_id": "2", "type": "eec", "id": "1"},
        "3": {"can_logger_channel_id": "3", "type": "eec", "id": "2"},
        "4": {"can_logger_channel_id": "4", "type": "eec", "id": "3"},
        "5": {"can_logger_channel_id": "5", "type": "eec", "id": "4"},
    }

    start_time = time.time()
    dbc_dict = load_dbc_dict(dbc_path_dict)
    unit_type_and_unit_id_dict = decode_can_data(blf_path, dbc_dict, unit_dict)
    logging.info(f"Decoding time: {time.time() - start_time} seconds")

    for unit_type_and_unit_id, data in unit_type_and_unit_id_dict.items():
        parquet_path = data_dir_path / Path(f"{unit_type_and_unit_id}.parquet")
        df = pd.DataFrame(data)
        df.to_parquet(parquet_path, engine="pyarrow")