tadashi-aikawa/jumeaux

View on GitHub
jumeaux/executor.py

Summary

Maintainability
D
2 days
Test Coverage
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import hashlib
import io
import os
import re
import sys
import urllib.parse as urlparser
from concurrent import futures
from typing import Any, List, Optional, Tuple

import requests
from deepdiff import DeepDiff
from owlmixin import TDict, TList, TOption
from requests.adapters import HTTPAdapter
from requests.exceptions import ConnectionError

# PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
# sys.path.append(PROJECT_ROOT)
# sys.path.append(os.getcwd())
from jumeaux import __version__
from jumeaux.addons import AddOnExecutor
from jumeaux.domain.config.service import (
    create_config,
    create_config_from_report,
    merge_args2config,
)
from jumeaux.domain.config.vo import Config, MergedArgs

# XXX: ...
from jumeaux.logger import Logger
from jumeaux.models import (
    ChallengeArg,
    Concurrency,
    DictOrList,
    DidChallengeAddOnPayload,
    DidChallengeAddOnReference,
    DiffKeys,
    DumpAddOnPayload,
    FinalAddOnPayload,
    FinalAddOnReference,
    HttpMethod,
    JudgementAddOnPayload,
    JudgementAddOnReference,
    Log2ReqsAddOnPayload,
    Proxy,
    QueryCustomization,
    Report,
    Reqs2ReqsAddOnPayload,
    Request,
    Res2DictAddOnPayload,
    Res2ResAddOnPayload,
    Response,
    Status,
    StoreCriterionAddOnPayload,
    StoreCriterionAddOnReference,
    Summary,
    Trial,
    to_json,
)
from jumeaux.utils import mill_seconds_until, now, parse_datetime_dsl, to_jumeaux_xpath

logger: Logger = Logger(__name__)
global_addon_executor: AddOnExecutor

