NodeRedis/node-redis

View on GitHub
packages/client/lib/sentinel/index.spec.ts

Summary

Maintainability
F
2 wks
Test Coverage
import { strict as assert } from 'node:assert';
import { setTimeout } from 'node:timers/promises';
import { WatchError } from "../errors";
import { RedisSentinelConfig, SentinelFramework } from "./test-util";
import { RedisNode, RedisSentinelClientType, RedisSentinelEvent, RedisSentinelType } from "./types";
import { RedisSentinelFactory } from '.';
import { RedisClientType } from '../client';
import { RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping, NumberReply } from '../RESP/types';

import { promisify } from 'node:util';
import { exec } from 'node:child_process';
import { RESP_TYPES } from '../RESP/decoder';
import { defineScript } from '../lua-script';
import { MATH_FUNCTION } from '../commands/FUNCTION_LOAD.spec';
import RedisBloomModules from '@redis/bloom';
import { RedisTcpSocketOptions } from '../client/socket';
import { SQUARE_SCRIPT } from '../client/index.spec';

const execAsync = promisify(exec);

/* used to ensure test environment resets to normal state
   i.e. 
   - all redis nodes are active and are part of the topology
   before allowing things to continue.
*/

async function steadyState(frame: SentinelFramework) {
  let checkedMaster = false;
  let checkedReplicas = false;
  while (!checkedMaster || !checkedReplicas) {
    if (!checkedMaster) {
      const master = await frame.sentinelMaster();
      if (master?.flags === 'master') {
        checkedMaster = true;
      }
    }

    if (!checkedReplicas) {
      const replicas = (await frame.sentinelReplicas());
      checkedReplicas = true;
      for (const replica of replicas!) {
        checkedReplicas &&= (replica.flags === 'slave');
      }
    }
  }

  let nodeResolve, nodeReject;
  const nodePromise = new Promise((res, rej) => {
    nodeResolve = res;
    nodeReject = rej;
  })

  const seenNodes = new Set<number>();
  let sentinel: RedisSentinelType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> | undefined;
  const tracer = [];

  try {
    sentinel = frame.getSentinelClient({ replicaPoolSize: 1, scanInterval: 2000 }, false)
      .on('topology-change', (event: RedisSentinelEvent) => {
        if (event.type == "MASTER_CHANGE" || event.type == "REPLICA_ADD") {
          seenNodes.add(event.node.port);
          if (seenNodes.size == frame.getAllNodesPort().length) {
            nodeResolve();
          }
        }
      }).on('error', err => { });
    sentinel.setTracer(tracer);
    await sentinel.connect();

    await nodePromise;
    
    await sentinel.flushAll();
  } finally {
    if (sentinel !== undefined) {
      sentinel.destroy();
    }
  }
}

