core/siri_subscription_request_dispatcher.go

Summary

Maintainability
A
3 hrs
Test Coverage
package core

import (
    "fmt"

    "bitbucket.org/enroute-mobi/ara/audit"
    "bitbucket.org/enroute-mobi/ara/logger"
    "bitbucket.org/enroute-mobi/ara/siri/siri"
    "bitbucket.org/enroute-mobi/ara/siri/sxml"
)

type SubscriptionRequestDispatcher interface {
    Dispatch(*sxml.XMLSubscriptionRequest, *audit.BigQueryMessage) (*siri.SIRISubscriptionResponse, error)
    CancelSubscription(*sxml.XMLDeleteSubscriptionRequest, *audit.BigQueryMessage) *siri.SIRIDeleteSubscriptionResponse
    HandleSubscriptionTerminatedNotification(*sxml.XMLSubscriptionTerminatedNotification)
    HandleNotifySubscriptionTerminated(*sxml.XMLNotifySubscriptionTerminated)
}

type SIRISubscriptionRequestDispatcherFactory struct{}

type SIRISubscriptionRequestDispatcher struct {
    connector
}

func (factory *SIRISubscriptionRequestDispatcherFactory) Validate(apiPartner *APIPartner) {
    apiPartner.ValidatePresenceOfRemoteCodeSpace()
    apiPartner.ValidatePresenceOfRemoteCredentials()
}

func (factory *SIRISubscriptionRequestDispatcherFactory) CreateConnector(partner *Partner) Connector {
    return NewSIRISubscriptionRequestDispatcher(partner)
}

func NewSIRISubscriptionRequestDispatcher(partner *Partner) *SIRISubscriptionRequestDispatcher {
    siriSubscriptionRequest := &SIRISubscriptionRequestDispatcher{}
    siriSubscriptionRequest.partner = partner

    return siriSubscriptionRequest
}

func (connector *SIRISubscriptionRequestDispatcher) Dispatch(request *sxml.XMLSubscriptionRequest, message *audit.BigQueryMessage) (*siri.SIRISubscriptionResponse, error) {
    response := siri.SIRISubscriptionResponse{
        Address:            connector.Partner().Address(),
        ResponderRef:       connector.Partner().RequestorRef(),
        ResponseTimestamp:  connector.Clock().Now(),
        RequestMessageRef:  request.MessageIdentifier(),
        ServiceStartedTime: connector.Partner().Referential().StartedAt(),
    }

    message.RequestIdentifier = request.MessageIdentifier()

    if len(request.XMLSubscriptionGMEntries()) > 0 {
        gmbc, ok := connector.Partner().Connector(SIRI_GENERAL_MESSAGE_SUBSCRIPTION_BROADCASTER)
        if !ok {
            return nil, fmt.Errorf("no GeneralMessageSubscriptionBroadcaster Connector")
        }

        response.ResponseStatus = gmbc.(*SIRIGeneralMessageSubscriptionBroadcaster).HandleSubscriptionRequest(request, message)

        return &response, nil
    }

    if len(request.XMLSubscriptionSXEntries()) > 0 {
        gmbc, ok := connector.Partner().Connector(SIRI_SITUATION_EXCHANGE_SUBSCRIPTION_BROADCASTER)
        if !ok {
            return nil, fmt.Errorf("no SituationExchangeSubscriptionBroadcaster Connector")
        }

        response.ResponseStatus = gmbc.(*SIRISituationExchangeSubscriptionBroadcaster).HandleSubscriptionRequest(request, message)

        return &response, nil
    }

    if len(request.XMLSubscriptionSMEntries()) > 0 {
        smbc, ok := connector.Partner().Connector(SIRI_STOP_MONITORING_SUBSCRIPTION_BROADCASTER)
        if !ok {
            return nil, fmt.Errorf("no StopMonitoringSubscriptionBroadcaster Connector")
        }

        response.ResponseStatus = smbc.(*SIRIStopMonitoringSubscriptionBroadcaster).HandleSubscriptionRequest(request, message)

        return &response, nil
    }

    if len(request.XMLSubscriptionETTEntries()) > 0 {
        smbc, ok := connector.Partner().Connector(SIRI_ESTIMATED_TIMETABLE_SUBSCRIPTION_BROADCASTER)
        if !ok {
            return nil, fmt.Errorf("no EstimatedTimetableSubscriptionBroadcaster Connector")
        }

        response.ResponseStatus = smbc.(*SIRIEstimatedTimetableSubscriptionBroadcaster).HandleSubscriptionRequest(request, message)

        return &response, nil
    }

    if len(request.XMLSubscriptionPTTEntries()) > 0 {
        ptbc, ok := connector.Partner().Connector(SIRI_PRODUCTION_TIMETABLE_SUBSCRIPTION_BROADCASTER)
        if !ok {
            return nil, fmt.Errorf("no ProductionTableSubscriptionBroadcaster Connector")
        }

        response.ResponseStatus = ptbc.(*SIRIProductionTimetableSubscriptionBroadcaster).HandleSubscriptionRequest(request, message)

        return &response, nil
    }

    if len(request.XMLSubscriptionVMEntries()) > 0 {
        vmbc, ok := connector.Partner().Connector(SIRI_VEHICLE_MONITORING_SUBSCRIPTION_BROADCASTER)
        if !ok {
            return nil, fmt.Errorf("no VehicleMonitoringSubscriptionBroadcaster Connector")
        }

        response.ResponseStatus = vmbc.(*SIRIVehicleMonitoringSubscriptionBroadcaster).HandleSubscriptionRequest(request, message)

        return &response, nil
    }

    return nil, fmt.Errorf("subscription not supported")
}