START_JUMEAUX_AA = r"""
        ____  _             _         _
__/\__ / ___|| |_ __ _ _ __| |_      | |_   _ _ __ ___   ___  __ _ _   ___  __ __/\__
\    / \___ \| __/ _` | '__| __|  _  | | | | | '_ ` _ \ / _ \/ _` | | | \ \/ / \    /
/_  _\  ___) | || (_| | |  | |_  | |_| | |_| | | | | | |  __/ (_| | |_| |>  <  /_  _\
  \/   |____/ \__\__,_|_|   \__|  \___/ \__,_|_| |_| |_|\___|\__,_|\__,_/_/\_\   \/


                    ..JgHggggggggHm&..   ...gWggggggggggHa-..
                .(WgggggggggggggggggggNNNMgggggggggggggggggggHa,
             .(ggggggggggggggggggggMMMMMMMMNggggggggggggggggggggga,
           .dggggggggggggggggggggHMMMMMMMMMMMNgggggggggggggggggggggN,
        ..ggggggggggggggggggggg@NMMMMMMMMMMMMMNHgggggggggggggggggggggHa.   `
      .JgggggggggggggggggggggggNMMMMMMMMMMMMMMMMHgggggggggggggggggggggggn.
     .ggggggggggggggggggggggggNMMMMMMMMMMMMMMMMMMggggggggggggggggggggggggH,
   .dgggggggggggggggggggggggg@MMMMMMMMMMMMMMMMMMMNgggggggggggggggggggggggggh.
  .ggggggggggggggggggggggggggMMMMMMMMMMMMMMMMMMMMMggggggggggggggggggggggggggH,
 .gggggggggggggggggggggggggggMMMMMMMMMMMMMMMMMMMMMNg@ggggggggggggggggggggggggH,
 dggggggggggggggggggggggggg@gMMMMMMMMMMMMMMMMMMMMMNgggggggggggggggggggggggggggL
 WgggggggggggggggggggggggggggMMMMMMMMMMMMMMMMMMMMMMgggggggggggggggggggggggggggP
 (gggggggggggggggggggggggggggMMMMMMMMMMMMMMMMMMMMMMggggggggggggggggggggggggggH'
  ,UggggggggggggggggggggggggggMMMMMMMMMMMMMMMMMMMMgggggggggggggggggggggggggHY`
     ?"YHgHHBHHgggggggggggggggMMMMMMMMMMMMMMMMMMMNggggggggggggggHWYWHgHY""(
    JH.      _?!~-?"4HggggggggMMMMMMMMMMMMMMMMMMMHgggggggg#"=~`~!`       .gL
   .ggh               (HgggggggMMMMMMMMMMMMMMMMMNgggggggf`              .Hgg,
   JggH                ,HYJggggMMMMMMMMMMMMMMMMMMgggg]7g|               .ggg]
   dggF               ."` HggggHMMMMMMMMMMMMMMMMgggggb  7.               Ogg]
   JgK                   .ggggggMMMMMMMMMMMMMMMNgggggH                    Hg%
   ,gHJ..                .ggggggMMMMMMMMMMMMMMMNgggggH                 ..(gg:
    ggggg\                ggggggMMMMMMMMMMMMMMMMgggggK                (ggggK
    zgggH,                4@ggg@MMMMMMMMMMMMMMMNgggggF             Ta..gggg%
    .ggggH,               ,gggggMMMMMMMMMMMMMMMNggggg!               (ggggH
     jWggggL        ..,    WgggMMMMMMMMMMMMMMMMMNgggP    .J,       .dggggD%
     ./Hggggh.   ..WggH.   .HNMMMMMMMMMMMMMMMMMMMNH@    .gggH&, ` .Wgggg@,
      G(gggggg@ggggggggh..+udMMMMMMMMMMMMMMMMMMMMMMmZ>-.Hggggggggggggggg%]
      ,(ggggggggggggggggMggggHMMMMMMMMMMMMMMMMMMMMgggggMgggggggggggggggg({
      .wggggggggggggggggg,7HgggMMMMMMMMMMMMMMMMMMgggHY(gggggggggggggggggJ`
      .Wggggggggggggggggg]    ???jMMMMMMMMMMM#?777!   JgggggggggggggggggW<
     .XggggggggggggggggggP      _?""Y"'"HHBYY""!      Wggggggggggggggggggh
    .JgggggggggggggggggggP                            WggggggggggggggggggHJ.
...dggggggggggggggggggggg]                            q@ggggggggggggggggggggm-..
  _7""Y""!.gggggggggggggH`                            .gggggggggggggg;_""Y""'!
         .WggggggggggggH^                              (gggggggggggggh
        .dggggggggggggB!                                ,Hggggggggggggh
       .dggggggggggg#=                                    ?Hgggggggggggh.
      .gggggggggg#"!                                        -THgggggggggH,
    .dgggggHYY"!                                                ?"YWHgggggh,

"""

CONFIG_AA = r"""
         ____             __ _
__/\__  / ___|___  _ __  / _(_) __ _  __/\__
\    / | |   / _ \| '_ \| |_| |/ _` | \    /
/_  _\ | |__| (_) | | | |  _| | (_| | /_  _\\
  \/    \____\___/|_| |_|_| |_|\__, |   \/
                               |___/
"""


def write_to_file(name, dir, body):
    with open(f"{dir}/{name}", "wb") as f:
        f.write(body)


def make_dir(path):
    os.makedirs(path)
    os.chmod(path, 0o777)


def http_get(args: Tuple[Any, str, TDict[str], TOption[Proxy]]):
    session, url, headers, proxies = args
    try:
        r = session.get(
            url, headers=headers, proxies=proxies.map(lambda x: x.to_dict()).get_or({})
        )
    finally:
        session.close()
    return r


def http_post(
    args: Tuple[
        Any, str, TOption[str], TOption[dict], TOption[dict], TDict[str], TOption[Proxy]
    ],
):
    session, url, raw, form, json_, headers, proxies = args
    try:
        r = session.post(
            url,
            data=raw.get() or form.get(),
            json=json_.get(),
            headers=headers,
            proxies=proxies.map(lambda x: x.to_dict()).get_or({}),
        )
    finally:
        session.close()
    return r


