core/siri_stop_monitoring_subscription_broadcaster.go

Summary

Maintainability
A
2 hrs
Test Coverage
package core

import (
    "fmt"
    "slices"
    "strconv"
    "strings"
    "sync"

    "bitbucket.org/enroute-mobi/ara/audit"
    "bitbucket.org/enroute-mobi/ara/core/ls"
    "bitbucket.org/enroute-mobi/ara/logger"
    "bitbucket.org/enroute-mobi/ara/model"
    "bitbucket.org/enroute-mobi/ara/siri/siri"
    "bitbucket.org/enroute-mobi/ara/siri/sxml"
)

type SIRIStopMonitoringSubscriptionBroadcaster struct {
    connector

    stopMonitoringBroadcaster SIRIStopMonitoringBroadcaster
    toBroadcast               map[SubscriptionId][]model.StopVisitId
    notMonitored              map[SubscriptionId]map[string]struct{}

    mutex *sync.Mutex //protect the map
}

type SIRIStopMonitoringSubscriptionBroadcasterFactory struct{}

func (factory *SIRIStopMonitoringSubscriptionBroadcasterFactory) CreateConnector(partner *Partner) Connector {
    if _, ok := partner.Connector(SIRI_SUBSCRIPTION_REQUEST_DISPATCHER); !ok {
        partner.CreateSubscriptionRequestDispatcher()
    }
    return newSIRIStopMonitoringSubscriptionBroadcaster(partner)
}

func (factory *SIRIStopMonitoringSubscriptionBroadcasterFactory) Validate(apiPartner *APIPartner) {
    apiPartner.ValidatePresenceOfRemoteCodeSpace()
    apiPartner.ValidatePresenceOfRemoteCredentials()
    apiPartner.ValidatePresenceOfLocalCredentials()
}

func newSIRIStopMonitoringSubscriptionBroadcaster(partner *Partner) *SIRIStopMonitoringSubscriptionBroadcaster {
    connector := &SIRIStopMonitoringSubscriptionBroadcaster{}
    connector.remoteCodeSpace = partner.RemoteCodeSpace(SIRI_STOP_MONITORING_SUBSCRIPTION_BROADCASTER)
    connector.partner = partner
    connector.mutex = &sync.Mutex{}
    connector.toBroadcast = make(map[SubscriptionId][]model.StopVisitId)
    connector.notMonitored = make(map[SubscriptionId]map[string]struct{})

    connector.stopMonitoringBroadcaster = NewSIRIStopMonitoringBroadcaster(connector)

    return connector
}

func (connector *SIRIStopMonitoringSubscriptionBroadcaster) Stop() {
    connector.stopMonitoringBroadcaster.Stop()
}

func (connector *SIRIStopMonitoringSubscriptionBroadcaster) Start() {
    connector.stopMonitoringBroadcaster.Start()
}

func (connector *SIRIStopMonitoringSubscriptionBroadcaster) HandleStopMonitoringBroadcastEvent(event *model.StopMonitoringBroadcastEvent) {
    switch event.ModelType {
    case "StopVisit":
        sv, ok := connector.partner.Model().StopVisits().Find(model.StopVisitId(event.ModelId))
        if ok {
            subsIds := connector.checkEvent(sv)
            if len(subsIds) != 0 {
                connector.addStopVisit(subsIds, sv.Id())
            }
        }
    case "VehicleJourney":
        svs := connector.partner.Model().StopVisits().FindFollowingByVehicleJourneyId(model.VehicleJourneyId(event.ModelId))
        for i := range svs {
            subsIds := connector.checkEvent(svs[i])
            if len(subsIds) != 0 {
                connector.addStopVisit(subsIds, svs[i].Id())
            }
        }
    case "StopArea":
        sa, ok := connector.partner.Model().StopAreas().Find(model.StopAreaId(event.ModelId))
        if ok {
            connector.checkStopAreaEvent(sa)
        }
    default:
        return
    }
}

func (connector *SIRIStopMonitoringSubscriptionBroadcaster) addStopVisit(subsIds []SubscriptionId, svId model.StopVisitId) {
    connector.mutex.Lock()
    for _, subId := range subsIds {
        connector.toBroadcast[SubscriptionId(subId)] = append(connector.toBroadcast[SubscriptionId(subId)], svId)
    }
    connector.mutex.Unlock()
}

