bencevans/node-influx

View on GitHub
src/index.ts

Summary

Maintainability
F
1 wk
Test Coverage
/* eslint-disable @typescript-eslint/unified-signatures */
/* eslint-disable no-prototype-builtins */

import { RequestOptions } from "https";
import * as url from "url";

import * as b from "./builder";
import * as grammar from "./grammar";
import { IPingStats, IPoolOptions, Pool } from "./pool";
import { assertNoErrors, IResults, parse, parseSingle } from "./results";
import { coerceBadly, ISchemaOptions, Schema } from "./schema";

const defaultHost: IHostConfig = Object.freeze({
  host: "127.0.0.1",
  port: 8086,
  path: "",
  protocol: "http" as const,
});

const defaultOptions: IClusterConfig = Object.freeze({
  database: null,
  hosts: [],
  password: "root",
  schema: [],
  username: "root",
});

export * from "./builder";
export {
  INanoDate,
  FieldType,
  Precision,
  Raw,
  TimePrecision,
  escape,
  toNanoDate,
} from "./grammar";
export { ISchemaOptions } from "./schema";
export { IPingStats, IPoolOptions } from "./pool";
export { IResults, IResponse, ResultError } from "./results";

export interface IHostConfig {
  /**
   * Influx host to connect to, defaults to 127.0.0.1.
   */
  host?: string;
  /**
   * Influx port to connect to, defaults to 8086.
   */
  port?: number;
  /**
   * Path for Influx within the host, defaults to ''.
   * May be used if Influx is behind a reverse proxy or load balancer.
   */
  path?: string;
  /**
   * Protocol to connect over, defaults to 'http'.
   */
  protocol?: "http" | "https";

  /**
   * Optional request option overrides.
   */
  options?: RequestOptions;
}

export interface ISingleHostConfig extends IHostConfig {
  /**
   * Username for connecting to the database. Defaults to 'root'.
   */
  username?: string;

  /**
   * Password for connecting to the database. Defaults to 'root'.
   */
  password?: string;

  /**
   * Default database to write information to.
   */
  database?: string;

  /**
   * Settings for the connection pool.
   */
  pool?: IPoolOptions;

  /**
   * A list of schema for measurements in the database.
   */
  schema?: ISchemaOptions[];
}

export interface IClusterConfig {
  /**
   * Username for connecting to the database. Defaults to 'root'.
   */
  username?: string;

  /**
   * Password for connecting to the database. Defaults to 'root'.
   */
  password?: string;

  /**
   * Default database to write information to.
   */
  database?: string;

  /**
   * A list of cluster hosts to connect to.
   */
  hosts: IHostConfig[];

  /**
   * Settings for the connection pool.
   */
  pool?: IPoolOptions;

  /**
   * A list of schema for measurements in the database.
   */
  schema?: ISchemaOptions[];
}

export interface IPoint {
  /**
   * Measurement is the Influx measurement name.
   */
  measurement?: string;

  /**
   * Tags is the list of tag values to insert.
   */
  tags?: { [name: string]: string };

  /**
   * Fields is the list of field values to insert.
   */
  fields?: { [name: string]: any };

  /**
   * Timestamp tags this measurement with a date. This can be a Date object,
   * in which case we'll adjust it to the desired precision, or a numeric
   * string or number, in which case it gets passed directly to Influx.
   */
  timestamp?: Date | string | number;
}

export interface IParsedPoint extends IPoint {
  /**
   * Fields Pairs is the list of key/value pairs for each field on the point
   */
  fieldsPairs: Array<[string, string]>;
  /**
   * Tags Names is the list of tag names in the point
   */
  tagsNames: string[];
  /**
   * Casted Timestamp is the timestamp value after being casted to the
   * desired precision. Default 'n'
   */
  castedTimestamp?: string;
}

export interface IWriteOptions {
  /**
   * Precision at which the points are written, defaults to nanoseconds 'n'.
   */
  precision?: grammar.TimePrecision;

  /**
   * Retention policy to write the points under, defaults to the DEFAULT
   * database policy.
   */
  retentionPolicy?: string;

  /**
   * Database under which to write the points. This is required if a default
   * database is not provided in Influx.
   */
  database?: string;
}

export interface IQueryOptions {
  /**
   * Defines the precision at which to query points. When left blank, it will
   * query in nanosecond precision.
   */
  precision?: grammar.TimePrecision;

  /**
   * Retention policy to query from, defaults to the DEFAULT
   * database policy.
   */
  retentionPolicy?: string;

  /**
   * Database under which to query the points. This is required if a default
   * database is not provided in Influx.
   */
  database?: string;

  /**
   * Any placeholders used by the query. Using these is strongly recommended
   * to avoid injection attacks.
   */
  placeholders?: Record<string, string | number>;
}

export interface IParseOptions {
  /**
   * Precision at which the points are written, defaults to nanoseconds 'n'.
   */
  precision?: grammar.TimePrecision;
  /**
   * Database under which to write the points. This is required if a default
   * database is not provided in Influx.
   */
  database?: string;
}