def merge_headers(
    access_point_base: TDict[str], this_request: TDict[str]
) -> TDict[str]:
    return (
        TDict({"User-Agent": f"jumeaux/{__version__}"})
        .assign(access_point_base)
        .assign(this_request)
    )


def has_different(
    one_headers: dict, other_headers: dict, ignore_keys: List[str]
) -> bool:
    return TDict(one_headers).omit_by(lambda k, v: k in ignore_keys) == TDict(
        other_headers
    ).omit_by(lambda k, v: k in ignore_keys)


def concurrent_request(
    session,
    *,
    headers: TDict[str],
    method: HttpMethod,
    raw: TOption[str],
    form: TOption[dict],
    json_: TOption[dict],
    url_one: str,
    url_other: str,
    headers_one: TDict[str],
    headers_other: TDict[str],
    proxies_one: TOption[Proxy],
    proxies_other: TOption[Proxy],
):
    merged_header_one: TDict[str] = merge_headers(headers_one, headers)
    merged_header_other: TDict[str] = merge_headers(headers_other, headers)
    logger.debug(f"One   Request headers: {merged_header_one}")
    logger.debug(f"Other Request headers: {merged_header_other}")

    with futures.ThreadPoolExecutor(max_workers=2) as ex:
        if method is HttpMethod.GET:
            res_one, res_other = ex.map(
                http_get,
                (
                    (session, url_one, merged_header_one, proxies_one),
                    (session, url_other, merged_header_other, proxies_other),
                ),
            )
        elif method is HttpMethod.POST:
            res_one, res_other = ex.map(
                http_post,
                (
                    (
                        session,
                        url_one,
                        raw,
                        form,
                        json_,
                        merged_header_one,
                        proxies_one,
                    ),
                    (
                        session,
                        url_other,
                        raw,
                        form,
                        json_,
                        merged_header_other,
                        proxies_other,
                    ),
                ),
            )
        else:
            # Unreachable
            raise RuntimeError

    return res_one, res_other


def res2res(res: Response, req: Request) -> Res2ResAddOnPayload:
    return global_addon_executor.apply_res2res(
        Res2ResAddOnPayload.from_dict({"response": res, "req": req, "tags": []})
    )


def res2dict(res: Response) -> TOption[dict]:
    return global_addon_executor.apply_res2dict(
        Res2DictAddOnPayload.from_dict({"response": res, "result": None})
    ).result


def judgement(
    r_one: Response,
    r_other: Response,
    d_one: TOption[DictOrList],
    d_other: TOption[DictOrList],
    name: str,
    path: str,
    qs: TDict[TList[str]],
    headers: TDict[str],
    diffs_by_cognition: Optional[TDict[DiffKeys]],
    judge_response_header: bool,
    ignore_response_header_keys: TList[str],
) -> Tuple[Status, TOption[TDict[DiffKeys]]]:
    result: JudgementAddOnPayload = global_addon_executor.apply_judgement(
        JudgementAddOnPayload.from_dict(
            {
                "diffs_by_cognition": diffs_by_cognition
                and diffs_by_cognition.omit_by(lambda k, v: v.is_empty()),
                "regard_as_same_body": r_one.body == r_other.body
                if diffs_by_cognition is None
                else diffs_by_cognition["unknown"].is_empty(),
                "regard_as_same_header": has_different(
                    r_one.headers, r_other.headers, ignore_response_header_keys
                )
                if judge_response_header
                else True,
            }
        ),
        JudgementAddOnReference.from_dict(
            {
                "name": name,
                "path": path,
                "qs": qs,
                "headers": headers,
                "dict_one": d_one,
                "dict_other": d_other,
                "res_one": r_one,
                "res_other": r_other,
            }
        ),
    )

    status: Status = Status.SAME if result.regard_as_same else Status.DIFFERENT  # type: ignore # Prevent for enum problem

    return status, result.diffs_by_cognition


