src/deep-resource/lib/Resource/Request.js
/**
* Created by AlexanderC on 6/10/15.
*/
'use strict';
import {SuperagentResponse} from './SuperagentResponse';
import {LambdaResponse} from './LambdaResponse';
import {Response} from './Response';
import {Exception} from '../Exception/Exception';
import {Action} from './Action';
import {RetryManager} from './RetryManager';
import Http from 'superagent';
import AWS from 'aws-sdk';
import {MissingCacheImplementationException} from './Exception/MissingCacheImplementationException';
import {CachedRequestException} from './Exception/CachedRequestException';
import aws4 from 'aws4';
import urlParse from 'url-parse';
import qs from 'qs';
import Core from 'deep-core';
import {MissingSecurityServiceException} from './Exception/MissingSecurityServiceException';
import {AsyncCallNotAvailableException} from './Exception/AsyncCallNotAvailableException';
import {LambdaParamsCompatibilityException} from './Exception/LambdaParamsCompatibilityException';
import {LoadCredentialsException} from './Exception/LoadCredentialsException';
import {SourceNotAvailableException} from './Exception/SourceNotAvailableException';
import crypto from 'crypto';
import util from 'util';
/**
* Action request instance
*/
export class Request {
/**
* @param {Action} action
* @param {Object} payload
* @param {String} method
*/
constructor(action, payload, method) {
this._action = action;
this._payload = payload;
this._method = method;
this._lambda = new AWS.Lambda();
this._cacheImpl = null;
this._cacheTtl = Request.TTL_FOREVER;
this._cached = false;
this._publicCached = false;
this._async = false;
this._xhrAsync = true; // it's passed to httpOptions.xhrAsync option of AWS Service
this._native = false;
this._validationSchemaName = null;
this._customId = null;
this._returnLogs = false;
this._withUserCredentials = true;
this._authScope = this._buildAuthScope();
this._apiKey = null;
this._baseUrl = null;
this._retryManager = new RetryManager(['internal-error']);
}
/**
* @param {String} url
* @returns {LocalRequest}
*/
baseUrl(url) {
this._baseUrl = url;
return this;
}
/**
* @returns {String|null}
*/
get baseUrl() {
if (!this._baseUrl) {
this._baseUrl = this._action.baseUrl;
}
return this._baseUrl;
}
/**
* @returns {Request}
*/
skipLoadingCredentials() {
this._withUserCredentials = false;
return this;
}
/**
* @returns {Boolean}
*/
get withUserCredentials() {
if (this.action.scope === 'private') {
this._withUserCredentials = false;
}
return this._withUserCredentials;
}
/**
* @returns {Request}
*/
skipPreValidation() {
this._validationSchemaName = null;
return this;
}
/**
* @returns {String}
*/
get validationSchemaName() {
return this._validationSchemaName;
}
/**
* @param {String} validationSchemaName
*/
set validationSchemaName(validationSchemaName) {
this._validationSchemaName = validationSchemaName;
}
/**
* @returns {Boolean}
*/
get async() {
return this._async;
}
/**
* @returns {Boolean}
*/
get xhrAsync() {
return this._xhrAsync;
}
/**
* @returns {Boolean}
*
* @todo: remove this?
*/
get isLambda() {
return this._action.type === Action.LAMBDA;
}
/**
* @returns {String}
*/
get customId() {
if (!this._customId) {
this._customId = Request._md5(this._buildCacheKey() + new Date().getTime());
}
return this._customId;
}
/**
* @returns {Request}
*/
invokeAsync() {
if (!this.isLambda) {
throw new AsyncCallNotAvailableException(this._action.type);
}
if (this._returnLogs) {
throw new LambdaParamsCompatibilityException({
InvocationType: 'Event',
LogType: 'Tail',
});
}
this._native = true;
this._async = true;
return this;
}
/**
* @param {Boolean} flag
*
* @returns {Request}
*/
httpXhrAsync(flag) {
if (!this.isLambda) {
throw new Exception('XHR sync requests are supported only for calls to lambda functions.', 400);
}
this._xhrAsync = !!flag;
return this;
}
/**
* @returns {Boolean}
*/
get native() {
return this._native;
}
/**
* @param {Boolean} returnLogs
* @returns {Request}
*/
useDirectCall(returnLogs = false) {
if (this._async && returnLogs) {
throw new LambdaParamsCompatibilityException({
InvocationType: 'Event',
LogType: 'Tail',
});
}
this._native = true;
this._returnLogs = returnLogs;
return this;
}
/**
* @returns {Request}
*/
usePublicCache() {
this._publicCached = true;
return this;
}
/**
* @returns {Boolean}
*/
get isCached() {
return this._cacheImpl && this._cached;
}
/**
* @returns {Boolean}
*/
get isPublicCached() {
return this._publicCached && this._cacheImpl && this._cacheImpl.shared;
}
/**
* @returns {Request}
*/
enableCache() {
this._cached = true;
return this;
}
/**
* @returns {Request}
*/
disableCache() {
this._cached = false;
return this;
}
/**
* @param {Number} ttl
* @returns {Request}
*/
cache(ttl = Request.TTL_FOREVER) {
if (!this._cacheImpl) {
throw new MissingCacheImplementationException();
}
this._cacheTtl = ttl;
this.enableCache();
return this;
}
/**
* @param {Number} count
* @returns {Request}
*/
retry(count) {
this._retryManager.count = count;
return this;
}
/**
* @param {Function|String} strategy
* @param {Object[]} args
* @returns {Request}
*/
addRetryStrategy(strategy, ...args) {
this._retryManager.addStrategy(strategy, ...args);
return this;
}
/**
* @returns {Number}
*/
get cacheTtl() {
return this._cacheTtl;
}
/**
* @param {Number} ttl
*/
set cacheTtl(ttl) {
this._cacheTtl = ttl;
}
/**
* @returns {Cache}
*/
get cacheImpl() {
return this._cacheImpl;
}
/**
* @param {Cache} cache
*/
set cacheImpl(cache) {
this._cacheImpl = cache;
// @todo: do we really have to force it?
this.cache(Request.TTL_DEFAULT);
}
/**
* @param {String|null} authScope
* @returns {Request}
*/
authScope(authScope) {
this._authScope = authScope;
return this;
}
/**
* @param {String} key
* @returns {Request}
*/
apiKey(key) {
this._apiKey = key;
return this;
}
/**
* @returns {String}
* @private
*/
_buildCacheKey() {
let payload = Request._md5(JSON.stringify(this.payload));
let endpoint = this.native ? this._action.source.original : this._action.source.api;
return `${this._method}:${this._action.type}:${endpoint}#${payload}`;
}
/**
* @param {String} str
* @returns {String}
*/
static _md5(str) {
var md5sum = crypto.createHash('md5');
md5sum.update(str);
return md5sum.digest('hex');
}
/**
* @param {Response} response
* @returns {String}
* @private
*/
static _stringifyResponse(response) {
let objToStr = {
_class: response.constructor.name,
data: response.rawData,
error: response.rawError,
headers: response.headers,
};
if (response.constructor.name === SuperagentResponse.name) {
objToStr.data = {
body: response.rawData.body,
status: response.rawData.status,
headers: response.rawData.headers
};
}
return JSON.stringify(objToStr);
}
/**
* @param {String|Object} rawData
* @returns {Response}
* @private
*/
_rebuildResponse(rawData) {
let response = typeof rawData === 'string' ? JSON.parse(rawData) : rawData;
if (!response) {
throw new CachedRequestException(`Unable to unpack cached JSON object from ${rawData}`);
}
if (response._class) {
let ResponseImpl = Request._chooseResponseImpl(response._class);
if (!ResponseImpl) {
throw new Exception(`Unknown Response implementation ${response._class}`);
}
return new ResponseImpl(this, response.data, response.error);
}
return new SuperagentResponse(this, response, null);
}
/**
* @param {String} className
* @returns {*}
* @private
*/
static _chooseResponseImpl(className) {
let implMap = {};
implMap[Response.name] = Response;
implMap[LambdaResponse.name] = LambdaResponse;
implMap[SuperagentResponse.name] = SuperagentResponse;
return implMap[className];
}
/**
*
* @param {Function} callback
* @returns {Request}
*/
invalidateCache(callback = () => {}) {
if (!this.isCached) {
callback(true);
return this;
}
let cache = this._cacheImpl;
let cacheKey = this._buildCacheKey();
cache.has(cacheKey, (error, result) => {
if (error) {
throw new CachedRequestException(error);
}
if (result) {
cache.invalidate(cacheKey, 0, (error, result) => {
if (error) {
throw new CachedRequestException(error);
}
callback(result);
});
return;
}
callback(true);
});
return this;
}
/**
* @param {Function} callback
* @returns {Request}
*/
send(callback = () => {}) {
let cache = this.cacheImpl;
let cacheKey = this._buildCacheKey();
if (!this.isCached || this._async || (this.cacheTtl === Request.TTL_INVALIDATE)) {
return this._send(callback);
}
this._loadResponseFromCache(cache, cacheKey, (error, response) => {
if (!error) {
callback(response);
return;
}
if (this.isPublicCached) {
let publicCache = cache.shared;
let publicCacheKey = publicCache.buildKeyFromRequest(this);
this._loadResponseFromCache(publicCache, publicCacheKey, (error, response) => {
if (!error) {
callback(response);
return;
}
this._send(callback);
});
} else {
this._send(callback);
}
});
return this;
}
/**
* @param {Function} callback
* @returns {Request}
*/
_send(callback = () => {}) {
let logService = this.action.resource.log;
let requestEvent = {
service: 'deep-resource',
resourceType: 'Browser',
resourceId: this.native ? this.action.source.original : this.action.source.api,
eventName: this.method,
eventId: this.customId,
requestId: this.customId,
time: Date.now(),
};
let decoratedCallback = (response) => {
if (this._retryManager.isRetryable(response)) {
return this._send(callback);
}
if (this.method.toUpperCase() === 'GET') {
this._saveResponseToCache(response);
}
requestEvent.requestId = requestEvent.mainRequestId = response.requestId;
let responseEvent = util._extend({}, requestEvent);
responseEvent.payload = response;
responseEvent.time = Date.now();
logService.rumLog(requestEvent);
logService.rumLog(responseEvent);
callback(response);
};
if (this.validationSchemaName) {
let result = this._validate();
if (result.error) {
return decoratedCallback(this._createValidationErrorResponse(result.error));
}
}
this._fillPayloadWithSystemData();
if (!this._native) {
return this._sendThroughApi(decoratedCallback);
}
switch (this._action.type) {
case Action.LAMBDA:
if (!this._action.isOriginalSourceInvokable) {
throw new SourceNotAvailableException(Action.LAMBDA, this._action);
}
this._sendLambda(decoratedCallback);
break;
case Action.EXTERNAL:
if (!this._action.isApiSourceInvokable) {
throw new SourceNotAvailableException(Action.EXTERNAL, this._action);
}
this._sendExternal(decoratedCallback);
break;
default: throw new Exception(`Request of type ${this._action.type} is not implemented`);
}
return this;
}
/**
* @returns {Request}
* @private
*/
_fillPayloadWithSystemData() {
let resource = this.action.resource;
if (!resource.isBackend || !resource.log.isRumEnabled()) {
return this;
}
let runtimeContext = resource.contextProvider.context;
this._payload.lambdaDepthLevel = (runtimeContext.getDeepFrameworkOption('lambdaDepthLevel') || 0) + 1;
this._payload.mainRequestId = runtimeContext.getDeepFrameworkOption('mainRequestId') ||
runtimeContext.awsRequestId;
return this;
}
/**
* @param {Object} response
* @param {Function} callback
* @private
*/
_saveResponseToCache(response, callback = () => {}) {
if (!this.isCached || this.async || (this.cacheTtl === Request.TTL_INVALIDATE) || response.isError) {
callback(null, response);
return;
}
let cacheKey = this._buildCacheKey();
let logService = this.action.resource.log;
let event = {
service: 'deep-cache',
resourceType: this.cacheImpl.type(),
resourceId: cacheKey,
eventName: 'set',
requestId: response.requestId,
};
logService.rumLog(event);
this.cacheImpl.set(cacheKey, Request._stringifyResponse(response), this.cacheTtl, (error, result) => {
event = util._extend({}, event);
event.payload = {error, result,};
logService.rumLog(event);
if (!error && !result) {
error = `Unable to persist request cache under key ${cacheKey}.`;
}
if (error) {
error = new CachedRequestException(error);
}
callback(error, result);
return;
});
}
/**
* @param {Object} driver
* @param {String|Key} key
* @param {Function} callback
*/
_loadResponseFromCache(driver, key, callback) {
driver.has(key, (err, has) => {
if(err) {
callback(new CachedRequestException(`Error to check if has in cache key ${key}`));
return;
}
if (has) {
let logService = this.action.resource.log;
let event = {
service: 'deep-cache',
resourceType: driver.type(),
resourceId: key,
eventName: 'get',
requestId: this.customId,
};
logService.rumLog(event);
driver.get(key, (err, data) => {
event = util._extend({}, event);
event.payload = {err, data,};
logService.rumLog(event);
if (err) {
callback(err, null);
return;
}
callback(null, this._rebuildResponse(data));
});
return;
}
callback(new CachedRequestException(`Missing key ${key}`), null);
});
}
/**
* @returns {Object}
* @private
*/
_validate() {
if (!this.validationSchemaName) {
throw new Exception('Error on validating request. Validation schema is not defined.');
}
return this.action.resource.validation.validate(
this.validationSchemaName, this.payload, true
);
}
/**
* @param {Error} validationError
*
* @returns {LambdaResponse}
* @private
*/
_createValidationErrorResponse(validationError) {
return new LambdaResponse(this, {
errorMessage: JSON.stringify({
errorType: validationError.name,
errorMessage: validationError.annotate(),
errorStack: validationError.stack || (new Error(validationError.message)).stack,
validationErrors: validationError.details,
})
}, null);
}
/**
* @param {Function} callback
* @returns {Request}
* @private
*/
_sendThroughApi(callback = () => {}) {
let endpoint = this.action.source.api;
let headers = {};
let sendRequestFunc = (request) => {
request.end((error, response) => {
callback(new SuperagentResponse(this, response, error));
});
};
// make sure apiKey is set for endpoints that requires it
if (this.action.apiKeyRequired) {
if (!this._apiKey) {
callback(new SuperagentResponse(
this, null, new Error('Missing required api key parameter.')
));
return this;
}
headers = {'x-api-key': this._apiKey};
}
if (this.action.apiAuthType === Action.API_AWS_IAM_AUTH) {
this._createAws4SignedRequest(endpoint, this.method, this.payload, headers, (error, signedRequest) => {
if (error) {
callback(new SuperagentResponse(this, null, error));
return this;
}
sendRequestFunc(signedRequest);
});
} else {
sendRequestFunc(
this._createBasicHttpRequest(endpoint, this.method, this.payload, headers)
)
}
return this;
}
/**
* @param {Function} callback
* @returns {Request}
* @private
*/
_sendLambda(callback = () => {}) {
// @todo: set retries in a smarter way...
AWS.config.maxRetries = 3;
let options = {
region: this._action.region,
httpOptions: {
xhrAsync: this.xhrAsync
},
};
let invocationParameters = {
FunctionName: this._action.source.original,
Payload: JSON.stringify(this.payload),
InvocationType: this._async ? 'Event' : 'RequestResponse',
LogType: this._returnLogs ? 'Tail' : 'None',
};
if (!this.withUserCredentials) {
this._invokeLambda(invocationParameters, callback);
} else {
this._loadSecurityCredentials((error, credentials) => {
// use cognito identity credentials if present
// if not, fallback to lambda execution role permissions
if (!error && credentials) {
options.credentials = credentials;
}
this._lambda = new AWS.Lambda(options);
this._invokeLambda(invocationParameters, callback);
});
}
return this;
}
/**
* @param {Object} invocationParameters
* @param {Function} callback
* @private
*/
_invokeLambda(invocationParameters, callback) {
let _this = this;
// @note - don't replace this callback function with an arrow one
// (we need injected context to access AWS.Response)
this._lambda.invoke(invocationParameters, function(error, data) {
let lambdaResponse = new LambdaResponse(_this, data, error);
lambdaResponse.originalResponse = this; // this is an instance of AWS.Response
callback(lambdaResponse);
});
}
/**
* @param {String} url
* @param {String} method
* @param {*} payload
* @param {*} headers
* @returns {*}
* @private
*/
_createBasicHttpRequest(url, method = this.method, payload = this.payload, headers = {}) {
let request = Http[Request._httpRealMethod(method)](url, payload);
for (let headerName in headers) {
if (!headers.hasOwnProperty(headerName)) {
continue;
}
request.set(headerName, headers[headerName]);
}
return request;
}
/**
* @param {Function} callback
* @returns {Request}
* @private
*/
_sendExternal(callback = () => {}) {
this._createBasicHttpRequest(this._action.source.original)
.send()
.end((error, response) => {
callback(new SuperagentResponse(this, response, error));
});
return this;
}
/**
* @param {String} url
* @param {String} httpMethod
* @param {Object} payload
* @param {Object} headers
* @param {Function} callback
* @private
*/
_createAws4SignedRequest(url, httpMethod, payload, headers, callback) {
let parsedUrl = urlParse(url, qs);
let apiHost = parsedUrl.hostname;
let apiPath = parsedUrl.pathname ? parsedUrl.pathname : '/';
headers = util._extend(headers, {
'Content-Type': 'application/json; charset=UTF-8',
});
let opsToSign = {
service: Core.AWS.Service.API_GATEWAY_EXECUTE,
region: this.getEndpointHostRegion(apiHost),
host: apiHost,
method: httpMethod,
path: apiPath,
headers: util._extend({}, headers), // clone headers object not to be changed by aws4.sign method
};
httpMethod = httpMethod.toLowerCase();
switch (httpMethod) {
case 'get':
case 'delete':
if (parsedUrl.query || payload) {
//assure parsedUrl.query is a valid object
if (parsedUrl.query === null || typeof parsedUrl.query !== 'object') {
parsedUrl.query = {};
}
let mergedPayload = util._extend(parsedUrl.query, payload);
if (this.action.apiCacheEnabled) {
mergedPayload[Action.DEEP_CACHE_QS_PARAM] = Request._md5(qs.stringify(mergedPayload));
}
opsToSign.path += `?${qs.stringify(mergedPayload)}`;
// pass payload as query string
parsedUrl.set('query', mergedPayload, qs.parse);
url = parsedUrl.toString(qs.stringify);
payload = null; // reset it coz superagent overrides url query string
}
break;
case 'post':
case 'put':
case 'patch':
payload = JSON.stringify(payload);
opsToSign.body = payload;
break;
}
this._loadSecurityCredentials((error, credentials) => {
if (error) {
callback(error);
return;
}
let signature = aws4.sign(opsToSign, credentials);
let request = this._createBasicHttpRequest(url, httpMethod, payload, headers);
// Adding aws4 required headers
['X-Amz-Date', 'X-Amz-Security-Token', 'Authorization'].forEach(header => {
request.set(header, signature.headers[header]);
});
if (this.action.resource.isBackend && signature.headers.hasOwnProperty('Content-Length')) {
request.set('Content-Length', signature.headers['Content-Length']);
}
callback(null, request);
});
}
/**
* @param {String} httpMethod
* @returns {String}
* @private
*/
static _httpRealMethod(httpMethod) {
let method = httpMethod.toLowerCase();
// @see https://visionmedia.github.io/superagent/
if (method === 'delete') {
method = 'del';
}
return method;
}
/**
* @param {Function} callback
* @returns {Request}
* @private
*/
_loadSecurityCredentials(callback) {
let securityService = this._action.resource.security;
if (!securityService) {
callback(new MissingSecurityServiceException(), null);
return this;
}
let loadCredentialsFunc = (securityToken) => {
securityToken.loadCredentials((error, credentials) => {
if (error) {
callback(new LoadCredentialsException(error), null);
return;
}
callback(null, credentials);
}, this._authScope);
};
if (securityService.token) {
loadCredentialsFunc(securityService.token);
} else {
loadCredentialsFunc(securityService.anonymousLogin(() => {}));
}
return this;
}
/**
* @param {String} endpointHost
* @returns {String}
*/
getEndpointHostRegion(endpointHost) {
let regionParts = endpointHost.match(/\.([^\.]+)\.amazonaws\.com$/i);
// @todo - expose API region into config provision section
return regionParts ? regionParts[1] : this._action.region; // use action region as fallback
}
/**
* @return {String}
*/
_buildAuthScope() {
let action = this._action;
let resource = action.resource;
let microservice = resource.microservice;
return `${microservice.identifier}:${resource.name}:${action.name}`;
}
/**
* @returns {Action}
*/
get action() {
return this._action;
}
/**
* @returns {Object}
*/
get payload() {
return this._payload;
}
/**
* @returns {String}
*/
get method() {
return this._method;
}
/**
* @returns {Number}
* @constructor
*/
static get TTL_DEFAULT() {
return 10;
}
/**
* @returns {Number}
*/
static get TTL_INVALIDATE() {
return -1;
}
/**
* @returns {Number}
*/
static get TTL_FOREVER() {
return 0;
}
}