/**
 * IRetentionOptions are passed into passed into the {@link
 * InfluxDB#createRetentionPolicy} and {@link InfluxDB#alterRetentionPolicy}.
 * See the [Downsampling and Retention page](https://docs.influxdata.com/
 * influxdb/v1.0/guides/downsampling_and_retention/) on the Influx docs for
 * more information.
 */
export interface IRetentionOptions {
  database?: string;
  duration: string;
  replication: number;
  isDefault?: boolean;
}

/**
 * Parses the URL out into into a IClusterConfig object
 */
function parseOptionsUrl(addr: string): ISingleHostConfig {
  const parsed = url.parse(addr);
  const options: ISingleHostConfig = {
    host: parsed.hostname,
    port: Number(parsed.port),
    protocol: parsed.protocol.slice(0, -1) as "http" | "https",
  };

  if (parsed.auth) {
    [options.username, options.password] = parsed.auth.split(":");
  }

  if (parsed.pathname.length > 1) {
    options.database = parsed.pathname.slice(1);
  }

  return options;
}

/**
 * Works similarly to Object.assign, but only overwrites
 * properties that resolve to undefined.
 */
function defaults<T>(target: T, ...srcs: T[]): T {
  srcs.forEach((src) => {
    Object.keys(src).forEach((key: Extract<keyof T, string>) => {
      if (target[key] === undefined) {
        target[key] = src[key];
      }
    });
  });

  return target;
}

/**
 * InfluxDB is the public interface to run queries against your database.
 * This is a 'driver-level' module, not a a full-fleged ORM or ODM; you run
 * queries directly by calling methods on this class.
 *
 * Please check out some of [the tutorials](https://node-influx.github.io/manual/tutorial.html)
 * if you want help getting started!
 *
 * @example
 * const Influx = require('influx');
 * const influx = new Influx.InfluxDB({
 *  host: 'localhost',
 *  database: 'express_response_db',
 *  schema: [
 *    {
 *      measurement: 'response_times',
 *      fields: {
 *        path: Influx.FieldType.STRING,
 *        duration: Influx.FieldType.INTEGER
 *      },
 *      tags: [
 *        'host'
 *      ]
 *    }
 *  ]
 * })
 *
 * @example
 * // Connect over HTTPS
 * const Influx = require('influx');
 * const influx = new Influx.InfluxDB({
 *  host: 'myinfluxdbhost',
 *  port: 443,
 *  protocol: 'https'
 *  database: 'express_response_db',
 *  schema: [
 *    {
 *      measurement: 'response_times',
 *      fields: {
 *        path: Influx.FieldType.STRING,
 *        duration: Influx.FieldType.INTEGER
 *      },
 *      tags: [
 *        'host'
 *      ]
 *    }
 *  ]
 * })
 *
 * influx.writePoints([
 *   {
 *     measurement: 'response_times',
 *     tags: { host: os.hostname() },
 *     fields: { duration, path: req.path },
 *   }
 * ]).then(() => {
 *   return influx.query(`
 *     select * from response_times
 *     where host = $<host>
 *     order by time desc
 *     limit 10
 *   `, {
 *      placeholders: {
 *        host: os.hostname()
 *      }
 *   })
 * }).then(rows => {
 *   rows.forEach(row => console.log(`A request to ${row.path} took ${row.duration}ms`))
 * })
 */
export class InfluxDB {
  /**
   * Connect pool for making requests.
   * @private
   */
  private readonly _pool: Pool;

  /**
   * Config options for Influx.
   * @private
   */
  private readonly _options: IClusterConfig;

  /**
   * Map of Schema instances defining measurements in Influx.
   * @private
   */
  private _schema: {
    [db: string]: { [measurement: string]: Schema };
  } = Object.create(null);

  constructor(options: ISingleHostConfig);

  /**
   * Connect to an InfluxDB cluster by specifying a
   * set of connection options.
   */
  constructor(options: IClusterConfig);

  /**
   * Connect to an InfluxDB instance using a configuration URL.
   * @example
   * new InfluxDB('http://user:password@host:8086/database')
   */
  constructor(url: string);

  /**
   * Connects to a local, default Influx instance.
   */
  constructor();