def store_criterion(
    status: Status, name: str, req: Request, r_one: Response, r_other: Response
):
    return global_addon_executor.apply_store_criterion(
        StoreCriterionAddOnPayload.from_dict({"stored": False}),
        StoreCriterionAddOnReference.from_dict(
            {
                "status": status,
                "req": {
                    "name": name,
                    "path": req.path,
                    "qs": req.qs,
                    "headers": req.headers,
                },
                "res_one": r_one,
                "res_other": r_other,
            }
        ),
    ).stored


def dump(res: Response):
    return global_addon_executor.apply_dump(
        DumpAddOnPayload.from_dict(
            {"response": res, "body": res.body, "encoding": res.encoding}
        )
    ).body


def to_sec(elapsed):
    return round(elapsed.seconds + elapsed.microseconds / 1000000, 2)


def select_key_as_case_insensitive(
    target_key_pattern: str, qs: TDict[TList[str]]
) -> str:
    case_insensitive: bool = target_key_pattern.endswith("/i")
    target_key = target_key_pattern[:-2] if case_insensitive else target_key_pattern

    def matcher(x):
        return x.lower() == target_key.lower() if case_insensitive else x == target_key

    return TList(qs.keys()).find(matcher).get_or(target_key)


def create_query_string(
    qs: TDict[TList[str]], cqs: TOption[QueryCustomization], encoding: str
) -> str:
    if cqs.is_none():
        return urlparser.urlencode(qs, doseq=True, encoding=encoding)

    overwritten = qs.assign(
        {
            select_key_as_case_insensitive(k, qs): [parse_datetime_dsl(v) for v in vs]
            for k, vs in cqs.get().overwrite.get_or(TDict()).to_dict().items()
        }
    )
    removed = {
        k: v
        for k, v in overwritten.items()
        if k
        not in [
            select_key_as_case_insensitive(x, qs)
            for x in cqs.get().remove.get_or(TList())
        ]
    }

    return urlparser.urlencode(removed, doseq=True, encoding=encoding)


