packages/core/src/connections/Connection.ts
import { URL } from 'url';
import { type Configuration, type ConnectionOptions, type DynamicPassword, Utils } from '../utils';
import type { LogContext, Logger } from '../logging';
import type { MetadataStorage } from '../metadata';
import type { ConnectionType, Dictionary, MaybePromise, Primary } from '../typings';
import type { Platform } from '../platforms/Platform';
import type { TransactionEventBroadcaster } from '../events/TransactionEventBroadcaster';
import type { IsolationLevel } from '../enums';
export abstract class Connection {
protected metadata!: MetadataStorage;
protected platform!: Platform;
protected readonly options: ConnectionOptions;
protected readonly logger: Logger;
protected connected = false;
constructor(protected readonly config: Configuration,
options?: ConnectionOptions,
protected readonly type: ConnectionType = 'write') {
this.logger = this.config.getLogger();
if (options) {
this.options = Utils.copy(options);
} else {
const props = ['dbName', 'clientUrl', 'host', 'port', 'user', 'password', 'multipleStatements', 'pool', 'schema', 'driverOptions'] as const;
this.options = props.reduce((o, i) => {
(o[i] as any) = this.config.get(i);
return o;
}, {} as ConnectionOptions);
}
}
/**
* Establishes connection to database
*/
abstract connect(): void | Promise<void>;
/**
* Are we connected to the database
*/
abstract isConnected(): Promise<boolean>;
/**
* Are we connected to the database
*/
abstract checkConnection(): Promise<{ ok: boolean; reason?: string; error?: Error }>;
/**
* Closes the database connection (aka disconnect)
*/
async close(force?: boolean): Promise<void> {
Object.keys(this.options)
.filter(k => k !== 'name')
.forEach(k => delete this.options[k as keyof ConnectionOptions]);
}
/**
* Ensure the connection exists, this is used to support lazy connect when using `MikroORM.initSync()`
*/
async ensureConnection(): Promise<void> {
if (!this.connected) {
await this.connect();
}
}
/**
* Returns default client url for given driver (e.g. mongodb://127.0.0.1:27017 for mongodb)
*/
abstract getDefaultClientUrl(): string;
async transactional<T>(cb: (trx: Transaction) => Promise<T>, options?: { isolationLevel?: IsolationLevel; readOnly?: boolean; ctx?: Transaction; eventBroadcaster?: TransactionEventBroadcaster }): Promise<T> {
throw new Error(`Transactions are not supported by current driver`);
}
async begin(options?: { isolationLevel?: IsolationLevel; readOnly?: boolean; ctx?: Transaction; eventBroadcaster?: TransactionEventBroadcaster }): Promise<Transaction> {
throw new Error(`Transactions are not supported by current driver`);
}
async commit(ctx: Transaction, eventBroadcaster?: TransactionEventBroadcaster): Promise<void> {
throw new Error(`Transactions are not supported by current driver`);
}
async rollback(ctx: Transaction, eventBroadcaster?: TransactionEventBroadcaster): Promise<void> {
throw new Error(`Transactions are not supported by current driver`);
}
abstract execute<T>(query: string, params?: any[], method?: 'all' | 'get' | 'run', ctx?: Transaction): Promise<QueryResult<T> | any | any[]>;
getConnectionOptions(): ConnectionConfig {
const ret: ConnectionConfig = {};
if (this.options.clientUrl) {
const url = new URL(this.options.clientUrl);
this.options.host = ret.host = this.options.host ?? decodeURIComponent(url.hostname);
this.options.port = ret.port = this.options.port ?? +url.port;
this.options.user = ret.user = this.options.user ?? decodeURIComponent(url.username);
this.options.password = ret.password = this.options.password ?? decodeURIComponent(url.password);
this.options.dbName = ret.database = this.options.dbName ?? decodeURIComponent(url.pathname).replace(/^\//, '');
if (this.options.schema || url.searchParams.has('schema')) {
this.options.schema = ret.schema = this.options.schema ?? decodeURIComponent(url.searchParams.get('schema')!);
this.config.set('schema', ret.schema);
}
} else {
const url = new URL(this.config.getClientUrl());
this.options.host = ret.host = this.options.host ?? this.config.get('host', decodeURIComponent(url.hostname));
this.options.port = ret.port = this.options.port ?? this.config.get('port', +url.port);
this.options.user = ret.user = this.options.user ?? this.config.get('user', decodeURIComponent(url.username));
this.options.password = ret.password = this.options.password ?? this.config.get('password', decodeURIComponent(url.password));
this.options.dbName = ret.database = this.options.dbName ?? this.config.get('dbName', decodeURIComponent(url.pathname).replace(/^\//, ''));
}
return ret;
}
getClientUrl(): string {
const options = this.getConnectionOptions();
const url = new URL(this.config.getClientUrl(true));
const password = options.password ? ':*****' : '';
const schema = options.schema && options.schema !== this.platform.getDefaultSchemaName()
? `?schema=${options.schema}`
: '';
return `${url.protocol}//${options.user}${password}@${options.host}:${options.port}${schema}`;
}
setMetadata(metadata: MetadataStorage): void {
this.metadata = metadata;
}
setPlatform(platform: Platform): void {
this.platform = platform;
}
getPlatform(): Platform {
return this.platform;
}
protected async executeQuery<T>(query: string, cb: () => Promise<T>, context?: LogContext): Promise<T> {
const now = Date.now();
try {
const res = await cb();
this.logQuery(query, {
...context,
took: Date.now() - now,
results: Array.isArray(res) ? res.length : undefined,
affected: Utils.isPlainObject<QueryResult>(res) ? res.affectedRows : undefined,
});
return res;
} catch (e) {
this.logQuery(query, { ...context, took: Date.now() - now, level: 'error' });
throw e;
}
}
protected logQuery(query: string, context: LogContext = {}): void {
this.logger.logQuery({
level: 'info',
connection: {
type: this.type,
name: this.options.name || this.config.get('name') || this.options.host,
},
...context,
query,
});
}
}
export interface QueryResult<T = { id: number }> {
affectedRows: number;
insertId: Primary<T>;
row?: Dictionary;
rows?: Dictionary[];
insertedIds?: Primary<T>[];
}
export interface ConnectionConfig {
host?: string;
port?: number;
user?: string;
password?: string | (() => MaybePromise<string> | MaybePromise<DynamicPassword>);
database?: string;
schema?: string;
}
export type Transaction<T = any> = T;