  /**
   * Connect to a single InfluxDB instance by specifying
   * a set of connection options.
   * @param [options='http://root:root@127.0.0.1:8086']
   *
   * @example
   * const Influx = require('influx')
   *
   * // Connect to a single host with a DSN:
   * const influx = new Influx.InfluxDB('http://user:password@host:8086/database')
   *
   * @example
   * const Influx = require('influx')
   *
   * // Connect to a single host with a full set of config details and
   * // a custom schema
   * const client = new Influx.InfluxDB({
   *   database: 'my_db',
   *   host: 'localhost',
   *   port: 8086,
   *   username: 'connor',
   *   password: 'pa$$w0rd',
   *   schema: [
   *     {
   *       measurement: 'perf',
   *       fields: {
   *         memory_usage: Influx.FieldType.INTEGER,
   *         cpu_usage: Influx.FieldType.FLOAT,
   *         is_online: Influx.FieldType.BOOLEAN
   *       }
   *       tags: [
   *         'hostname'
   *       ]
   *     }
   *   ]
   * })
   *
   * @example
   * const Influx = require('influx')
   *
   * // Use a pool of several host connections and balance queries across them:
   * const client = new Influx.InfluxDB({
   *   database: 'my_db',
   *   username: 'connor',
   *   password: 'pa$$w0rd',
   *   hosts: [
   *     { host: 'db1.example.com' },
   *     { host: 'db2.example.com' },
   *   ],
   *   schema: [
   *     {
   *       measurement: 'perf',
   *       fields: {
   *         memory_usage: Influx.FieldType.INTEGER,
   *         cpu_usage: Influx.FieldType.FLOAT,
   *         is_online: Influx.FieldType.BOOLEAN
   *       }
   *       tags: [
   *         'hostname'
   *       ]
   *     }
   *   ]
   * })
   *
   */
  constructor(options?: any) {
    // Figure out how to parse whatever we were passed in into a IClusterConfig.
    if (typeof options === "string") {
      // Plain URI => ISingleHostConfig
      options = parseOptionsUrl(options);
    } else if (!options) {
      options = defaultHost;
    }

    if (!options.hasOwnProperty("hosts")) {
      // ISingleHostConfig => IClusterConfig
      options = {
        database: options.database,
        hosts: [options],
        password: options.password,
        pool: options.pool,
        schema: options.schema,
        username: options.username,
      };
    }

    const resolved = options as IClusterConfig;
    resolved.hosts = resolved.hosts.map((host) => {
      return defaults(
        {
          host: host.host,
          port: host.port,
          path: host.path,
          protocol: host.protocol,
          options: host.options,
        },
        defaultHost
      );
    });

    this._pool = new Pool(resolved.pool);
    this._options = defaults(resolved, defaultOptions);

    resolved.hosts.forEach((host) => {
      this._pool.addHost(
        `${host.protocol}://${host.host}:${host.port}${host.path}`,
        host.options
      );
    });

    this._options.schema.forEach((schema) => this._createSchema(schema));
  }

  /**
   * Adds specified schema for better fields coercing.
   *
   * @param {ISchemaOptions} schema
   * @memberof InfluxDB
   */
  public addSchema(schema: ISchemaOptions): void {
    this._createSchema(schema);
  }

  /**
   * Creates a new database with the provided name.
   * @param databaseName
   * @return
   * @example
   * influx.createDatabase('mydb')
   */
  public createDatabase(databaseName: string): Promise<void> {
    return this._pool
      .json(
        this._getQueryOpts(
          {
            q: `create database ${grammar.escape.quoted(databaseName)}`,
          },
          "POST"
        )
      )
      .then(assertNoErrors)
      .then(() => undefined);
  }

  /**
   * Deletes a database with the provided name.
   * @param databaseName
   * @return
   * @example
   * influx.dropDatabase('mydb')
   */
  public dropDatabase(databaseName: string): Promise<void> {
    return this._pool
      .json(
        this._getQueryOpts(
          {
            q: `drop database ${grammar.escape.quoted(databaseName)}`,
          },
          "POST"
        )
      )
      .then(assertNoErrors)
      .then(() => undefined);
  }

  /**
   * Returns array of database names. Requires cluster admin privileges.
   * @returns a list of database names
   * @example
   * influx.getDatabaseNames().then(names =>
   *   console.log('My database names are: ' + names.join(', ')));
   */
  public getDatabaseNames(): Promise<string[]> {
    return this._pool
      .json(this._getQueryOpts({ q: "show databases" }))
      .then((res) => parseSingle<{ name: string }>(res).map((r) => r.name));
  }

  /**
   * Returns array of measurements.
   * @returns a list of measurement names
   * @param [database] the database the measurement lives in, optional
   *     if a default database is provided.
   * @example
   * influx.getMeasurements().then(names =>
   *   console.log('My measurement names are: ' + names.join(', ')));
   */
  public getMeasurements(
    database: string = this._defaultDB()
  ): Promise<string[]> {
    return this._pool
      .json(
        this._getQueryOpts({
          db: database,
          q: "show measurements",
        })
      )
      .then((res) => parseSingle<{ name: string }>(res).map((r) => r.name));
  }

  /**
   * Returns a list of all series within the target measurement, or from the
   * entire database if a measurement isn't provided.
   * @param [options]
   * @param [options.measurement] if provided, we'll only get series
   *     from within that measurement.
   * @param [options.database] the database the series lives in,
   *     optional if a default database is provided.
   * @returns a list of series names
   * @example
   * influx.getSeries().then(names => {
   *   console.log('My series names in my_measurement are: ' + names.join(', '))
   * })
   *
   * influx.getSeries({
   *   measurement: 'my_measurement',
   *   database: 'my_db'
   * }).then(names => {
   *   console.log('My series names in my_measurement are: ' + names.join(', '))
   * })
   */
  public getSeries(
    options: {
      measurement?: string;
      database?: string;
    } = {}
  ): Promise<string[]> {
    const { database = this._defaultDB(), measurement } = options;

    let query = "show series";
    if (measurement) {
      query += ` from ${grammar.escape.quoted(measurement)}`;
    }

    return this._pool
      .json(
        this._getQueryOpts({
          db: database,
          q: query,
        })
      )
      .then((res) => parseSingle<{ key: string }>(res).map((r) => r.key));
  }

