FredericHeem/starhackit

View on GitHub
server/src/plugins/cloudDiagram/index.js

Summary

Maintainability
B
5 hrs
Test Coverage
const assert = require("assert");
const { pipe, tap, assign, get } = require("rubico");
const { defaultsDeep } = require("rubico/x");

const { DockerClient } = require("@grucloud/docker-axios");
const { GetAllApi } = require("./api/getAllApi");

const { OrgApi } = require("./api/orgApi");
const { GitCredentialApi } = require("./api/gitCredentialApi");
const { ProjectApi } = require("./api/projectApi");
const { WorkspaceApi } = require("./api/workspaceApi");
const { GitRepositoryApi } = require("./api/gitRepositoryApi");
const { CloudAuthenticationApi } = require("./api/cloudAuthenticationApi");
const { RunApi } = require("./api/runApi");
const { DockerGcRun } = require("./utils/rungc");

const configDefault = {
  containerImage: "grucloud/grucloud-cli",
  localOutputPath: "output",
  localInputPath: "input",
  docker: {
    baseURL: "http://localhost/v1.41",
    socketPath: "/var/run/docker.sock",
    timeout: 3000e3,
  },
};

const streamDockerLogs = async ({ dockerClient, container_id, ws }) => {
  assert(dockerClient);
  assert(container_id);
  assert(ws);

  console.log("streamDockerLogs ", container_id, "message", message.toString());

  // TODO: validate container_id
  ws.on("error", console.error);

  let container = await dockerClient.container.get({ id: container_id });
  if (!container) {
    console.error("streamDockerLogs container_id does not exist");
    ws.send(
      JSON.stringify({ error: "container does not exist", container_id })
    );
    ws.close();
    return;
  }
  const logParam = {
    name: container_id,
    options: {
      stdout: true,
      stderr: true,
      follow: true,
    },
  };
  const stream = await dockerClient.container.logs(logParam);

  ws.once("close", () => {
    console.log("ws ", container_id, "close");
    stream.removeAllListeners();
    stream.destroy();
  });

  stream.on("data", (info) => {
    // console.log(info.toString("utf-8"));
    ws.send(info.toString("utf-8"));
  });
  stream.on("end", (info) => {
    console.log("ws ", container_id, "end");
    ws.close();
  });
};

module.exports = (app) => {
  const { sql } = app.data;
  const dockerClient = pipe([
    () => app.config,
    get("infra.docker"),
    DockerClient,
  ])();

  const sqlAdaptor = require("utils/SqlAdapter")({ sql });
  const models = {
    org: sqlAdaptor(require("./sql/OrganisationSql")({ sql })),
    project: sqlAdaptor(require("./sql/ProjectSql")({ sql })),
    workspace: sqlAdaptor(require("./sql/WorkspaceSql")({ sql })),
    gitRepository: sqlAdaptor(require("./sql/GitRepositorySql")({ sql })),
    userOrg: sqlAdaptor(require("./sql/UserOrgSql")({ sql })),
    gitCredential: sqlAdaptor(require("./sql/GitCredentialSql")({ sql })),
    run: sqlAdaptor(require("./sql/RunSql")({ sql })),
    cloudAuthentication: sqlAdaptor(
      require("./sql/CloudAuthenticationSql")({ sql })
    ),
  };

  const roomMap = new Map();
  const producerMap = new Map();

  app.server.koa.ws.use((ctx) => {
    const ws = ctx.websocket;
    dockerGcRun = DockerGcRun({ app, models, ws });
    ws.on("close", () => {
      console.log("ws close");
      const room = producerMap.get();
      if (room) {
        roomMap.delete(room);
      }
      producerMap.delete(ws);
    });
    ws.on("message", async function (message) {
      assert(ctx.request);
      console.log("ws message", message.toString());
      try {
        const {
          command,
          options = {},
          data = {},
        } = JSON.parse(message.toString());
        switch (command) {
          case "join":
            console.log("join", options.room);
            const roomInfo = roomMap.get(options.room);
            if (roomInfo) {
              console.log("join", options.room, "room already exist");
              roomMap.set(options.room, [...roomInfo, ws]);
            } else {
              console.log("join", options.room, "create room");
              roomMap.set(options.room, [ws]);
            }
            producerMap.set(ws, options.room);
            break;
          case "list": {
            const { error } = data;
            const room = producerMap.get(ws);
            if (room) {
              const [org_id, project_id, workspace_id, run_id] =
                room.split("/");
              console.log(
                "list",
                org_id,
                project_id,
                workspace_id,
                run_id,
                ", error: ",
                error
              );
              models.run.update({
                where: { org_id, project_id, workspace_id, run_id },
                data: {
                  status: error ? "error" : "completed",
                  updated_at: new Date().toUTCString(),
                },
              });
            } else {
              console.error("no room for list command");
            }
          }
          // fall through
          case "logs":
            const room = producerMap.get(ws);
            if (room) {
              const clients = roomMap.get(room);
              if (clients) {
                console.error("#clients", clients.length);

                clients
                  .filter((c) => c != ws)
                  .forEach((client) => {
                    console.log("sending back");
                    client.send(message.toString());
                  });
              }
            } else {
              console.error("no room for log command");
            }
            break;
          case "DockerLogs":
            if (options.engine == "docker") {
              await streamDockerLogs({
                container_id: options.container_id,
                engine: options.engine,
                ws,
                dockerClient,
              });
            }
            break;
          case "Run":
            if (options.engine == "docker") {
              await dockerGcRun(options);
            }
            return;
        }
      } catch (error) {
        console.error("ws message", error);
        ws.close();
      }
    });
  });

  app.config = assign({
    infra: pipe([get("infra", {}), defaultsDeep(configDefault)]),
  })(app.config);

  app.dockerClient = dockerClient;

  [
    GetAllApi,
    OrgApi,
    GitCredentialApi,
    ProjectApi,
    WorkspaceApi,
    GitRepositoryApi,
    RunApi,
    CloudAuthenticationApi,
  ].forEach((api) => app.server.createRouter(api({ app, models })));

  return {
    models,
    start: pipe([
      () => ({ image: app.config.infra.containerImage }),
      tap(({ image }) => {
        assert(image);
      }),
      //TODO
      // dockerClient.image.pull,
      // () => fs.mkdir(app.config.infra.localInputPath, { recursive: true }),
    ]),
    stop: pipe([
      tap(() => {
        assert(true);
      }),
    ]),
  };
};