core/siri_situation_exchange_request_broadcaster.go

Summary

Maintainability
F
4 days
Test Coverage
package core

import (
    "slices"
    "strings"

    "bitbucket.org/enroute-mobi/ara/audit"
    "bitbucket.org/enroute-mobi/ara/core/idgen"
    "bitbucket.org/enroute-mobi/ara/logger"
    "bitbucket.org/enroute-mobi/ara/model"
    "bitbucket.org/enroute-mobi/ara/siri/siri"
    "bitbucket.org/enroute-mobi/ara/siri/sxml"
    "bitbucket.org/enroute-mobi/ara/state"
)

type SituationExchangeRequestBroadcaster interface {
    Situations(*sxml.XMLGetSituationExchange, *audit.BigQueryMessage) *siri.SIRISituationExchangeResponse
}

type SIRISituationExchangeRequestBroadcaster struct {
    state.Startable

    connector
}

type SIRISituationExchangeRequestBroadcasterFactory struct{}

func NewSIRISituationExchangeRequestBroadcaster(partner *Partner) *SIRISituationExchangeRequestBroadcaster {
    connector := &SIRISituationExchangeRequestBroadcaster{}

    connector.partner = partner
    return connector
}

func (connector *SIRISituationExchangeRequestBroadcaster) Start() {
    connector.remoteCodeSpace = connector.partner.RemoteCodeSpace(SIRI_SITUATION_EXCHANGE_REQUEST_BROADCASTER)
}

func (connector *SIRISituationExchangeRequestBroadcaster) Situations(request *sxml.XMLGetSituationExchange, message *audit.BigQueryMessage) *siri.SIRISituationExchangeResponse {
    response := &siri.SIRISituationExchangeResponse{
        Address:                   connector.Partner().Address(),
        ProducerRef:               connector.Partner().ProducerRef(),
        ResponseMessageIdentifier: connector.Partner().NewResponseMessageIdentifier(),
    }

    delivery := &siri.SIRISituationExchangeDelivery{
        RequestMessageRef: request.MessageIdentifier(),
        Status:            true,
        ResponseTimestamp: connector.Clock().Now(),
        LineRefs:          make(map[string]struct{}),
        MonitoringRefs:    make(map[string]struct{}),
    }

    connector.getSituationExchangeDelivery(delivery, &request.XMLSituationExchangeRequest)

    message.Lines = GetModelReferenceSlice(delivery.LineRefs)
    message.StopAreas = GetModelReferenceSlice(delivery.MonitoringRefs)

    response.SIRISituationExchangeDelivery = *delivery
    return response
}

func (connector *SIRISituationExchangeRequestBroadcaster) getSituationExchangeDelivery(delivery *siri.SIRISituationExchangeDelivery, _ *sxml.XMLSituationExchangeRequest) {
    situations := connector.partner.Model().Situations().FindAll()
    for i := range situations {
        connector.buildSituation(delivery, situations[i])
    }
}

