A11yWatch/a11ywatch-core

View on GitHub
src/event/crawl-tracking.ts

Summary

Maintainability
A
1 hr
Test Coverage
import { getHostName } from "@a11ywatch/website-source-builder";
import { performance } from "perf_hooks";
import { bindTaskQ, getCWLimit } from "../queues/crawl/handle";
import { asyncWorkerCrawlComplete } from "../queues/crawl";
import { crawlTrackingEmitter } from "./emitters/crawl";
import { domainName } from "../core/utils";
import type { ScanRpcCall } from "../proto/calls/scan-stream";

// crawling set per website
type CrawlSet = {
  total: number; // total pages
  current: number; // current page number
  crawling: boolean; // active crawl
  shutdown: boolean; // crawl shutdown
  duration: number; // the crawl duration
  event: ReturnType<typeof bindTaskQ>; // queue
};

// track when a new website starts and determine page completion
export const crawlingSet = new Map<string, CrawlSet>();

// handle hostname assign from domain or pages
const extractHostname = (domain?: string, pages?: string[]) =>
  domainName(getHostName(pages && pages.length === 1 ? pages[0] : domain));

// get a key for the event based on domain and uid.
export const getKey = (domain, pages, user_id) =>
  `${extractHostname(domain, pages)}-${user_id || 0}`;

// rebind the number of concurrency per object
const rebindConcurrency = async () => {
  const newLimit = getCWLimit(crawlingSet.size || 1);

  for (const item of crawlingSet.values()) {
    const itemEvent = item && item?.event;

    if (itemEvent && itemEvent?.concurrency > newLimit) {
      const q = itemEvent.getQueue();
      itemEvent.killAndDrain();
      await itemEvent.drained(); // wait till drained
      item.event.concurrency = newLimit;
      for (let j = 0; j < q.length; j++) {
        await itemEvent.unshift(q[j]);
      }
    } else if (itemEvent && itemEvent?.concurrency !== newLimit) {
      item.event.concurrency = newLimit;
    }
  }
};

// de-init crawling
const deInit = async (key, target, { duration, shutdown }: CrawlSet) => {
  crawlingSet.delete(key);
  crawlTrackingEmitter.emit(`crawl-complete-${key}`, target);

  const params = {
    userId: target.user_id,
    meta: {
      extra: {
        domain: extractHostname(target.domain),
        duration: performance.now() - duration,
        shutdown: shutdown,
      },
    },
  };

  await rebindConcurrency(); // rebind event queue and increment limit
  await asyncWorkerCrawlComplete(params);
};

// init crawling
const crawlStart = (target) => {
  setImmediate(() => {
    const key = getKey(target.domain, target.pages, target.user_id);
    // set the item for tracking
    if (!crawlingSet.has(key)) {
      crawlingSet.set(key, {
        total: 0,
        current: 0,
        crawling: true,
        shutdown: false,
        duration: performance.now(),
        event: bindTaskQ(crawlingSet.size + 1), // add 1 to include new item
      });
    }
  });
};

// complete crawl
const crawlComplete = (target) => {
  setImmediate(async () => {
    const key = getKey(target.domain, target.pages, target.user_id);

    if (crawlingSet.has(key)) {
      const item = crawlingSet.get(key);
      item.crawling = false;
    }
  });
};

// gRPC call
const crawlProcessing = (call: ScanRpcCall) => {
  setImmediate(async () => {
    const key = getKey(
      call.request.domain,
      call.request.pages,
      call.request.user_id
    ); // process a new item tracking count

    if (crawlingSet.has(key)) {
      const item = crawlingSet.get(key);

      item.total += 1;

      if (item.shutdown) {
        call.write({ message: "shutdown" });
        await deInit(key, call.request, item);
      }
    }

    call.end();
  });
};

// crawl finished processing the page
const crawlProcessed = (target) => {
  setImmediate(async () => {
    // process a new item tracking count
    const key = getKey(target.domain, target.pages, target.user_id);

    if (crawlingSet.has(key)) {
      const item = crawlingSet.get(key);

      item.current += 1;

      // shutdown the events
      if (target.shutdown) {
        item.shutdown = true;
        item.crawling = false;
      }

      if (!item.crawling && item.current === item.total) {
        await deInit(key, target, item);
      }
    }
  });
};

/*  Emit events to track crawling progress.
 *  This mainly tracks at a higher level the progress between the gRPC crawling across modules.
 *  @param url: scope the events to track one domain
 */
export const establishCrawlTracking = () => {
  // track when crawl has started
  crawlTrackingEmitter.on("crawl-start", crawlStart);
  // track when the crawler has processed the pages and sent.
  crawlTrackingEmitter.on("crawl-complete", crawlComplete);
  // track total amount of pages in a website via gRPC.
  crawlTrackingEmitter.on("crawl-processing", crawlProcessing);
  // track the amount of pages the website should have and determine if complete.
  crawlTrackingEmitter.on("crawl-processed", crawlProcessed);
};