def challenge(arg_dict: dict) -> dict:
    """
    [[[ WARNING !!!!! ]]]
    `arg_dict` is dict like `ChallengeArg` because HttpMethod(OwlEnum) can't be pickled.
    Return value is dict like `Trial` because Status(OwlEnum) can't be pickled.
    """
    arg: ChallengeArg = ChallengeArg.from_dict(arg_dict)

    name: str = arg.req.name.get_or(str(arg.seq))
    log_prefix = f"[{arg.seq} / {arg.number_of_request}]"

    logger.info_lv3(f"{log_prefix} {'-'*80}")
    logger.info_lv3(f"{log_prefix}  {arg.seq}. {arg.req.name.get_or(arg.req.path)}")
    logger.info_lv3(f"{log_prefix} {'-'*80}")

    path_str_one = arg.path_one.map(
        lambda x: re.sub(x.before, x.after, arg.req.path)
    ).get_or(arg.req.path)
    path_str_other = arg.path_other.map(
        lambda x: re.sub(x.before, x.after, arg.req.path)
    ).get_or(arg.req.path)
    qs_str_one = create_query_string(arg.req.qs, arg.query_one, arg.req.url_encoding)
    qs_str_other = create_query_string(
        arg.req.qs, arg.query_other, arg.req.url_encoding
    )
    url_one = f"{arg.host_one}{path_str_one}?{qs_str_one}"
    url_other = f"{arg.host_other}{path_str_other}?{qs_str_other}"

    # Get two responses
    req_time = now()
    try:
        logger.info_lv3(f"{log_prefix} One   URL:   {url_one}")
        logger.debug(
            f"{log_prefix} One   PROXY: {arg.proxy_one.map(lambda x: x.to_dict()).get()}"
        )

        logger.info_lv3(f"{log_prefix} Other URL:   {url_other}")
        logger.debug(
            f"{log_prefix} Other PROXY: {arg.proxy_other.map(lambda x: x.to_dict()).get()}"
        )

        if arg.req.headers:
            logger.info_lv3(f"{log_prefix} Additional headers:   {arg.req.headers}")
        if arg.req.raw.any():
            logger.info_lv3(f"{log_prefix} raw:   {arg.req.raw.get()}")
        if arg.req.form.any():
            logger.info_lv3(f"{log_prefix} form:   {arg.req.form.get()}")
        if arg.req.json.any():
            logger.info_lv3(f"{log_prefix} json:   {arg.req.json.get()}")

        r_one, r_other = concurrent_request(
            arg.session,
            headers=arg.req.headers,
            method=arg.req.method,
            raw=arg.req.raw,
            form=arg.req.form,
            json_=arg.req.json,
            url_one=url_one,
            url_other=url_other,
            headers_one=arg.headers_one,
            headers_other=arg.headers_other,
            proxies_one=arg.proxy_one,
            proxies_other=arg.proxy_other,
        )

        logger.info_lv3(
            f"{log_prefix} One:   {r_one.status_code} / {to_sec(r_one.elapsed)}s / {len(r_one.content)}b / {r_one.headers.get('content-type')}"  # noqa
        )
        logger.info_lv3(
            f"{log_prefix} Other: {r_other.status_code} / {to_sec(r_other.elapsed)}s / {len(r_other.content)}b / {r_other.headers.get('content-type')}"  # noqa
        )
    except ConnectionError:
        logger.info_lv1(f"{log_prefix} 💀 {arg.req.name.get()}")
        # TODO: Integrate logic into create_trial
        return Trial.from_dict(
            {
                "seq": arg.seq,
                "name": name,
                "tags": [],
                "request_time": req_time.isoformat(),
                "status": "failure",
                "method": arg.req.method,
                "path": arg.req.path,
                "queries": arg.req.qs,
                "raw": arg.req.raw,
                "form": arg.req.form,
                "json": arg.req.json,
                "headers": arg.req.headers,
                "one": {"url": url_one, "type": "unknown"},
                "other": {"url": url_other, "type": "unknown"},
            }
        ).to_dict()

    res2res_one_begin = now()
    res_one_payload: Res2ResAddOnPayload = res2res(
        Response.from_requests(r_one, arg.default_response_encoding_one), arg.req
    )
    res_one = res_one_payload.response
    logger.info_lv3(
        f"{log_prefix} ⏰ One   res2res:   {mill_seconds_until(res2res_one_begin)}ms"
    )

    res2res_other_begin = now()
    res_other_payload: Res2ResAddOnPayload = res2res(
        Response.from_requests(r_other, arg.default_response_encoding_other), arg.req
    )
    res_other = res_other_payload.response
    logger.info_lv3(
        f"{log_prefix} ⏰ Other   res2res:   {mill_seconds_until(res2res_other_begin)}ms"
    )

    res2dict_one_begin = now()
    dict_one: TOption[DictOrList] = res2dict(res_one)
    logger.info_lv3(
        f"{log_prefix} ⏰ One   res2dict:   {mill_seconds_until(res2dict_one_begin)}ms"
    )

    res2dict_other_begin = now()
    dict_other: TOption[DictOrList] = res2dict(res_other)
    logger.info_lv3(
        f"{log_prefix} ⏰ Other   res2dict:   {mill_seconds_until(res2dict_other_begin)}ms"
    )

    # Create diff
    # Either dict_one or dic_other is None, it means that it can't be analyzed, therefore return None
    diff_diagnosis_begin = now()
    ddiff = (
        None
        if dict_one.is_none() or dict_other.is_none()
        else {}
        if res_one.body == res_other.body
        else DeepDiff(dict_one.get(), dict_other.get())
    )
    logger.info_lv3(
        f"{log_prefix} ⏰ Diff diagnosis:   {mill_seconds_until(diff_diagnosis_begin)}ms"
    )

    initial_diffs_by_cognition: Optional[TDict[DiffKeys]] = (
        TDict(
            {
                "unknown": DiffKeys.from_dict(
                    {
                        "changed": TList(
                            ddiff.get("type_changes", {}).keys()
                            | ddiff.get("values_changed", {}).keys()
                        )
                        .map(to_jumeaux_xpath)
                        .order_by(lambda x: x),
                        "added": TList(
                            ddiff.get("dictionary_item_added", {})
                            | ddiff.get("iterable_item_added", {}).keys()
                        )
                        .map(to_jumeaux_xpath)
                        .order_by(lambda x: x),
                        "removed": TList(
                            ddiff.get("dictionary_item_removed", {})
                            | ddiff.get("iterable_item_removed", {}).keys()
                        )
                        .map(to_jumeaux_xpath)
                        .order_by(lambda x: x),
                    }
                )
            }
        )
        if ddiff is not None
        else None
    )

    # Judgement
    judgement_begin = now()
    status, diffs_by_cognition = judgement(
        res_one,
        res_other,
        dict_one,
        dict_other,
        name,
        arg.req.path,
        arg.req.qs,
        arg.req.headers,
        initial_diffs_by_cognition,
        arg.judge_response_header,
        arg.ignore_response_header_keys,
    )
    logger.info_lv3(
        f"{log_prefix} ⏰ Judgement:   {mill_seconds_until(judgement_begin)}ms"
    )

    status_symbol = "O" if status == Status.SAME else "X"
    log_msg = f"{log_prefix} {status_symbol} ({res_one.status_code} - {res_other.status_code}) <{res_one.elapsed_sec}s - {res_other.elapsed_sec}s> {{{arg.req.method}}} {arg.req.name.get_or(arg.req.path)}"  # noqa
    (logger.info_lv2 if status == Status.SAME else logger.info_lv1)(log_msg)

    # Store files
    store_criterion_begin = now()
    file_one: Optional[str] = None
    file_other: Optional[str] = None
    prop_file_one: Optional[str] = None
    prop_file_other: Optional[str] = None
    if store_criterion(status, name, arg.req, res_one, res_other):
        dir = f"{arg.res_dir}/{arg.key}"
        file_one = f"one/({arg.seq}){name}"
        file_other = f"other/({arg.seq}){name}"
        write_to_file(file_one, dir, dump(res_one))
        write_to_file(file_other, dir, dump(res_other))
        if not dict_one.is_none():
            prop_file_one = f"one-props/({arg.seq}){name}.json"
            write_to_file(
                prop_file_one,
                dir,
                to_json(dict_one.get()).encode("utf-8", errors="replace"),
            )
        if not dict_other.is_none():
            prop_file_other = f"other-props/({arg.seq}){name}.json"
            write_to_file(
                prop_file_other,
                dir,
                to_json(dict_other.get()).encode("utf-8", errors="replace"),
            )
    logger.info_lv3(
        f"{log_prefix} ⏰ Store criterion:   {mill_seconds_until(store_criterion_begin)}ms"
    )

    # Did challenge
    did_challenge_begin = now()
    payload = global_addon_executor.apply_did_challenge(
        DidChallengeAddOnPayload.from_dict(
            {
                "trial": Trial.from_dict(
                    {
                        "seq": arg.seq,
                        "name": name,
                        "tags": res_one_payload.tags.concat(
                            res_other_payload.tags
                        ).uniq(),  # TODO: tags created by reqs2reqs
                        "request_time": req_time.isoformat(),
                        "status": status,
                        "method": arg.req.method,
                        "path": arg.req.path,
                        "queries": arg.req.qs,
                        "raw": arg.req.raw,
                        "form": arg.req.form,
                        "json": arg.req.json,
                        "headers": arg.req.headers,
                        "diffs_by_cognition": diffs_by_cognition,
                        "one": {
                            "url": res_one.url,
                            "type": res_one.type,
                            "status_code": res_one.status_code,
                            "byte": res_one.byte,
                            "response_sec": res_one.elapsed_sec,
                            "content_type": res_one.content_type,
                            "mime_type": res_one.mime_type,
                            "encoding": res_one.encoding,
                            "file": file_one,
                            "prop_file": prop_file_one,
                            "response_header": dict(res_one.headers)
                            if arg.judge_response_header
                            else None,
                        },
                        "other": {
                            "url": res_other.url,
                            "type": res_other.type,
                            "status_code": res_other.status_code,
                            "byte": res_other.byte,
                            "response_sec": res_other.elapsed_sec,
                            "content_type": res_other.content_type,
                            "mime_type": res_other.mime_type,
                            "encoding": res_other.encoding,
                            "file": file_other,
                            "prop_file": prop_file_other,
                            "response_header": dict(res_other.headers)
                            if arg.judge_response_header
                            else None,
                        },
                    }
                )
            }
        ),
        DidChallengeAddOnReference.from_dict(
            {
                "res_one": res_one,
                "res_other": res_other,
                "res_one_props": dict_one,
                "res_other_props": dict_other,
            }
        ),
    )
    logger.info_lv3(
        f"{log_prefix} ⏰ Did challenge:   {mill_seconds_until(did_challenge_begin)}ms"
    )

    return payload.trial.to_dict()