func (connector *SIRISituationExchangeRequestBroadcaster) buildSituation(delivery *siri.SIRISituationExchangeDelivery, situation model.Situation) {
    if !connector.canBroadcast(situation) {
        return
    }

    var situationNumber string
    code, present := situation.Code(connector.remoteCodeSpace)
    if present {
        situationNumber = code.Value()
    } else {
        code, present = situation.Code(model.Default)
        if !present {
            logger.Log.Debugf("Unknown Code for Situation %s", situation.Id())
            return
        }
        situationNumber = connector.partner.NewIdentifier(idgen.IdentifierAttributes{Type: "InfoMessage", Id: code.Value()})
    }

    ptSituationElement := &siri.SIRIPtSituationElement{
        SituationNumber:    situationNumber,
        CreationTime:       situation.RecordedAt,
        Version:            situation.Version,
        ValidityPeriods:    situation.ValidityPeriods,
        PublicationWindows: situation.PublicationWindows,
        Keywords:           strings.Join(situation.Keywords, " "),
        ReportType:         situation.ReportType,
        ParticipantRef:     situation.ParticipantRef,
        VersionedAtTime:    situation.VersionedAt,
        Progress:           situation.Progress,
        Reality:            situation.Reality,
        AlertCause:         situation.AlertCause,
        Severity:           situation.Severity,
    }

    if situation.Description != nil {
        d := &siri.SIRITranslatedString{
            Tag:              "Description",
            TranslatedString: *situation.Description,
        }

        ptSituationElement.Description = d
    }

    if situation.Summary != nil {
        s := &siri.SIRITranslatedString{
            Tag:              "Summary",
            TranslatedString: *situation.Summary,
        }

        ptSituationElement.Summary = s
    }

    connector.buildAffects(situation.Affects, &ptSituationElement.SIRIAffects, delivery)

    if ptSituationElement.AffectedLines != nil || ptSituationElement.AffectedStopPoints != nil || ptSituationElement.AffectedAllLines {
        ptSituationElement.HasAffects = true
    }

    for _, consequence := range situation.Consequences {
        c := &siri.Consequence{
            Periods:   consequence.Periods,
            Severity:  consequence.Severity,
            Condition: consequence.Condition,
        }

        connector.buildAffects(consequence.Affects, &c.SIRIAffects, delivery)

        if c.AffectedLines != nil || c.AffectedStopPoints != nil || c.AffectedAllLines {
            c.HasAffects = true
        }
        if consequence.Blocking != nil {
            c.Blocking = consequence.Blocking
        }

        ptSituationElement.Consequences = append(ptSituationElement.Consequences, c)
    }

    for _, publishToWebAction := range situation.PublishToWebActions {
        wa := &siri.PublishToWebAction{}

        wa.Incidents = publishToWebAction.Incidents
        wa.HomePage = publishToWebAction.HomePage

        wa.SocialNetworks = append(wa.SocialNetworks, publishToWebAction.SocialNetworks...)

        connector.buildActionCommon(publishToWebAction.ActionCommon, &wa.SIRIPublishActionCommon, delivery)
        ptSituationElement.PublishToWebActions = append(ptSituationElement.PublishToWebActions, wa)
    }

    for _, publishToMobileAction := range situation.PublishToMobileActions {
        ma := &siri.PublishToMobileAction{}

        ma.Incidents = publishToMobileAction.Incidents
        ma.HomePage = publishToMobileAction.HomePage

        connector.buildActionCommon(publishToMobileAction.ActionCommon, &ma.SIRIPublishActionCommon, delivery)
        ptSituationElement.PublishToMobileActions = append(ptSituationElement.PublishToMobileActions, ma)
    }

    for _, publishToDisplayAction := range situation.PublishToDisplayActions {
        da := &siri.PublishToDisplayAction{}

        da.OnBoard = publishToDisplayAction.OnBoard
        da.OnPlace = publishToDisplayAction.OnPlace

        connector.buildActionCommon(publishToDisplayAction.ActionCommon, &da.SIRIPublishActionCommon, delivery)
        ptSituationElement.PublishToDisplayActions = append(ptSituationElement.PublishToDisplayActions, da)
    }

    if len(ptSituationElement.PublishToWebActions) != 0 ||
        len(ptSituationElement.PublishToMobileActions) != 0 ||
        len(ptSituationElement.PublishToDisplayActions) != 0 {
        ptSituationElement.HasPublishingActions = true
    }

    delivery.Situations = append(delivery.Situations, ptSituationElement)
}