func (connector *SIRIStopMonitoringSubscriptionBroadcaster) checkEvent(sv *model.StopVisit) (subscriptionIds []SubscriptionId) {
    if sv.Origin == string(connector.Partner().Slug()) {
        return
    }

    vj, _ := connector.partner.Model().VehicleJourneys().Find(sv.VehicleJourneyId)

    for _, stopAreaCode := range connector.partner.Model().StopAreas().FindAscendantsWithCodeSpace(sv.StopAreaId, connector.remoteCodeSpace) {
        subs := connector.partner.Subscriptions().FindByResourceId(stopAreaCode.String(), StopMonitoringBroadcast)

        for _, sub := range subs {
            resource := sub.Resource(stopAreaCode)
            if resource == nil || resource.SubscribedUntil.Before(connector.Clock().Now()) {
                continue
            }

            // Handle LineRef filter
            if ok := connector.checkLineRefFilter(sub, vj.LineId); !ok {
                continue
            }

            lastState, ok := resource.LastState(string(sv.Id()))
            if ok && !lastState.(*ls.StopMonitoringLastChange).Haschanged(sv) {
                continue
            }

            if !ok {
                resource.SetLastState(string(sv.Id()), ls.NewStopMonitoringLastChange(sv, sub))
            }

            subscriptionIds = append(subscriptionIds, sub.Id())
        }
    }

    return
}

func (connector *SIRIStopMonitoringSubscriptionBroadcaster) checkStopAreaEvent(stopArea *model.StopArea) {
    obj, ok := stopArea.Code(connector.remoteCodeSpace)
    if !ok {
        return
    }

    connector.mutex.Lock()

    subs := connector.partner.Subscriptions().FindByResourceId(obj.String(), StopMonitoringBroadcast)
    for _, sub := range subs {
        resource := sub.Resource(obj)
        if resource == nil || resource.SubscribedUntil.Before(connector.Clock().Now()) {
            continue
        }

        lastState, ok := resource.LastState(string(stopArea.Id()))
        if ok {
            partners, ok := lastState.(*ls.StopAreaLastChange).Haschanged(stopArea)
            if ok {
                nm, ok := connector.notMonitored[sub.Id()]
                if !ok {
                    nm = make(map[string]struct{})
                    connector.notMonitored[sub.Id()] = nm
                }
                for _, partner := range partners {
                    if partner != string(connector.partner.Slug()) {
                        nm[partner] = struct{}{}
                    }
                }
            }
            lastState.(*ls.StopAreaLastChange).UpdateState(stopArea)
        } else { // Should not happen
            resource.SetLastState(string(stopArea.Id()), ls.NewStopAreaLastChange(stopArea, sub))
        }
    }

    connector.mutex.Unlock()
}

func (connector *SIRIStopMonitoringSubscriptionBroadcaster) HandleSubscriptionRequest(request *sxml.XMLSubscriptionRequest, message *audit.BigQueryMessage) (resps []siri.SIRIResponseStatus) {
    var monitoringRefs, subIds []string

    for _, sm := range request.XMLSubscriptionSMEntries() {
        rs := siri.SIRIResponseStatus{
            RequestMessageRef: sm.MessageIdentifier(),
            SubscriberRef:     sm.SubscriberRef(),
            SubscriptionRef:   sm.SubscriptionIdentifier(),
            ResponseTimestamp: connector.Clock().Now(),
        }

        monitoringRefs = append(monitoringRefs, sm.MonitoringRef())

        code := model.NewCode(connector.remoteCodeSpace, sm.MonitoringRef())
        sa, ok := connector.partner.Model().StopAreas().FindByCode(code)
        if !ok {
            rs.ErrorType = "InvalidDataReferencesError"
            rs.ErrorText = fmt.Sprintf("StopArea not found: '%s'", code.Value())
            resps = append(resps, rs)

            message.Status = "Error"
            continue
        }

        subIds = append(subIds, sm.SubscriptionIdentifier())

        sub, ok := connector.Partner().Subscriptions().FindByExternalId(sm.SubscriptionIdentifier())
        if ok {
            if sub.Kind() != StopMonitoringBroadcast {
                logger.Log.Debugf("StopMonitoring subscription request with a duplicated Id: %v", sm.SubscriptionIdentifier())
                rs.ErrorType = "OtherError"
                rs.ErrorNumber = 2
                rs.ErrorText = fmt.Sprintf("[BAD_REQUEST] Subscription Id %v already exists", sm.SubscriptionIdentifier())
                resps = append(resps, rs)

                message.Status = "Error"
                continue
            }

            sub.Delete()
        }

        sub = connector.Partner().Subscriptions().New(StopMonitoringBroadcast)
        sub.SubscriberRef = sm.SubscriberRef()
        sub.SetExternalId(sm.SubscriptionIdentifier())

        ref := model.Reference{
            Code: &code,
            Type: "StopArea",
        }

        r := sub.CreateAndAddNewResource(ref)
        r.Subscribed(connector.Clock().Now())
        r.SubscribedUntil = sm.InitialTerminationTime()

        connector.fillOptions(sub, request, sm)
        if sm.LineRef() != "" {
            sub.SetSubscriptionOption("LineRef", fmt.Sprintf("%s:%s", connector.remoteCodeSpace, sm.LineRef()))
        }

        rs.Status = true
        rs.ValidUntil = sm.InitialTerminationTime()
        resps = append(resps, rs)

        // Init SA LastChange
        r.SetLastState(string(sa.Id()), ls.NewStopAreaLastChange(sa, sub))
        // Init StopVisits LastChange
        connector.addStopAreaStopVisits(sa, sub, r)

        sub.Save()
    }

    message.Type = audit.STOP_MONITORING_SUBSCRIPTION_REQUEST
    message.SubscriptionIdentifiers = subIds
    message.StopAreas = monitoringRefs

    return
}

