deeplime-io/onecode

View on GitHub
onecode/cli/skeleton/main.py

Summary

Maintainability
A
1 hr
Test Coverage
# SPDX-FileCopyrightText: 2023-2024 DeepLime <contact@deeplime.io>
# SPDX-License-Identifier: MIT

#####################################
#    !! DO NOT EDIT THIS FILE !!    #
#####################################

import argparse
import json
import logging
import os
from importlib import import_module
from typing import Dict, List

from onecode import (
    ConfigOption,
    Env,
    Logger,
    Mode,
    Project,
    register_ext_module
)


def main(
    data: Dict = None,
    flow_name: str = None,
    logger: logging.Handler = None
):
    """
    Starts the OneCode project with the given options.

    Args:
        data: Initialize the Project with data before execution (typically
            `Mode.LOAD_THEN_EXECUTE`).
        flow_name: Execute only the flow specified by its ID. If None provided, all flows will run.
        logger: Add a logging handler to the OneCode Logger.

    Raises:
        FileNotFoundError: if the OneCode project configuration file is not found.

    """
    cur_dir = os.path.dirname(__file__)
    config_file = os.path.join(cur_dir, Env.ONECODE_CONFIG_FILE)

    if not os.path.exists(config_file):
        raise FileNotFoundError(
            """
            Missing OneCode config files, it looks like someone tampered with the wrong file...
            You may recreate the files according to the documentation specs.
            """
        )

    # register elements from OneCode inline extensions if any
    globals()['onecode_ext'] = register_ext_module()

    Project().data = data
    Logger().reset()
    Logger().add_handler(logger)

    with open(config_file) as f:
        workflows = json.load(f)

    all_manifests = []
    for wfl in workflows:
        flow_file = wfl['file']
        if flow_name is None or flow_name == flow_file:
            if not os.path.exists(os.path.join(cur_dir, 'flows', f'{flow_file}.py')):
                print(f"Registered flow {wfl['label']} ({flow_file}.py) doesn't exist => skipping")
            else:
                Project().current_flow = flow_file

                # clear any previous MANIFEST.txt output
                manifest = Project().get_output_manifest()
                if os.path.exists(manifest):
                    os.remove(manifest)

                flow = import_module(f"flows.{flow_file}")
                flow.run()
                all_manifests.append(manifest)

    return all_manifests[0] if len(all_manifests) == 1 else all_manifests


def _main(raw_args: List[str] = None):
    parser = argparse.ArgumentParser(description='Use optional JSON parameters file')
    parser.add_argument('--flow', default=None, help='Specify the flow to run')
    parser.add_argument('--flush', action="store_true", help='Flush the logs immediately')
    parser.add_argument('file', nargs='?', help='Path to the input JSON file')

    args = parser.parse_args(raw_args)

    data = None
    if args.file is not None:
        if not os.path.exists(args.file):
            raise FileNotFoundError(f'Input parameters file {args.file} does not exist')

        print(f'Using provided parameter file: {args.file}')
        with open(args.file, 'r') as f:
            data = json.load(f)

        Project().mode = Mode.LOAD_THEN_EXECUTE
    else:
        Project().mode = Mode.EXECUTE

    if args.flush:
        Project().set_config(ConfigOption.FLUSH_STDOUT, True)

    main(data, args.flow)


if __name__ == '__main__':
    _main()