NodeRedis/node-redis

View on GitHub
examples/stream-consumer.js

Summary

Maintainability
A
0 mins
Test Coverage
// A sample stream consumer using the blocking variant of XREAD.
// https://redis.io/commands/xread/
// This consumes entries from a stream created by stream-producer.js

import { createClient, commandOptions } from 'redis';

const client = createClient();

await client.connect();

let currentId = '0-0'; // Start at lowest possible stream ID

while (true) {
  try {
    let response = await client.xRead(
      commandOptions({
        isolated: true
      }), [
        // XREAD can read from multiple streams, starting at a
        // different ID for each...
        {
          key: 'mystream',
          id: currentId
        }
      ], {
        // Read 1 entry at a time, block for 5 seconds if there are none.
        COUNT: 1,
        BLOCK: 5000
      }
    );

    if (response) {
      // Response is an array of streams, each containing an array of
      // entries:
      // [
      //   {
      //     "name": "mystream",
      //     "messages": [
      //       {
      //         "id": "1642088708425-0",
      //         "message": {
      //           "num": "999"
      //         }
      //       }
      //     ]
      //   }
      // ]
      console.log(JSON.stringify(response));

      // Get the ID of the first (only) entry returned.
      currentId = response[0].messages[0].id;
      console.log(currentId);
    } else {
      // Response is null, we have read everything that is
      // in the stream right now...
      console.log('No new stream entries.');
    }
  } catch (err) {
    console.error(err);
  }
}