["redis-sentinel-test-password", undefined].forEach(function (password) {
  describe.skip(`Sentinel - password = ${password}`, () => {
    const config: RedisSentinelConfig = { sentinelName: "test", numberOfNodes: 3, password: password };
    const frame = new SentinelFramework(config);
    let tracer = new Array<string>();
    let stopMeasuringBlocking = false;
    let longestDelta = 0;
    let longestTestDelta = 0;
    let last: number;
  
    before(async function () {
      this.timeout(15000);
  
      last = Date.now();
  
      function deltaMeasurer() {
        const delta = Date.now() - last;
        if (delta > longestDelta) {
          longestDelta = delta;
        }
        if (delta > longestTestDelta) {
          longestTestDelta = delta;
        }
        if (!stopMeasuringBlocking) {
          last = Date.now();
          setImmediate(deltaMeasurer);
        }
      }

      setImmediate(deltaMeasurer);

      await frame.spawnRedisSentinel();
    });
  
    after(async function () {
      this.timeout(15000);
  
      stopMeasuringBlocking = true;
  
      await frame.cleanup();
    })
  
    describe('Sentinel Client', function () {
      let sentinel: RedisSentinelType<    RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> | undefined;
  
      beforeEach(async function () {
        this.timeout(0);
  
        await frame.getAllRunning();

        await steadyState(frame);
        longestTestDelta = 0;
      })
  
      afterEach(async function () {
        this.timeout(30000);

        // avoid errors in afterEach that end testing
        if (sentinel !== undefined) {
          sentinel.on('error', () => { });
        }
 
        if (this!.currentTest!.state === 'failed') {          
          console.log(`longest event loop blocked delta: ${longestDelta}`);
          console.log(`longest event loop blocked in failing test: ${longestTestDelta}`);
          console.log("trace:");
          for (const line of tracer) {
            console.log(line);
          }
          console.log(`sentinel object state:`)
          console.log(`master: ${JSON.stringify(sentinel?.getMasterNode())}`)
          console.log(`replicas: ${JSON.stringify(sentinel?.getReplicaNodes().entries)}`)
          const results = await Promise.all([
            frame.sentinelSentinels(),
            frame.sentinelMaster(),
            frame.sentinelReplicas()
          ])
          console.log(`sentinel sentinels:\n${JSON.stringify(results[0], undefined, '\t')}`);
          console.log(`sentinel master:\n${JSON.stringify(results[1], undefined, '\t')}`);
          console.log(`sentinel replicas:\n${JSON.stringify(results[2], undefined, '\t')}`);
          const { stdout, stderr } = await execAsync("docker ps -a");
          console.log(`docker stdout:\n${stdout}`);

          const ids = frame.getAllDockerIds();
          console.log("docker logs");

          for (const [id, port] of ids) {
            console.log(`${id}/${port}\n`);
            const { stdout, stderr } = await execAsync(`docker logs ${id}`, {maxBuffer: 8192 * 8192 * 4});
            console.log(stdout);
          }
        }
        tracer.length = 0;
  
        if (sentinel !== undefined) {
          await sentinel.destroy();
          sentinel = undefined;
        }
      })
  
      it('basic bootstrap', async function () {
        sentinel = frame.getSentinelClient();
        await sentinel.connect();

        await assert.doesNotReject(sentinel.set('x', 1));

      });
  
      it('basic teardown worked', async function () {
        const nodePorts = frame.getAllNodesPort();
        const sentinelPorts = frame.getAllSentinelsPort();
  
        assert.notEqual(nodePorts.length, 0);
        assert.notEqual(sentinelPorts.length, 0);
  
        sentinel = frame.getSentinelClient();
        await sentinel.connect();
  
        await assert.doesNotReject(sentinel.get('x'));
      });
  
      it('try to connect multiple times', async function () {
        sentinel = frame.getSentinelClient();
        const connectPromise = sentinel.connect();
        await assert.rejects(sentinel.connect());
        await connectPromise;
      });
  
      it('with type mapping', async function () {
        const commandOptions = {
          typeMapping: {
            [RESP_TYPES.SIMPLE_STRING]: Buffer
          }
        }
        sentinel = frame.getSentinelClient({ commandOptions: commandOptions });
        await sentinel.connect();
  
        const resp = await sentinel.ping();
        assert.deepEqual(resp, Buffer.from('PONG'))
      })
  
      it('with a script', async function () {  
        const options = {
          scripts: {
            square: SQUARE_SCRIPT
          }
        }
  
        sentinel = frame.getSentinelClient(options);
        await sentinel.connect();
  
        const [, reply] = await Promise.all([
          sentinel.set('key', '2'),
          sentinel.square('key')
        ]);
  
        assert.equal(reply, 4);
      })

      it('multi with a script', async function () {
        const options = {
          scripts: {
            square: SQUARE_SCRIPT
          }
        }
  
        sentinel = frame.getSentinelClient(options);
        await sentinel.connect();

        const reply = await sentinel.multi().set('key', 2).square('key').exec();

        assert.deepEqual(reply, ['OK', 4]);
      })
  
      it('with a function', async function () {
        const options = {
          functions: {
            math: MATH_FUNCTION.library
          }
        }
        sentinel = frame.getSentinelClient(options);
        await sentinel.connect();
  
        await sentinel.functionLoad(
          MATH_FUNCTION.code,
          { REPLACE: true }
        );
  
        await sentinel.set('key', '2');
        const resp = await sentinel.math.square('key');
  
        assert.equal(resp, 4);
      })

      it('multi with a function', async function () {
        const options = {
          functions: {
            math: MATH_FUNCTION.library
          }
        }
        sentinel = frame.getSentinelClient(options);
        await sentinel.connect();
  
        await sentinel.functionLoad(
          MATH_FUNCTION.code,
          { REPLACE: true }
        );

        const reply = await sentinel.multi().set('key', 2).math.square('key').exec();
        assert.deepEqual(reply, ['OK', 4]);
      })
  
      it('with a module', async function () {
        const options = {
          modules: RedisBloomModules
        }
        sentinel = frame.getSentinelClient(options);
        await sentinel.connect();
  
        const resp = await sentinel.bf.add('key', 'item')
        assert.equal(resp, true);
      })

      it('multi with a module', async function () {
        const options = {
          modules: RedisBloomModules
        }
        sentinel = frame.getSentinelClient(options);
        await sentinel.connect();
  
        const resp = await sentinel.multi().bf.add('key', 'item').exec();
        assert.deepEqual(resp, [true]);
      })
  
      it('many readers', async function () {
        this.timeout(10000);
  
        sentinel = frame.getSentinelClient({ replicaPoolSize: 8 });
        await sentinel.connect();
  
        await sentinel.set("x", 1);
        for (let i = 0; i < 10; i++) {
          if (await sentinel.get("x") == "1") {
            break;
          }
          await setTimeout(1000);
        }
  
        const promises: Array<Promise<string | null>> = [];
        for (let i = 0; i < 500; i++) {
          promises.push(sentinel.get("x"));
        }
  
        const resp = await Promise.all(promises);
        assert.equal(resp.length, 500);
        for (let i = 0; i < 500; i++) {
          assert.equal(resp[i], "1", `failed on match at ${i}`);
        }
      });
  
      it('use', async function () {
        this.timeout(30000);
  
        sentinel = frame.getSentinelClient({ replicaPoolSize: 1 });
        sentinel.on("error", () => { });
        await sentinel.connect();
  
        await sentinel.use(
          async (client: RedisSentinelClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>, ) => {
            const masterNode = sentinel!.getMasterNode();
            await frame.stopNode(masterNode!.port.toString());
            await assert.doesNotReject(client.get('x'));
          }
        );
      });
  
      it('use with script', async function () {
        this.timeout(10000);
  
        const options = {
          scripts: {
            square: SQUARE_SCRIPT
          }
        }
  
        sentinel = frame.getSentinelClient(options);
        await sentinel.connect();
  
        const reply = await sentinel.use(
          async (client: RedisSentinelClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>) => {
            assert.equal(await client.set('key', '2'), 'OK');
            assert.equal(await client.get('key'), '2');
            return client.square('key')
          }
        );
  
        assert.equal(reply, 4);
      })
  
      it('use with a function', async function () {
        this.timeout(10000);
  
        const options = {
          functions: {
            math: MATH_FUNCTION.library
          }
        }
        sentinel = frame.getSentinelClient(options);
        await sentinel.connect();
  
        await sentinel.functionLoad(
          MATH_FUNCTION.code,
          { REPLACE: true }
        );
  
        const reply = await sentinel.use(
          async (client: RedisSentinelClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>) => {
            await client.set('key', '2');
            return client.math.square('key');
          }
        );
  
        assert.equal(reply, 4);
      })
  
      it('use with a module', async function () {
        const options = {
          modules: RedisBloomModules
        }
        sentinel = frame.getSentinelClient(options);
        await sentinel.connect();
  
        const reply = await sentinel.use(
          async (client: RedisSentinelClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping>) => {
            return client.bf.add('key', 'item');
          }
        );
  
        assert.equal(reply, true);
      })
  
      it('block on pool', async function () {
        this.timeout(30000);
  
        sentinel = frame.getSentinelClient({ replicaPoolSize: 1 });
        sentinel.on("error", () => { });
        await sentinel.connect();

        const promise = sentinel.use(
          async client => {
            await setTimeout(1000);
            return await client.get("x");
          }
        )
        
        await sentinel.set("x", 1);
        assert.equal(await promise, null);
      });

      it('reserve client, takes a client out of pool', async function () {
        this.timeout(30000);
  
        sentinel = frame.getSentinelClient({ masterPoolSize: 2, reserveClient: true });
        await sentinel.connect();

        const promise1 = sentinel.use(
          async client => {
            const val = await client.get("x");
            await client.set("x", 2);
            return val;
          }
        )

        const promise2 = sentinel.use(
          async client => {
            return client.get("x");
          }
        )

        await sentinel.set("x", 1);
        assert.equal(await promise1, "1");
        assert.equal(await promise2, "2");
      })
  
      it('multiple clients', async function () {
        this.timeout(30000);
  
        sentinel = frame.getSentinelClient({ masterPoolSize: 2 });
        sentinel.on("error", () => { });
        await sentinel.connect();
  
        let set = false;
  
        const promise = sentinel.use(
          async client => {
            await sentinel!.set("x", 1);
            await client.get("x");
          }
        )
  
        await assert.doesNotReject(promise);
      });
  
      // by taking a lease, we know we will block on master as no clients are available, but as read occuring, means replica read occurs
      it('replica reads', async function () {
        this.timeout(30000);
  
        sentinel = frame.getSentinelClient({ replicaPoolSize: 1 });
        sentinel.on("error", () => { });
        await sentinel.connect();
  
        const clientLease = await sentinel.aquire();
        clientLease.set('x', 456);
  
        let matched = false;
        /* waits for replication */
        for (let i = 0; i < 15; i++) {
          try {
            assert.equal(await sentinel.get("x"), '456');
            matched = true;
            break;
          } catch (err) {
            await setTimeout(1000);
          }
        }
  
        clientLease.release();
  
        assert.equal(matched, true);
      });
  
      it('pipeline', async function () {
        this.timeout(30000);
  
        sentinel = frame.getSentinelClient({ replicaPoolSize: 1 });
        await sentinel.connect();
  
        const resp = await sentinel.multi().set('x', 1).get('x').execAsPipeline();
  
        assert.deepEqual(resp, ['OK', '1']);
      })
  
      it('use - watch - clean', async function () {
        this.timeout(30000);
  
        sentinel = frame.getSentinelClient({ masterPoolSize: 2 });
        await sentinel.connect();
  
        let promise = sentinel.use(async (client) => {
          await client.set("x", 1);
          await client.watch("x");
          return client.multi().get("x").exec();
        });
  
        assert.deepEqual(await promise, ['1']);
      });
  
      it('use - watch - dirty', async function () {
        this.timeout(30000);
  
        sentinel = frame.getSentinelClient({ masterPoolSize: 2 });
        await sentinel.connect();
  
        let promise = sentinel.use(async (client) => {
          await client.set('x', 1);
          await client.watch('x');
          await sentinel!.set('x', 2);
          return client.multi().get('x').exec();
        });
  
        await assert.rejects(promise, new WatchError());
      });
  
      it('lease - watch - clean', async function () {
        sentinel = frame.getSentinelClient({ masterPoolSize: 2 });
        await sentinel.connect();
  
        const leasedClient = await sentinel.aquire();
        await leasedClient.set('x', 1);
        await leasedClient.watch('x');
        assert.deepEqual(await leasedClient.multi().get('x').exec(), ['1'])
      });
  
      it('lease - watch - dirty', async function () {
        sentinel = frame.getSentinelClient({ masterPoolSize: 2 });
        await sentinel.connect();
  
        const leasedClient = await sentinel.aquire();
        await leasedClient.set('x', 1);
        await leasedClient.watch('x');
        await leasedClient.set('x', 2);
  
        await assert.rejects(leasedClient.multi().get('x').exec(), new WatchError());
      });
  
  
      it('watch does not carry through leases', async function () {
        this.timeout(10000);
        sentinel = frame.getSentinelClient();
        await sentinel.connect();
 
        // each of these commands is an independent lease
        assert.equal(await sentinel.use(client => client.watch("x")), 'OK')
        assert.equal(await sentinel.use(client => client.set('x', 1)), 'OK');
        assert.deepEqual(await sentinel.use(client => client.multi().get('x').exec()), ['1']);
      });
  
      // stops master to force sentinel to update 
      it('stop master', async function () {
        this.timeout(30000);
  
        sentinel = frame.getSentinelClient();
        sentinel.setTracer(tracer);
        sentinel.on("error", () => { });
        await sentinel.connect();
  
        tracer.push(`connected`);
  
        let masterChangeResolve;
        const masterChangePromise = new Promise((res) => {
          masterChangeResolve = res;
        })
  
        const masterNode = await sentinel.getMasterNode();
        sentinel.on('topology-change', (event: RedisSentinelEvent) => {
          tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
          if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) {
            tracer.push(`got expected master change event`);
            masterChangeResolve(event.node);
          }
        });
  
        tracer.push(`stopping master node`);
        await frame.stopNode(masterNode!.port.toString());
        tracer.push(`stopped master node`);
  
        tracer.push(`waiting on master change promise`);
        const newMaster = await masterChangePromise as RedisNode;
        tracer.push(`got new master node of ${newMaster.port}`);
        assert.notEqual(masterNode!.port, newMaster.port);
      });
  
      // if master changes, client should make sure user knows watches are invalid
      it('watch across master change', async function () {
        this.timeout(30000);
  
        sentinel = frame.getSentinelClient({ masterPoolSize: 2 });
        sentinel.setTracer(tracer);
        sentinel.on("error", () => { });
        await sentinel.connect();
  
        tracer.push("connected");
  
        const client = await sentinel.aquire();
        tracer.push("aquired lease");
  
        await client.set("x", 1);
        await client.watch("x");
  
        tracer.push("did a watch on lease");
  
        let resolve;
        const promise = new Promise((res) => {
          resolve = res;
        })
  
        const masterNode = sentinel.getMasterNode();
        tracer.push(`got masterPort as ${masterNode!.port}`);
  
        sentinel.on('topology-change', (event: RedisSentinelEvent) => {
          tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
          if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) {
            tracer.push("resolving promise");
            resolve(event.node);
          }
        });
  
        tracer.push("stopping master node");
        await frame.stopNode(masterNode!.port.toString());
        tracer.push("stopped master node and waiting on promise");
  
        const newMaster = await promise as RedisNode;
        tracer.push(`promise returned, newMaster = ${JSON.stringify(newMaster)}`);
        assert.notEqual(masterNode!.port, newMaster.port);
        tracer.push(`newMaster does not equal old master`);
  
        tracer.push(`waiting to assert that a multi/exec now fails`);
        await assert.rejects(async () => { await client.multi().get("x").exec() }, new Error("sentinel config changed in middle of a WATCH Transaction"));
        tracer.push(`asserted that a multi/exec now fails`);
      });
  
      // same as above, but set a watch before and after master change, shouldn't change the fact that watches are invalid
      it('watch before and after master change', async function () {
        this.timeout(30000);
  
        sentinel = frame.getSentinelClient({ masterPoolSize: 2 });
        sentinel.setTracer(tracer);
        sentinel.on("error", () => { });
        await sentinel.connect();
        tracer.push("connected");
  
        const client = await sentinel.aquire();
        tracer.push("got leased client");
        await client.set("x", 1);
        await client.watch("x");
  
        tracer.push("set and watched x");
  
        let resolve;
        const promise = new Promise((res) => {
          resolve = res;
        })
  
        const masterNode = sentinel.getMasterNode();
        tracer.push(`initial masterPort = ${masterNode!.port} `);
  
        sentinel.on('topology-change', (event: RedisSentinelEvent) => {
          tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
          if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) {
            tracer.push("got a master change event that is not the same as before");
            resolve(event.node);
          }
        });
  
        tracer.push("stopping master");
        await frame.stopNode(masterNode!.port.toString());
        tracer.push("stopped master");
  
        tracer.push("waiting on master change promise");
        const newMaster = await promise as RedisNode;
        tracer.push(`got master change port as ${newMaster.port}`);
        assert.notEqual(masterNode!.port, newMaster.port);
  
        tracer.push("watching again, shouldn't matter");
        await client.watch("y");
  
        tracer.push("expecting multi to be rejected");
        await assert.rejects(async () => { await client.multi().get("x").exec() }, new Error("sentinel config changed in middle of a WATCH Transaction"));
        tracer.push("multi was rejected");
      });
  
      it('plain pubsub - channel', async function () {
        this.timeout(30000);
  
        sentinel = frame.getSentinelClient();
        sentinel.setTracer(tracer);
        await sentinel.connect();
        tracer.push(`connected`);
  
        let pubSubResolve;
        const pubSubPromise = new Promise((res) => {
          pubSubResolve = res;
        });
  
        let tester = false;
        await sentinel.subscribe('test', () => {
          tracer.push(`got pubsub message`);
          tester = true;
          pubSubResolve(1);
        })
  
        tracer.push(`publishing pubsub message`);
        await sentinel.publish('test', 'hello world');
        tracer.push(`waiting on pubsub promise`);
        await pubSubPromise;
        tracer.push(`got pubsub promise`);
        assert.equal(tester, true);
  
        // now unsubscribe
        tester = false
        tracer.push(`unsubscribing pubsub listener`);
        await sentinel.unsubscribe('test')
        tracer.push(`pubishing pubsub message`);
        await sentinel.publish('test', 'hello world');
        await setTimeout(1000);
  
        tracer.push(`ensuring pubsub was unsubscribed via an assert`);
        assert.equal(tester, false);
      });
  
      it('plain pubsub - pattern', async function () {
        this.timeout(30000);
  
        sentinel = frame.getSentinelClient();
        sentinel.setTracer(tracer);
        await sentinel.connect();
        tracer.push(`connected`);
  
        let pubSubResolve;
        const pubSubPromise = new Promise((res) => {
          pubSubResolve = res;
        });
  
        let tester = false;
        await sentinel.pSubscribe('test*', () => {
          tracer.push(`got pubsub message`);
          tester = true;
          pubSubResolve(1);
        })
  
        tracer.push(`publishing pubsub message`);
        await sentinel.publish('testy', 'hello world');
        tracer.push(`waiting on pubsub promise`);
        await pubSubPromise;
        tracer.push(`got pubsub promise`);
        assert.equal(tester, true);
  
        // now unsubscribe
        tester = false
        tracer.push(`unsubscribing pubsub listener`);
        await sentinel.pUnsubscribe('test*');
        tracer.push(`pubishing pubsub message`);
        await sentinel.publish('testy', 'hello world');
        await setTimeout(1000);
  
        tracer.push(`ensuring pubsub was unsubscribed via an assert`);
        assert.equal(tester, false);
      });
  
      // pubsub continues to work, even with a master change
      it('pubsub - channel - with master change', async function () {
        this.timeout(30000);
  
        sentinel = frame.getSentinelClient();
        sentinel.setTracer(tracer);
        sentinel.on("error", () => { });
        await sentinel.connect();
        tracer.push(`connected`);
  
        let pubSubResolve;
        const pubSubPromise = new Promise((res) => {
          pubSubResolve = res;
        })
  
        let tester = false;
        await sentinel.subscribe('test', () => {
          tracer.push(`got pubsub message`);
          tester = true;
          pubSubResolve(1);
        })
  
        let masterChangeResolve;
        const masterChangePromise = new Promise((res) => {
          masterChangeResolve = res;
        })
  
        const masterNode = sentinel.getMasterNode();
        tracer.push(`got masterPort as ${masterNode!.port}`);
  
        sentinel.on('topology-change', (event: RedisSentinelEvent) => {
          tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
          if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) {
            tracer.push("got a master change event that is not the same as before");
            masterChangeResolve(event.node);
          }
        });
  
        tracer.push("stopping master");
        await frame.stopNode(masterNode!.port.toString());
        tracer.push("stopped master and waiting on change promise");
  
        const newMaster = await masterChangePromise as RedisNode;
        tracer.push(`got master change port as ${newMaster.port}`);
        assert.notEqual(masterNode!.port, newMaster.port);
  
        tracer.push(`publishing pubsub message`);
        await sentinel.publish('test', 'hello world');
        tracer.push(`published pubsub message and waiting pn pubsub promise`);
        await pubSubPromise;
        tracer.push(`got pubsub promise`);
  
        assert.equal(tester, true);
  
        // now unsubscribe
        tester = false
        await sentinel.unsubscribe('test')
        await sentinel.publish('test', 'hello world');
        await setTimeout(1000);
  
        assert.equal(tester, false);
      });
  
      it('pubsub - pattern - with master change', async function () {
        this.timeout(30000);
  
        sentinel = frame.getSentinelClient();
        sentinel.setTracer(tracer);
        sentinel.on("error", () => { });
        await sentinel.connect();
        tracer.push(`connected`);
  
        let pubSubResolve;
        const pubSubPromise = new Promise((res) => {
          pubSubResolve = res;
        })
  
        let tester = false;
        await sentinel.pSubscribe('test*', () => {
          tracer.push(`got pubsub message`);
          tester = true;
          pubSubResolve(1);
        })
  
        let masterChangeResolve;
        const masterChangePromise = new Promise((res) => {
          masterChangeResolve = res;
        })
  
        const masterNode = sentinel.getMasterNode();
        tracer.push(`got masterPort as ${masterNode!.port}`);
  
        sentinel.on('topology-change', (event: RedisSentinelEvent) => {
          tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
          if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) {
            tracer.push("got a master change event that is not the same as before");
            masterChangeResolve(event.node);
          }
        });
  
        tracer.push("stopping master");
        await frame.stopNode(masterNode!.port.toString());
        tracer.push("stopped master and waiting on master change promise");
  
        const newMaster = await masterChangePromise as RedisNode;
        tracer.push(`got master change port as ${newMaster.port}`);
        assert.notEqual(masterNode!.port, newMaster.port);
  
        tracer.push(`publishing pubsub message`);
        await sentinel.publish('testy', 'hello world');
        tracer.push(`published pubsub message and waiting on pubsub promise`);
        await pubSubPromise;
        tracer.push(`got pubsub promise`);
        assert.equal(tester, true);
  
        // now unsubscribe
        tester = false
        await sentinel.pUnsubscribe('test*');
        await sentinel.publish('testy', 'hello world');
        await setTimeout(1000);
  
        assert.equal(tester, false);
      });
  
      // if we stop a node, the comand should "retry" until we reconfigure topology and execute on new topology
      it('command immeaditely after stopping master', async function () {
        this.timeout(30000);
  
        sentinel = frame.getSentinelClient();
        sentinel.setTracer(tracer);
        sentinel.on("error", () => { });
        await sentinel.connect();
  
        tracer.push("connected");
  
        let masterChangeResolve;
        const masterChangePromise = new Promise((res) => {
          masterChangeResolve = res;
        })
  
        const masterNode = sentinel.getMasterNode();
        tracer.push(`original master port = ${masterNode!.port}`);
  
        let changeCount = 0;
        sentinel.on('topology-change', (event: RedisSentinelEvent) => {
          tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
          if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) {
            changeCount++;
            tracer.push(`got topology-change event we expected`);
            masterChangeResolve(event.node);
          }
        });
  
        tracer.push(`stopping masterNode`);
        await frame.stopNode(masterNode!.port.toString());
        tracer.push(`stopped masterNode`);
        assert.equal(await sentinel.set('x', 123), 'OK');
        tracer.push(`did the set operation`);
        const presumamblyNewMaster = sentinel.getMasterNode();
        tracer.push(`new master node seems to be ${presumamblyNewMaster?.port} and waiting on master change promise`);
  
        const newMaster = await masterChangePromise as RedisNode;
        tracer.push(`got new masternode event saying master is at ${newMaster.port}`);
        assert.notEqual(masterNode!.port, newMaster.port);
  
        tracer.push(`doing the get`);
        const val = await sentinel.get('x');
        tracer.push(`did the get and got ${val}`);
        const newestMaster = sentinel.getMasterNode()
        tracer.push(`after get, we see master as ${newestMaster?.port}`);
  
        switch (changeCount) {
          case 1:
            // if we only changed masters once, we should have the proper value
            assert.equal(val, '123');
            break;
          case 2:
            // we changed masters twice quickly, so probably didn't replicate
            // therefore, this is soewhat flakey, but the above is the common case
            assert(val == '123' || val == null);
            break;
          default:
            assert(false, "unexpected case");
        }
      });
  
      it('shutdown sentinel node', async function () {
        this.timeout(30000);
  
        sentinel = frame.getSentinelClient();
        sentinel.setTracer(tracer);
        sentinel.on("error", () => { });
        await sentinel.connect();
        tracer.push("connected");
  
        let sentinelChangeResolve;
        const sentinelChangePromise = new Promise((res) => {
          sentinelChangeResolve = res;
        })
  
        const sentinelNode = sentinel.getSentinelNode();
        tracer.push(`sentinelNode = ${sentinelNode?.port}`)
  
        sentinel.on('topology-change', (event: RedisSentinelEvent) => {
          tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
          if (event.type === "SENTINEL_CHANGE") {
            tracer.push("got sentinel change event");
            sentinelChangeResolve(event.node);
          }
        });
  
        tracer.push("Stopping sentinel node");
        await frame.stopSentinel(sentinelNode!.port.toString());
        tracer.push("Stopped sentinel node and waiting on sentinel change promise");
        const newSentinel = await sentinelChangePromise as RedisNode;
        tracer.push("got sentinel change promise");
        assert.notEqual(sentinelNode!.port, newSentinel.port);
      });
  
      it('timer works, and updates sentinel list', async function () {
        this.timeout(30000);
  
        sentinel = frame.getSentinelClient({ scanInterval: 1000 });
        sentinel.setTracer(tracer);
        await sentinel.connect();
        tracer.push("connected");
  
        let sentinelChangeResolve;
        const sentinelChangePromise = new Promise((res) => {
          sentinelChangeResolve = res;
        })
  
        sentinel.on('topology-change', (event: RedisSentinelEvent) => {
          tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
          if (event.type === "SENTINE_LIST_CHANGE" && event.size == 4) {
            tracer.push(`got sentinel list change event with right size`);
            sentinelChangeResolve(event.size);
          }
        });
  
        tracer.push(`adding sentinel`);
        await frame.addSentinel();
        tracer.push(`added sentinel and waiting on sentinel change promise`);
        const newSentinelSize = await sentinelChangePromise as number;
  
        assert.equal(newSentinelSize, 4);
      });
  
      it('stop replica, bring back replica', async function () {
        this.timeout(30000);
  
        sentinel = frame.getSentinelClient({ replicaPoolSize: 1 });
        sentinel.setTracer(tracer);
        sentinel.on('error', err => { });
        await sentinel.connect();
        tracer.push("connected");
  
        let sentinelRemoveResolve;
        const sentinelRemovePromise = new Promise((res) => {
          sentinelRemoveResolve = res;
        })
  
        const replicaPort = await frame.getRandonNonMasterNode();

        sentinel.on('topology-change', (event: RedisSentinelEvent) => {
          tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
          if (event.type === "REPLICA_REMOVE") {
            if (event.node.port.toString() == replicaPort) {
              tracer.push("got expected replica removed event");
              sentinelRemoveResolve(event.node);
            } else {
              tracer.push(`got replica removed event for a different node: ${event.node.port}`);
            }
          }
        });
  
        tracer.push(`replicaPort = ${replicaPort} and stopping it`);
        await frame.stopNode(replicaPort);
        tracer.push("stopped replica and waiting on sentinel removed promise");
        const stoppedNode = await sentinelRemovePromise as RedisNode;
        tracer.push("got removed promise");
        assert.equal(stoppedNode.port, Number(replicaPort));
  
        let sentinelRestartedResolve;
        const sentinelRestartedPromise = new Promise((res) => {
          sentinelRestartedResolve = res;
        })
  
        sentinel.on('topology-change', (event: RedisSentinelEvent) => {
          tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
          if (event.type === "REPLICA_ADD") {
            tracer.push("got replica added event");
            sentinelRestartedResolve(event.node);
          }
        });
  
        tracer.push("restarting replica");
        await frame.restartNode(replicaPort);
        tracer.push("restarted replica and waiting on restart promise");
        const restartedNode = await sentinelRestartedPromise as RedisNode;
        tracer.push("got restarted promise");
        assert.equal(restartedNode.port, Number(replicaPort));
      })
  
      it('add a node / new replica', async function () {
        this.timeout(30000);
  
        sentinel = frame.getSentinelClient({ scanInterval: 2000, replicaPoolSize: 1 });
        sentinel.setTracer(tracer);
        // need to handle errors, as the spawning a new docker node can cause existing connections to time out
        sentinel.on('error', err => { });
        await sentinel.connect();
        tracer.push("connected");
  
        let nodeAddedResolve: (value: RedisNode) => void;
        const nodeAddedPromise = new Promise((res) => {
          nodeAddedResolve = res as (value: RedisNode) => void;
        });
  
        const portSet = new Set<number>();
        for (const port of frame.getAllNodesPort()) {
          portSet.add(port);
        }
  
        // "on" and not "once" as due to connection timeouts, can happen multiple times, and want right one
        sentinel.on('topology-change', (event: RedisSentinelEvent) => {
          tracer.push(`got topology-change event: ${JSON.stringify(event)}`);
          if (event.type === "REPLICA_ADD") {
            if (!portSet.has(event.node.port)) {
              tracer.push("got expected replica added event");
              nodeAddedResolve(event.node);
            }
          }
        });
  
        tracer.push("adding node");
        await frame.addNode();
        tracer.push("added node and waiting on added promise");
        await nodeAddedPromise; 
      })
    })
  
    describe('Sentinel Factory', function () {
      let master: RedisClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> | undefined;
      let replica: RedisClientType<RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> | undefined;
  
      beforeEach(async function () {
        this.timeout(0);

        await frame.getAllRunning();
    
        await steadyState(frame);
        longestTestDelta = 0;
      })
  
      afterEach(async function () {
        if (this!.currentTest!.state === 'failed') {
          console.log(`longest event loop blocked delta: ${longestDelta}`);
          console.log(`longest event loop blocked in failing test: ${longestTestDelta}`);
          console.log("trace:");
          for (const line of tracer) {
            console.log(line);
          }
          const results = await Promise.all([
            frame.sentinelSentinels(),
            frame.sentinelMaster(),
            frame.sentinelReplicas()
          ])
          console.log(`sentinel sentinels:\n${JSON.stringify(results[0], undefined, '\t')}`);
          console.log(`sentinel master:\n${JSON.stringify(results[1], undefined, '\t')}`);
          console.log(`sentinel replicas:\n${JSON.stringify(results[2], undefined, '\t')}`);
          const { stdout, stderr } = await execAsync("docker ps -a");
          console.log(`docker stdout:\n${stdout}`);
          console.log(`docker stderr:\n${stderr}`);
        }
        tracer.length = 0;
  
        if (master !== undefined) {
          if (master.isOpen) {
            master.destroy();
          }
          master = undefined;
        }
  
        if (replica !== undefined) {
          if (replica.isOpen) {
            replica.destroy();
          }
          replica = undefined;
        }
      })
  
      it('sentinel factory - master', async function () {
        const sentinelPorts = frame.getAllSentinelsPort();
        const sentinels: Array<RedisNode> = [];
        for (const port of sentinelPorts) {
          sentinels.push({ host: "localhost", port: port });
        }
  
        const factory = new RedisSentinelFactory({ name: frame.config.sentinelName, sentinelRootNodes: sentinels, sentinelClientOptions: {password: password}, nodeClientOptions: {password: password} })
        await factory.updateSentinelRootNodes();
  
        master = await factory.getMasterClient();
        await master.connect();
  
        assert.equal(await master.set("x", 1), 'OK');
      })
  
      it('sentinel factory - replica', async function () {
        const sentinelPorts = frame.getAllSentinelsPort();
        const sentinels: Array<RedisNode> = [];
  
        for (const port of sentinelPorts) {
          sentinels.push({ host: "localhost", port: port });
        }
  
        const factory = new RedisSentinelFactory({ name: frame.config.sentinelName, sentinelRootNodes: sentinels, sentinelClientOptions: {password: password}, nodeClientOptions: {password: password} })
        await factory.updateSentinelRootNodes();
  
        const masterNode = await factory.getMasterNode();
        replica = await factory.getReplicaClient();
        const replicaSocketOptions = replica.options?.socket as unknown as RedisTcpSocketOptions | undefined;
        assert.notEqual(masterNode.port, replicaSocketOptions?.port)
      })
  
      it('sentinel factory - bad node', async function () {
        const factory = new RedisSentinelFactory({ name: frame.config.sentinelName, sentinelRootNodes: [{ host: "locahost", port: 1 }] });
        await assert.rejects(factory.updateSentinelRootNodes(), new Error("Couldn't connect to any sentinel node"));
      })
  
      it('sentinel factory - invalid db name', async function () {
        this.timeout(15000);
  
        const sentinelPorts = frame.getAllSentinelsPort();
        const sentinels: Array<RedisNode> = [];
  
        for (const port of sentinelPorts) {
          sentinels.push({ host: "localhost", port: port });
        }
  
        const factory = new RedisSentinelFactory({ name: "invalid-name", sentinelRootNodes: sentinels, sentinelClientOptions: {password: password}, nodeClientOptions: {password: password} })
        await assert.rejects(factory.updateSentinelRootNodes(), new Error("ERR No such master with that name"));
      })
  
      it('sentinel factory - no available nodes', async function () {
        this.timeout(15000);
  
        const sentinelPorts = frame.getAllSentinelsPort();
        const sentinels: Array<RedisNode> = [];
  
        for (const port of sentinelPorts) {
          sentinels.push({ host: "localhost", port: port });
        }
  
        const factory = new RedisSentinelFactory({ name: frame.config.sentinelName, sentinelRootNodes: sentinels, sentinelClientOptions: {password: password}, nodeClientOptions: {password: password} })
  
        for (const node of frame.getAllNodesPort()) {
          await frame.stopNode(node.toString());
        }
  
        await setTimeout(1000);
  
        await assert.rejects(factory.getMasterNode(), new Error("Master Node Not Enumerated"));
      })
    })
  })
});