func (connector *SIRISubscriptionRequestDispatcher) CancelSubscription(r *sxml.XMLDeleteSubscriptionRequest, message *audit.BigQueryMessage) *siri.SIRIDeleteSubscriptionResponse {
    message.RequestIdentifier = r.MessageIdentifier()

    currentTime := connector.Clock().Now()
    resp := &siri.SIRIDeleteSubscriptionResponse{
        ResponderRef:      connector.Partner().RequestorRef(),
        RequestMessageRef: r.MessageIdentifier(),
        ResponseTimestamp: currentTime,
    }

    ignoreTerminate := connector.Partner().IgnoreTerminateSubscriptionsRequest()

    if r.CancelAll() {
        for _, subscription := range connector.Partner().Subscriptions().FindBroadcastSubscriptions() {
            responseStatus := &siri.SIRITerminationResponseStatus{
                SubscriberRef:     r.RequestorRef(),
                SubscriptionRef:   subscription.ExternalId(),
                ResponseTimestamp: currentTime,
                Status:            true,
            }

            if ignoreTerminate {
                responseStatus.ErrorType = "CapabilityNotSupportedError"
                responseStatus.ErrorText = "Subscription Termination is disabled for this Subscriber"
                responseStatus.Status = false
            }

            resp.ResponseStatus = append(resp.ResponseStatus, responseStatus)
        }

        if ignoreTerminate {
            message.Status = "Error"
            message.ErrorDetails = "Subscription Termination is disabled for this Subscriber"
        } else {
            connector.Partner().CancelBroadcastSubscriptions()
        }

        return resp
    }

    responseStatus := &siri.SIRITerminationResponseStatus{
        SubscriberRef:     r.RequestorRef(),
        SubscriptionRef:   r.SubscriptionRef(),
        ResponseTimestamp: currentTime,
    }

    message.SubscriptionIdentifiers = []string{r.SubscriptionRef()}

    resp.ResponseStatus = append(resp.ResponseStatus, responseStatus)

    sub, ok := connector.Partner().Subscriptions().FindByExternalId(r.SubscriptionRef())
    if !ok {
        logger.Log.Debugf("Could not Unsubscribe to unknow subscription %v", r.SubscriptionRef())

        responseStatus.ErrorType = "UnknownSubscriptionError"
        responseStatus.ErrorText = fmt.Sprintf("Subscription not found: '%s'", r.SubscriptionRef())

        message.Status = "Error"
        message.ErrorDetails = responseStatus.ErrorText

        return resp
    }

    if ignoreTerminate {
        logger.Log.Debugf("Subscription Termination is disabled for partner %s", connector.Partner().Id())

        responseStatus.ErrorType = "CapabilityNotSupportedError"
        responseStatus.ErrorText = "Subscription Termination is disabled for this Subscriber"

        message.Status = "Error"
        message.ErrorDetails = responseStatus.ErrorText

        return resp
    }

    responseStatus.Status = true

    sub.Delete()
    return resp
}

func (connector *SIRISubscriptionRequestDispatcher) HandleSubscriptionTerminatedNotification(r *sxml.XMLSubscriptionTerminatedNotification) {
    connector.partner.Subscriptions().DeleteById(SubscriptionId(r.SubscriptionRef()))
}

func (connector *SIRISubscriptionRequestDispatcher) HandleNotifySubscriptionTerminated(r *sxml.XMLNotifySubscriptionTerminated) {
    connector.partner.Subscriptions().DeleteById(SubscriptionId(r.SubscriptionRef()))
}