  /**
   * Removes a measurement from the database.
   * @param measurement
   * @param [database] the database the measurement lives in, optional
   *     if a default database is provided.
   * @return
   * @example
   * influx.dropMeasurement('my_measurement')
   */
  public dropMeasurement(
    measurement: string,
    database: string = this._defaultDB()
  ): Promise<void> {
    return this._pool
      .json(
        this._getQueryOpts(
          {
            db: database,
            q: `drop measurement ${grammar.escape.quoted(measurement)}`,
          },
          "POST"
        )
      )
      .then(assertNoErrors)
      .then(() => undefined);
  }

  /**
   * Removes a one or more series from InfluxDB.
   *
   * @returns
   * @example
   * // The following pairs of queries are equivalent: you can chose either to
   * // use our builder or pass in string directly. The builder takes care
   * // of escaping and most syntax handling for you.
   *
   * influx.dropSeries({ where: e => e.tag('cpu').equals.value('cpu8') })
   * influx.dropSeries({ where: '"cpu" = \'cpu8\'' })
   * // DROP SERIES WHERE "cpu" = 'cpu8'
   *
   * influx.dropSeries({ measurement: m => m.name('cpu').policy('autogen') })
   * influx.dropSeries({ measurement: '"cpu"."autogen"' })
   * // DROP SERIES FROM "autogen"."cpu"
   *
   * influx.dropSeries({
   *   measurement: m => m.name('cpu').policy('autogen'),
   *   where: e => e.tag('cpu').equals.value('cpu8'),
   *   database: 'my_db'
   * })
   * // DROP SERIES FROM "autogen"."cpu" WHERE "cpu" = 'cpu8'
   */
  public dropSeries(
    options: b.measurement | b.where | { database: string }
  ): Promise<void> {
    const db =
      "database" in options ? (options as any).database : this._defaultDB();

    let q = "drop series";
    if ("measurement" in options) {
      q += " from " + b.parseMeasurement(options);
    }

    if ("where" in options) {
      q += " where " + b.parseWhere(options);
    }

    return this._pool
      .json(this._getQueryOpts({ db, q }, "POST"))
      .then(assertNoErrors)
      .then(() => undefined);
  }

  /**
   * Returns a list of users on the Influx database.
   * @return
   * @example
   * influx.getUsers().then(users => {
   *   users.forEach(user => {
   *     if (user.admin) {
   *       console.log(user.user, 'is an admin!')
   *     } else {
   *       console.log(user.user, 'is not an admin!')
   *     }
   *   })
   * })
   */
  public getUsers(): Promise<IResults<{ user: string; admin: boolean }>> {
    return this._pool
      .json(this._getQueryOpts({ q: "show users" }))
      .then((result) => parseSingle<{ user: string; admin: boolean }>(result));
  }

  /**
   * Creates a new InfluxDB user.
   * @param username
   * @param password
   * @param [admin=false] If true, the user will be given all
   *     privileges on all databases.
   * @return
   * @example
   * influx.createUser('connor', 'pa55w0rd', true) // make 'connor' an admin
   *
   * // make non-admins:
   * influx.createUser('not_admin', 'pa55w0rd')
   */
  public createUser(
    username: string,
    password: string,
    admin = false
  ): Promise<void> {
    return this._pool
      .json(
        this._getQueryOpts(
          {
            q:
              `create user ${grammar.escape.quoted(username)} with password ` +
              grammar.escape.stringLit(password) +
              (admin ? " with all privileges" : ""),
          },
          "POST"
        )
      )
      .then(assertNoErrors)
      .then(() => undefined);
  }

  /**
   * Sets a password for an Influx user.
   * @param username
   * @param password
   * @return
   * @example
   * influx.setPassword('connor', 'pa55w0rd')
   */
  public setPassword(username: string, password: string): Promise<void> {
    return this._pool
      .json(
        this._getQueryOpts(
          {
            q:
              `set password for ${grammar.escape.quoted(username)} = ` +
              grammar.escape.stringLit(password),
          },
          "POST"
        )
      )
      .then(assertNoErrors)
      .then(() => undefined);
  }

  /**
   * Grants a privilege to a specified user.
   * @param username
   * @param privilege Should be one of 'READ' or 'WRITE'
   * @param [database] If not provided, uses the default database.
   * @return
   * @example
   * influx.grantPrivilege('connor', 'READ', 'my_db') // grants read access on my_db to connor
   */
  public grantPrivilege(
    username: string,
    privilege: "READ" | "WRITE",
    database: string = this._defaultDB()
  ): Promise<void> {
    return this._pool
      .json(
        this._getQueryOpts(
          {
            q:
              `grant ${privilege} on ${grammar.escape.quoted(database)} ` +
              `to ${grammar.escape.quoted(username)}`,
          },
          "POST"
        )
      )
      .then(assertNoErrors)
      .then(() => undefined);
  }

