vorteil/direktiv

View on GitHub
ui/src/api/logs/query/logs.ts

Summary

Maintainability
A
0 mins
Test Coverage
import {
  InfiniteData,
  QueryFunctionContext,
  useQueryClient,
} from "@tanstack/react-query";
import {
  LogEntrySchema,
  LogEntryType,
  LogsSchema,
  LogsSchemaType,
} from "../schema";

import { apiFactory } from "~/api/apiFactory";
import { buildSearchParamsString } from "~/api/utils";
import { logKeys } from "..";
import { useApiKey } from "~/util/store/apiKey";
import useInfiniteQueryWithPermissions from "~/api/useInfiniteQueryWithPermissions";
import { useNamespace } from "~/util/store/namespace";
import { useStreaming } from "~/api/streaming";

/**
 * example of a InfiniteData<LogsSchemaType> object. All of these
 * data share one cache key. The pages and pageParams properties
 * are part of useInfiniteQuery hook.
  {
    // the result of every page request is stored here
    "pages": [
      {
        "meta": {
          "previousPage": "FIRST_TIMESTAMP",
          "startingFrom": "..."
        },
        "data": []
      },
      {
        "meta": {
          "previousPage": "SECOND_TIMESTAMP",
          "startingFrom": "..."
        },
        "data": []
      },
      {
        "meta": {
          "previousPage": null, // last page
          "startingFrom": "..."
        },
        "data": []
      }
    ]
    // all page pointers that were found in the page request results are stored here
    "pageParams": [
      "FIRST_TIMESTAMP",
      "SECOND_TIMESTAMP",
      null
    ]
  }
*/
type LogsCache = InfiniteData<LogsSchemaType>;

const updateCache = (
  oldData: LogsCache | undefined,
  newLogEntry: LogEntryType
): LogsCache | undefined => {
  if (oldData === undefined) return undefined;

  const pages = oldData.pages;
  const olderPages = pages.slice(0, -1);
  const newestPage = pages[0];
  if (newestPage === undefined) return undefined;

  const newestPageData = newestPage.data ?? [];

  // skip cache if the log entry is already in the cache
  if (newestPageData.some((logEntry) => logEntry.id === newLogEntry.id)) {
    return oldData;
  }

  return {
    ...oldData,
    pages: [
      ...olderPages,
      {
        ...newestPage,
        data: [...newestPageData, newLogEntry],
      },
    ],
  };
};

export type LogsQueryParams = {
  instance?: string;
  route?: string;
  activity?: string;
  before?: string;
  trace?: string;
};

type LogsParams = {
  baseUrl?: string;
  namespace: string;
  useStreaming?: boolean;
} & LogsQueryParams;

const getUrl = (params: LogsParams) => {
  const { baseUrl, namespace, useStreaming, ...queryParams } = params;

  let urlPath = `/api/v2/namespaces/${namespace}/logs`;

  if (useStreaming) {
    urlPath = `${urlPath}/subscribe`;
  }

  const queryParamsString = buildSearchParamsString({
    ...queryParams,
  });

  return `${baseUrl ?? ""}${urlPath}${queryParamsString}`;
};

const getLogs = apiFactory({
  url: getUrl,
  method: "GET",
  schema: LogsSchema,
});

const fetchLogs = async ({
  pageParam,
  queryKey: [{ apiKey, namespace, instance, route, activity, trace }],
}: QueryFunctionContext<
  ReturnType<(typeof logKeys)["detail"]>,
  LogsQueryParams["before"]
>) =>
  getLogs({
    apiKey,
    urlParams: {
      namespace,
      instance,
      route,
      activity,
      before: pageParam,
      trace,
    },
  });

export type UseLogsStreamParams = LogsQueryParams & { enabled?: boolean };

export const useLogsStream = ({ enabled, ...params }: UseLogsStreamParams) => {
  const apiKey = useApiKey();
  const namespace = useNamespace();
  const queryClient = useQueryClient();

  if (!namespace) {
    throw new Error("namespace is undefined");
  }

  return useStreaming({
    url: getUrl({
      useStreaming: true,
      namespace,
      ...params,
    }),
    apiKey: apiKey ?? undefined,
    schema: LogEntrySchema,
    enabled,
    onMessage: (msg) => {
      queryClient.setQueryData<LogsCache>(
        logKeys.detail(namespace, {
          apiKey: apiKey ?? undefined,
          activity: params.activity,
          instance: params.instance,
          route: params.route,
          trace: params.trace,
        }),
        (oldData) => updateCache(oldData, msg)
      );
    },
  });
};

export type UseLogsParams = LogsQueryParams & { enabled?: boolean };

export const useLogs = ({
  instance,
  route,
  activity,
  trace,
  enabled = true,
}: UseLogsParams = {}) => {
  const apiKey = useApiKey();
  const namespace = useNamespace();

  if (!namespace) {
    throw new Error("namespace is undefined");
  }

  /**
   * The API returns data as an infinite list, which means it returns a cursor in form of a timestamp
   * to the next page of data. The end of the list is not known until the last page is reached and
   * the cursor is null.
   *
   * The API only returns navigation into one direction, which means we always have to start with querying
   * the most recent logs and then navigate to older ones. It is not possible to start at a specific time
   * and then move to more recent logs.
   */
  const queryReturn = useInfiniteQueryWithPermissions({
    queryKey: logKeys.detail(namespace, {
      apiKey: apiKey ?? undefined,
      instance,
      route,
      activity,
      trace,
    }),
    queryFn: fetchLogs,
    getNextPageParam: (firstPage) => firstPage.meta?.previousPage,
    enabled: !!namespace && enabled,
    initialPageParam: undefined,
    refetchOnWindowFocus: false,
  });

  /**
   * expose a simpler data structure to the consumer of the hook by stripping
   * out the pages and flattening the data into a single array
   */
  let logData: LogEntryType[] | undefined = undefined;
  if (queryReturn.data) {
    const pagesReversed = [...queryReturn.data.pages].reverse();
    const pages = pagesReversed.map((page) => page.data ?? []) ?? [];
    logData = pages.flat();
  }

  return {
    ...queryReturn,
    data: logData,
  };
};