examples/stream-consumer.js
// A sample stream consumer using the blocking variant of XREAD.
// https://redis.io/commands/xread/
// This consumes entries from a stream created by stream-producer.js
import { createClient, commandOptions } from 'redis';
const client = createClient();
await client.connect();
let currentId = '0-0'; // Start at lowest possible stream ID
while (true) {
try {
let response = await client.xRead(
commandOptions({
isolated: true
}), [
// XREAD can read from multiple streams, starting at a
// different ID for each...
{
key: 'mystream',
id: currentId
}
], {
// Read 1 entry at a time, block for 5 seconds if there are none.
COUNT: 1,
BLOCK: 5000
}
);
if (response) {
// Response is an array of streams, each containing an array of
// entries:
// [
// {
// "name": "mystream",
// "messages": [
// {
// "id": "1642088708425-0",
// "message": {
// "num": "999"
// }
// }
// ]
// }
// ]
console.log(JSON.stringify(response));
// Get the ID of the first (only) entry returned.
currentId = response[0].messages[0].id;
console.log(currentId);
} else {
// Response is null, we have read everything that is
// in the stream right now...
console.log('No new stream entries.');
}
} catch (err) {
console.error(err);
}
}