bokuweb/tsukiakari

View on GitHub
src/renderer/src/sagas/filter-stream.js

Summary

Maintainability
A
55 mins
Test Coverage
/* eslint-disable no-constant-condition */

import { eventChannel } from 'redux-saga';
import { fork, take, call, put, cancel, select } from 'redux-saga/effects';
import { recieveTweet } from '../actions/tweets';
import { ipcRenderer } from 'electron'; // eslint-disable-line import/no-unresolved
import log from '../lib/log';
import { normalize, Schema } from 'normalizr';
import { getFirstAccount, getFilterQueries } from './selector';
import type { Account } from '../../../types/account';
import type { Tweet } from '../../../types/tweet';
import type { Channel } from 'redux-saga/effects';
import type { EmitterFn } from 'redux-saga';

const tweetSchema = new Schema('tweets', { idAttribute: 'id_str' });

const subscribe = (account: Account) => (
  eventChannel((emit: EmitterFn) => {
    ipcRenderer.on('filterstream-tweet', (_: any, tweet: Tweet, queries: Array<string>) => {
      log.debug('Recieve serach tweet.');
      window.requestIdleCallback(() => {
        log.debug(tweet);
        for (const q of queries) {
          if (tweet.text.includes(q)) {
            emit(recieveTweet({
              tweet: normalize(tweet, tweetSchema), account, type: 'Search', q,
            }));
          }
        }
      }, { timeout: 20000 });
    });
    const unsubscribe = () => {
      ipcRenderer.removeAllListeners('filterstream-tweet');
    };
    return unsubscribe;
  })
);

function* addChannel(channel: Channel) {
  while (true) {
    const action = yield take(channel);
    yield put(action);
  }
}

function* watchDeleteColumnRequest() {
  while (true) {
    const { payload: { type } } = yield take('DELETE_COLUMN');
    if (type !== 'Search') continue;
    const account = yield select(getFirstAccount);
    log.debug('Reconnect filter stream because search column deleted');
    const q = yield select(getFilterQueries);
    log.debug('Filter Query', q);
    ipcRenderer.send('request-filterstream-connection', account, { q });
  }
}

export default function* watchConnectFilterStream() {
  let connection;
  yield fork(watchDeleteColumnRequest);
  while (true) {
    yield take('CONNECT_FILTER_STREAM');
    if (connection) {
      yield cancel(connection);
    }
    const account = yield select(getFirstAccount);
    const q = yield select(getFilterQueries);
    ipcRenderer.send('request-filterstream-connection', account, { q });
    const channel = yield call(subscribe, account);
    connection = yield fork(addChannel, channel);
  }
}