hongbo-miao/hongbomiao.com

View on GitHub
data-orchestration/hm-prefect/workflows/calculate/src/main.py

Summary

Maintainability
A
0 mins
Test Coverage
import random
import time

from prefect import flow, get_run_logger, task
from pydantic import BaseModel


class Model(BaseModel):
    n: int


@task
def expand(n: int) -> list[int]:
    time.sleep(random.uniform(0.5, 5))
    return [*range(n)]


@task
def power(a: int, b: int) -> int:
    time.sleep(random.uniform(0.5, 2))
    return a**b


@task
def multiply(a: int, b: int) -> int:
    time.sleep(random.uniform(0.5, 2))
    return a * b


@task
def add(a: int, b: int) -> int:
    time.sleep(random.uniform(0.5, 2))
    return a + b


@task
def sum_up(nums: list[int]) -> int:
    time.sleep(random.uniform(0.5, 2))
    return sum(nums)


@flow
def calculate(model: Model) -> None:
    logger = get_run_logger()
    nums = expand(model.n)
    nums = power.map(nums, 2)
    res = []
    for n in nums:
        ns = expand(n)
        ns = multiply.map(ns, 100)
        ns = add.map(ns, 9)
        res += ns
    n = sum_up(res)
    logger.info(n)


if __name__ == "__main__":
    external_model = Model(n=4)
    calculate(external_model)