core/siri_estimated_timetable_subscription_broadcaster.go

Summary

Maintainability
D
2 days
Test Coverage
package core

import (
    "fmt"
    "strings"
    "sync"

    "bitbucket.org/enroute-mobi/ara/audit"
    "bitbucket.org/enroute-mobi/ara/core/ls"
    "bitbucket.org/enroute-mobi/ara/logger"
    "bitbucket.org/enroute-mobi/ara/model"
    "bitbucket.org/enroute-mobi/ara/siri/siri"
    "bitbucket.org/enroute-mobi/ara/siri/sxml"
)

type SIRIEstimatedTimetableSubscriptionBroadcaster struct {
    connector

    vjRemoteCodeSpaces            []string
    estimatedTimetableBroadcaster EstimatedTimetableBroadcaster
    toBroadcast                   map[SubscriptionId][]model.StopVisitId
    notMonitored                  map[SubscriptionId]map[string]struct{}

    mutex *sync.Mutex //protect the map
}

type SIRIEstimatedTimetableSubscriptionBroadcasterFactory struct{}

func (factory *SIRIEstimatedTimetableSubscriptionBroadcasterFactory) CreateConnector(partner *Partner) Connector {
    if _, ok := partner.Connector(SIRI_SUBSCRIPTION_REQUEST_DISPATCHER); !ok {
        partner.CreateSubscriptionRequestDispatcher()
    }
    return newSIRIEstimatedTimetableSubscriptionBroadcaster(partner)
}

func (factory *SIRIEstimatedTimetableSubscriptionBroadcasterFactory) Validate(apiPartner *APIPartner) {
    apiPartner.ValidatePresenceOfRemoteCodeSpace()
    apiPartner.ValidatePresenceOfRemoteCredentials()
    apiPartner.ValidatePresenceOfLocalCredentials()
}

func newSIRIEstimatedTimetableSubscriptionBroadcaster(partner *Partner) *SIRIEstimatedTimetableSubscriptionBroadcaster {
    connector := &SIRIEstimatedTimetableSubscriptionBroadcaster{}
    connector.remoteCodeSpace = partner.RemoteCodeSpace(SIRI_ESTIMATED_TIMETABLE_SUBSCRIPTION_BROADCASTER)
    connector.vjRemoteCodeSpaces = partner.VehicleJourneyRemoteCodeSpaceWithFallback(SIRI_ESTIMATED_TIMETABLE_SUBSCRIPTION_BROADCASTER)
    connector.partner = partner
    connector.mutex = &sync.Mutex{}
    connector.toBroadcast = make(map[SubscriptionId][]model.StopVisitId)

    connector.estimatedTimetableBroadcaster = NewSIRIEstimatedTimetableBroadcaster(connector)
    return connector
}

func (connector *SIRIEstimatedTimetableSubscriptionBroadcaster) HandleSubscriptionRequest(request *sxml.XMLSubscriptionRequest, message *audit.BigQueryMessage) (resps []siri.SIRIResponseStatus) {
    var lineIds, subIds []string

    for _, ett := range request.XMLSubscriptionETTEntries() {
        rs := siri.SIRIResponseStatus{
            RequestMessageRef: ett.MessageIdentifier(),
            SubscriberRef:     ett.SubscriberRef(),
            SubscriptionRef:   ett.SubscriptionIdentifier(),
            ResponseTimestamp: connector.Clock().Now(),
        }

        // for logging
        lineIds = append(lineIds, ett.Lines()...)

        sub, ok := connector.Partner().Subscriptions().FindByExternalId(ett.SubscriptionIdentifier())
        if ok {
            if sub.Kind() != EstimatedTimetableBroadcast {
                logger.Log.Debugf("EstimatedTimetable subscription request with a duplicated Id: %v", ett.SubscriptionIdentifier())
                rs.ErrorType = "OtherError"
                rs.ErrorNumber = 2
                rs.ErrorText = fmt.Sprintf("[BAD_REQUEST] Subscription Id %v already exists", ett.SubscriptionIdentifier())

                resps = append(resps, rs)
                message.Status = "Error"
                continue
            }

            sub.Delete()
        }

        resources, unknownLineIds := connector.checkLines(ett)
        if len(unknownLineIds) != 0 {
            logger.Log.Debugf("EstimatedTimetable subscription request Could not find line(s) with id : %v", strings.Join(unknownLineIds, ","))
            rs.ErrorType = "InvalidDataReferencesError"
            rs.ErrorText = fmt.Sprintf("Unknown Line(s) %v", strings.Join(unknownLineIds, ","))

            resps = append(resps, rs)
            message.Status = "Error"
            continue
        }

        rs.Status = true
        rs.ValidUntil = ett.InitialTerminationTime()
        resps = append(resps, rs)

        subIds = append(subIds, ett.SubscriptionIdentifier())

        sub = connector.Partner().Subscriptions().New(EstimatedTimetableBroadcast)
        sub.SubscriberRef = ett.SubscriberRef()
        sub.SetExternalId(ett.SubscriptionIdentifier())
        connector.fillOptions(sub, request)

        for _, r := range resources {
            line, ok := connector.Partner().Model().Lines().FindByCode(*r.Reference.Code)
            if !ok {
                continue
            }

            // Init StopVisits LastChange
            connector.addLineStopVisits(sub, r, line.Id())

            sub.AddNewResource(r)
        }
        sub.Save()
    }
    message.Type = audit.ESTIMATED_TIMETABLE_SUBSCRIPTION_REQUEST
    message.SubscriptionIdentifiers = subIds
    message.Lines = lineIds

    return resps
}

