cyclejs/cycle-core

View on GitHub
time/src/delay.ts

Summary

Maintainability
A
0 mins
Test Coverage
import xs, {Stream, Listener} from 'xstream';
import {adapt} from '@cycle/run/lib/adapt';
import {OperatorArgs} from './types';

function makeDelayListener<T>(
  schedule: any,
  currentTime: () => number,
  delayTime: number,
  listener: any
) {
  const delayedTime = () => currentTime() + delayTime;

  return {
    next(value: T) {
      schedule.next(listener, delayedTime(), value);
    },

    error(error: Error) {
      schedule.error(listener, delayedTime(), error);
    },

    complete() {
      schedule.complete(listener, delayedTime());
    },
  };
}

function makeDelay(createOperator: () => OperatorArgs<any>) {
  const {schedule, currentTime} = createOperator();

  return function delay(delayTime: number) {
    return function delayOperator<T>(inputStream: Stream<T>): Stream<T> {
      const stream = xs.fromObservable(inputStream);
      let delayListener: any = null;

      const producer = {
        start(listener: Listener<T>) {
          delayListener = makeDelayListener<T>(
            schedule,
            currentTime,
            delayTime,
            listener
          );

          stream.addListener(delayListener);
        },

        stop() {
          if (delayListener) {
            stream.removeListener(delayListener);
          }
        },
      };

      return adapt(xs.create<T>(producer));
    };
  };
}

export {makeDelay};