app/api/common.v2/database/MongoTransactionManager.ts
import { MongoClient, ClientSession } from 'mongodb';
import { Logger } from 'api/log.v2/contracts/Logger';
import { TransactionManager } from '../contracts/TransactionManager';
export class MongoTransactionManager implements TransactionManager {
private mongoClient: MongoClient;
private logger: Logger;
private session?: ClientSession;
private onCommitHandlers: ((returnValue: any) => Promise<void>)[];
private finished = false;
constructor(mongoClient: MongoClient, logger: Logger) {
this.onCommitHandlers = [];
this.mongoClient = mongoClient;
this.logger = logger;
}
async executeOnCommitHandlers(returnValue: unknown) {
return Promise.all(this.onCommitHandlers.map(async handler => handler(returnValue)));
}
private validateState() {
if (this.session) {
if (this.finished) {
throw new Error('Transaction already finished.');
}
throw new Error('Transaction already in progress.');
}
}
private async commitWithRetry() {
try {
await this.session!.commitTransaction();
this.finished = true;
} catch (error) {
if (error.hasErrorLabel && error.hasErrorLabel('UnknownTransactionCommitResult')) {
this.logger.debug(error);
await this.commitWithRetry();
} else {
throw error;
}
}
}
private startTransaction() {
this.session!.startTransaction();
}
private async abortTransaction() {
await this.session!.abortTransaction();
}
private async runInTransaction<T>(callback: () => Promise<T>) {
this.startTransaction();
try {
const returnValue = await callback();
await this.commitWithRetry();
return returnValue;
} catch (error) {
if (this.session?.inTransaction()) {
await this.abortTransaction();
}
throw error;
}
}
private async runWithRetry<T>(callback: () => Promise<T>, retries = 3): Promise<T> {
try {
return await this.runInTransaction(callback);
} catch (error) {
if (retries > 0 && error.hasErrorLabel && error.hasErrorLabel('TransientTransactionError')) {
this.logger.debug(error);
return this.runWithRetry(callback, retries - 1);
}
throw error;
}
}
async run<T>(callback: () => Promise<T>) {
this.validateState();
this.session = this.mongoClient.startSession();
try {
const returnValue = await this.runWithRetry(callback);
await this.executeOnCommitHandlers(returnValue);
return returnValue;
} finally {
await this.session.endSession();
}
}
runHandlingOnCommitted<T>(callback: () => Promise<T>) {
return {
onCommitted: async (handler: (returnValue: T) => Promise<void>) => {
this.onCommitHandlers.push(handler);
return this.run(callback);
},
};
}
getSession() {
return this.session;
}
onCommitted(handler: () => Promise<void>) {
this.onCommitHandlers.push(handler);
return this;
}
}