func (connector *SIRIEstimatedTimetableSubscriptionBroadcaster) addLineStopVisits(sub *Subscription, res *SubscribedResource, lineId model.LineId) {
    lineIds := connector.partner.Model().Lines().FindFamily(lineId)
    for i := range lineIds {
        sas := connector.partner.Model().StopAreas().FindByLineId(lineIds[i])
        for i := range sas {
            // Init SA LastChange
            res.SetLastState(string(sas[i].Id()), ls.NewStopAreaLastChange(sas[i], sub))
            svs := connector.partner.Model().StopVisits().FindFollowingByStopAreaId(sas[i].Id())
            for i := range svs {
                connector.addStopVisit(sub.Id(), svs[i].Id())
            }
        }

    }
}

func (connector *SIRIEstimatedTimetableSubscriptionBroadcaster) checkLines(ett *sxml.XMLEstimatedTimetableSubscriptionRequestEntry) (resources []*SubscribedResource, lineIds []string) {
    // check for subscription to all lines
    if len(ett.Lines()) == 0 {
        var lv []string
        //find all lines corresponding to the remoteCodeSpace
        for _, line := range connector.Partner().Model().Lines().FindAll() {
            lineCode, ok := line.Code(connector.remoteCodeSpace)
            if ok {
                lv = append(lv, lineCode.Value())
                continue
            }
        }

        for _, lineValue := range lv {
            lineCode := model.NewCode(connector.remoteCodeSpace, lineValue)
            ref := model.Reference{
                Code: &lineCode,
                Type: "Line",
            }
            r := NewResource(ref)
            r.Subscribed(connector.Clock().Now())
            r.SubscribedUntil = ett.InitialTerminationTime()
            resources = append(resources, r)
        }
        return resources, lineIds
    }

    for _, lineId := range ett.Lines() {

        lineCode := model.NewCode(connector.remoteCodeSpace, lineId)
        _, ok := connector.Partner().Model().Lines().FindByCode(lineCode)

        if !ok {
            lineIds = append(lineIds, lineId)
            continue
        }

        ref := model.Reference{
            Code: &lineCode,
            Type: "Line",
        }

        r := NewResource(ref)
        r.Subscribed(connector.Clock().Now())
        r.SubscribedUntil = ett.InitialTerminationTime()
        resources = append(resources, r)
    }
    return resources, lineIds
}

func (connector *SIRIEstimatedTimetableSubscriptionBroadcaster) Stop() {
    connector.estimatedTimetableBroadcaster.Stop()
}

func (connector *SIRIEstimatedTimetableSubscriptionBroadcaster) Start() {
    connector.estimatedTimetableBroadcaster.Start()
}

func (ettb *SIRIEstimatedTimetableSubscriptionBroadcaster) fillOptions(s *Subscription, request *sxml.XMLSubscriptionRequest) {
    changeBeforeUpdates := request.ChangeBeforeUpdates()
    if changeBeforeUpdates == "" {
        changeBeforeUpdates = "PT1M"
    }
    s.SetSubscriptionOption("ChangeBeforeUpdates", changeBeforeUpdates)
    s.SetSubscriptionOption("MessageIdentifier", request.MessageIdentifier())
}

func (connector *SIRIEstimatedTimetableSubscriptionBroadcaster) HandleBroadcastEvent(event *model.StopMonitoringBroadcastEvent) {
    switch event.ModelType {
    case "StopVisit":
        connector.checkEvent(model.StopVisitId(event.ModelId))
    case "StopArea":
        sa, ok := connector.partner.Model().StopAreas().Find(model.StopAreaId(event.ModelId))
        if ok {
            connector.checkStopAreaEvent(sa)
        }
    default:
        return
    }
}

