haraka/haraka-plugin-limit

View on GitHub
index.js

Summary

Maintainability
A
2 hrs
Test Coverage
const constants = require('haraka-constants')
const ipaddr = require('ipaddr.js')

exports.register = function () {
  this.inherits('haraka-plugin-redis')

  this.load_limit_ini()
  let needs_redis = 0

  if (this.cfg.concurrency.enabled) {
    needs_redis++
    this.register_hook('connect_init', 'conn_concur_incr')
    this.register_hook('connect', 'check_concurrency')
    this.register_hook('disconnect', 'conn_concur_decr')
  }

  if (this.cfg.errors.enabled) {
    for (const hook of ['helo', 'ehlo', 'mail', 'rcpt', 'data']) {
      this.register_hook(hook, 'max_errors')
    }
  }

  if (this.cfg.recipients.enabled) {
    this.register_hook('rcpt', 'max_recipients')
  }

  if (this.cfg.unrecognized_commands.enabled) {
    this.register_hook('unrecognized_command', 'max_unrecognized_commands')
  }

  if (this.cfg.rate_conn.enabled) {
    needs_redis++
    this.register_hook('connect_init', 'rate_conn_incr')
    this.register_hook('connect', 'rate_conn_enforce')
  }
  if (this.cfg.rate_rcpt_host.enabled) {
    needs_redis++
    this.register_hook('connect', 'rate_rcpt_host_enforce')
    this.register_hook('rcpt', 'rate_rcpt_host_incr')
  }
  if (this.cfg.rate_rcpt_sender.enabled) {
    needs_redis++
    this.register_hook('rcpt', 'rate_rcpt_sender')
  }
  if (this.cfg.rate_rcpt_null.enabled) {
    needs_redis++
    this.register_hook('rcpt', 'rate_rcpt_null')
  }
  if (this.cfg.rate_rcpt.enabled) {
    needs_redis++
    this.register_hook('rcpt', 'rate_rcpt')
  }

  if (this.cfg.outbound.enabled) {
    needs_redis++
    this.register_hook('send_email', 'outbound_increment')
    this.register_hook('delivered', 'outbound_decrement')
    this.register_hook('deferred', 'outbound_decrement')
    this.register_hook('bounce', 'outbound_decrement')
  }

  if (needs_redis) {
    this.register_hook('init_master', 'init_redis_plugin')
    this.register_hook('init_child', 'init_redis_plugin')
  }
}

exports.load_limit_ini = function () {
  this.cfg = this.config.get(
    'limit.ini',
    {
      booleans: [
        '-outbound.enabled',
        '-recipients.enabled',
        '-unrecognized_commands.enabled',
        '-errors.enabled',
        '-rate_conn.enabled',
        '-rate_rcpt.enabled',
        '-rate_rcpt_host.enabled',
        '-rate_rcpt_sender.enabled',
        '-rate_rcpt_null.enabled',
        '-concurrency_history.enabled',
        '-recipients_history.enabled',
        '-rate_conn_history.enabled',
      ],
    },
    () => {
      this.load_limit_ini()
    },
  )

  if (!this.cfg.concurrency) {
    // no config file
    this.cfg.concurrency = {}
  }

  this.merge_redis_ini()
}

exports.shutdown = function () {
  if (this.db) this.db.quit()
}

exports.max_unrecognized_commands = function (next, connection, cmd) {
  if (!this.cfg.unrecognized_commands) return next()

  connection.results.push(this, { unrec_cmds: cmd, emit: true })

  const max = parseFloat(this.cfg.unrecognized_commands.max)
  if (!max || isNaN(max)) return next()

  const uc = connection.results.get(this).unrec_cmds
  if (!uc || !uc.length) return next()

  if (uc.length <= max) return next()

  connection.results.add(this, { fail: 'unrec_cmds.max' })
  this.penalize(connection, true, 'Too many unrecognized commands', next)
}

exports.max_errors = function (next, connection) {
  if (!this.cfg.errors) return next() // disabled in config

  const max = parseFloat(this.cfg.errors.max)
  if (!max || isNaN(max)) return next()

  if (connection.errors <= max) return next()

  connection.results.add(this, { fail: 'errors.max' })
  this.penalize(connection, true, 'Too many errors', next)
}

exports.max_recipients = function (next, connection, params) {
  if (!this.cfg.recipients) return next() // disabled in config

  const max = this.get_limit('recipients', connection)
  if (!max || isNaN(max)) return next()

  const c = connection.rcpt_count
  const count = c.accept + c.tempfail + c.reject + 1
  if (count <= max) return next()

  connection.results.add(this, { fail: 'recipients.max' })
  this.penalize(connection, false, 'Too many recipient attempts', next)
}

