core/siri_general_message_subscriber.go

Summary

Maintainability
C
1 day
Test Coverage
package core

import (
    "fmt"
    "time"

    "bitbucket.org/enroute-mobi/ara/audit"
    "bitbucket.org/enroute-mobi/ara/clock"
    "bitbucket.org/enroute-mobi/ara/logger"
    "bitbucket.org/enroute-mobi/ara/model"
    "bitbucket.org/enroute-mobi/ara/siri/siri"
    "bitbucket.org/enroute-mobi/ara/state"
)

type SIRIGeneralMessageSubscriber interface {
    state.Stopable
    state.Startable
}

type GMSubscriber struct {
    clock.ClockConsumer

    connector *SIRIGeneralMessageSubscriptionCollector
}

type GeneralMessageSubscriber struct {
    GMSubscriber

    stop chan struct{}
}

type FakeGeneralMessageSubscriber struct {
    GMSubscriber
}

type resourceToRequest struct {
    subId SubscriptionId
    code  model.Code
    kind  string
}

func NewFakeGeneralMessageSubscriber(connector *SIRIGeneralMessageSubscriptionCollector) SIRIGeneralMessageSubscriber {
    subscriber := &FakeGeneralMessageSubscriber{}
    subscriber.connector = connector
    return subscriber
}

func (subscriber *FakeGeneralMessageSubscriber) Start() {
    subscriber.prepareSIRIGeneralMessageSubscriptionRequest()
}

func (subscriber *FakeGeneralMessageSubscriber) Stop() {}

func NewSIRIGeneralMessageSubscriber(connector *SIRIGeneralMessageSubscriptionCollector) SIRIGeneralMessageSubscriber {
    subscriber := &GeneralMessageSubscriber{}
    subscriber.connector = connector
    return subscriber
}

func (subscriber *GeneralMessageSubscriber) Start() {
    logger.Log.Debugf("Start GeneralMessageSubscriber")

    subscriber.stop = make(chan struct{})
    go subscriber.run()
}

func (subscriber *GeneralMessageSubscriber) run() {
    c := subscriber.Clock().After(5 * time.Second)

    for {
        select {
        case <-subscriber.stop:
            return
        case <-c:
            logger.Log.Debugf("SIRIGeneralMessageSubscriber visit")

            subscriber.prepareSIRIGeneralMessageSubscriptionRequest()

            c = subscriber.Clock().After(5 * time.Second)
        }
    }
}

func (subscriber *GeneralMessageSubscriber) Stop() {
    if subscriber.stop != nil {
        close(subscriber.stop)
    }
}

