airbnb/caravel

View on GitHub
superset-frontend/src/middleware/asyncEvent.ts

Summary

Maintainability
B
4 hrs
Test Coverage
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
import {
  ensureIsArray,
  isFeatureEnabled,
  FeatureFlag,
  makeApi,
  SupersetClient,
  logging,
  getClientErrorObject,
  parseErrorJson,
  SupersetError,
} from '@superset-ui/core';
import getBootstrapData from 'src/utils/getBootstrapData';

type AsyncEvent = {
  id?: string | null;
  channel_id: string;
  job_id: string;
  user_id?: string;
  status: string;
  errors?: SupersetError[];
  result_url: string | null;
};

type CachedDataResponse = {
  status: string;
  data: any;
};
type AppConfig = Record<string, any>;
type ListenerFn = (asyncEvent: AsyncEvent) => Promise<any>;

const TRANSPORT_POLLING = 'polling';
const TRANSPORT_WS = 'ws';
const JOB_STATUS = {
  PENDING: 'pending',
  RUNNING: 'running',
  ERROR: 'error',
  DONE: 'done',
};
const LOCALSTORAGE_KEY = 'last_async_event_id';
const POLLING_URL = '/api/v1/async_event/';
const MAX_RETRIES = 6;
const RETRY_DELAY = 100;

let config: AppConfig;
let transport: string;
let pollingDelayMs: number;
let pollingTimeoutId: number;
let listenersByJobId: Record<string, ListenerFn>;
let retriesByJobId: Record<string, number>;
let lastReceivedEventId: string | null | undefined;

const addListener = (id: string, fn: any) => {
  listenersByJobId[id] = fn;
};

const removeListener = (id: string) => {
  if (!listenersByJobId[id]) return;
  delete listenersByJobId[id];
};

const fetchCachedData = async (
  asyncEvent: AsyncEvent,
): Promise<CachedDataResponse> => {
  let status = 'success';
  let data;
  try {
    const { json } = await SupersetClient.get({
      endpoint: String(asyncEvent.result_url),
    });
    data = 'result' in json ? json.result : json;
  } catch (response) {
    status = 'error';
    data = await getClientErrorObject(response);
  }

  return { status, data };
};

export const waitForAsyncData = async (asyncResponse: AsyncEvent) =>
  new Promise((resolve, reject) => {
    const jobId = asyncResponse.job_id;
    const listener = async (asyncEvent: AsyncEvent) => {
      switch (asyncEvent.status) {
        case JOB_STATUS.DONE: {
          let { data, status } = await fetchCachedData(asyncEvent); // eslint-disable-line prefer-const
          data = ensureIsArray(data);
          if (status === 'success') {
            resolve(data);
          } else {
            reject(data);
          }
          break;
        }
        case JOB_STATUS.ERROR: {
          const err = parseErrorJson(asyncEvent);
          reject(err);
          break;
        }
        default: {
          logging.warn('received event with status', asyncEvent.status);
        }
      }
      removeListener(jobId);
    };
    addListener(jobId, listener);
  });

const fetchEvents = makeApi<
  { last_id?: string | null },
  { result: AsyncEvent[] }
>({
  method: 'GET',
  endpoint: POLLING_URL,
});

const setLastId = (asyncEvent: AsyncEvent) => {
  lastReceivedEventId = asyncEvent.id;
  try {
    localStorage.setItem(LOCALSTORAGE_KEY, lastReceivedEventId as string);
  } catch (err) {
    logging.warn('Error saving event Id to localStorage', err);
  }
};

export const processEvents = async (events: AsyncEvent[]) => {
  events.forEach((asyncEvent: AsyncEvent) => {
    const jobId = asyncEvent.job_id;
    const listener = listenersByJobId[jobId];
    if (listener) {
      listener(asyncEvent);
      delete retriesByJobId[jobId];
    } else {
      // handle race condition where event is received
      // before listener is registered
      if (!retriesByJobId[jobId]) retriesByJobId[jobId] = 0;
      retriesByJobId[jobId] += 1;

      if (retriesByJobId[jobId] <= MAX_RETRIES) {
        setTimeout(() => {
          processEvents([asyncEvent]);
        }, RETRY_DELAY * retriesByJobId[jobId]);
      } else {
        delete retriesByJobId[jobId];
        logging.warn('listener not found for job_id', asyncEvent.job_id);
      }
    }
    setLastId(asyncEvent);
  });
};

const loadEventsFromApi = async () => {
  const eventArgs = lastReceivedEventId ? { last_id: lastReceivedEventId } : {};
  if (Object.keys(listenersByJobId).length) {
    try {
      const { result: events } = await fetchEvents(eventArgs);
      if (events?.length) await processEvents(events);
    } catch (err) {
      logging.warn(err);
    }
  }

  if (transport === TRANSPORT_POLLING) {
    pollingTimeoutId = window.setTimeout(loadEventsFromApi, pollingDelayMs);
  }
};

const wsConnectMaxRetries = 6;
const wsConnectErrorDelay = 2500;
let wsConnectRetries = 0;
let wsConnectTimeout: any;
let ws: WebSocket;

const wsConnect = (): void => {
  let url = config.GLOBAL_ASYNC_QUERIES_WEBSOCKET_URL;
  if (lastReceivedEventId) url += `?last_id=${lastReceivedEventId}`;
  ws = new WebSocket(url);

  ws.addEventListener('open', () => {
    logging.log('WebSocket connected');
    clearTimeout(wsConnectTimeout);
    wsConnectRetries = 0;
  });

  ws.addEventListener('close', () => {
    wsConnectTimeout = setTimeout(() => {
      wsConnectRetries += 1;
      if (wsConnectRetries <= wsConnectMaxRetries) {
        wsConnect();
      } else {
        logging.warn('WebSocket not available, falling back to async polling');
        loadEventsFromApi();
      }
    }, wsConnectErrorDelay);
  });

  ws.addEventListener('error', () => {
    // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState
    if (ws.readyState < 2) ws.close();
  });

  ws.addEventListener('message', async event => {
    let events: AsyncEvent[] = [];
    try {
      events = [JSON.parse(event.data)];
      await processEvents(events);
    } catch (err) {
      logging.warn(err);
    }
  });
};

export const init = (appConfig?: AppConfig) => {
  if (!isFeatureEnabled(FeatureFlag.GlobalAsyncQueries)) return;
  if (pollingTimeoutId) clearTimeout(pollingTimeoutId);

  listenersByJobId = {};
  retriesByJobId = {};
  lastReceivedEventId = null;

  config = appConfig || getBootstrapData().common.conf;
  transport = config.GLOBAL_ASYNC_QUERIES_TRANSPORT || TRANSPORT_POLLING;
  pollingDelayMs = config.GLOBAL_ASYNC_QUERIES_POLLING_DELAY || 500;

  try {
    lastReceivedEventId = localStorage.getItem(LOCALSTORAGE_KEY);
  } catch (err) {
    logging.warn('Failed to fetch last event Id from localStorage');
  }

  if (transport === TRANSPORT_POLLING) {
    loadEventsFromApi();
  }
  if (transport === TRANSPORT_WS) {
    wsConnect();
  }
};

init();