func (connector *SIRIStopMonitoringSubscriptionBroadcaster) addStopAreaStopVisits(sa *model.StopArea, sub *Subscription, res *SubscribedResource) {
    for _, saId := range connector.partner.Model().StopAreas().FindFamily(sa.Id()) {
        svs := connector.partner.Model().StopVisits().FindFollowingByStopAreaId(saId)
        for i := range svs {
            if _, ok := res.LastState(string(svs[i].Id())); ok {
                continue
            }

            // Handle LineRef filter
            vj, _ := connector.partner.Model().VehicleJourneys().Find(svs[i].VehicleJourneyId)
            if ok := connector.checkLineRefFilter(sub, vj.LineId); !ok {
                continue
            }

            res.SetLastState(string(svs[i].Id()), ls.NewStopMonitoringLastChange(svs[i], sub))
            connector.addStopVisit([]SubscriptionId{sub.Id()}, svs[i].Id())
        }
    }
}

// WIP Need to do something about this method Refs #6338
func (smsb *SIRIStopMonitoringSubscriptionBroadcaster) fillOptions(s *Subscription, request *sxml.XMLSubscriptionRequest, sm *sxml.XMLStopMonitoringSubscriptionRequestEntry) {
    changeBeforeUpdates := request.ChangeBeforeUpdates()
    if changeBeforeUpdates == "" {
        changeBeforeUpdates = "PT1M"
    }

    s.SetSubscriptionOption("StopVisitTypes", sm.StopVisitTypes())
    s.SetSubscriptionOption("IncrementalUpdates", request.IncrementalUpdates())
    s.SetSubscriptionOption("MaximumStopVisits", strconv.Itoa(sm.MaximumStopVisits()))
    s.SetSubscriptionOption("ChangeBeforeUpdates", changeBeforeUpdates)
    s.SetSubscriptionOption("MessageIdentifier", request.MessageIdentifier())
}

// Returns the LineId of the line defined in the LineRef subscription option
// If LineRef isn't defined or with an incorrect format, returns false
func (connector *SIRIStopMonitoringSubscriptionBroadcaster) checkLineRefFilter(sub *Subscription, lineId model.LineId) (ok bool) {
    lineRef := sub.SubscriptionOption("LineRef")
    if lineRef == "" {
        return true
    }
    kindValue := strings.SplitN(lineRef, ":", 2)
    if len(kindValue) != 2 { // Should not happen but we don't want an index out of range panic
        logger.Log.Debugf("The LineRef Setting hasn't been stored in the correct format: %v", lineRef)
        return
    }
    lineIds := connector.partner.Model().Lines().FindFamilyFromCode(model.NewCode(kindValue[0], kindValue[1]))

    ok = slices.Contains(lineIds, lineId)

    return
}

// START TEST

type TestSIRIStopMonitoringSubscriptionBroadcasterFactory struct{}

type TestStopMonitoringSubscriptionBroadcaster struct {
    connector

    events []*model.StopMonitoringBroadcastEvent
}

func NewTestStopMonitoringSubscriptionBroadcaster() *TestStopMonitoringSubscriptionBroadcaster {
    connector := &TestStopMonitoringSubscriptionBroadcaster{}
    return connector
}

func (connector *TestStopMonitoringSubscriptionBroadcaster) HandleStopMonitoringBroadcastEvent(event *model.StopMonitoringBroadcastEvent) {
    connector.events = append(connector.events, event)
}

func (factory *TestSIRIStopMonitoringSubscriptionBroadcasterFactory) Validate(apiPartner *APIPartner) {
} // Always valid

func (factory *TestSIRIStopMonitoringSubscriptionBroadcasterFactory) CreateConnector(partner *Partner) Connector {
    return NewTestStopMonitoringSubscriptionBroadcaster()
}

// END TEST