haraka/haraka-plugin-redis

View on GitHub
index.js

Summary

Maintainability
A
1 hr
Test Coverage
'use strict'
/* global server */

const redis = require('redis')

exports.register = function () {
  this.load_redis_ini()

  // another plugin has called us with: inherits('haraka-plugin-redis')
  if (this.name !== 'redis') return

  // register when 'redis' is declared in config/plugins
  this.register_hook('init_master', 'init_redis_shared')
  this.register_hook('init_child', 'init_redis_shared')
}

const defaultOpts = { socket: { host: '127.0.0.1', port: '6379' } }
const socketOpts = [
  'host',
  'port',
  'path',
  'tls',
  'connectTimeout',
  'noDelay',
  'keepAlive',
  'reconnectStrategy',
]

exports.load_redis_ini = function () {
  const plugin = this

  // store redis cfg at redisCfg, to avoid conflicting with plugins that
  // inherit this plugin and have *their* config at plugin.cfg
  plugin.redisCfg = plugin.config.get('redis.ini', () => {
    plugin.load_redis_ini()
  })

  // backwards compat
  if (plugin.redisCfg?.server?.ip && !plugin.redisCfg?.server?.host) {
    plugin.redisCfg.server.host = plugin.redisCfg.server.ip
    delete plugin.redisCfg.server.ip
  }
  if (plugin.redisCfg.db && !plugin.redisCfg.database) {
    plugin.redisCfg.database = plugin.redisCfg.db
    delete plugin.redisCfg.db
  }

  plugin.redisCfg.server = 
    { ...defaultOpts, ...plugin.redisCfg.opts, ...plugin.redisCfg.server };
  plugin.redisCfg.pubsub = 
    { ...defaultOpts, ...plugin.redisCfg.opts, ...plugin.redisCfg.pubsub };

  // socket options. In redis < 4, the options like host and port were
  // top level, now they're in socket.*. Permit legacy configs to still work
  for (const s of ['server', 'pubsub']) {
    for (const o of socketOpts) {
      if (plugin.redisCfg[s][o])
        plugin.redisCfg[s].socket[o] = plugin.redisCfg[s][o]
      delete plugin.redisCfg[s][o]
    }
  }
}

exports.merge_redis_ini = function () {
  if (!this.cfg) this.cfg = {} // no <plugin>.ini loaded?
  if (!this.cfg.redis) this.cfg.redis = {} // no [redis] in <plugin>.ini file
  if (!this.redisCfg) this.load_redis_ini()

  this.cfg.redis = Object.assign({}, this.redisCfg.server, this.cfg.redis)

  // backwards compatibility
  for (const o of socketOpts) {
    if (this.cfg.redis[o] === undefined) continue
    this.cfg.redis.socket[o] = this.cfg.redis[o]
    delete this.cfg.redis[o]
  }
  if (this.cfg.redis.db && !this.cfg.redis.database) {
    this.cfg.redis.database = this.cfg.redis.db
    delete this.cfg.redis.db
  }
}

exports.init_redis_shared = function (next, server) {
  let calledNext = false
  function nextOnce(e) {
    if (e) this.logerror(`Redis error: ${e.message}`)
    if (calledNext) return
    calledNext = true
    next()
  }

  // this is the server-wide redis, shared by plugins that don't
  // specify a db ID.
  if (!server.notes.redis) {
    this.get_redis_client(this.redisCfg.server).then((client) => {
      server.notes.redis = client
      nextOnce()
    })
    return
  }

  server.notes.redis.ping((err) => {
    if (err) return nextOnce(err)

    this.loginfo('already connected')
    nextOnce() // connection is good
  })
}