func (subscriber *GMSubscriber) prepareSIRIGeneralMessageSubscriptionRequest() {
    subscriptions := subscriber.connector.partner.Subscriptions().FindSubscriptionsByKind(GeneralMessageCollect)
    if len(subscriptions) == 0 {
        logger.Log.Debugf("GeneralMessageSubscriber visit without GeneralMessageCollect subscriptions")
        return
    }

    lineRefList := []string{}
    stopPointRefList := []string{}

    resourcesToRequest := make(map[string]*resourceToRequest)
    for _, subscription := range subscriptions {
        for _, resource := range subscription.ResourcesByCodeCopy() {
            if resource.SubscribedAt().IsZero() && resource.RetryCount <= 10 {
                messageIdentifier := subscriber.connector.Partner().NewMessageIdentifier()
                logger.Log.Debugf("send request for subscription with id : %v", subscription.id)
                resourcesToRequest[messageIdentifier] = &resourceToRequest{
                    subId: subscription.id,
                    code:  *(resource.Reference.Code),
                    kind:  resource.Reference.Type,
                }
            }
        }
    }

    if len(resourcesToRequest) == 0 {
        return
    }

    message := subscriber.newBQEvent()
    defer audit.CurrentBigQuery(string(subscriber.connector.Partner().Referential().Slug())).WriteEvent(message)

    gmRequest := &siri.SIRIGeneralMessageSubscriptionRequest{
        ConsumerAddress:   subscriber.connector.Partner().Address(),
        MessageIdentifier: subscriber.connector.Partner().NewMessageIdentifier(),
        RequestorRef:      subscriber.connector.Partner().RequestorRef(),
        RequestTimestamp:  subscriber.Clock().Now(),
    }

    var subIDs []string
    for messageIdentifier, requestedResource := range resourcesToRequest {
        entry := &siri.SIRIGeneralMessageSubscriptionRequestEntry{
            SubscriberRef:          subscriber.connector.Partner().RequestorRef(),
            SubscriptionIdentifier: string(requestedResource.subId),
            InitialTerminationTime: subscriber.Clock().Now().Add(48 * time.Hour),
        }
        entry.MessageIdentifier = messageIdentifier
        entry.RequestTimestamp = subscriber.Clock().Now()
        subIDs = append(subIDs, entry.SubscriptionIdentifier)
        switch requestedResource.kind {
        case "Line":
            entry.LineRef = []string{requestedResource.code.Value()}
            lineRefList = append(lineRefList, requestedResource.code.Value())
        case "StopArea":
            entry.StopPointRef = []string{requestedResource.code.Value()}
            stopPointRefList = append(stopPointRefList, requestedResource.code.Value())
        }

        if subscriber.connector.Partner().GeneralMessageRequestVersion22() {
            entry.XsdInWsdl = true
        }

        gmRequest.Entries = append(gmRequest.Entries, entry)
    }

    message.RequestIdentifier = gmRequest.MessageIdentifier
    message.RequestRawMessage, _ = gmRequest.BuildXML(subscriber.connector.Partner().SIRIEnvelopeType())
    message.RequestSize = int64(len(message.RequestRawMessage))
    message.StopAreas = stopPointRefList
    message.Lines = lineRefList
    message.SubscriptionIdentifiers = subIDs

    startTime := subscriber.Clock().Now()
    response, err := subscriber.connector.Partner().SIRIClient().GeneralMessageSubscription(gmRequest)
    message.ProcessingTime = subscriber.Clock().Since(startTime).Seconds()
    if err != nil {
        logger.Log.Debugf("Error while subscribing: %v", err)
        e := fmt.Sprintf("Error during GeneralMessageSubscriptionRequest: %v", err)
        subscriber.incrementRetryCountFromMap(resourcesToRequest)

        message.Status = "Error"
        message.ErrorDetails = e
        return
    }

    message.ResponseRawMessage = response.RawXML()
    message.ResponseSize = int64(len(message.ResponseRawMessage))

    for _, responseStatus := range response.ResponseStatus() {
        requestedResource, ok := resourcesToRequest[responseStatus.RequestMessageRef()]
        if !ok {
            logger.Log.Debugf("ResponseStatus RequestMessageRef unknown: %v", responseStatus.RequestMessageRef())
            continue
        }
        delete(resourcesToRequest, responseStatus.RequestMessageRef()) // See #4691

        subscription, ok := subscriber.connector.partner.Subscriptions().Find(requestedResource.subId)
        if !ok { // Should never happen
            logger.Log.Debugf("Response for unknown subscription %v", requestedResource.subId)
            continue
        }
        resource := subscription.Resource(requestedResource.code)
        if resource == nil { // Should never happen
            logger.Log.Debugf("Response for unknown subscription resource %v", requestedResource.code.String())
            continue
        }

        if !responseStatus.Status() {
            logger.Log.Debugf("Subscription status false for line %v: %v %v ", requestedResource.code.Value(), responseStatus.ErrorType(), responseStatus.ErrorText())
            resource.RetryCount++
            message.Status = "Error"
            continue
        }
        resource.Subscribed(subscriber.Clock().Now())
        resource.RetryCount = 0
    }
    // Should not happen but see #4691
    if len(resourcesToRequest) == 0 {
        return
    }
    subscriber.incrementRetryCountFromMap(resourcesToRequest)
}

func (subscriber *GMSubscriber) incrementRetryCountFromMap(resourcesToRequest map[string]*resourceToRequest) {
    for _, requestedResource := range resourcesToRequest {
        subscription, ok := subscriber.connector.partner.Subscriptions().Find(requestedResource.subId)
        if !ok { // Should never happen
            continue
        }
        resource := subscription.Resource(requestedResource.code)
        if resource == nil { // Should never happen
            continue
        }
        resource.RetryCount++
    }
}

func (subscriber *GMSubscriber) newBQEvent() *audit.BigQueryMessage {
    return &audit.BigQueryMessage{
        Type:      "GeneralMessageSubscriptionRequest",
        Protocol:  "siri",
        Direction: "sent",
        Partner:   string(subscriber.connector.partner.Slug()),
        Status:    "OK",
    }
}