  /**
   * Removes a privilege from a specified user.
   * @param username
   * @param privilege Should be one of 'READ' or 'WRITE'
   * @param [database] If not provided, uses the default database.
   * @return
   * @example
   * influx.revokePrivilege('connor', 'READ', 'my_db') // removes read access on my_db from connor
   */
  public revokePrivilege(
    username: string,
    privilege: "READ" | "WRITE",
    database: string = this._defaultDB()
  ): Promise<void> {
    return this._pool
      .json(
        this._getQueryOpts(
          {
            q:
              `revoke ${privilege} on ${grammar.escape.quoted(
                database
              )} from ` + grammar.escape.quoted(username),
          },
          "POST"
        )
      )
      .then(assertNoErrors)
      .then(() => undefined);
  }

  /**
   * Grants admin privileges to a specified user.
   * @param username
   * @return
   * @example
   * influx.grantAdminPrivilege('connor')
   */
  public grantAdminPrivilege(username: string): Promise<void> {
    return this._pool
      .json(
        this._getQueryOpts(
          {
            q: `grant all to ${grammar.escape.quoted(username)}`,
          },
          "POST"
        )
      )
      .then(assertNoErrors)
      .then(() => undefined);
  }

  /**
   * Removes a admin privilege from a specified user.
   * @param username
   * @return
   * @example
   * influx.revokeAdminPrivilege('connor')
   */
  public revokeAdminPrivilege(username: string): Promise<void> {
    return this._pool
      .json(
        this._getQueryOpts(
          {
            q: `revoke all from ${grammar.escape.quoted(username)}`,
          },
          "POST"
        )
      )
      .then(assertNoErrors)
      .then(() => undefined);
  }

  /**
   * Removes a user from the database.
   * @param username
   * @return
   * @example
   * influx.dropUser('connor')
   */
  public dropUser(username: string): Promise<void> {
    return this._pool
      .json(
        this._getQueryOpts(
          {
            q: `drop user ${grammar.escape.quoted(username)}`,
          },
          "POST"
        )
      )
      .then(assertNoErrors)
      .then(() => undefined);
  }

  /**
   * Creates a continuous query in a database
   * @param name The query name, for later reference
   * @param query The body of the query to run
   * @param [database] If not provided, uses the default database.
   * @param [resample] If provided, adds resample policy
   * @return
   * @example
   * influx.createContinuousQuery('downsample_cpu_1h', `
   *   SELECT MEAN(cpu) INTO "7d"."perf"
   *   FROM "1d"."perf" GROUP BY time(1m)
   * `, undefined, 'RESAMPLE FOR 7m')
   */
  public createContinuousQuery(
    name: string,
    query: string,
    database: string = this._defaultDB(),
    resample = ""
  ): Promise<void> {
    return this._pool
      .json(
        this._getQueryOpts(
          {
            q:
              `create continuous query ${grammar.escape.quoted(name)}` +
              ` on ${grammar.escape.quoted(
                database
              )} ${resample} begin ${query} end`,
          },
          "POST"
        )
      )
      .then(assertNoErrors)
      .then(() => undefined);
  }

  /**
   * Returns a list of continous queries in the database.
   * @param [database] If not provided, uses the default database.
   * @return
   * @example
   * influx.showContinousQueries()
   */
  public showContinousQueries(database: string = this._defaultDB()): Promise<
    IResults<{
      name: string;
      query: string;
    }>
  > {
    return this._pool
      .json(
        this._getQueryOpts({
          db: database,
          q: "show continuous queries",
        })
      )
      .then((result) =>
        parseSingle<{
          name: string;
          query: string;
        }>(result)
      );
  }

  /**
   * Creates a continuous query in a database
   * @param name The query name
   * @param [database] If not provided, uses the default database.
   * @return
   * @example
   * influx.dropContinuousQuery('downsample_cpu_1h')
   */
  public dropContinuousQuery(
    name: string,
    database: string = this._defaultDB()
  ): Promise<void> {
    return this._pool
      .json(
        this._getQueryOpts(
          {
            q:
              `drop continuous query ${grammar.escape.quoted(name)}` +
              ` on ${grammar.escape.quoted(database)}`,
          },
          "POST"
        )
      )
      .then(assertNoErrors)
      .then(() => undefined);
  }

  /**
   * Creates a new retention policy on a database. You can read more about
   * [Downsampling and Retention](https://docs.influxdata.com/influxdb/v1.0/
   * guides/downsampling_and_retention/) on the InfluxDB website.
   *
   * @param name The retention policy name
   * @param options
   * @param [options.database] Database to create the policy on,
   *     uses the default database if not provided.
   * @param options.duration How long data in the retention policy
   *     should be stored for, should be in a format like `7d`. See details
   *     [here](https://docs.influxdata.com/influxdb/v1.0/query_language/spec/#durations)
   * @param options.replication How many servers data in the series
   *     should be replicated to.
   * @param [options.isDefault] Whether the retention policy should
   *     be the default policy on the database.
   * @return
   * @example
   * influx.createRetentionPolicy('7d', {
   *  duration: '7d',
   *  replication: 1
   * })
   */
  public createRetentionPolicy(
    name: string,
    options: IRetentionOptions
  ): Promise<void> {
    const q =
      `create retention policy ${grammar.escape.quoted(name)} on ` +
      grammar.escape.quoted(options.database || this._defaultDB()) +
      ` duration ${options.duration} replication ${options.replication}` +
      (options.isDefault ? " default" : "");

    return this._pool
      .json(this._getQueryOpts({ q }, "POST"))
      .then(assertNoErrors)
      .then(() => undefined);
  }