def create_concurrent_executor(config: Config) -> Tuple[Any, Concurrency]:
    processes = config.processes.get()
    if processes:
        return (
            futures.ProcessPoolExecutor(max_workers=processes),
            Concurrency.from_dict({"processes": processes, "threads": 1}),
        )

    threads = config.threads
    return (
        futures.ThreadPoolExecutor(max_workers=threads),
        Concurrency.from_dict({"processes": 1, "threads": threads}),
    )


def exec(
    config: Config, reqs: TList[Request], key: str, retry_hash: Optional[str]
) -> Report:
    # Provision
    s = requests.Session()
    s.mount("http://", HTTPAdapter(max_retries=config.max_retries))
    s.mount("https://", HTTPAdapter(max_retries=config.max_retries))

    make_dir(f"{config.output.response_dir}/{key}/one")
    make_dir(f"{config.output.response_dir}/{key}/other")
    make_dir(f"{config.output.response_dir}/{key}/one-props")
    make_dir(f"{config.output.response_dir}/{key}/other-props")

    # Parse inputs to args of multi-thread executor.
    ex_args = reqs.emap(
        lambda x, i: {
            "seq": i + 1,
            "number_of_request": len(reqs),
            "key": key,
            "session": s,
            "req": x,
            "host_one": config.one.host,
            "host_other": config.other.host,
            "proxy_one": Proxy.from_host(config.one.proxy),
            "proxy_other": Proxy.from_host(config.other.proxy),
            "path_one": config.one.path,
            "path_other": config.other.path,
            "query_one": config.one.query,
            "query_other": config.other.query,
            "headers_one": config.one.headers,
            "headers_other": config.other.headers,
            "default_response_encoding_one": config.one.default_response_encoding,
            "default_response_encoding_other": config.other.default_response_encoding,
            "res_dir": config.output.response_dir,
            "judge_response_header": config.judge_response_header,
            "ignore_response_header_keys": config.ignore_response_header_keys,
        }
    ).to_dicts()

    # Challenge
    title = config.title.get_or("No title")
    description = config.description.get()
    tags = config.tags.get_or([])
    executor, concurrency = create_concurrent_executor(config)

    logger.info_lv1(
        f"""
--------------------------------------------------------------------------------
| {title}
| ({key})
--------------------------------------------------------------------------------
| {description}
--------------------------------------------------------------------------------
| - {concurrency.processes} processes
| - {concurrency.threads} threads
--------------------------------------------------------------------------------
    """
    )

    start_time = now()
    with executor as ex:
        trials = TList([r for r in ex.map(challenge, ex_args)]).map(Trial.from_dict)
    end_time = now()

    latest = f"{config.output.response_dir}/latest"
    if os.path.lexists(latest):
        os.remove(latest)
    os.symlink(key, latest, True)

    summary = Summary.from_dict(
        {
            "one": {
                "name": config.one.name,
                "host": config.one.host,
                "path": config.one.path,
                "query": config.one.query,
                "proxy": config.one.proxy,
                "headers": config.one.headers,
                "default_response_encoding": config.one.default_response_encoding,
            },
            "other": {
                "name": config.other.name,
                "host": config.other.host,
                "path": config.other.path,
                "query": config.other.query,
                "proxy": config.other.proxy,
                "headers": config.other.headers,
                "default_response_encoding": config.other.default_response_encoding,
            },
            "status": trials.group_by(lambda x: x.status.value)
            .map_values(len)
            .to_dict(),
            "tags": tags,
            "time": {
                "start": start_time.isoformat(),
                "end": end_time.isoformat(),
                "elapsed_sec": (end_time - start_time).seconds,
            },
            "output": config.output.to_dict(),
            "concurrency": concurrency,
        }
    )

    return Report.from_dict(
        {
            "version": __version__,
            "key": key,
            "title": title,
            "description": description,
            "notifiers": config.notifiers,
            "summary": summary.to_dict(),
            "trials": trials.to_dicts(),
            "addons": config.addons.to_dict(),
            "retry_hash": retry_hash,
        }
    )


