tadashi-aikawa/jumeaux

View on GitHub
jumeaux/domain/config/service.py

Summary

Maintainability
A
1 hr
Test Coverage
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
from typing import List

from owlmixin import TList, TOption
from owlmixin.util import load_yamlf

from jumeaux.domain.config.vo import Config, MergedArgs
from jumeaux.models import Report


def apply_include(addon: dict, config_path: str) -> dict:
    return (
        load_yamlf(os.path.join(os.path.dirname(config_path), addon["include"]), "utf8")
        if "include" in addon
        else addon
    )


def apply_include_addons(addons: dict, config_path: str) -> dict:
    def apply_includes(layer_name: str):
        return [apply_include(a, config_path) for a in addons.get(layer_name, [])]

    return {
        k: v
        for k, v in {
            "log2reqs": apply_include(addons["log2reqs"], config_path)
            if "log2reqs" in addons
            else None,
            "reqs2reqs": apply_includes("reqs2reqs"),
            "res2res": apply_includes("res2res"),
            "res2dict": apply_includes("res2dict"),
            "judgement": apply_includes("judgement"),
            "store_criterion": apply_includes("store_criterion"),
            "dump": apply_includes("dump"),
            "did_challenge": apply_includes("did_challenge"),
            "final": apply_includes("final"),
        }.items()
        if v
    }


def create_config(config_paths: TList[str], skip_tags: TOption[TList[str]]) -> Config:
    def filter_by_tags(addons: List[dict]) -> List[dict]:
        return [
            x
            for x in addons
            if skip_tags.map(lambda y: not y.intersection(x.get("tags", []))).get_or(
                True
            )
        ]

    def reducer(merged: dict, config_path: str) -> dict:
        d = load_yamlf(config_path, "utf8")
        if "addons" in d:
            addons_by_key: dict = d["addons"]
            d["addons"] = {
                k: v
                for k, v in {
                    "log2reqs": addons_by_key.get("log2reqs"),
                    "reqs2reqs": filter_by_tags(addons_by_key.get("reqs2reqs", [])),
                    "res2res": filter_by_tags(addons_by_key.get("res2res", [])),
                    "res2dict": filter_by_tags(addons_by_key.get("res2dict", [])),
                    "judgement": filter_by_tags(addons_by_key.get("judgement", [])),
                    "store_criterion": filter_by_tags(
                        addons_by_key.get("store_criterion", [])
                    ),
                    "dump": filter_by_tags(addons_by_key.get("dump", [])),
                    "did_challenge": filter_by_tags(
                        addons_by_key.get("did_challenge", [])
                    ),
                    "final": filter_by_tags(addons_by_key.get("final", [])),
                }.items()
                if v
            }
            if "addons" in merged:
                merged["addons"].update(d["addons"])
                del d["addons"]

        merged.update(d)

        if "addons" in merged:
            merged["addons"].update(apply_include_addons(merged["addons"], config_path))

        return merged

    return Config.from_dict(config_paths.reduce(reducer, {}))


def create_config_from_report(report: Report) -> Config:
    return Config.from_dict(
        {
            "one": report.summary.one.to_dict(),
            "other": report.summary.other.to_dict(),
            "output": report.summary.output.to_dict(),
            "threads": 1,
            "title": report.title,
            "description": report.description,
            "notifiers": report.notifiers,
            "addons": report.addons.get().to_dict(),
        }
    )


def merge_args2config(args: MergedArgs, config: Config) -> Config:
    return Config.from_dict(
        {
            "one": config.one,
            "other": config.other,
            "output": config.output,
            "threads": args.threads.get_or(config.threads),
            "processes": args.processes if args.processes.get() else config.processes,
            "max_retries": args.max_retries.get()
            if args.max_retries.get() is not None
            else config.max_retries,
            "title": args.title if args.title.get() else config.title,
            "description": args.description
            if args.description.get()
            else config.description,
            "tags": args.tag if args.tag.any() else config.tags,
            "input_files": args.files if args.files.any() else config.input_files,
            "notifiers": config.notifiers,
            "addons": config.addons,
            "judge_response_header": config.judge_response_header,
            "ignore_response_header_keys": config.ignore_response_header_keys,
        }
    )