Verkehrsministerium/kraftfahrstrasse

View on GitHub
src/generic/Connection.ts

Summary

Maintainability
D
1 day
Test Coverage

import { CallOptions, ECallKillMode } from '../types/messages/CallMessage';
import { HelloMessageDetails, WampHelloMessage } from '../types/messages/HelloMessage';
import { EWampMessageID, WampDict, WampID, WampList, WampURI } from '../types/messages/MessageTypes';
import { PublishOptions } from '../types/messages/PublishMessage';
import { RegisterOptions } from '../types/messages/RegisterMessage';
import { SubscribeOptions } from '../types/messages/SubscribeMessage';
import { WampAbortMessage, WampChallengeMessage, WampMessage } from '../types/Protocol';

import { Logger } from '../logging/Logger';
import {
  CallHandler,
  CallResult,
  ConnectionCloseError,
  ConnectionCloseInfo,
  ConnectionOpenError,
  ConnectionOptions,
  EventHandler,
  IConnection,
  IPublication,
  IRegistration,
  ISubscription,
  LogLevel,
} from '../types/Connection';
import { WampWelcomeMessage, WelcomeDetails } from '../types/messages/WelcomeMessage';
import { ETransportEventType, ITransport, TransportEvent } from '../types/Transport';
import { Deferred } from '../util/deferred';
import { GlobalIDGenerator, SessionIDGenerator } from '../util/id';

import { Callee } from './Callee';
import { Caller } from './Caller';
import { ConnectionStateMachine, EConnectionState, EMessageDirection } from './ConnectionStateMachine';
import { IDGen, IMessageProcessorFactory } from './MessageProcessor';
import { Publisher } from './Publisher';
import { Subscriber } from './Subscriber';

const createIdGens = () => {
  return {
    global: new GlobalIDGenerator(),
    session: new SessionIDGenerator(),
  };
};

export class Connection implements IConnection {

  public sessionId: number | null = null;

  private transport: ITransport | null = null;
  private onOpen: Deferred<WelcomeDetails> | null = null;
  private onClose: Deferred<ConnectionCloseInfo> | null = null;

  // The type of subHandlers has to match the order of the Factories in subFactories
  private subHandlers: [Publisher, Subscriber, Caller, Callee] | null = null;
  private subFactories: IMessageProcessorFactory[] = [Publisher, Subscriber, Caller, Callee];

  private readonly logger: Logger;

  private idGen: IDGen;
  private state: ConnectionStateMachine;
  constructor(private connectionOptions: ConnectionOptions) {
    // TODO: Improve logging...
    // tslint:disable-next-line
    this.connectionOptions.transportOptions = this.connectionOptions.transportOptions || {};
    this.state = new ConnectionStateMachine();
    this.idGen = createIdGens();
    this.logger = new Logger('Connection.ts', connectionOptions.logFunction);
  }

  public Open(): Promise<WelcomeDetails> {
    if (!!this.transport) {
      return Promise.reject('Transport already opened or opening');
    }
    this.transport = new this.connectionOptions.transport(
      this.connectionOptions.serializer,
      this.connectionOptions.transportOptions,
    );
    this.state = new ConnectionStateMachine();
    this.onOpen = new Deferred();
    this.transport.Open(this.connectionOptions.endpoint, this.handleTransportEvent.bind(this));

    this.logger.log(
      LogLevel.DEBUG,
      `Opened Connection with ${this.connectionOptions.serializer.ProtocolID()} and ${this.transport.name}`,
    );
    return this.onOpen.promise;
  }

  public OnClose(): Promise<ConnectionCloseInfo> {
    if (!this.onClose) {
      this.onClose = new Deferred();
    }
    return this.onClose.promise;
  }

  public Close(): Promise<ConnectionCloseInfo> {
    if (!this.transport) {
      return Promise.reject('transport is not open');
    }
    this.transport.Send([
      EWampMessageID.GOODBYE,
      { message: 'client shutdown' },
      'wamp.close.normal',
    ]);

    this.logger.log(LogLevel.DEBUG, 'Closing Connection');
    this.state.update([EMessageDirection.SENT, EWampMessageID.GOODBYE]);
    return this.OnClose();
  }

  public CancelCall(callid: WampID, mode?: ECallKillMode): void {
    if (!this.subHandlers) {
      throw new Error('invalid session state');
    }
    this.subHandlers[2].CancelCall(callid, mode);
  }

