PyDrocsid/cogs

View on GitHub
integrations/run_code/api.py

Summary

Maintainability
A
0 mins
Test Coverage
from aiohttp import ClientSession


class PistonException(BaseException):
    @property
    def error(self) -> str:
        return self.args[0]["message"]


class PistonAPI:
    ENVIRONMENTS_URL = "https://emkc.org/api/v2/piston/runtimes"
    EXECUTE_URL = "https://emkc.org/api/v2/piston/execute"

    def __init__(self):
        self.environments: dict[str, str] = {}
        self.aliases: dict[str, str] = {}

    def get_language(self, language: str) -> str | None:
        if language in self.environments:
            return language

        return self.aliases.get(language)

    async def load_environments(self):
        async with ClientSession() as session, session.get(self.ENVIRONMENTS_URL) as response:
            if response.status != 200:
                raise PistonException

            environments = await response.json()

        self.environments = {env["language"]: env["version"] for env in environments}
        self.aliases = {alias: env["language"] for env in environments for alias in env["aliases"]}

    async def run_code(self, language: str, source: str, stdin: str = "") -> dict:
        async with ClientSession() as session, session.post(
            PistonAPI.EXECUTE_URL,
            json={
                "language": language,
                "version": self.environments[language],
                "stdin": stdin,
                "files": [{"content": source}],
            },
        ) as response:
            if response.status != 200:
                raise PistonException(await response.json())

            return await response.json()