Enterprise-CMCS/macpro-appian-connector

View on GitHub
src/libs/connect-lib.js

Summary

Maintainability
A
0 mins
Test Coverage
import * as ecs from "./ecs-lib.js";
var http = require("http");
var _ = require("lodash");

//TODO:  The design of this mechanism assumes that there are N number of configuration items (distinct Java tasks, usually)
//that run against a single ECS task.  This is exampled by the fact that each IP lookup only references the first item in the array,
// which is a hard coded value.  It may be prudent to allow running multiple tasks each with a distinct set of configuration,
//and if that is determined to be the case, quite a bit of refactoring will be required in this library, and in how the configuration
//is passed to this library to facilitate that use case.

const resolver = (req, resolve) => {
  console.log("Finished");
  req.socket.destroy();
  resolve(req.statusCode);
};

export async function connectRestApiWithRetry(params) {
  return new Promise((resolve, reject) => {
    var retry = function (e) {
      console.log("Got error: " + e);
      setTimeout(async function () {
        return await connectRestApiWithRetry(params);
      }, 5000);
    };
    var options = {
      hostname: params.hostname,
      port: params.port || 8083,
      path: params.path || "",
      method: params.method || "GET",
      headers: params.headers || {
        "Content-Type": "application/json",
      },
    };
    const req = http.request(options, (res) => {
      res
        .on("data", (d) => {
          console.log(d.toString("utf-8"));
        })
        .on("error", (error) => {
          console.log("res.on error:", error);
          retry.call(`${error}`);
        })
        .on("end", (d) => {
          resolver(req, resolve);
        });
    });
    req.on("error", (error) => {
      console.log("req.on error:", error);
      reject(error);
    });
    if (params.body) {
      req.write(JSON.stringify(params.body));
    }
    req.end();
  });
}

export async function putConnectors(cluster, service, connectors) {
  const workerIp = await ecs.findIpForEcsService(cluster, service);
  await connectRestApiWithRetry({
    hostname: workerIp,
  });
  for (var i = 0; i < connectors.length; i++) {
    console.log(
      `Putting connector with config: ${JSON.stringify(connectors[i], null, 2)}`
    );
    //This won't account for multiple tasks with multiple interfaces
    await connectRestApiWithRetry({
      hostname: workerIp,
      path: `/connectors/${connectors[i].name}/config`,
      method: "PUT",
      body: connectors[i].config,
    });
  }
}

export async function restartConnectors(cluster, service, connectors) {
  const workerIp = await ecs.findIpForEcsService(cluster, service);
  for (var i = 0; i < connectors.length; i++) {
    let connector = _.omit(connectors[i], "config");
    connector.tasks = connectors[i].config["tasks.max"];
    console.log(`Restarting connector: ${JSON.stringify(connector, null, 2)}`);
    //This won't account for multiple tasks with multiple interfaces
    await connectRestApiWithRetry({
      hostname: workerIp,
      path: `/connectors/${connectors[i].name}/tasks/0/restart`,
      method: "POST",
    });
  }
}

export async function deleteConnector(ip, name) {
  return new Promise((resolve, reject) => {
    var retry = function (e) {
      console.log("Got error: " + e);
      setTimeout(async function () {
        return await deleteConnector(ip, name);
      }, 5000);
    };

    var options = {
      hostname: ip,
      port: 8083,
      path: `/connectors/${name}`,
      method: "DELETE",
      headers: {
        "Content-Type": "application/json",
      },
    };
    const req = http.request(options, (res) => {
      console.log(`statusCode: ${res.statusCode}`);
      res
        .on("data", (d) => {
          console.log(d.toString("utf-8"));
          if (JSON.parse(d).message != `Connector ${name} not found`) {
            return retry.call(d.toString("utf-8"));
          }
        })
        .on("error", (error) => {
          console.log("res.on error:", error);
          return retry.call(`${error}`);
        })
        .on("end", (d) => {
          resolver(req, resolve);
        });
    });
    req.on("error", (error) => {
      console.log("req.on error:", error);
      reject(error);
    });
    req.write(JSON.stringify({}));
    req.end();
  });
}

export async function deleteConnectors(cluster, service, connectors) {
  const workerIp = await ecs.findIpForEcsService(cluster, service);
  for (var i = 0; i < connectors.length; i++) {
    console.log(`Deleting connector: ${connectors[i]}`);
    //This won't account for multiple tasks with multiple interfaces
    await deleteConnector(workerIp, connectors[i]);
  }
}

export async function testConnector(ip, config) {
  return new Promise((resolve, reject) => {
    var options = {
      hostname: ip,
      port: 8083,
      path: `/connectors/${config.name}/status`,
      headers: {
        "Content-Type": "application/json",
      },
    };
    console.log("Test Kafka-connect service", options);
    const req = http.request(options, (res) => {
      console.log(`statusCode: ${res.statusCode}`);
      res
        .on("data", (d) => {
          console.log(d.toString("utf-8"));
          var data = JSON.parse(d);
          resolve(data);
        })
        .on("error", (error) => {
          console.log("res.on error:", error);
          reject(error);
        })
        .on("end", (d) => {
          resolver(req, resolve);
        });
    });
    req.on("error", (error) => {
      console.log("req.on error:", error);
      reject(error);
    });
    req.write(JSON.stringify({}));
    req.end();
  });
}

export async function testConnectors(cluster, service, connectors) {
  const workerIp = await ecs.findIpForEcsService(cluster, service);
  return await Promise.all(
    connectors.map((connector) => {
      console.log(`Testing connector: ${connector.name}`);
      return testConnector(workerIp, connector);
    })
  ).then((res) => {
    return res;
  });
}