func (connector *SIRIEstimatedTimetableSubscriptionBroadcaster) checkEvent(svId model.StopVisitId) {
    sv, ok := connector.Partner().Model().StopVisits().Find(svId)
    if !ok {
        return
    }

    vj, ok := connector.Partner().Model().VehicleJourneys().Find(sv.VehicleJourneyId)
    if !ok {
        return
    }

    line, ok := connector.Partner().Model().Lines().Find(vj.LineId)
    if !ok {
        return
    }

    lineObj, ok := line.ReferentOrSelfCode(connector.remoteCodeSpace)
    if !ok {
        return
    }

    subs := connector.Partner().Subscriptions().FindByResourceId(lineObj.String(), EstimatedTimetableBroadcast)

    for _, sub := range subs {
        r := sub.Resource(lineObj)
        if r == nil || r.SubscribedUntil.Before(connector.Clock().Now()) {
            continue
        }

        lastState, ok := r.LastState(string(svId))
        if ok && !lastState.(*ls.EstimatedTimetableLastChange).Haschanged(sv) {
            continue
        }

        if !ok {
            r.SetLastState(string(sv.Id()), ls.NewEstimatedTimetableLastChange(sv, sub))
        }
        connector.addFilteredStopVisit(sub.Id(), sv)
    }
}

func (connector *SIRIEstimatedTimetableSubscriptionBroadcaster) addFilteredStopVisit(subId SubscriptionId, sv *model.StopVisit) {
    // ignore stopVist before the RecordedCallsDuration if any
    if connector.Partner().RecordedCallsDuration() != 0 &&
        sv.ReferenceTime().Before(connector.Clock().Now().Add(-connector.Partner().RecordedCallsDuration())) {
        return
    }

    connector.mutex.Lock()
    connector.toBroadcast[SubscriptionId(subId)] = append(connector.toBroadcast[SubscriptionId(subId)], sv.Id())
    connector.mutex.Unlock()
}

func (connector *SIRIEstimatedTimetableSubscriptionBroadcaster) addStopVisit(subId SubscriptionId, svId model.StopVisitId) {
    connector.mutex.Lock()
    defer connector.mutex.Unlock()

    connector.toBroadcast[SubscriptionId(subId)] = append(connector.toBroadcast[SubscriptionId(subId)], svId)
}

func (connector *SIRIEstimatedTimetableSubscriptionBroadcaster) checkStopAreaEvent(stopArea *model.StopArea) {
    obj, ok := stopArea.Code(connector.remoteCodeSpace)
    if !ok {
        return
    }

    connector.mutex.Lock()
    defer connector.mutex.Unlock()

    subs := connector.partner.Subscriptions().FindByResourceId(obj.String(), EstimatedTimetableBroadcast)
    for _, sub := range subs {
        resource := sub.Resource(obj)
        if resource == nil || resource.SubscribedUntil.Before(connector.Clock().Now()) {
            continue
        }

        lastState, ok := resource.LastState(string(stopArea.Id()))
        if ok {
            partners, ok := lastState.(*ls.StopAreaLastChange).Haschanged(stopArea)
            if ok {
                nm, ok := connector.notMonitored[sub.Id()]
                if !ok {
                    nm = make(map[string]struct{})
                    connector.notMonitored[sub.Id()] = nm
                }
                for _, partner := range partners {
                    nm[partner] = struct{}{}
                }
            }
            lastState.(*ls.StopAreaLastChange).UpdateState(stopArea)
        } else { // Should not happen
            resource.SetLastState(string(stopArea.Id()), ls.NewStopAreaLastChange(stopArea, sub))
        }
    }
}

// START TEST

type TestSIRIETTSubscriptionBroadcasterFactory struct{}

type TestETTSubscriptionBroadcaster struct {
    connector

    events []*model.StopMonitoringBroadcastEvent
}

func NewTestETTSubscriptionBroadcaster() *TestETTSubscriptionBroadcaster {
    connector := &TestETTSubscriptionBroadcaster{}
    return connector
}

func (connector *TestETTSubscriptionBroadcaster) HandleBroadcastEvent(event *model.StopMonitoringBroadcastEvent) {
    connector.events = append(connector.events, event)
}

func (factory *TestSIRIETTSubscriptionBroadcasterFactory) Validate(apiPartner *APIPartner) {} // Always valid

func (factory *TestSIRIETTSubscriptionBroadcasterFactory) CreateConnector(partner *Partner) Connector {
    return NewTestETTSubscriptionBroadcaster()
}