exports.get_history_limit = function (type, connection) {
  const history_cfg = `${type}_history`
  if (!this.cfg[history_cfg] || !this.cfg[history_cfg].enabled) return

  const history_plugin = this.cfg[history_cfg].plugin
  if (!history_plugin) return

  const results = connection.results.get(history_plugin)
  if (!results) {
    connection.logerror(
      this,
      `no ${history_plugin} results, disabling history due to misconfiguration`,
    )
    delete this.cfg[history_cfg]
    return
  }

  if (results.history === undefined) {
    connection.logdebug(this, `no history from : ${history_plugin}`)
    return
  }

  const history = parseFloat(results.history)
  connection.logdebug(this, `history: ${history}`)
  if (isNaN(history)) return

  if (history > 0) return this.cfg[history_cfg].good
  if (history < 0) return this.cfg[history_cfg].bad
  return this.cfg[history_cfg].none
}

exports.get_limit = function (type, connection) {
  if (type === 'recipients') {
    if (connection.relaying && this.cfg.recipients.max_relaying) {
      return this.cfg.recipients.max_relaying
    }
  }

  if (this.cfg[`${type}_history`]) {
    const history = this.get_history_limit(type, connection)
    if (history) return history
  }

  return this.cfg[type].max || this.cfg[type].default
}

exports.conn_concur_incr = async function (next, connection) {
  if (!this.db) return next()
  if (!this.cfg.concurrency) return next()

  const dbkey = this.get_concurrency_key(connection)

  try {
    const count = await this.db.incr(dbkey)

    if (isNaN(count)) {
      connection.results.add(this, { err: 'conn_concur_incr got isNaN' })
      return next()
    }

    connection.results.add(this, { concurrent_count: count })

    // repair negative concurrency counters
    if (count < 1) {
      connection.results.add(this, {
        msg: `resetting concurrent ${count} to 1`,
      })
      this.db.set(dbkey, 1)
    }

    this.db.expire(dbkey, 3 * 60) // 3 minute lifetime
  } catch (err) {
    connection.results.add(this, { err: `conn_concur_incr:${err}` })
  }
  next()
}

exports.get_concurrency_key = function (connection) {
  return `concurrency|${connection.remote.ip}`
}

exports.check_concurrency = function (next, connection) {
  const max = this.get_limit('concurrency', connection)
  if (!max || isNaN(max)) {
    connection.results.add(this, { err: 'concurrency: no limit?!' })
    return next()
  }

  const count = parseInt(connection.results.get(this.name).concurrent_count)
  if (isNaN(count)) {
    connection.results.add(this, { err: 'concurrent.unset' })
    return next()
  }

  connection.results.add(this, { concurrent: `${count}/${max}` })

  if (count <= max) return next()

  connection.results.add(this, { fail: 'concurrency.max' })

  this.penalize(connection, true, 'Too many concurrent connections', next)
}

exports.penalize = function (connection, disconnect, msg, next) {
  const code = disconnect ? constants.DENYSOFTDISCONNECT : constants.DENYSOFT

  if (!this.cfg.main.tarpit_delay) return next(code, msg)

  const delay = this.cfg.main.tarpit_delay
  connection.loginfo(this, `tarpitting for ${delay}s`)

  setTimeout(() => {
    if (!connection) return
    next(code, msg)
  }, delay * 1000)
}

exports.conn_concur_decr = async function (next, connection) {
  if (!this.db) return next()
  if (!this.cfg.concurrency) return next()

  try {
    const dbkey = this.get_concurrency_key(connection)
    await this.db.incrBy(dbkey, -1)
  } catch (err) {
    connection.results.add(this, { err: `conn_concur_decr:${err}` })
  }
  next()
}

exports.get_host_key = function (type, connection) {
  if (!this.cfg[type]) {
    connection.results.add(this, { err: `${type}: not configured` })
    return
  }

  let ip
  try {
    ip = ipaddr.parse(connection.remote.ip)
    if (ip.kind === 'ipv6') {
      ip = ipaddr.toNormalizedString()
    } else {
      ip = ip.toString()
    }
  } catch (err) {
    connection.results.add(this, { err: `${type}: ${err.message}` })
    return
  }

  const ip_array = ip.kind === 'ipv6' ? ip.split(':') : ip.split('.')
  while (ip_array.length) {
    const part = ip.kind === 'ipv6' ? ip_array.join(':') : ip_array.join('.')
    if (this.cfg[type][part] || this.cfg[type][part] === 0) {
      return [part, this.cfg[type][part]]
    }
    ip_array.pop()
  }

  // rDNS
  if (connection.remote.host) {
    const rdns_array = connection.remote.host.toLowerCase().split('.')
    while (rdns_array.length) {
      const part2 = rdns_array.join('.')
      if (this.cfg[type][part2] || this.cfg[type][part2] === 0) {
        return [part2, this.cfg[type][part2]]
      }
      rdns_array.pop()
    }
  }

  if (this.cfg[`${type}_history`]) {
    const history = this.get_history_limit(type, connection)
    if (history) return [ip, history]
  }

  // Custom Default
  if (this.cfg[type].default) {
    return [ip, this.cfg[type].default]
  }

  // Default 0 = unlimited
  return [ip, 0]
}

