lib/dialects/postgres/connector-manager.js
var Query = require("./query")
, Utils = require("../../utils")
module.exports = (function() {
var ConnectorManager = function(sequelize, config) {
var pgModule = config.dialectModulePath || 'pg'
this.sequelize = sequelize
this.client = null
this.config = config || {}
this.config.port = this.config.port || 5432
this.pooling = (!!this.config.pool && (this.config.pool.maxConnections > 0))
this.pg = this.config.native ? require(pgModule).native : require(pgModule)
// Better support for BigInts
// https://github.com/brianc/node-postgres/issues/166#issuecomment-9514935
this.pg.types.setTypeParser(20, String);
this.disconnectTimeoutId = null
this.pendingQueries = 0
this.clientDrained = true
this.maxConcurrentQueries = (this.config.maxConcurrentQueries || 50)
this.ConnectionParameters = require(pgModule + '/lib/connection-parameters')
this.onProcessExit = function () {
this.disconnect()
}.bind(this);
process.on('exit', this.onProcessExit)
}
Utils._.extend(ConnectorManager.prototype, require("../connector-manager").prototype)
ConnectorManager.prototype.endQuery = function() {
var self = this
self.pendingQueries--
if (!self.pooling && self.pendingQueries === 0) {
setTimeout(function() {
self.pendingQueries === 0 && self.disconnect.call(self)
}, 100)
}
}
ConnectorManager.prototype.query = function(sql, callee, options) {
var self = this
self.pendingQueries++
self.clientDrained = false
return new Utils.CustomEventEmitter(function(emitter) {
self.connect()
.on('error', function(err) {
emitter.emit('error', err)
})
.on('success', function(done) {
var query = new Query(self.client, self.sequelize, callee, options || {})
return query.run(sql)
.complete(function(err) {
self.endQuery.call(self)
done && done(err) })
.proxy(emitter)
})
}).run()
}
ConnectorManager.prototype.afterTransactionSetup = function(callback) {
this.setTimezone(this.client, 'UTC', callback)
}
ConnectorManager.prototype.connect = function(callback) {
var self = this
var emitter = new (require('events').EventEmitter)()
// in case database is slow to connect, prevent orphaning the client
// TODO: We really need some sort of queue/flush/drain mechanism
if (this.isConnecting && !this.pooling && this.client === null) {
emitter.emit('success', null)
return emitter
}
this.isConnecting = true
this.isConnected = false
var uri = this.sequelize.getQueryInterface().QueryGenerator.databaseConnectionUri(this.config)
, config = new this.ConnectionParameters(uri)
// set pooling parameters if specified
if (this.pooling) {
config.poolSize = this.config.pool.maxConnections || 10
config.poolIdleTimeout = this.config.pool.maxIdleTime || 30000
config.reapIntervalMillis = this.config.pool.reapInterval || 1000
config.uuid = this.config.uuid
}
var connectCallback = function(err, client, done) {
self.isConnecting = false
if (!!err) {
// release the pool immediately, very important.
done && done(err)
self.client = null
if (err.code) {
switch(err.code) {
case 'ECONNREFUSED':
emitter.emit('error', new Error("Failed to authenticate for PostgresSQL. Please double check your settings."))
break
case 'ENOTFOUND':
case 'EHOSTUNREACH':
case 'EINVAL':
emitter.emit('error', new Error("Failed to find PostgresSQL server. Please double check your settings."))
break
default:
emitter.emit('error', err)
break
}
} else {
emitter.emit('error', new Error(err.message))
}
} else if (client) {
var timezoneCallback = function() {
self.isConnected = true
self.client = client
emitter.emit('success', done)
}
if (self.config.keepDefaultTimezone) {
Utils.tick(timezoneCallback)
} else {
self.setTimezone(client, 'UTC', timezoneCallback)
}
} else if (self.config.native) {
self.setTimezone(self.client, 'UTC', function() {
self.isConnected = true
emitter.emit('success', done)
})
} else {
done && done()
self.client = null
emitter.emit('success')
}
}
if (this.pooling) {
// acquire client from pool
this.pg.connect(config, connectCallback)
} else {
if (!!this.client) {
connectCallback(null, this.client)
} else {
//create one-off client
var responded = false
this.client = new this.pg.Client(config)
this.client.connect(function(err, client, done) {
responded = true
connectCallback(err, client || self.client, done)
})
// If we didn't ever hear from the client.connect() callback the connection timeout, node-postgres does not treat this as an error since no active query was ever emitted
this.client.on('end', function () {
if (!responded) {
connectCallback(new Error('Connection timed out'))
}
})
// Closes a client correctly even if we have backed up queries
// https://github.com/brianc/node-postgres/pull/346
this.client.on('drain', function() {
self.clientDrained = true
})
}
}
return emitter
}
ConnectorManager.prototype.setTimezone = function(client, timezone, callback) {
client.query("SET TIME ZONE '" + (timezone || "UTC") + "'").on('end', callback)
}
ConnectorManager.prototype.disconnect = function() {
if (this.client) {
if (this.clientDrained) {
this.client.end()
}
this.client = null
}
this.isConnecting = false
this.isConnected = false
}
return ConnectorManager
})()