def __run(
    config: Config,
    origin_reqs: TList[Request],
    addon_executor: AddOnExecutor,
    hash: str,
    retry_hash: Optional[str],
):
    sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding=config.output.encoding)
    sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding=config.output.encoding)

    logger.info_lv1(START_JUMEAUX_AA)
    logger.info_lv1(f"Version: {__version__}")

    if config.output.logger.get():
        logger.warning("`output.logger` is no longer works.")
        logger.warning(
            "And this will be removed soon! You need to remove this property not to stop!"
        )

    logger.info_lv2(CONFIG_AA)
    logger.info_lv2("Merge with yaml files or report, and args")
    logger.info_lv2("----")
    logger.info_lv2(config.to_yaml())

    # XXX: Please fix to not use global!
    global global_addon_executor
    global_addon_executor = addon_executor

    # Requests
    reqs: TList[Request] = addon_executor.apply_reqs2reqs(
        Reqs2ReqsAddOnPayload.from_dict({"requests": origin_reqs}), config
    ).requests

    # Execute
    report = exec(config, reqs, hash, retry_hash)

    # Finalize
    addon_executor.apply_final(
        FinalAddOnPayload.from_dict(
            {"report": report, "output_summary": config.output}
        ),
        FinalAddOnReference.from_dict({"notifiers": config.notifiers}),
    )