exports.get_mail_key = function (type, mail) {
  if (!this.cfg[type] || !mail) return

  // Full e-mail address (e.g. smf@fsl.com)
  const email = mail.address()
  if (this.cfg[type][email] || this.cfg[type][email] === 0) {
    return [email, this.cfg[type][email]]
  }

  // RHS parts e.g. host.sub.sub.domain.com
  if (mail.host) {
    const rhs_split = mail.host.toLowerCase().split('.')
    while (rhs_split.length) {
      const part = rhs_split.join('.')
      if (this.cfg[type][part] || this.cfg[type][part] === 0) {
        return [part, this.cfg[type][part]]
      }
      rhs_split.pop()
    }
  }

  // Custom Default
  if (this.cfg[type].default) {
    return [email, this.cfg[type].default]
  }

  // Default 0 = unlimited
  return [email, 0]
}

function getTTL(value) {
  const match = /^(\d+)(?:\/(\d+)(\S)?)?$/.exec(value)
  if (!match) return

  const qty = match[2]
  const units = match[3]

  let ttl = qty ? qty : 60 // Default 60s
  if (!units) return ttl

  // Unit
  switch (units.toLowerCase()) {
    case 's': // Default is seconds
      break
    case 'm':
      ttl *= 60 // minutes
      break
    case 'h':
      ttl *= 60 * 60 // hours
      break
    case 'd':
      ttl *= 60 * 60 * 24 // days
      break
    default:
      return ttl
  }
  return ttl
}

function getLimit(value) {
  const match = /^([\d]+)/.exec(value)
  if (!match) return 0
  return parseInt(match[1], 10)
}

exports.rate_limit = async function (connection, key, value) {
  if (value === 0) {
    // Limit disabled for this host
    connection.loginfo(this, `rate limit disabled for: ${key}`)
    return false
  }

  // CAUTION: !value would match that 0 value -^
  if (!key || !value) return
  if (!this.db) return

  const limit = getLimit(value)
  const ttl = getTTL(value)

  if (!limit || !ttl) {
    connection.results.add(this, {
      err: `syntax error: key=${key} value=${value}`,
    })
    return
  }

  connection.logdebug(this, `key=${key} limit=${limit} ttl=${ttl}`)

  try {
    const newval = await this.db.incr(key)
    if (newval === 1) this.db.expire(key, ttl)
    return parseInt(newval, 10) > limit // boolean
  } catch (err) {
    connection.results.add(this, { err: `${key}:${err}` })
  }
}

exports.rate_rcpt_host_incr = async function (next, connection) {
  if (!this.db) return next()

  const [key, value] = this.get_host_key('rate_rcpt_host', connection)
  if (!key || !value) return next()

  try {
    const newval = await this.db.incr(`rate_rcpt_host:${key}`)
    if (newval === 1)
      await this.db.expire(`rate_rcpt_host:${key}`, getTTL(value))
  } catch (err) {
    connection.results.add(this, { err })
  }
  next()
}

exports.rate_rcpt_host_enforce = async function (next, connection) {
  if (!this.db) return next()

  const [key, value] = this.get_host_key('rate_rcpt_host', connection)
  if (!key || !value) return next()

  const match = /^(\d+)/.exec(value)
  const limit = parseInt(match[0], 10)
  if (!limit) return next()

  try {
    const result = await this.db.get(`rate_rcpt_host:${key}`)

    if (!result) return next()
    connection.results.add(this, {
      rate_rcpt_host: `${key}:${result}:${value}`,
    })

    if (result <= limit) return next()

    connection.results.add(this, { fail: 'rate_rcpt_host' })
    this.penalize(connection, false, 'recipient rate limit exceeded', next)
  } catch (err) {
    connection.results.add(this, { err: `rate_rcpt_host:${err}` })
    next()
  }
}

exports.rate_conn_incr = async function (next, connection) {
  if (!this.db) return next()

  const [key, value] = this.get_host_key('rate_conn', connection)
  if (!key || !value) return next()

  try {
    await this.db.hIncrBy(`rate_conn:${key}`, (+new Date()).toString(), 1)
    // extend key expiration on every new connection
    await this.db.expire(`rate_conn:${key}`, getTTL(value) * 2)
  } catch (err) {
    connection.results.add(this, { err })
  }
  next()
}