  /**
   * Alters an existing retention policy on a database.
   *
   * @param name The retention policy name
   * @param options
   * @param [options.database] Database to create the policy on,
   *     uses the default database if not provided.
   * @param options.duration How long data in the retention policy
   *     should be stored for, should be in a format like `7d`. See details
   *     [here](https://docs.influxdata.com/influxdb/v1.0/query_language/spec/#durations)
   * @param options.replication How many servers data in the series
   *     should be replicated to.
   * @param [options.default] Whether the retention policy should
   *     be the default policy on the database.
   * @return
   * @example
   * influx.alterRetentionPolicy('7d', {
   *  duration: '7d',
   *  replication: 1,
   *  default: true
   * })
   */
  public alterRetentionPolicy(
    name: string,
    options: IRetentionOptions
  ): Promise<void> {
    const q =
      `alter retention policy ${grammar.escape.quoted(name)} on ` +
      grammar.escape.quoted(options.database || this._defaultDB()) +
      ` duration ${options.duration} replication ${options.replication}` +
      (options.isDefault ? " default" : "");

    return this._pool
      .json(this._getQueryOpts({ q }, "POST"))
      .then(assertNoErrors)
      .then(() => undefined);
  }

  /**
   * Deletes a retention policy and associated data. Note that the data will
   * not be immediately destroyed, and will hang around until Influx's
   * bi-hourly cron.
   *
   * @param name The retention policy name
   * @param [database] Database name that the policy lives in,
   *     uses the default database if not provided.
   * @return
   * @example
   * influx.dropRetentionPolicy('7d')
   */
  public dropRetentionPolicy(
    name: string,
    database: string = this._defaultDB()
  ): Promise<void> {
    return this._pool
      .json(
        this._getQueryOpts(
          {
            q:
              `drop retention policy ${grammar.escape.quoted(name)} ` +
              `on ${grammar.escape.quoted(database)}`,
          },
          "POST"
        )
      )
      .then(assertNoErrors)
      .then(() => undefined);
  }

  /**
   * Shows retention policies on the database
   *
   * @param [database] The database to list policies on, uses the
   *     default database if not provided.
   * @return
   * @example
   * influx.showRetentionPolicies().then(policies => {
   *   expect(policies.slice()).to.deep.equal([
   *     {
   *       name: 'autogen',
   *       duration: '0s',
   *       shardGroupDuration: '168h0m0s',
   *       replicaN: 1,
   *       default: true,
   *     },
   *     {
   *       name: '7d',
   *       duration: '168h0m0s',
   *       shardGroupDuration: '24h0m0s',
   *       replicaN: 1,
   *       default: false,
   *     },
   *   ])
   * })
   */
  public showRetentionPolicies(database: string = this._defaultDB()): Promise<
    IResults<{
      default: boolean;
      duration: string;
      name: string;
      replicaN: number;
      shardGroupDuration: string;
    }>
  > {
    return this._pool
      .json(
        this._getQueryOpts(
          {
            q: `show retention policies on ${grammar.escape.quoted(database)}`,
          },
          "GET"
        )
      )
      .then((result) =>
        parseSingle<{
          default: boolean; // Tslint:disable-line
          duration: string;
          name: string;
          replicaN: number;
          shardGroupDuration: string;
        }>(result)
      );
  }

  /**
   * Shows shards on the database
   *
   * @param [database] The database to list policies on, uses the
   *     default database if not provided.
   * @return
   * @example
   * influx.showShards().then(shards => {
   *   expect(shards.slice()).to.deep.equal([
   *     {
   *        id: 1
   *        database: 'database',
   *        retention_policy: 'autogen',
   *        shard_group: 1,
   *        start_time: '2019-05-06T00:00:00Z',
   *        end_time: '2019-05-13T00:00:00Z',
   *        expiry_time: '2019-05-13T00:00:00Z',
   *        owners: null,
   *     },
   *   ])
   * })
   */
  public showShards(database: string = this._defaultDB()): Promise<
    Array<{
      id: number;
      database: string;
      retention_policy: string;
      shard_group: number;
      start_time: string;
      end_time: string;
      expiry_time: string;
      owners: string;
    }>
  > {
    return this._pool
      .json(
        this._getQueryOpts(
          {
            q: "show shards ",
          },
          "GET"
        )
      )
      .then((result) =>
        parseSingle<{
          id: number;
          database: string;
          retention_policy: string;
          shard_group: number;
          start_time: string;
          end_time: string;
          expiry_time: string;
          owners: string;
        }>(result).filter(function (i) {
          return i.database === database;
        })
      );
  }

  /**
   * Drops a shard with the provided number.
   * @param shard_id
   * @return
   * @example
   * influx.dropShard(3)
   */
  public dropShard(shard_id: number): Promise<void> {
    return this._pool
      .json(
        this._getQueryOpts(
          {
            q: `drop shard ${shard_id}`,
          },
          "POST"
        )
      )
      .then(assertNoErrors)
      .then(() => undefined);
  }