def hash_from_args(args_str: str) -> str:
    return hashlib.sha256((str(now()) + args_str).encode()).hexdigest()


def retry(*, args: MergedArgs, report: str):
    report: Report = Report.from_jsonf(report, force_cast=True)
    config: Config = merge_args2config(args, create_config_from_report(report))
    addon_executor = AddOnExecutor(config.addons)
    origin_reqs: TList[Request] = report.trials.map(
        lambda x: Request.from_dict(
            {
                "path": x.path,
                "qs": x.queries,
                "headers": x.headers,
                "name": x.name,
                "method": x.method,
                "form": x.form,
                "json": x.json,
            }
        )
    )
    __run(
        config, origin_reqs, addon_executor, hash_from_args(args.to_json()), report.key
    )


def run(
    *,
    args: MergedArgs,
    config_paths: TList[str],
    skip_addon_tag: TOption[TList[str]],
):
    config: Config = merge_args2config(
        args,
        create_config(config_paths, skip_addon_tag),
    )

    addon_executor = AddOnExecutor(config.addons)
    origin_reqs: TList[Request] = config.input_files.get().flat_map(
        lambda f: addon_executor.apply_log2reqs(
            Log2ReqsAddOnPayload.from_dict({"file": f})
        )
    )
    __run(config, origin_reqs, addon_executor, hash_from_args(args.to_json()), None)