src/service/LocationBasedService.ts
import { AbsolutePosition, DataObject } from '../data';
import { Graph } from '../graph/Graph';
import { Model } from '../Model';
import { TimeUnit } from '../utils';
import { DataObjectService } from './DataObjectService';
import { Service } from './Service';
import { TimeService } from './TimeService';
import { Constructor } from '../data/decorators';
/**
* Location-Based Service
*
* ## Usage
*
* ### Creation
* ```typescript
* const service = new LocationBasedService<
* DataObject,
* GeographicalPosition
* >();
*
* ModelBuilder.create()
* .addService(service)
* .from()
* .to().build();
* ```
*
* ### Getting the current position
*
* ### Setting the current position
*
* ### Watching the position of an object
*/
export class LocationBasedService<
T extends DataObject = DataObject,
P extends AbsolutePosition = AbsolutePosition,
> extends Service {
protected options: LBSOptions;
model: Model;
protected service: DataObjectService<T>;
protected watchers: Map<number, Watcher> = new Map();
protected watchedObjects: Map<string, number[]> = new Map();
protected watchIndex = 1;
constructor(options?: LBSOptions) {
super();
this.options = options || {};
this.once('build', this._initLBS.bind(this));
this.once('destroy', this._destroy.bind(this));
}
private _initLBS(): void {
// Default options
this.options.pullNode = this.options.pullNode || (this.model as Graph<any, any>).internalSink.uid;
this.options.dataService = this.options.dataService || DataObject;
this.service = this.model.findDataService(this.options.dataService);
this.service.on('insert', (uid: string, storedObject: T) => {
const watchIds = this.watchedObjects.get(uid);
if (watchIds) {
const position = storedObject.position as P;
watchIds.forEach((watchId) => {
const watcher = this.watchers.get(watchId);
if (position) {
watcher.callback(position);
}
});
}
});
}
private _destroy(): void {
Array.from(this.watchers.keys()).forEach((watcher) => {
this.clearWatch(watcher);
});
}
/**
* Set the current position of an object
* @param {DataObject | string} object Data object to get the current position of or uid
* @param {AbsolutePosition} position Position to update
* @returns {Promise<void>} Promise of updating
*/
setCurrentPosition(object: T | string, position: P): Promise<void> {
return new Promise((resolve, reject) => {
const uid = object instanceof DataObject ? object.uid : object;
this.service
.findByUID(uid)
.then((storedObj) => {
storedObj.setPosition(position);
return this.service.insertObject(storedObj);
})
.then(() => {
resolve();
})
.catch(reject);
});
}
/**
* Get the current position of a specific data object.
* @param {DataObject | string} object Data object to get the current position of or uid
* @param {GeoOptions} [options] Current position options
* @returns {Promise<AbsolutePosition>} Promise of latest absolute position
*/
getCurrentPosition(object: T | string, options: GeoOptions = {}): Promise<P> {
return new Promise((resolve, reject) => {
const maximumAge = options.maximumAge || Infinity;
options.timeout = options.timeout || 10000;
const uid = object instanceof DataObject ? object.uid : object;
// Force update
if (options.forceUpdate) {
this.model.findNodeByUID(this.options.pullNode).pull({
requestedObjects: [uid],
});
const timeout = setTimeout(() => {
this.clearWatch(watchId);
reject(new Error('Timeout error for getting current position!'));
}, options.timeout);
const watchId = this.watchPosition(
object,
(pos, err) => {
this.clearWatch(watchId);
clearTimeout(timeout);
if (err) {
return reject(err);
}
resolve(pos);
},
{
...options,
interval: -1,
forceUpdate: false,
},
);
} else {
this.service
.findByUID(uid)
.then((storedObj) => {
const position = storedObj.position;
const time = TimeService.getUnit().convert(TimeService.now(), TimeUnit.MILLISECOND);
if (position && position.timestamp >= time - maximumAge) {
// Stored position satisfies maximum age
resolve(position as P);
} else {
return this.getCurrentPosition(object, {
...options,
forceUpdate: true,
});
}
})
.then(resolve)
.catch(reject);
}
});
}
/**
* Watch for position changes
* @param {DataObject | string} object Data object to watch for position changes for
* @param {(position: AbsolutePosition, err?: Error) => void} callback Callback function
* @param {GeoWatchOptions} [options] Watch options
* @returns {number} Watch number
*/
watchPosition(
object: T | string,
callback: (position: P, err?: Error) => void,
options: GeoWatchOptions = {},
): number {
const uid = object instanceof DataObject ? object.uid : object;
const watchId = this.watchIndex++;
const interval = options.interval ?? 1000;
const timer =
interval !== -1
? (setInterval(() => {
this.getCurrentPosition(object, options)
.then(callback)
.catch((ex) => {
callback(undefined, ex);
});
}, interval) as unknown as number)
: undefined;
this.watchers.set(watchId, {
timer,
uid,
callback,
});
this.watchObject(uid, watchId);
return watchId;
}
protected watchObject(uid: string, watchId: number): void {
const existingIds = this.watchedObjects.get(uid) ?? [];
existingIds.push(watchId);
this.watchedObjects.set(uid, existingIds);
}
protected unwatchObject(uid: string, watchId: number): void {
const existingIds = this.watchedObjects.get(uid) ?? [];
existingIds.splice(existingIds.indexOf(watchId), 1);
if (existingIds.length === 0) {
this.watchedObjects.delete(uid);
} else {
this.watchedObjects.set(uid, existingIds);
}
}
/**
* Clear a running position watch
* @param {number} watchId Watch identifier
*/
clearWatch(watchId: number): void {
const watcher = this.watchers.get(watchId);
if (watcher.timer !== undefined) {
clearInterval(watcher.timer);
}
this.watchers.delete(watchId);
this.unwatchObject(watcher.uid, watchId);
}
}
interface Watcher {
timer: number;
uid: string;
callback: (pos: AbsolutePosition, err?: Error) => void;
}
interface GeoOptions {
timeout?: number;
/**
* Maximum age in milliseconds for the position. If the stored position
* is older, a new position update will be attemped.
*/
maximumAge?: number;
/**
* Force update of the position
*/
forceUpdate?: boolean;
}
interface GeoWatchOptions extends GeoOptions {
/**
* Refresh interval in milliseconds
* @default 1000
*/
interval?: number;
}
export interface LBSOptions {
/**
* Node to pull for pull-based position updates
* @default internalOutput
*/
pullNode?: string;
/**
* Dataservice to fetch stored data objects
*/
dataService?: Constructor<DataObject>;
}