src/core/probe.ts
import { createSocket, RemoteInfo } from 'dgram'
import { Strings, Numbers, IProbeConfig, DEFAULT_PROBE_CONFIG } from './interfaces'
import { Observable, Observer, fromEvent, timer, Subject } from 'rxjs'
import { shareReplay, map, distinctUntilChanged, mapTo, takeUntil, scan } from 'rxjs/operators'
type IMessage = readonly [Buffer, RemoteInfo]
type TimestampMessages = readonly TimestampedMessage[]
type StringDictionary = { readonly [key: string]: string }
interface TimestampedMessage { readonly msg: string, readonly ts: number }
interface BufferPort { readonly buffer: Buffer, readonly port: number, readonly address: string }
const mapStringToBuffer = (str: string) => Buffer.from(str, 'utf8')
const flattenXml = (str: string) => str.replace(/>\s*/g, '>').replace(/\s*</g, '<')
const toArrayOfValues = <T extends StringDictionary>(source: Observable<T>) => source.pipe(map(a => Object.keys(a).map(b => a[b])))
const flattenDocumentStrings = (source: Observable<Strings>) => source.pipe(map(a => a.map(flattenXml)))
const timestamp = (source: Observable<Buffer>) => source.pipe(map<Buffer, TimestampedMessage>(a => ({ msg: a.toString(), ts: Date.now() })))
const distinctUntilObjectChanged = <T>(source: Observable<T>) => source.pipe(distinctUntilChanged((a, b) => {
const keys1 = Object.keys(a)
const keys2 = Object.keys(b)
return keys1.length === keys2.length &&
keys1.reduce((acc: boolean, curr) => acc === false ? false : keys2.includes(curr) as boolean, true)
}))
const accumulateFreshMessages =
(falloutTime: number) =>
(source: Observable<TimestampedMessage>) =>
source.pipe(scan((acc, val) => [...acc, val].filter(a => a.ts > Date.now() - falloutTime), [] as TimestampMessages))
const mapStrToDictionary =
(mapFn: (msg: TimestampMessages) => StringDictionary) =>
(source: Observable<TimestampMessages>) =>
source.pipe(map(mapFn))
export const flattenBuffersWithInfo =
(ports: Numbers) =>
(address: string) =>
(buffers: readonly Buffer[]) =>
ports.reduce((acc, port) =>
[...acc, ...buffers.map(buffer => ({ buffer, port, address }))], [] as readonly BufferPort[])
export const probe =
(config?: Partial<IProbeConfig>) =>
(messages: Strings): Observable<Strings> =>
Observable.create((obs: Observer<Strings>) => {
const cfg = { ...DEFAULT_PROBE_CONFIG, ...(config || {}) }
const socket = createSocket({ type: 'udp4' })
const socketMessages$ = fromEvent<IMessage>(socket, 'message').pipe(map(a => a[0]), shareReplay(1))
const internalLimit = new Subject()
socket.on('err', err => obs.error(err))
socket.on('close', () => obs.complete())
timer(0, cfg.PROBE_REQUEST_SAMPLE_RATE_MS).pipe(
mapTo(flattenBuffersWithInfo(cfg.PORTS)(cfg.MULTICAST_ADDRESS)(messages.map(mapStringToBuffer))),
takeUntil(internalLimit))
.subscribe(bfrPorts => {
bfrPorts.forEach(mdl => socket.send(mdl.buffer, 0, mdl.buffer.length, mdl.port, mdl.address))
})
socketMessages$.pipe(
timestamp,
accumulateFreshMessages(cfg.PROBE_RESPONSE_FALLOUT_MS),
mapStrToDictionary(cfg.RESULT_DEDUPE_FN),
distinctUntilObjectChanged,
toArrayOfValues,
flattenDocumentStrings,
takeUntil(internalLimit)
).subscribe(msg => obs.next(msg), err => obs.next(err))
return function unsubscribe() {
internalLimit.next()
internalLimit.complete()
socket.close()
}
})