packages/js-sdk/src/datastore/sync.ts
import { Query } from '../query';
import { SyncError } from '../errors/sync';
import { NotFoundError } from '../errors/notFound';
import { NetworkStore } from './networkstore';
import { DataStoreCache, SyncCache, SyncEvent } from './cache';
import { getApiVersion } from '../kinvey';
const pushInProgress = new Map<string, boolean>();
function markPushStart(collectionName: string) {
pushInProgress.set(collectionName, true);
}
function markPushEnd(collectionName: string) {
pushInProgress.set(collectionName, false);;
}
export function queryToSyncQuery(query?: Query) {
if (query && query instanceof Query) {
const newFilter = Object.keys(query.filter)
.reduce((filter, field) => Object.assign({}, filter, { [`entity.${field}`]: query.filter[field] }), {});
const newSort = Object.keys(query.sort)
.reduce((sort, field) => Object.assign({}, sort, { [`entity.${field}`]: query.sort[field] }), {});
return new Query({
filter: newFilter,
sort: newSort,
skip: query.skip,
limit: query.limit
});
}
return undefined;
}
export class Sync {
public collectionName: string;
public tag?: string;
constructor(collectionName: string, tag?: string) {
this.collectionName = collectionName;
this.tag = tag;
}
isPushInProgress() {
return pushInProgress.get(this.collectionName) === true;
}
find(providedQuery?: Query) {
const syncCache = new SyncCache(this.tag);
const query = new Query(providedQuery).equalTo('collection', this.collectionName);
return syncCache.find(query);
}
findById(id: string) {
const syncCache = new SyncCache(this.tag);
return syncCache.findById(id);
}
count(providedQuery?: Query) {
const syncCache = new SyncCache(this.tag);
const query = new Query(providedQuery).equalTo('collection', this.collectionName);
return syncCache.count(query);
}
addCreateSyncEvent(docs: any) {
return this.addSyncEvent(SyncEvent.Create, docs);
}
addUpdateSyncEvent(docs: any) {
return this.addSyncEvent(SyncEvent.Update, docs);
}
addDeleteSyncEvent(docs: any) {
return this.addSyncEvent(SyncEvent.Delete, docs);
}
async addSyncEvent(event: SyncEvent, docs: any) {
const syncCache = new SyncCache(this.tag);
let singular = false;
let syncDocs: any = [];
let docsToSync = docs;
if (!Array.isArray(docs)) {
singular = true;
docsToSync = [docs];
}
if (docsToSync.length > 0) {
const docWithNoId = docsToSync.find((doc: { _id: any; }) => !doc._id);
if (docWithNoId) {
throw new SyncError('A doc is missing an _id. All docs must have an _id in order to be added to the sync collection.');
}
// Remove existing sync events that match the docs
const query = new Query().contains('entityId', docsToSync.map((doc: { _id: any; }) => doc._id));
await this.remove(query);
// Don't add delete events for docs that were created offline
if (event === SyncEvent.Delete) {
docsToSync = docsToSync.filter((doc: { _kmd: { local: boolean; }; }) => {
if (doc._kmd && doc._kmd.local === true) {
return false;
}
return true;
});
}
// Add sync events for the docs
syncDocs = await syncCache.save(docsToSync.map((doc: { _id: any; }) => {
return {
entityId: doc._id,
entity: doc,
collection: this.collectionName,
state: {
operation: event
}
};
}));
}
return singular ? syncDocs.shift() : syncDocs;
}
async push(providedQuery?: Query, options?: any) {
if (this.isPushInProgress()) {
throw new SyncError('Data is already being pushed to the backend. Please wait for it to complete before pushing new data to the backend.');
}
const apiVersion = getApiVersion();
const network = new NetworkStore(this.collectionName);
const cache = new DataStoreCache(this.collectionName, this.tag);
const syncCache = new SyncCache(this.tag);
const collectionQuery = new Query(providedQuery).equalTo('collection', this.collectionName);
const queryForInsert = new Query(collectionQuery).equalTo('state.operation', SyncEvent.Create);
const syncDocsForInsert = await syncCache.find(queryForInsert);
const batchCreateEnabled = apiVersion >= 5 && syncDocsForInsert.length > 1;
const totalPushResults = [];
const batchCreateEntities = async (): Promise<any> => {
const localIdsToRemove = [];
const entitiesForInsert = await Promise.all(
syncDocsForInsert.map(async (doc, index) => {
const entity = await cache.findById(doc.entityId);
if (entity._kmd && entity._kmd.local === true) {
localIdsToRemove[index] = doc.entityId;
delete entity._id;
delete entity._kmd.local;
}
return entity;
})
);
let multiInsertResult;
try {
multiInsertResult = await network.create(entitiesForInsert, options);
} catch (error) {
// In case of a general batch insert error, do not break the push operation and aggregate all errors in the result
if (options.catchGeneralErrors === true) {
return syncDocsForInsert.forEach((doc, index) => {
totalPushResults.push({
_id: doc.entityId,
operation: SyncEvent.Create,
entity: entitiesForInsert[index],
error
});
});
}
throw error;
}
// Process successful inserts
if (multiInsertResult.entities != null) {
const insertedEntities = [];
const idsToRemoveFromSyncCache = [];
const idsToRemoveFromCache = [];
multiInsertResult.entities.forEach((insertedEntity, index) => {
if (insertedEntity == null) {
return;
}
insertedEntities.push(insertedEntity);
idsToRemoveFromSyncCache.push(syncDocsForInsert[index]._id);
idsToRemoveFromCache.push(localIdsToRemove[index]);
// Add the inserted entity to the end result
totalPushResults.push({
_id: syncDocsForInsert[index].entityId,
operation: SyncEvent.Create,
entity: insertedEntity
});
});
await Promise.all([
syncCache.removeManyById(idsToRemoveFromSyncCache), // Remove the sync docs
cache.save(insertedEntities), // Save the docs to cache
]);
await cache.removeManyById(idsToRemoveFromCache) // Remove the original docs that were created
}
// Process insert errors
if (multiInsertResult.errors != null) {
multiInsertResult.errors.forEach((insertError) => {
// Add the error to the end result and keep the order relative to other inserts
totalPushResults.splice(insertError.index, 0, {
_id: syncDocsForInsert[insertError.index].entityId,
operation: SyncEvent.Create,
entity: entitiesForInsert[insertError.index],
error: insertError
});
});
}
};
const createEntity = async (syncDocId, entityId): Promise<any> => {
let doc: any = await cache.findById(entityId);
let local = false;
try {
// Save the doc to the backend
if (doc._kmd && doc._kmd.local === true) {
local = true;
// tslint:disable-next-line:no-delete
delete doc._id;
// tslint:disable-next-line:no-delete
delete doc._kmd.local;
}
doc = await network.create(doc, options);
// Remove the sync doc
await syncCache.removeById(syncDocId!);
// Save the doc to cache
await cache.save(doc);
// Remove the original doc that was created
if (local) {
await cache.removeById(entityId);
}
// Return a result
return {
_id: entityId,
operation: SyncEvent.Create,
entity: doc
};
} catch (error) {
// Return a result with the error
return {
_id: entityId,
operation: SyncEvent.Create,
entity: doc,
error
};
}
};
const updateEntity = async (syncDocId, entityId): Promise<any> => {
let doc: any = await cache.findById(entityId);
try {
// Save the doc to the backend
doc = await network.update(doc, options);
// Remove the sync doc
await syncCache.removeById(syncDocId!);
// Save the doc to cache
await cache.save(doc);
// Return a result
return {
_id: entityId,
operation: SyncEvent.Update,
entity: doc
};
} catch (error) {
// Return a result with the error
return {
_id: entityId,
operation: SyncEvent.Update,
entity: doc,
error
};
}
};
const deleteEntity = async (syncDocId, entityId): Promise<any> => {
try {
try {
// Remove the doc from the backend
await network.removeById(entityId, options);
} catch (error) {
// Rethrow the error if it is not a NotFoundError
if (!(error instanceof NotFoundError)) {
throw error;
}
}
// Remove the sync doc
await syncCache.removeById(syncDocId!);
// Return a result
return {
_id: entityId,
operation: SyncEvent.Delete
};
} catch (error) {
// Return a result with the error
return {
_id: entityId,
operation: SyncEvent.Delete,
error
};
}
};
const pushEntity = async (syncDoc): Promise<any> => {
const { _id, entityId, state = { operation: undefined } } = syncDoc;
switch (state.operation) {
case SyncEvent.Create: {
if (batchCreateEnabled) {
return null; // Inserts must have already been batched
}
return createEntity(_id, entityId);
}
case SyncEvent.Update: {
return updateEntity(_id, entityId);
}
case SyncEvent.Delete: {
return deleteEntity(_id, entityId);
}
default: {
return {
_id,
operation: state.operation,
error: new Error('Unable to push item in sync table because the event was not recognized.')
};
}
}
};
// First try inserting new entities at once
if (batchCreateEnabled) {
try {
markPushStart(this.collectionName);
await batchCreateEntities();
} finally {
markPushEnd(this.collectionName);
}
}
// Push other entities one by one in batches of 100 parallel requests
const syncDocs = await syncCache.find(collectionQuery);
if (syncDocs.length > 0) {
const batchSize = 100;
let i = 0;
const batchPush = async (): Promise<any> => {
if (i >= syncDocs.length) {
return;
}
const batch = syncDocs.slice(i, i + batchSize);
i += batchSize;
try {
markPushStart(this.collectionName);
await Promise.all(batch.map((syncDoc) => pushEntity(syncDoc)
.then((pushResult) => {
if (pushResult != null) {
totalPushResults.push(pushResult);
}
})
));
} finally {
markPushEnd(this.collectionName);
}
// Push remaining docs
return batchPush();
};
await batchPush();
}
return totalPushResults;
}
async remove(providedQuery?: Query) {
const syncCache = new SyncCache(this.tag);
const query = new Query(providedQuery).equalTo('collection', this.collectionName);
return syncCache.remove(query);
}
async removeById(id: string) {
const syncCache = new SyncCache(this.tag);
return syncCache.removeById(id);
}
async clear() {
const syncCache = new SyncCache(this.tag);
const query = new Query().equalTo('collection', this.collectionName);
return syncCache.remove(query);
}
}