  public Call<
    A extends WampList,
    K extends WampDict,
    RA extends WampList,
    RK extends WampDict
    >(uri: WampURI, args?: A, kwargs?: K, opts?: CallOptions): [Promise<CallResult<RA, RK>>, WampID] {
    if (!this.subHandlers) {
      return [Promise.reject('invalid session state'), -1];
    }
    return this.subHandlers[2].Call(uri, args, kwargs, opts);
  }

  public Register<
    A extends WampList,
    K extends WampDict,
    RA extends WampList,
    RK extends WampDict
    >(uri: WampURI, handler: CallHandler<A, K, RA, RK>, opts?: RegisterOptions): Promise<IRegistration> {
    if (!this.subHandlers) {
      return Promise.reject('invalid session state');
    }
    return this.subHandlers[3].Register(uri, handler, opts);
  }
  public Subscribe<
    A extends WampList,
    K extends WampDict
    >(uri: WampURI, handler: EventHandler<A, K>, opts?: SubscribeOptions): Promise<ISubscription> {
    if (!this.subHandlers) {
      return Promise.reject('invalid session state');
    }
    return this.subHandlers[1].Subscribe(uri, handler, opts);
  }
  public Publish<
    A extends WampList,
    K extends WampDict
    >(uri: WampURI, args?: A, kwargs?: K, opts?: PublishOptions): Promise<IPublication> {
    if (!this.subHandlers) {
      return Promise.reject('invalid session state');
    }
    return this.subHandlers[0].Publish(uri, args, kwargs, opts);
  }

  private handleTransportEvent(event: TransportEvent): void {
    switch (event.type) {
      case ETransportEventType.OPEN: {
        this.sendHello();
        break;
      }
      case ETransportEventType.MESSAGE: {
        if (this.state.getState() === EConnectionState.ESTABLISHED) {
          this.processMessage(event.message);
        } else {
          this.processSessionMessage(event.message);
        }
        break;
      }
      case ETransportEventType.ERROR: {
        this.logger.log(LogLevel.DEBUG, `ConnError: ${event.error}`);
        if (this.state.getState() !== EConnectionState.ESTABLISHED) {
          this.handleOnOpen(new ConnectionOpenError(event.error));
        }
        break;
      }
      case ETransportEventType.CLOSE: {
        this.logger.log(LogLevel.DEBUG, `ConnClose: ${event.wasClean} ${event.code} ${event.reason}`)
        this.transport = null;
        const state = this.state.getState();
        this.state = new ConnectionStateMachine();
        if (!!this.subHandlers) {
          this.subHandlers.forEach(h => h.Close());
          this.subHandlers = null;
        }
        if (!this.handleOnOpen(new ConnectionOpenError(event.reason))) {
          this.handleOnClose(event.wasClean ? {
            code: event.code,
            reason: event.reason,
            wasClean: event.wasClean,
          } : new ConnectionCloseError(event.reason, event.code));
        }
        break;
      }
    }
  }

  private sendHello(): void {
    const details: HelloMessageDetails = {
      roles: Object.assign({}, ...this.subFactories.map(j => j.GetFeatures())),
      agent: 'kraftfahrstrasse pre-alpha',
    };

    if (!!this.connectionOptions.authProvider) {
      details.authid = this.connectionOptions.authProvider.AuthID();
      details.authmethods = [this.connectionOptions.authProvider.AuthMethod()];
    }

    const msg: WampHelloMessage = [
      EWampMessageID.HELLO,
      this.connectionOptions.realm,
      details,
    ];
    this.transport!.Send(msg).then(() => {
      this.state.update([EMessageDirection.SENT, EWampMessageID.HELLO]);
    }, err => {
      this.handleProtocolViolation(`Transport error: ${err}`);
    });
  }