exports.rate_conn_enforce = async function (next, connection) {
  if (!this.db) return next()

  const [key, value] = this.get_host_key('rate_conn', connection)
  if (!key || !value) return next()

  const limit = getLimit(value)
  if (!limit) {
    connection.results.add(this, { err: `rate_conn:syntax:${value}` })
    return next()
  }

  try {
    const tstamps = await this.db.hGetAll(`rate_conn:${key}`)
    if (!tstamps) {
      connection.results.add(this, { err: 'rate_conn:no_tstamps' })
      return next()
    }

    const d = new Date()
    d.setMinutes(d.getMinutes() - getTTL(value) / 60)
    const periodStartTs = +d // date as integer

    let connections_in_ttl_period = 0
    for (const ts of Object.keys(tstamps)) {
      if (parseInt(ts, 10) < periodStartTs) {
        // older than ttl
        this.db.hDel(`rate_conn:${key}`, ts)
        continue
      }
      connections_in_ttl_period =
        connections_in_ttl_period + parseInt(tstamps[ts], 10)
    }
    connection.results.add(this, {
      rate_conn: `${connections_in_ttl_period}:${value}`,
    })

    if (connections_in_ttl_period <= limit) return next()

    connection.results.add(this, { fail: 'rate_conn' })

    this.penalize(connection, true, 'connection rate limit exceeded', next)
  } catch (err) {
    connection.results.add(this, { err: `rate_conn:${err}` })
    next()
  }
}

exports.rate_rcpt_sender = async function (next, connection, params) {
  const [key, value] = this.get_mail_key(
    'rate_rcpt_sender',
    connection.transaction.mail_from,
  )
  connection.results.add(this, { rate_rcpt_sender: value })

  const over = await this.rate_limit(
    connection,
    `rate_rcpt_sender:${key}`,
    value,
  )
  if (!over) return next()

  connection.results.add(this, { fail: 'rate_rcpt_sender' })
  this.penalize(connection, false, 'rcpt rate limit exceeded', next)
}

exports.rate_rcpt_null = async function (next, connection, params) {
  if (!params) return next()
  if (Array.isArray(params)) params = params[0]
  if (params.user) return next()

  // Message from the null sender
  const [key, value] = this.get_mail_key('rate_rcpt_null', params)
  connection.results.add(this, { rate_rcpt_null: value })

  const over = await this.rate_limit(connection, `rate_rcpt_null:${key}`, value)
  if (!over) return next()

  connection.results.add(this, { fail: 'rate_rcpt_null' })
  this.penalize(connection, false, 'null recip rate limit', next)
}

exports.rate_rcpt = async function (next, connection, params) {
  const plugin = this
  if (Array.isArray(params)) params = params[0]

  const [key, value] = plugin.get_mail_key('rate_rcpt', params)
  connection.results.add(plugin, { rate_rcpt: value })

  const over = await plugin.rate_limit(connection, `rate_rcpt:${key}`, value)
  if (!over) return next()

  connection.results.add(plugin, { fail: 'rate_rcpt' })
  plugin.penalize(connection, false, 'rate limit exceeded', next)
}

/*
 *        Outbound Rate Limits
 *
 */

function getOutDom(hmail) {
  // outbound isn't internally consistent using hmail.domain and hmail.todo.domain.
  // TODO: fix haraka/Haraka/outbound/HMailItem to be internally consistent.
  return hmail?.todo?.domain || hmail.domain
}

function getOutKey(domain) {
  return `outbound-rate:${domain}`
}

exports.outbound_increment = async function (next, hmail) {
  if (!this.db) return next()

  const outDom = getOutDom(hmail)
  const outKey = getOutKey(outDom)

  try {
    let count = await this.db.hIncrBy(outKey, 'TOTAL', 1)

    this.db.expire(outKey, 300) // 5 min expire

    if (!this.cfg.outbound[outDom]) return next()
    const limit = parseInt(this.cfg.outbound[outDom], 10)
    if (!limit) return next()

    count = parseInt(count, 10)
    if (count <= limit) return next()

    this.db.hIncrBy(outKey, 'TOTAL', -1) // undo the increment
    const delay = this.cfg.outbound.delay || 30
    next(constants.delay, delay)
  } catch (err) {
    this.logerror(`outbound_increment: ${err}`)
    next() // just deliver
  }
}

exports.outbound_decrement = function (next, hmail) {
  if (!this.db) return next()

  this.db.hIncrBy(getOutKey(getOutDom(hmail)), 'TOTAL', -1)
  next()
}