func (connector *SIRISituationExchangeRequestBroadcaster) buildActionCommon(actionCommon model.ActionCommon, siriActionCommon *siri.SIRIPublishActionCommon, delivery *siri.SIRISituationExchangeDelivery) {
    siriActionCommon.Name = actionCommon.Name
    siriActionCommon.ActionType = actionCommon.ActionType
    siriActionCommon.Value = actionCommon.Value
    siriActionCommon.ScopeType = actionCommon.ScopeType
    siriActionCommon.ActionStatus = actionCommon.ActionStatus
    siriActionCommon.PublicationWindows = actionCommon.PublicationWindows

    if actionCommon.Prompt != nil {
        p := &siri.SIRITranslatedString{
            Tag:              "Prompt",
            TranslatedString: *actionCommon.Prompt,
        }
        siriActionCommon.Prompt = p
    }

    if actionCommon.Description != nil {
        d := &siri.SIRITranslatedString{
            Tag:              "Description",
            TranslatedString: *actionCommon.Description,
        }
        siriActionCommon.Description = d
    }

    connector.buildAffects(actionCommon.Affects, &siriActionCommon.SIRIAffects, delivery)

    if siriActionCommon.AffectedLines != nil || siriActionCommon.AffectedStopPoints != nil || siriActionCommon.AffectedAllLines {
        siriActionCommon.HasAffects = true
    }

    if siriActionCommon.ScopeType != "" && siriActionCommon.HasAffects {
        siriActionCommon.HasPublishAtScope = true
    }
}

func (connector *SIRISituationExchangeRequestBroadcaster) buildAffects(affects model.Affects, siriAffects *siri.SIRIAffects, delivery *siri.SIRISituationExchangeDelivery) {
    for _, affect := range affects {
        switch affect.GetType() {
        case model.SituationTypeStopArea:
            affectedStopArea, ok := connector.buildAffectedStopArea(affect, delivery)
            if ok {
                siriAffects.AffectedStopPoints = append(siriAffects.AffectedStopPoints, affectedStopArea)
            }
        case model.SituationTypeLine:
            affectedLine, ok := connector.buildAffectedLine(affect, delivery)
            if ok {
                siriAffects.AffectedLines = append(siriAffects.AffectedLines, affectedLine)
            }
        case model.SituationTypeAllLines:
            siriAffects.AffectedAllLines = true
        }
    }
}

func (connector *SIRISituationExchangeRequestBroadcaster) buildAffectedStopArea(affect model.Affect, delivery *siri.SIRISituationExchangeDelivery) (*siri.AffectedStopPoint, bool) {
    affect, _ = affect.(*model.AffectedStopArea)

    affectedStopAreaRef, ok := connector.resolveStopAreaRef(model.StopAreaId(affect.GetId()))
    if !ok {
        logger.Log.Debugf("Unknown StopArea %s", affect.GetId())
        return nil, false
    }

    // Logging
    delivery.MonitoringRefs[affectedStopAreaRef] = struct{}{}

    affectedStopPoint := &siri.AffectedStopPoint{StopPointRef: affectedStopAreaRef}
    for _, lineId := range affect.(*model.AffectedStopArea).LineIds {
        line, ok := connector.partner.Model().Lines().Find(lineId)
        if !ok {
            logger.Log.Debugf("Unknown Line %s", affect.GetId())
            continue
        }
        lineCode, ok := line.ReferentOrSelfCode(connector.remoteCodeSpace)
        if !ok {
            logger.Log.Debugf("Unknown Line Code %s", connector.remoteCodeSpace)
            continue
        }
        affectedStopPoint.LineRefs = append(affectedStopPoint.LineRefs, lineCode.Value())
    }

    return affectedStopPoint, true
}