  private processSessionMessage(msg: WampMessage): void {
    if (!this.transport) {
      return;
    }
    this.state.update([EMessageDirection.RECEIVED, msg[0]]);
    switch (this.state.getState()) {
      case EConnectionState.CHALLENGING: {
        const challengeMsg = msg as WampChallengeMessage;
        this.connectionOptions.authProvider.ComputeChallenge(challengeMsg[2] || {}).then(signature => {
          if (!this.transport) {
            return;
          }
          return this.transport.Send([
            EWampMessageID.AUTHENTICATE,
            signature.signature,
            signature.details || {},
          ]);
        }).then(() => {
          this.state.update([EMessageDirection.SENT, EWampMessageID.AUTHENTICATE]);
        }).catch(error => {
          if (!this.transport) {
            return;
          }
          this.logger.log(
            LogLevel.WARNING,
            [
              'Failed to compute challenge or send for auth provider',
              this.connectionOptions.authProvider,
              error,
            ],
          );
          this.transport.Close(3000, 'Authentication failed');
        });
        break;
      }
      case EConnectionState.ESTABLISHED: {
        this.idGen = createIdGens();
        this.subHandlers = this.subFactories.map(handlerClass => {
          return new handlerClass(
            async msgToSend => {
              await this.transport!.Send(msgToSend);
            },
            reason => {
              this.handleProtocolViolation(reason);
            },
            this.idGen,
            this.logger,
          );
        }) as any; // this works.
        // this is, because map on tuples is not defined typesafe-ish.
        // Harr, Harr, Harr

        const estabishedMessage = msg as WampWelcomeMessage;

        const [, sessionId, welcomeDetails] = estabishedMessage;

        this.sessionId = sessionId;
        this.logger.log(
          LogLevel.DEBUG,
          `Opened Connection with ${this.connectionOptions.serializer.ProtocolID()} and ${this.transport.name}`,
        );
        this.handleOnOpen(welcomeDetails);
        break;
      }
      case EConnectionState.CLOSING: {
        // We received a GOODBYE message from the server, so reply with goodbye and shutdown the transport.
        this.transport.Send([
          EWampMessageID.GOODBYE,
          { message: 'clean close' },
          'wamp.close.goodbye_and_out',
        ]);
        this.state.update([EMessageDirection.SENT, EWampMessageID.GOODBYE]);
        this.transport.Close(1000, 'wamp.close.normal');
        break;
      }
      case EConnectionState.CLOSED: {
        // Clean close finished, actually close the transport, so onClose and close Callbacks will be created
        this.transport.Close(1000, 'wamp.close.normal');
        break;
      }
      case EConnectionState.ERROR: {
        // protocol violation, so close the transport not clean (i.e. code 3000)
        // and if we encountered the error, send an ABORT message to the server
        if (msg[0] !== EWampMessageID.ABORT) {
          this.handleProtocolViolation('protocol violation during session establish');
        } else {
          this.transport.Close(3000, msg[2]);
          this.handleOnOpen(new ConnectionOpenError(msg[2], msg[1]));
        }
        break;
      }
    }
  }

  private processMessage(msg: WampMessage): void {
    if (msg[0] === EWampMessageID.GOODBYE) {
      this.state.update([EMessageDirection.RECEIVED, msg[0]]);
      return;
    }
    let success = false;
    for (const subHandler of this.subHandlers!) {
      success = subHandler.ProcessMessage(msg);
      if (success) {
        break;
      }
    }
    if (!success) {
      this.logger.log(LogLevel.ERROR, `Unhandled message: ${JSON.stringify(msg)}`);
      this.handleProtocolViolation('no handler found for message');
    }
  }

  private handleProtocolViolation(reason: WampURI): void {
    if (!this.transport) {
      this.logger.log(LogLevel.ERROR, 'Failed to handle protocol violation: Already closed.');
      return;
    }
    const abortMessage: WampAbortMessage = [
      EWampMessageID.ABORT,
      { message: reason },
      'wamp.error.protocol_violation',
    ];

    this.logger.log(LogLevel.ERROR, `Protocol violation: ${reason }`);
    this.transport.Send(abortMessage);
    this.transport.Close(3000, 'protcol_violation');
    this.handleOnOpen(new ConnectionOpenError('protocol violation'));
  }

  private handleOnOpen(details: Error | WelcomeDetails): boolean {
    if (!this.onOpen) {
      return false;
    }
    if (details instanceof Error) {
      this.onOpen.reject(details);
      // Reset the transport property, so we can run .open() again.
      this.transport = null;
    } else {
      this.onOpen.resolve(details);
    }
    this.onOpen = null;
    return true;
  }

  private handleOnClose(details: Error | ConnectionCloseInfo): void {
    if (!this.onClose) {
      return;
    }
    if (details instanceof Error) {
      this.onClose.reject(details);
    } else {
      this.onClose.resolve(details);
    }
    this.onClose = null;
  }
}