exports.init_redis_plugin = function (next, server) {
  const plugin = this

  // this function is called by plugins at init_*, to establish their
  // shared or unique redis db handle.

  let calledNext = false
  function nextOnce() {
    if (calledNext) return
    calledNext = true
    next()
  }

  // for tests that do not load a shared config
  if (!plugin.cfg) {
    plugin.cfg = { redis: {} }
    if (plugin.redisCfg)
      plugin.cfg.redis = JSON.parse(JSON.stringify(plugin.redisCfg))
  }
  if (!server) server = { notes: {} }

  const pidb = plugin.cfg.redis.database
  if (server.notes.redis) {
    // server-wide redis is available
    // and the DB not specified or is the same as server-wide
    if (pidb === undefined || pidb === plugin.redisCfg.db) {
      server.loginfo(plugin, 'using server.notes.redis')
      plugin.db = server.notes.redis
      nextOnce()
      return
    }
  }

  plugin.get_redis_client(plugin.cfg.redis).then((client) => {
    plugin.db = client
    nextOnce()
  })
}

exports.shutdown = function () {
  if (this.db) this.db.quit()

  if (server && server.notes && server.notes.redis) {
    server.notes.redis.quit()
  }
}

exports.redis_ping = async function () {
  this.redis_pings = false
  if (!this.db) throw new Error('redis not initialized')

  const r = await this.db.ping()
  if (r !== 'PONG') throw new Error('not PONG')
  this.redis_pings = true

  return true
}

function getUriStr(client, opts) {
  let msg = `redis://${opts?.socket?.host}:${opts?.socket?.port}`
  if (opts?.database) msg += `/${opts?.database}`
  if (client?.server_info?.redis_version) {
    msg += `\tv${client?.server_info?.redis_version}`
  }
  return msg
}

exports.get_redis_client = async function (opts) {
  const client = redis.createClient(opts)

  let urlStr

  client
    .on('error', (err) => {
      this.logerror(err.message)
    })
    .on('end', () => {
      this.loginfo(`Disconnected from ${urlStr}`)
    })

  try {
    await client.connect()

    if (opts?.database) client.dbid = opts?.database

    client.server_info = await client.info()
    urlStr = getUriStr(client, opts)
    this.loginfo(`connected to ${urlStr}`)

    return client
  } catch (e) {
    console.error(e)
    this.logerror(e)
  }
}

exports.get_redis_pub_channel = function (conn) {
  return `result-${conn.transaction ? conn.transaction.uuid : conn.uuid}`
}

exports.get_redis_sub_channel = function (conn) {
  return `result-${conn.uuid}*`
}

// formerly used by pi-watch
exports.redis_subscribe_pattern = async function (pattern, onMessage) {
  if (this.redis) return // already subscribed?

  this.redis = redis.createClient(this.redisCfg.pubsub)
  this.redis.on('error', (err) => {
    this.logerror(err.message)
  })
  await this.redis.connect()

  await this.redis.pSubscribe(pattern, onMessage)
  this.logdebug(this, `pSubscribed to ${pattern}`)
}

// the next two functions are use by pi-karma
exports.redis_subscribe = async function (connection, onMessage) {
  if (connection.notes.redis) {
    connection.logdebug(this, `redis already subscribed`)
    return // another plugin has already called this.
  }

  const timer = setTimeout(() => {
    connection.logerror('redis subscribe timed out')
  }, 3 * 1000)

  connection.notes.redis = redis.createClient(this.redisCfg.pubsub)
  connection.notes.redis.on('error', (err) => {
    this.logerror(err.message)
  })
  await connection.notes.redis.connect()

  clearTimeout(timer)

  const pattern = this.get_redis_sub_channel(connection)
  connection.notes.redis.pSubscribe(pattern, onMessage)
  connection.logdebug(this, `pSubscribed to ${pattern}`)
}

exports.redis_unsubscribe = async function (connection) {
  if (!connection.notes.redis) {
    connection.logerror(this, `redis_unsubscribe called when no redis`)
    return
  }

  const pattern = this.get_redis_sub_channel(connection)
  await connection.notes.redis.unsubscribe(pattern)
  connection.logdebug(this, `unsubsubscribed from ${pattern}`)
  connection.notes.redis.quit()
}