src/providers/gce.js
const Promise = require('bluebird');
const AbstractFileTransfer = require('ms-files-transport');
const { get, merge, defaults } = require('lodash');
const bl = require('bl');
const assert = require('assert');
const os = require('os');
const { Storage } = require('@google-cloud/storage');
const { fetch } = require('undici');
const { HttpStatusError } = require('common-errors');
// for some reason it doesn't go through it if we just do the obj
const unwrap = (datum) => datum[0];
// default signed URL expiration time
const THREE_HOURS = 1000 * 60 * 60 * 3;
/**
* Main transport class
*/
class GCETransport extends AbstractFileTransfer {
constructor(opts = {}) {
super();
this._config = merge({}, GCETransport.defaultOpts, opts);
this._logger = this._config.logger;
this.setupGCE();
}
/**
* Returns logger or noop
* @return {Bunyan}
*/
get log() {
return this._logger;
}
/**
* Returns base configuration
*/
get config() {
return this._config;
}
/**
* Creates authenticated instance of gcloud
*/
setupGCE() {
this._gcs = new Storage({ ...this._config.gce });
try {
const { PubSub } = require('@google-cloud/pubsub');
this._pubsub = new PubSub({ ...this._config.gce });
this._pubsub._subscriptions = [];
} catch (e) {
this._logger.warn('failed to load @google-cloud/pubsub', e);
}
}
/**
* Creates notification channel
*/
setupChannel(resourceId) {
const bucket = this._bucket;
const gcs = this._gcs;
const { id, config } = this._config.bucket.channel;
if (!id || !config.address) {
this.log.warn('incomplete configuration for notification channel');
return null;
}
return bucket
.createChannel(id, config)
.then(unwrap)
.then((channel) => {
this.log.debug('created channel %s', id);
return channel;
})
.catch({ code: 400 }, (err) => {
if (get(err, 'errors[0].reason') === 'channelIdNotUnique' && resourceId) {
this.log.debug('found existing channel %s - %s', id, resourceId);
return gcs.channel(id, resourceId);
}
throw err;
})
.catch({ code: 401 }, (err) => {
this.log.error('no rights to create channel', err);
throw err;
})
.then((channel) => {
this._channel = channel;
return channel.metadata.resourceId;
});
}
/**
* stops channel
*/
stopChannel() {
const channel = this._channel;
this.log.debug('destroying channel %j', (channel && channel.metadata) || '<noop>');
return Promise.resolve(channel && channel.stop());
}
/**
* Handles pubsub for object change notifications
* @return {Subscription}
*/
async subscribe(handler) {
assert(this._pubsub, '@google-cloud/pubsub not initialized');
// extract config
const { pubsub } = this._config.bucket.channel;
assert(pubsub, 'to subscribe must specify pubsub topic via `pubsub.topic`');
assert(pubsub.topic, 'to subscribe must specify pubsub topic via `pubsub.topic`');
assert(pubsub.name, 'must contain a name for proper round-robin distribution');
const Pubsub = this._pubsub;
const { name } = pubsub;
const config = defaults(pubsub.config || {}, {
terminate: false,
gaxOpts: {
autoCreate: true,
},
});
this.log.info('subscribing to %s on %s', pubsub.topic, os.hostname());
// first find if we have an existing subscription
// NOTE: will throw if not created before
const [topic] = await Pubsub.topic(pubsub.topic).get();
this.log.info({ topic: pubsub.topic }, 'retrieved topic');
// prepare Subscription object
const Subscription = topic.subscription(name, {
flowControl: { ...config.flowControl },
maxConnections: config.maxConnections || 5,
});
const [subscription] = await Subscription.get(config.gaxOpts);
this.log.info({ subscription: name }, 'prepared subscription');
subscription.on('error', (err) => this.log.error({ err }, 'failed to subscribe'));
subscription.on('close', () => subscription.open());
subscription.on('message', handler);
this._pubsub._subscriptions.push(subscription);
// for internal cleanup
if (config.terminate) subscription._terminate = true;
return subscription;
}
/**
* Creates bucket if it doesn't exist, otherwise
* returns an existing one
* @param {Object} [query]
* @return {Promise}
*/
async createBucket() {
let bucket;
const gcs = this._gcs;
const needle = this._config.bucket.name;
const gceBucket = gcs.bucket(needle);
const [exists] = await gceBucket.exists();
// retrieves bucket gce metadata
if (exists) {
[bucket] = await gceBucket.get();
} else {
this.log.debug('initiating createBucket: %s', needle);
[bucket] = await gcs.createBucket(needle, this._config.bucket.metadata);
}
return bucket;
}
/**
* Ensures that we have rights to write to the
* specified bucket
* @returns {Promise<Void>}
*/
connect() {
return Promise
.bind(this)
.then(this.createBucket)
.then((bucket) => {
this._bucket = bucket;
return bucket;
});
}
/**
* Disconnects pubsub handlers if they are alive
* @returns {Promise<Void>}
*/
close() {
const subscriptions = this._pubsub && this._pubsub._subscriptions;
if (subscriptions && subscriptions.length > 0) {
return Promise.map(subscriptions, (subscription) => {
// remove message listener
subscription.removeAllListeners('close');
subscription.removeAllListeners('message');
subscription.removeAllListeners('error');
// terminate if needed
if (subscription._terminate) return subscription.delete();
// done
return subscription.close();
});
}
return Promise.resolve();
}
/**
* Creates signed URL
*
* StringToSign = HTTP_Verb + "\n" +
* Content_MD5 + "\n" +
* Content_Type + "\n" +
* Expiration + "\n" +
* Canonicalized_Extension_Headers +
* Canonicalized_Resource
*
* @param {String="read","write","delete"} action
* @param {String} [type] Content-Type, do not supply for downloads
* @param {String} resource `/path/to/objectname/without/bucket`
* You construct the Canonicalized_Resource portion of the message by concatenating the resource path
* (bucket and object and subresource) that the request is acting on. To do this, you can use the following process:
* * Begin with an empty string.
* * If the bucket name appears in the Host header, add a slash and the bucket name to the string (for example,
* /example-travel-maps). If the bucket name appears in the path portion of the HTTP request, do nothing.
* * Add the path portion of the HTTP request to the string, excluding any query string parameters. For example,
* if the path is /europe/france/paris.jpg?cors and you already added the bucket example-travel-maps to the string,
* then you need to add /europe/france/paris.jpg to the string.
* * If the request is scoped to a subresource, such as ?cors, add this subresource to the string, including the
* question mark.
* * Be sure to copy the HTTP request path literally: that is, you should include all URL encoding (percent signs)
* in the string that you create. Also, be sure that you include only query string parameters that designate
* subresources (such as cors). You should not include query string parameters such as ?prefix,
* ?max-keys, ?marker, and ?delimiter.
* @param {String} [md5] - md5 digest of content - Optional. The MD5 digest value in base64. If you provide this in the string,
* the client (usually a browser) must provide this HTTP header with this same value in its request.
* @param {Number} expires This is the timestamp (represented as the number of miliseconds since the Unix Epoch
* of 00:00:00 UTC on January 1, 1970) when the signature expires
* @param {String} [extensionHeaders] :
* You construct the Canonical Extension Headers portion of the message by concatenating all extension
* (custom) headers that begin with x-goog-. However, you cannot perform a simple concatenation.
* You must concatenate the headers using the following process:
* * Make all custom header names lowercase.
* * Sort all custom headers by header name using a lexicographical sort by code point value.
* * Eliminate duplicate header names by creating one header name with a comma-separated list of values.
* Be sure there is no whitespace between the values and be sure that the order of the comma-separated
* list matches the order that the headers appear in your request. For more information, see RFC 2616 section 4.2.
* * Replace any folding whitespace or newlines (CRLF or LF) with a single space. For more
* information about folding whitespace, see RFC 2822 section 2.2.3.
* * Remove any whitespace around the colon that appears after the header name.
* * Append a newline (U+000A) to each custom header.
* * Concatenate all custom headers.
* Important: You must use both the header name and the header value when you construct the Canonical Extension Headers
* portion of the query string. Be sure to remove any whitespace around the colon that separates the header name and
* value. For example, using the custom header x-goog-acl: private without removing the space after the colon will
* return a 403 Forbidden because the request signature you calculate will not match the signature Google calculates.
* @returns {Promise}
*/
createSignedURL(opts) {
this.log.debug('initiating signing of URL for %s', opts.resource);
const { cname, rename } = this;
const { action, md5, type, expires, extensionHeaders, resource, ...props } = opts;
/**
* @type {import('@google-cloud/storage').File}
*/
const file = this.bucket.file(resource);
/**
* @type {import('@google-cloud/storage').GetSignedUrlConfig}
*/
const settings = {
version: 'v4',
...props,
action,
expires,
cname: rename ? cname : false,
contentMd5: md5,
contentType: type,
extensionHeaders,
};
return file.getSignedUrl(settings).then(unwrap);
}
// @todo interface
getDownloadUrlSigned(filename, downloadName) {
return this.createSignedURL({
action: 'read',
// 3 hours
expires: Date.now() + (this.expire || THREE_HOURS),
resource: filename,
promptSaveAs: downloadName,
});
}
// @todo interface
getBucketName() {
return this._config.bucket.name;
}
/**
* Initializes resumable upload
* @param {Object} opts
* @param {String} opts.filename
* @param {Object} opts.metadata
* @param {String} opts.metadata.contentLength
* @param {String} opts.metadata.contentType - must be included
* @param {String} opts.metadata.md5Hash - must be included
* @param {String} [opts.metadata.contentEncoding] - optional, can be set to gzip
* @return {Promise<string>}
*/
async initResumableUpload(opts) {
const {
filename, metadata, generation, ...props
} = opts;
this.log.debug('initiating resumable upload of %s', filename);
const [uri] = await this.bucket.file(filename, { generation }).createResumableUpload({
...props,
metadata,
});
return uri;
}
/**
* Perform resumable uploads
* https://cloud.google.com/storage/docs/performing-resumable-uploads#initiate-session
*
* @param {String} url
* @returns {Promise<String>}
*/
// eslint-disable-next-line class-methods-use-this
async initResumableUploadFromURL(url, { origin, md5Hash, contentType, headers = {} }) {
const response = await fetch(url, {
method: 'POST',
headers: {
origin,
'Content-MD5': md5Hash,
'Content-Type': contentType,
'X-Goog-Resumable': 'start',
...headers,
},
});
const body = await response.text();
const location = response.headers.get('location');
if (!response.ok) {
throw new HttpStatusError(422, `could not init resumable upload: ${body}`);
}
if (!location) {
throw new HttpStatusError(422, `could not init resumable upload, empty location: ${body}`);
}
return location;
}
/**
* Download file/stream
* @param {String} filename - what do we want to download
* @param {Object} opts
* @param {Number} [opts.start]
* @param {Number} [opts.end]
* @param {Function} opts.onError - returns error if it happens
* @param {Function} opts.onResponse - returns response headers and status
* @param {Function} opts.onData - returns data chunks
* @param {Function} opts.onEnd - fired when transfer is completed
*/
readFileStream(filename, opts = {}) {
this.log.debug('initiating read of %s', filename);
const {
onError,
onResponse,
onData,
onEnd,
..._opts
} = opts;
const file = this.bucket.file(filename);
const stream = file.createReadStream(_opts);
// attach event handles if they are present
['onError', 'onResponse', 'onData', 'onEnd'].forEach((opt) => {
const thunk = opts[opt];
if (typeof thunk === 'function') {
stream.on(opt.slice(2).toLowerCase(), thunk);
}
});
return stream;
}
/**
* Upload filestream
* @param {String} filename
* @param {Object} opts
* @return {Stream}
*/
writeStream(filename, opts) {
this.log.debug('initiating upload of %s', filename);
const file = this.bucket.file(filename);
return file.createWriteStream(opts, { resumable: false });
}
/**
* Makes file publicly accessible
* @param {String} filename
* @return {Promise}
*/
makePublic(filename) {
const file = this.bucket.file(filename);
return file.makePublic();
}
/**
* Makes file public
* @param {String} filename
* @return {Promise}
*/
makePrivate(filename, options = {}) {
const file = this.bucket.file(filename);
return file.makePrivate(options);
}
/**
* Download file
* @param {String} filename - what do we want to download
* @param {Object} opts
* @return {Promise}
*/
readFile(filename, opts) {
return Promise.fromNode((next) => {
let response = null;
this.readFileStream(filename, opts)
.on('response', (httpResponse) => {
response = httpResponse;
})
.pipe(bl((err, contents) => {
if (err) {
return next(err);
}
return next(null, { response, contents });
}));
});
}
/**
* Tells whether file exists or not
* @param {String} filename
* @return {Promise}
*/
async exists(filename) {
this.log.debug('initiating exists check of %s', filename);
const file = this.bucket.file(filename);
const [exists] = await file.exists();
return exists;
}
/**
* Removes file from bucket
* @param {String} filename
* @return {Promise}
*/
remove(filename) {
this.log.debug('removing file %s', filename);
const file = this.bucket.file(filename);
// make sure it is wrapped, so that later we can do .catch(predicate, action)
return Promise.resolve(file).call('delete');
}
async copy(sourceFilename, destFilename) {
this.log.debug('copying file %s to %s', sourceFilename, destFilename);
const destination = this.bucket.file(destFilename);
const source = this.bucket.file(sourceFilename);
return source.copy(destination, {
preconditionOpts: {
ifGenerationMatch: 0,
},
});
}
}
GCETransport.defaultOpts = {
name: 'gce',
gce: {
// specify authentication options
// here
},
bucket: {
// specify bucket
name: 'must-be-a-valid-bucket-name',
host: 'storage.cloud.google.com',
channel: {
// must be persistent in your app to identify the channel
id: null,
pubsub: null,
config: {
// change to your webhook address
address: 'https://localhost:443',
// token: this is your `SECRET`, so make sure you set it to something unique for your application and
// verify notification
token: undefined,
},
},
metadata: {},
},
};
module.exports = GCETransport;