  /**
   * WritePoints sends a list of points together in a batch to InfluxDB. In
   * each point you must specify the measurement name to write into as well
   * as a list of tag and field values. Optionally, you can specify the
   * time to tag that point at, defaulting to the current time.
   *
   * If you defined a schema for the measurement in the options you passed
   * to `new Influx(options)`, we'll use that to make sure that types get
   * cast correctly and that there are no extraneous fields or columns.
   *
   * For best performance, it's recommended that you batch your data into
   * sets of a couple thousand records before writing it. In the future we'll
   * have some utilities within node-influx to make this easier.
   *
   * ---
   *
   * A note when using manually-specified times and precisions: by default
   * we write using the `ms` precision since that's what JavaScript gives us.
   * You can adjust this. However, there is some special behaviour if you
   * manually specify a timestamp in your points:
   *  - if you specify the timestamp as a Date object, we'll convert it to
   *    milliseconds and manipulate it as needed to get the right precision
   *  - if provide a INanoDate as returned from {@link toNanoTime} or the
   *    results from an Influx query, we'll be able to pull the precise
   *    nanosecond timestamp and manipulate it to get the right precision
   *  - if you provide a string or number as the timestamp, we'll pass it
   *    straight into Influx.
   *
   * Please see the IPoint and IWriteOptions types for a
   * full list of possible options.
   *
   * @param points
   * @param [options]
   * @return
   * @example
   * // write a point into the default database with
   * // the default retention policy.
   * influx.writePoints([
   *   {
   *     measurement: 'perf',
   *     tags: { host: 'box1.example.com' },
   *     fields: { cpu: getCpuUsage(), mem: getMemUsage() },
   *   }
   * ])
   *
   * // you can manually specify the database,
   * // retention policy, and time precision:
   * influx.writePoints([
   *   {
   *     measurement: 'perf',
   *     tags: { host: 'box1.example.com' },
   *     fields: { cpu: getCpuUsage(), mem: getMemUsage() },
   *     timestamp: getLastRecordedTime(),
   *   }
   * ], {
   *   database: 'my_db',
   *   retentionPolicy: '1d',
   *   precision: 's'
   * })
   */
  public writePoints(
    points: IPoint[],
    options: IWriteOptions = {}
  ): Promise<void> {
    const {
      database = this._defaultDB(),
      precision = "n" as grammar.TimePrecision,
      retentionPolicy,
    } = options;

    let payload = "";
    points.forEach((point) => {
      const { measurement, tags, fieldsPairs, tagsNames, castedTimestamp } =
        this.parsePoint(point, { database, precision });

      payload += (payload.length > 0 ? "\n" : "") + measurement;

      for (let tagsName of tagsNames) {
        payload +=
          "," +
          grammar.escape.tag(tagsName) +
          "=" +
          grammar.escape.tag(tags[tagsName]);
      }

      for (let i = 0; i < fieldsPairs.length; i += 1) {
        payload +=
          (i === 0 ? " " : ",") +
          grammar.escape.tag(fieldsPairs[i][0]) +
          "=" +
          fieldsPairs[i][1];
      }

      if (castedTimestamp !== undefined) {
        payload += " " + castedTimestamp;
      }
    });

    return this._pool.discard({
      body: payload,
      method: "POST",
      path: "/write",
      query: {
        db: database,
        p: this._options.password,
        precision,
        rp: retentionPolicy,
        u: this._options.username,
      },
    });
  }

  /**
   * ParsePoint will perform the coercions/schema checks and return the data
   * required for writing a point. This will throw an error if a schema check
   * or coercion fails. This can be useful for flagging or "throwing out" bad
   * points in a batch write to prevent the entire batch from getting aborted
   *
   * ---
   *
   * A note when using this function, {@link InfluxDB#writePoints} will still perform
   * the same checks, so any pre-processed data will be checked for validity twice which
   * has potential performance implications on large data sets
   *
   * @param point
   * @param [options]
   * @return
   * @example
   * // parse a point as if it is getting written to the default
   * // databse with the default time precision
   * influx.parsePoint({
   *     measurement: 'perf',
   *     tags: { host: 'box1.example.com' },
   *     fields: { cpu: getCpuUsage(), mem: getMemUsage() },
   * })
   *
   * // you can manually specify the database and time precision
   * influx.parsePoint({
   *     measurement: 'perf',
   *     tags: { host: 'box1.example.com' },
   *     fields: { cpu: getCpuUsage(), mem: getMemUsage() },
   * }, {
   *   precision: 's',
   *   database: 'my_db'
   * })
   *
   * // if an error occurs, you can catch the error with try...catch
   * try {
   *   influx.parsePoint({
   *     measurement: 'perf',
   *     tags: { host: 'box1.example.com', myExtraneousTag: 'value' },
   *     fields: { cpu: getCpuUsage(), mem: getMemUsage(), myExtraneousField: 'value' },
   *   })
   * } catch(err) {
   *   handleError(err);
   * }
   */
  parsePoint(point: IPoint, options: IParseOptions = {}): IParsedPoint {
    const { database = this._defaultDB(), precision = "n" } = options;

    const { fields = {}, tags = {}, measurement, timestamp } = point;

    const schema =
      this._schema[database] && this._schema[database][measurement];
    const fieldsPairs = schema
      ? schema.coerceFields(fields)
      : coerceBadly(fields);
    const tagsNames = schema ? schema.checkTags(tags) : Object.keys(tags);
    const castedTimestamp =
      timestamp && grammar.castTimestamp(timestamp, precision);
    return {
      fields,
      tags,
      measurement,
      timestamp,
      fieldsPairs,
      tagsNames,
      castedTimestamp,
    };
  }

