src/actions/list.js
const { ActionTransport } = require('@microfleet/plugin-router');
const Promise = require('bluebird');
const fsort = require('redis-filtered-sort');
const perf = require('ms-perf');
const { HttpStatusError, NotImplementedError } = require('common-errors');
const handlePipeline = require('../utils/pipeline-error');
const fetchData = require('../utils/fetch-data').batch;
const {
FILES_DATA,
FILES_INDEX,
FILES_INDEX_PUBLIC,
FILES_INDEX_TAGS,
FILES_INDEX_TEMP,
FILES_LIST,
FILES_USER_INDEX_KEY,
FILES_USER_INDEX_PUBLIC_KEY,
FILES_INDEX_UAT,
FILES_INDEX_UAT_PUBLIC,
FILES_USER_INDEX_UAT_KEY,
FILES_USER_INDEX_UAT_PUBLIC_KEY,
FILES_OWNER_FIELD,
FILES_PUBLIC_FIELD,
FILES_DIRECT_ONLY_FIELD,
FILES_UNLISTED_FIELD,
FILES_TAGS_FIELD,
FILES_ID_FIELD,
FILES_HAS_NFT,
FILES_TEMP_FIELD,
FILES_NFT_OWNER_FIELD,
FILES_HAS_NFT_OWNER_FIELD,
FILES_HAS_REFERENCES_FIELD,
FILES_IS_REFERENCED_FIELD,
FILES_PARENT_FIELD,
FILES_NFT_TOKEN_FIELD,
FILES_NFT_COLLECTION_FIELD,
FILES_IS_CLONE_FIELD,
FILES_LIST_SEARCH,
FILES_CATEGORIES_FIELD,
} = require('../constant');
const { normalizeForSearch } = require('../utils/normalize-name');
const k404Error = new Error('ELIST404');
/**
* Internal functions
*/
const numberOrInf = (a, b, infRange) => {
if (typeof a === 'number') {
return a;
}
if (typeof b === 'number') {
return b;
}
return `${infRange}inf`;
};
const numericQueryRange = (actionTypeOrValue) => {
// eslint-disable-next-line no-nested-ternary
const lowerRange = actionTypeOrValue.gte || actionTypeOrValue.gt
? (
actionTypeOrValue.gte
? `${actionTypeOrValue.gte}`
: `(${actionTypeOrValue.gt}`
)
: '-inf';
// eslint-disable-next-line no-nested-ternary
const upperRange = actionTypeOrValue.lte || actionTypeOrValue.lt
? (
actionTypeOrValue.lte
? `${actionTypeOrValue.lte}`
: `(${actionTypeOrValue.lt}`
)
: '+inf';
return { upperRange, lowerRange };
};
async function interstore(ctx) {
const { isPublic, temp, tags, redis, hasTags, uploadedAt, order, maxInterval, username, modelType } = ctx;
if (modelType === 'nft') {
throw new NotImplementedError('nft filter is unavailable');
}
// choose which set to use
let filesIndex;
if (isPublic && username) {
filesIndex = FILES_USER_INDEX_PUBLIC_KEY(username);
} else if (username) {
filesIndex = FILES_USER_INDEX_KEY(username);
} else if (isPublic) {
filesIndex = FILES_INDEX_PUBLIC;
} else if (temp) {
filesIndex = FILES_INDEX_TEMP;
} else {
filesIndex = FILES_INDEX;
}
let uploadedAtIndex;
let uploadedAtIndexInterstore;
let uploadedAtRequest;
if (uploadedAt) {
if (isPublic && username) {
uploadedAtIndex = FILES_USER_INDEX_UAT_PUBLIC_KEY(username);
} else if (username) {
uploadedAtIndex = FILES_USER_INDEX_UAT_KEY(username);
} else if (isPublic) {
uploadedAtIndex = FILES_INDEX_UAT_PUBLIC;
} else {
uploadedAtIndex = FILES_INDEX_UAT;
}
const lte = numberOrInf(uploadedAt.lte, uploadedAt.lt, '+');
const gte = numberOrInf(uploadedAt.gte, uploadedAt.gt, '-');
const now = Date.now();
// validation section to ensure we have interval that arent too large
if (lte === '+inf' && gte === '-inf') {
throw new HttpStatusError(400, `uploadedAt.{lte|lt} & uploadedAt.{gte|gt} need to have a specific interval no more than ${maxInterval} ms`);
} else if (gte >= now) {
throw new HttpStatusError(400, 'uploadedAt.{gte|gt} needs to be in the past');
} else if (lte === '+inf' && now - gte > maxInterval) {
throw new HttpStatusError(400, `uploadedAt.{gte|gt} needs to be greater than Now() - ${maxInterval} ms`);
} else if (gte === '-inf') {
throw new HttpStatusError(400, 'do not use unbound uploadedAt.{lte|lt}');
} else if (lte - gte > maxInterval) {
throw new HttpStatusError(400, `uploadedAt.{lte|lt} - uploadedAt.{gte|gt} must be less than ${maxInterval}`);
}
const { upperRange, lowerRange } = numericQueryRange(uploadedAt);
uploadedAtRequest = order === 'DESC'
? ['zrevrangebyscore', uploadedAtIndex, upperRange, lowerRange]
: ['zrangebyscore', uploadedAtIndex, lowerRange, upperRange];
uploadedAtIndexInterstore = `${uploadedAtIndex}!${order}!${upperRange}!${lowerRange}`;
}
if (!hasTags && !uploadedAtIndex) {
return filesIndex;
}
let interstoreKey = `${FILES_LIST}:${filesIndex}`;
const tagKeys = [];
if (hasTags) {
for (const tag of tags.sort().values()) {
const tagKey = `${FILES_INDEX_TAGS}:${tag}`;
tagKeys.push(tagKey);
interstoreKey = `${interstoreKey}:${tagKey}`;
}
}
if (uploadedAtIndex) {
interstoreKey = `${interstoreKey}:${uploadedAtIndexInterstore}`;
tagKeys.push(uploadedAtIndexInterstore);
}
const result = await redis.pttl(interstoreKey);
if (result > ctx.interstoreKeyMinTimeleft) {
return interstoreKey;
}
// convert zset -> set
if (uploadedAtIndexInterstore) {
await ctx.dlock.manager.fanout(uploadedAtIndexInterstore, async () => {
const ttl = await redis.pttl(uploadedAtIndexInterstore);
if (ttl > ctx.interstoreKeyMinTimeleft) {
return uploadedAtIndexInterstore;
}
const [cmd, ...args] = uploadedAtRequest;
const ids = await redis[cmd](...args);
if (ids.length === 0) {
throw k404Error;
}
const res = await redis
.pipeline()
.sadd(uploadedAtIndexInterstore, ids)
.expire(uploadedAtIndexInterstore, ctx.interstoreKeyTTL)
.exec();
handlePipeline(res);
return uploadedAtIndexInterstore;
});
}
// ensure we only do 1 operation concurrently
await ctx.dlock.manager.fanout(interstoreKey, async () => {
const res = await redis.pipeline()
.sinterstore(interstoreKey, filesIndex, tagKeys)
.expire(interstoreKey, ctx.interstoreKeyTTL)
.exec();
handlePipeline(res);
});
return interstoreKey;
}
/**
* Perform fetch from redis
*/
async function fetchFromRedis(ctx, filesIndex) {
const {
redis,
criteria,
order,
strFilter,
offset,
limit,
expiration,
hasTags,
avoidTagCache,
} = ctx;
// split op in 2 operations to reduce lock of redis
const now = Date.now();
const meta = `${FILES_DATA}:*`;
// caches for 1 second when there are tags
if (hasTags && avoidTagCache) {
return redis.fsort(filesIndex, meta, criteria, order, strFilter, now, offset, limit, 1000); // cache for 1 second
}
// this splits lengthy scripts into 2 phases - getting a sorted set and then filtering
const response = await redis.fsort(filesIndex, meta, criteria, order, '{}', now, offset, limit, expiration);
if (strFilter === '{}') {
return response;
}
return redis.fsort(filesIndex, meta, criteria, order, strFilter, now, offset, limit, expiration);
}
/**
* Reports missing file error
*/
function reportMissingError(err, filename) {
this.log.fatal({ err }, 'failed to fetch data for %s', filename);
return false;
}
/**
* Omits errors & reports missing files
*/
function omitErrors(result, promise, idx) {
if (promise.isFulfilled()) {
result.push(promise.value());
} else {
const error = promise.reason();
if (error.statusCode === 404) {
reportMissingError.call(this, error, this.filenames[idx]);
} else {
throw error;
}
}
return result;
}
/**
* Prepares filenames
*/
const prepareFilenames = (filename) => `${FILES_DATA}:${filename}`;
const omitPrefix = (prefix) => (filename) => filename.slice(prefix.length);
/**
* Fetch extra data from redis based on IDS
*/
function fetchExtraData(ctx, filenames, { total, redisSearchEnabled }) {
if (total === 0 || filenames.length === 0) {
return {
filenames,
props: [],
length: total,
};
}
const transform = redisSearchEnabled
? omitPrefix(ctx.service.config.redis.options.keyPrefix)
: prepareFilenames;
const mapped = filenames.map(transform);
const pipeline = Promise
.bind(ctx, [mapped, { omit: ctx.without }])
.spread(fetchData)
.bind({ log: ctx.log, filenames: mapped })
.reduce(omitErrors, []);
return Promise.props({ filenames, props: pipeline, length: total });
}
/**
* Filters out non-truthy array elements
*/
function truthy(_, idx) {
return !!this[idx];
}
/**
* Prepares response
*/
async function prepareResponse(ctx, data) {
const { service, timer, offset, limit } = ctx;
const { filenames, props, length } = data;
const filteredFilenames = filenames.filter(truthy, props);
const files = await Promise.map(filteredFilenames, async (filename, idx) => {
const fileData = props[idx];
fileData.id = filename;
await service.hook('files:info:post', fileData);
return fileData;
});
timer('files:info:post');
return {
files,
cursor: offset + limit,
page: Math.floor(offset / limit) + 1,
pages: Math.ceil(length / limit),
time: timer(),
};
}
const punctuation = /[,.<>{}[\]"':;!@#$%^&*()\-+=~]+/g;
const tokenization = /[\s,.<>{}[\]"':;!@#$%^&*()\-+=~]+/g;
const tagProps = [
FILES_OWNER_FIELD, FILES_NFT_OWNER_FIELD, FILES_PARENT_FIELD, FILES_NFT_TOKEN_FIELD, FILES_NFT_COLLECTION_FIELD,
];
const numericProps = [FILES_HAS_REFERENCES_FIELD, FILES_IS_REFERENCED_FIELD, FILES_IS_CLONE_FIELD];
/**
* Performs search using redis search extension
*/
async function redisSearch(ctx) {
// 1. build query
const indexName = `${ctx.service.config.redis.options.keyPrefix}:${FILES_LIST_SEARCH}`;
const args = ['FT.SEARCH', indexName];
const query = [];
const params = [];
if (ctx.username && ctx.modelType !== 'nft') {
query.push(`@${FILES_OWNER_FIELD}:{ $username }`);
params.push('username', ctx.username);
}
if (ctx.isPublic) {
query.push(`@${FILES_PUBLIC_FIELD}:{1}`);
query.push(`-@${FILES_DIRECT_ONLY_FIELD}:[1 1]`);
}
if (ctx[FILES_CATEGORIES_FIELD]) {
query.push(`@${FILES_CATEGORIES_FIELD}:{ ${ctx[FILES_CATEGORIES_FIELD].join(' | ')} }`);
}
if (ctx.hasTags) {
for (const [idx, tag] of ctx.tags.sort().entries()) {
// multi-word or tags containing punctuation will be broken into pieces
const normalizedTags = tag.replace(punctuation, ' ').split(/\s/);
for (const [idx2, subTag] of normalizedTags.entries()) {
const varName = `tag_${idx}_${idx2}`;
query.push(`@${FILES_TAGS_FIELD}:$${varName}`);
params.push(varName, subTag);
}
}
}
if (ctx.modelType) {
query.push(`${ctx.modelType === '3d' ? '-' : ''}@${FILES_HAS_NFT}:[1 1]`);
if (ctx.modelType === 'nft') {
let ownerMatch = '';
let nftOwnerMatch = '';
if (ctx.username) {
// owner and empty wallet
ownerMatch = `(@${FILES_OWNER_FIELD}:{ $username } -@${FILES_HAS_NFT_OWNER_FIELD}:[1 1])`;
params.push('username', ctx.username);
}
if (ctx.nftOwner) {
// only wallet match
nftOwnerMatch = `(@${FILES_NFT_OWNER_FIELD}:{ $nftOwner }) @${FILES_HAS_NFT_OWNER_FIELD}:[1 1]`;
params.push('nftOwner', ctx.nftOwner);
}
if (ctx.username || ctx.nftOwner) {
query.push(`(${ownerMatch}${ownerMatch && nftOwnerMatch ? '|' : ''}${nftOwnerMatch})`);
}
}
}
const { filter } = ctx;
const rebuiltFilter = ctx.uploadedAt ? { ...filter, uploadedAt: ctx.uploadedAt } : filter;
for (const [_propName, actionTypeOrValue] of Object.entries(rebuiltFilter)) {
let propName = _propName;
if (propName === '#') {
propName = FILES_ID_FIELD;
} else if (propName === '#multi') {
propName = actionTypeOrValue.fields.join('|');
} else if (propName === 'alias') {
propName = 'alias_tag';
}
if (actionTypeOrValue === undefined || propName === 'nft') {
// skip empty attributes
// or nft cause it uses special index
} else if (typeof actionTypeOrValue === 'string') {
if (tagProps.includes(propName) || Number.isNaN(+actionTypeOrValue)) {
query.push(`@${propName}:{ $f_${propName} }`);
params.push(`f_${propName}`, actionTypeOrValue);
} else {
query.push(`@${propName}:[${actionTypeOrValue} ${actionTypeOrValue}]`);
}
} else if (actionTypeOrValue.gte || actionTypeOrValue.lte || actionTypeOrValue.gt || actionTypeOrValue.lt) {
const { lowerRange, upperRange } = numericQueryRange(actionTypeOrValue);
query.push(`@${propName}:[${lowerRange} ${upperRange}]`);
} else if (actionTypeOrValue.exists !== undefined) {
if (numericProps.includes(propName)) {
query.push(`@${propName}:[-inf +inf]`);
} else {
query.push(`-@${propName}:""`);
}
} else if (actionTypeOrValue.isempty !== undefined) {
if (numericProps.includes(propName)) {
query.push(`-@${propName}:[-inf +inf]`);
} else {
query.push(`@${propName}:""`);
}
} else if (actionTypeOrValue.eq) {
query.push(`@${propName}:{ $f_${propName}_eq }`);
params.push(`f_${propName}_eq`, actionTypeOrValue.eq);
} else if (actionTypeOrValue.ne) {
query.push(`-@${propName}:{ $f_${propName}_ne }`);
params.push(`f_${propName}_ne`, actionTypeOrValue.ne);
} else if (actionTypeOrValue.match) {
const varName = `f_${propName.replace(/\|/g, '_')}_m`;
const words = actionTypeOrValue.match.split(tokenization);
const queryVars = [];
words.forEach((word, index) => {
if (word.trim().length === 0) {
return;
}
const wordVarName = `${varName}_${index}`;
queryVars.push(`$${wordVarName}`);
params.push(wordVarName, normalizeForSearch(word));
});
if (queryVars.length > 0) {
query.push(`@${propName}:(${queryVars.join(' ')}*)`);
}
}
}
if (!ctx.temp) {
query.push(`-@${FILES_TEMP_FIELD}:[1 1]`);
}
// skip unlisted files
// NOTE: there is a bug if this appear as first in the query all models are returned instead
query.push(`-@${FILES_UNLISTED_FIELD}:[1 1]`);
if (query.length > 0) {
args.push(query.join(' '));
} else {
args.push('*');
}
if (params.length > 0) {
args.push('PARAMS', params.length.toString(), ...params);
args.push('DIALECT', '2');
}
// sort the response
if (ctx.criteria) {
args.push('SORTBY', ctx.criteria, ctx.order);
} else {
args.push('SORTBY', FILES_ID_FIELD, ctx.order);
}
// limits
args.push('LIMIT', ctx.offset, ctx.limit);
// we'll fetch the data later
args.push('NOCONTENT');
// [total, [ids]]
ctx.service.log.info({ search: args }, 'search query');
const [total, ...ids] = await ctx.redis.call(...args);
return { total, ids };
}
/**
* List files
* @return {Promise}
*/
async function listFiles({ params }) {
const timer = perf('list', { thunk: false });
const {
redis,
dlock,
log,
config,
} = this;
const {
interstoreKeyTTL,
interstoreKeyMinTimeleft,
avoidTagCache,
} = config;
let {
filter = Object.create(null),
} = params;
const {
without,
owner,
public: isPublic,
offset,
limit,
order,
criteria,
tags,
modelType,
nftOwner,
temp,
categories,
expiration = 30000,
} = params;
const nonStringFilter = typeof filter === 'string' ? JSON.parse(filter) : filter;
const { uploadedAt } = nonStringFilter;
if (uploadedAt && !temp) {
filter = { ...nonStringFilter, uploadedAt: undefined };
}
const nftFilters = {
'3d': { nft: { isempty: '1' } },
};
const nftFilter = nftFilters[modelType];
if (nftFilter) {
filter = { ...filter, ...nftFilter };
}
const strFilter = typeof filter === 'string'
? filter
: fsort.filter(filter);
const ctx = {
// context
redis,
dlock,
log,
interstoreKeyTTL,
interstoreKeyMinTimeleft,
timer,
service: this,
maxInterval: config.listMaxInterval,
// our params
without,
owner,
filter,
isPublic,
offset,
limit,
order,
criteria,
tags,
temp,
expiration,
strFilter,
uploadedAt: temp ? null : uploadedAt,
hasTags: Array.isArray(tags) && tags.length > 0,
avoidTagCache,
username: '',
nftOwner,
modelType,
categories,
};
try {
const [username] = await this.hook('files:info:pre', owner);
timer('files:info:pre');
ctx.username = username;
let ids;
let total;
if (config.redisSearch.enabled) {
({ total, ids } = await redisSearch(ctx));
} else {
const stored = await interstore(ctx);
timer('interstore');
ids = await fetchFromRedis(ctx, stored);
total = +ids.pop();
timer('fsort');
}
const data = await fetchExtraData(ctx, ids, { total, redisSearchEnabled: config.redisSearch.enabled });
timer('fetch');
return await prepareResponse(ctx, data);
} catch (e) {
if (e.message === k404Error.message) {
return {
files: [],
cursor: 0,
page: 0,
pages: 0,
time: timer(),
};
}
this.log.warn({ timer: timer(), err: e }, 'list search failed');
throw e;
}
}
listFiles.transports = [ActionTransport.amqp, ActionTransport.internal];
module.exports = listFiles;