func (connector *SIRISituationExchangeRequestBroadcaster) buildAffectedLine(affect model.Affect, delivery *siri.SIRISituationExchangeDelivery) (*siri.AffectedLine, bool) {
    affect, _ = affect.(*model.AffectedLine)
    line, ok := connector.partner.Model().Lines().Find(model.LineId(affect.GetId()))
    if !ok {
        logger.Log.Debugf("Unknown Line %s", affect.GetId())
        return nil, false
    }
    lineCode, ok := line.ReferentOrSelfCode(connector.remoteCodeSpace)
    if !ok {
        logger.Log.Debugf("Unknown Line Code %s", connector.remoteCodeSpace)
        return nil, false
    }

    affectedLine := siri.AffectedLine{
        LineRef: lineCode.Value(),
    }
    delivery.LineRefs[lineCode.Value()] = struct{}{}

    for _, affectedDestination := range affect.(*model.AffectedLine).AffectedDestinations {
        affectedDestinationRef, ok := connector.resolveStopAreaRef(model.StopAreaId(affectedDestination.StopAreaId))
        if !ok {
            logger.Log.Debugf("Cannot find destination %s", affectedDestination.StopAreaId)
            continue
        }
        destination := &siri.SIRIAffectedDestination{
            StopPlaceRef: affectedDestinationRef,
        }
        delivery.MonitoringRefs[affectedDestinationRef] = struct{}{}
        affectedLine.Destinations = append(affectedLine.Destinations, *destination)
    }

    for _, affectedSection := range affect.(*model.AffectedLine).AffectedSections {
        firstStopRef, ok := connector.resolveStopAreaRef(model.StopAreaId(affectedSection.FirstStop))
        if !ok {
            logger.Log.Debugf("Cannot find firstStop  %s", affectedSection.FirstStop)
            continue
        }
        lastStopRef, ok := connector.resolveStopAreaRef(model.StopAreaId(affectedSection.LastStop))
        if !ok {
            logger.Log.Debugf("Cannot find lastStop  %s", affectedSection.LastStop)
            continue
        }
        section := &siri.SIRIAffectedSection{
            FirstStopPointRef: firstStopRef,
            LastStopPointRef:  lastStopRef,
        }
        delivery.MonitoringRefs[firstStopRef] = struct{}{}
        delivery.MonitoringRefs[lastStopRef] = struct{}{}
        affectedLine.Sections = append(affectedLine.Sections, *section)
    }

    for _, affectedRoute := range affect.(*model.AffectedLine).AffectedRoutes {
        route := &siri.SIRIAffectedRoute{
            RouteRef: affectedRoute.RouteRef,
        }

        for _, stopArea := range affectedRoute.StopAreaIds {
            stopAreaRef, ok := connector.resolveStopAreaRef(stopArea)
            if ok {
                route.StopPointRefs = append(route.StopPointRefs, stopAreaRef)
            }
        }
        affectedLine.Routes = append(affectedLine.Routes, *route)
    }

    return &affectedLine, true
}

func (connector *SIRISituationExchangeRequestBroadcaster) resolveStopAreaRef(stopAreaId model.StopAreaId) (string, bool) {
    stopArea, ok := connector.partner.Model().StopAreas().Find(stopAreaId)
    if !ok {
        return "", false
    }
    stopAreaCode, ok := stopArea.ReferentOrSelfCode(connector.remoteCodeSpace)
    if !ok {
        return "", false
    }
    return stopAreaCode.Value(), true
}

func (connector *SIRISituationExchangeRequestBroadcaster) canBroadcast(situation model.Situation) bool {
    if situation.Origin == string(connector.partner.Slug()) {
        return false
    }

    if !situation.GMValidUntil().IsZero() &&
        situation.GMValidUntil().Before(connector.Clock().Now()) {
        return false
    }

    tagsToBroadcast := connector.partner.BroadcastSituationsInternalTags()
    if len(tagsToBroadcast) != 0 {
        for _, tag := range situation.InternalTags {
            if slices.Contains(tagsToBroadcast, tag) {
                return true
            }
        }
        return false
    }

    return true
}

func (factory *SIRISituationExchangeRequestBroadcasterFactory) CreateConnector(partner *Partner) Connector {
    return NewSIRISituationExchangeRequestBroadcaster(partner)
}

func (factory *SIRISituationExchangeRequestBroadcasterFactory) Validate(apiPartner *APIPartner) {
    apiPartner.ValidatePresenceOfRemoteCodeSpace()
    apiPartner.ValidatePresenceOfLocalCredentials()
}