  /**
   * WriteMeasurement functions similarly to {@link InfluxDB#writePoints}, but
   * it automatically fills in the `measurement` value for all points for you.
   *
   * @param measurement
   * @param points
   * @param [options]
   * @return
   * @example
   * influx.writeMeasurement('perf', [
   *   {
   *     tags: { host: 'box1.example.com' },
   *     fields: { cpu: getCpuUsage(), mem: getMemUsage() },
   *   }
   * ])
   */
  public writeMeasurement(
    measurement: string,
    points: IPoint[],
    options: IWriteOptions = {}
  ): Promise<void> {
    points = points.map((p) => ({ measurement, ...p }));
    return this.writePoints(points, options);
  }

  public query<T>(
    query: string[],
    options?: IQueryOptions
  ): Promise<Array<IResults<T>>>;

  public query<T>(query: string, options?: IQueryOptions): Promise<IResults<T>>;

  /**
   * .query() runs a query (or list of queries), and returns the results in a
   * friendly format, {@link IResults}. If you run multiple queries, an array of results
   * will be returned, otherwise a single result (array of objects) will be returned.
   *
   * @param query
   * @param [options]
   * @return result(s)
   * @example
   * influx.query('select * from perf').then(results => {
   *   console.log(results)
   * })
   */
  public query<T>(
    query: string | string[],
    options: IQueryOptions = {}
  ): Promise<IResults<T> | Array<IResults<T>>> {
    if (Array.isArray(query)) {
      query = query.join(";");
    }

    // If the consumer asked explicitly for nanosecond precision parsing,
    // remove that to cause Influx to give us ISO dates that
    // we can parse correctly.
    if (options.precision === "n") {
      options = { ...options }; // Avoid mutating
      delete options.precision;
    }

    return this.queryRaw(query, options).then((res) =>
      parse<T>(res, options.precision)
    );
  }

  /**
   * QueryRaw functions similarly to .query() but it does no fancy
   * transformations on the returned data; it calls `JSON.parse` and returns
   * those results verbatim.
   *
   * @param query
   * @param [options]
   * @return
   * @example
   * influx.queryRaw('select * from perf').then(rawData => {
   *   console.log(rawData)
   * })
   */
  public queryRaw(
    query: string | string[],
    options: IQueryOptions = {}
  ): Promise<any> {
    const {
      database = this._defaultDB(),
      retentionPolicy,
      placeholders = {},
    } = options;

    if (query instanceof Array) {
      query = query.join(";");
    }

    return this._pool.json(
      this._getQueryOpts({
        db: database,
        epoch: options.precision,
        q: query,
        rp: retentionPolicy,
        params: JSON.stringify(placeholders),
      })
    );
  }

  /**
   * Pings all available hosts, collecting online status and version info.
   * @param timeout Given in milliseconds
   * @return
   * @example
   * influx.ping(5000).then(hosts => {
   *   hosts.forEach(host => {
   *     if (host.online) {
   *       console.log(`${host.url.host} responded in ${host.rtt}ms running ${host.version})`)
   *     } else {
   *       console.log(`${host.url.host} is offline :(`)
   *     }
   *   })
   * })
   */
  public ping(timeout: number): Promise<IPingStats[]> {
    let auth: string = undefined;

    if (typeof this._options.username === "string") {
      auth = `${this._options.username}:${this._options.password || ""}`;
    }

    return this._pool.ping(timeout, "/ping", auth);
  }

  /**
   * Returns the default database that queries operates on. It throws if called
   * when a default database isn't set.
   * @private
   */
  private _defaultDB(): string {
    if (!this._options.database) {
      throw new Error(
        "Attempted to run an influx query without a default" +
          " database specified or an explicit database provided."
      );
    }

    return this._options.database;
  }

  /**
   * Creates options to be passed into the pool to query databases.
   * @private
   */
  private _getQueryOpts(params: any, method = "GET"): any {
    return {
      method,
      path: "/query",
      query: {
        p: this._options.password,
        u: this._options.username,
        ...params,
      },
    };
  }

  /**
   * Creates specified measurement schema
   *
   * @private
   * @param {ISchemaOptions} schema
   * @memberof InfluxDB
   */
  private _createSchema(schema: ISchemaOptions): void {
    schema.database = schema.database || this._options.database;
    if (!schema.database) {
      throw new Error(
        `Schema ${schema.measurement} doesn't have a database specified,` +
          "and no default database is provided!"
      );
    }

    if (!this._schema[schema.database]) {
      this._schema[schema.database] = Object.create(null);
    }

    this._schema[schema.database][schema.measurement] = new Schema(schema);
  }
}