core/siri_vehicle_monitoring_subscription_broadcaster.go
package core
import (
"fmt"
"strings"
"sync"
"bitbucket.org/enroute-mobi/ara/audit"
"bitbucket.org/enroute-mobi/ara/core/ls"
"bitbucket.org/enroute-mobi/ara/logger"
"bitbucket.org/enroute-mobi/ara/model"
"bitbucket.org/enroute-mobi/ara/siri/siri"
"bitbucket.org/enroute-mobi/ara/siri/sxml"
)
type SIRIVehicleMonitoringSubscriptionBroadcaster struct {
connector
vjRemoteCodeSpaces []string
vehicleMonitoringBroadcaster VehicleMonitoringBroadcaster
toBroadcast map[SubscriptionId][]model.VehicleId
mutex *sync.Mutex //protect the map
}
type SIRIVehicleMonitoringSubscriptionBroadcasterFactory struct{}
func (factory *SIRIVehicleMonitoringSubscriptionBroadcasterFactory) CreateConnector(partner *Partner) Connector {
if _, ok := partner.Connector(SIRI_SUBSCRIPTION_REQUEST_DISPATCHER); !ok {
partner.CreateSubscriptionRequestDispatcher()
}
return newSIRIVehicleMonitoringSubscriptionBroadcaster(partner)
}
func (factory *SIRIVehicleMonitoringSubscriptionBroadcasterFactory) Validate(apiPartner *APIPartner) {
apiPartner.ValidatePresenceOfRemoteCodeSpace()
apiPartner.ValidatePresenceOfRemoteCredentials()
apiPartner.ValidatePresenceOfLocalCredentials()
}
func newSIRIVehicleMonitoringSubscriptionBroadcaster(partner *Partner) *SIRIVehicleMonitoringSubscriptionBroadcaster {
connector := &SIRIVehicleMonitoringSubscriptionBroadcaster{}
connector.remoteCodeSpace = partner.RemoteCodeSpace(SIRI_VEHICLE_MONITORING_SUBSCRIPTION_BROADCASTER)
connector.vjRemoteCodeSpaces = partner.VehicleJourneyRemoteCodeSpaceWithFallback(SIRI_VEHICLE_MONITORING_SUBSCRIPTION_BROADCASTER)
connector.partner = partner
connector.mutex = &sync.Mutex{}
connector.toBroadcast = make(map[SubscriptionId][]model.VehicleId)
connector.vehicleMonitoringBroadcaster = NewSIRIVehicleMonitoringBroadcaster(connector)
return connector
}
func (connector *SIRIVehicleMonitoringSubscriptionBroadcaster) HandleSubscriptionRequest(request *sxml.XMLSubscriptionRequest, message *audit.BigQueryMessage) (resps []siri.SIRIResponseStatus) {
var lineIds, subIds []string
for _, vm := range request.XMLSubscriptionVMEntries() {
rs := siri.SIRIResponseStatus{
RequestMessageRef: vm.MessageIdentifier(),
SubscriberRef: vm.SubscriberRef(),
SubscriptionRef: vm.SubscriptionIdentifier(),
ResponseTimestamp: connector.Clock().Now(),
}
// for logging
lineIds = append(lineIds, vm.Lines()...)
sub, ok := connector.Partner().Subscriptions().FindByExternalId(vm.SubscriptionIdentifier())
if ok {
if sub.Kind() != VehicleMonitoringBroadcast {
logger.Log.Debugf("VehicleMonitoring subscription request with a duplicated Id: %v", vm.SubscriptionIdentifier())
rs.ErrorType = "OtherError"
rs.ErrorNumber = 2
rs.ErrorText = fmt.Sprintf("[BAD_REQUEST] Subscription Id %v already exists", vm.SubscriptionIdentifier())
resps = append(resps, rs)
message.Status = "Error"
continue
}
sub.Delete()
}
resources, unknownLineIds := connector.checkLines(vm)
if len(unknownLineIds) != 0 {
logger.Log.Debugf("VehicleMonitoring subscription request Could not find line(s) with id : %v", strings.Join(unknownLineIds, ","))
rs.ErrorType = "InvalidDataReferencesError"
rs.ErrorText = fmt.Sprintf("Unknown Line(s) %v", strings.Join(unknownLineIds, ","))
resps = append(resps, rs)
message.Status = "Error"
continue
}
rs.Status = true
rs.ValidUntil = vm.InitialTerminationTime()
resps = append(resps, rs)
subIds = append(subIds, vm.SubscriptionIdentifier())
sub = connector.Partner().Subscriptions().New(VehicleMonitoringBroadcast)
sub.SubscriberRef = vm.SubscriberRef()
sub.SetExternalId(vm.SubscriptionIdentifier())
connector.fillOptions(sub, request)
for _, r := range resources {
line, ok := connector.Partner().Model().Lines().FindByCode(*r.Reference.Code)
if !ok {
continue
}
// Init Vehicles LastChange
connector.addLineVehicles(sub, r, line.Id())
sub.AddNewResource(r)
}
sub.Save()
}
message.Type = audit.VEHICLE_MONITORING_SUBSCRIPTION_REQUEST
message.SubscriptionIdentifiers = subIds
message.Lines = lineIds
return resps
}
func (connector *SIRIVehicleMonitoringSubscriptionBroadcaster) addLineVehicles(sub *Subscription, res *SubscribedResource, lineId model.LineId) {
vs := connector.partner.Model().Vehicles().FindByLineId(lineId)
for i := range vs {
// Init Vehicle LastChange
res.SetLastState(string(vs[i].Id()), ls.NewVehicleMonitoringLastChange(vs[i], sub))
connector.addVehicle(sub.Id(), vs[i].Id())
}
}
func (connector *SIRIVehicleMonitoringSubscriptionBroadcaster) checkLines(vm *sxml.XMLVehicleMonitoringSubscriptionRequestEntry) (resources []*SubscribedResource, lineIds []string) {
// check for subscription to all lines
if len(vm.Lines()) == 0 {
var lv []string
//find all lines corresponding to the remoteCodeSpace
for _, line := range connector.Partner().Model().Lines().FindAll() {
lineCode, ok := line.Code(connector.remoteCodeSpace)
if ok {
lv = append(lv, lineCode.Value())
continue
}
}
for _, lineValue := range lv {
lineCode := model.NewCode(connector.remoteCodeSpace, lineValue)
ref := model.Reference{
Code: &lineCode,
Type: "Line",
}
r := NewResource(ref)
r.Subscribed(connector.Clock().Now())
r.SubscribedUntil = vm.InitialTerminationTime()
resources = append(resources, r)
}
return resources, lineIds
}
for _, lineId := range vm.Lines() {
lineCode := model.NewCode(connector.remoteCodeSpace, lineId)
_, ok := connector.Partner().Model().Lines().FindByCode(lineCode)
if !ok {
lineIds = append(lineIds, lineId)
continue
}
ref := model.Reference{
Code: &lineCode,
Type: "Line",
}
r := NewResource(ref)
r.Subscribed(connector.Clock().Now())
r.SubscribedUntil = vm.InitialTerminationTime()
resources = append(resources, r)
}
return resources, lineIds
}
func (connector *SIRIVehicleMonitoringSubscriptionBroadcaster) Stop() {
connector.vehicleMonitoringBroadcaster.Stop()
}
func (connector *SIRIVehicleMonitoringSubscriptionBroadcaster) Start() {
connector.vehicleMonitoringBroadcaster.Start()
}
func (vmb *SIRIVehicleMonitoringSubscriptionBroadcaster) fillOptions(s *Subscription, request *sxml.XMLSubscriptionRequest) {
changeBeforeUpdates := request.ChangeBeforeUpdates()
if changeBeforeUpdates == "" {
changeBeforeUpdates = "PT1M"
}
s.SetSubscriptionOption("ChangeBeforeUpdates", changeBeforeUpdates)
s.SetSubscriptionOption("MessageIdentifier", request.MessageIdentifier())
}
func (connector *SIRIVehicleMonitoringSubscriptionBroadcaster) HandleBroadcastEvent(event *model.VehicleBroadcastEvent) {
switch event.ModelType {
case "Vehicle":
connector.checkEvent(model.VehicleId(event.ModelId))
default:
return
}
}
func (connector *SIRIVehicleMonitoringSubscriptionBroadcaster) checkEvent(vId model.VehicleId) {
v, ok := connector.Partner().Model().Vehicles().Find(vId)
if !ok {
return
}
vj, ok := connector.Partner().Model().VehicleJourneys().Find(v.VehicleJourneyId)
if !ok {
return
}
line, ok := connector.Partner().Model().Lines().Find(vj.LineId)
if !ok {
return
}
lineObj, ok := line.Code(connector.remoteCodeSpace)
if !ok {
return
}
subs := connector.Partner().Subscriptions().FindByResourceId(lineObj.String(), VehicleMonitoringBroadcast)
for _, sub := range subs {
r := sub.Resource(lineObj)
if r == nil || r.SubscribedUntil.Before(connector.Clock().Now()) {
continue
}
lastState, ok := r.LastState(string(vId))
if ok && !lastState.(*ls.VehicleMonitoringLastChange).HasChanged(v) {
continue
}
if !ok {
r.SetLastState(string(v.Id()), ls.NewVehicleMonitoringLastChange(v, sub))
}
connector.addVehicle(sub.Id(), v.Id())
}
}
func (connector *SIRIVehicleMonitoringSubscriptionBroadcaster) addVehicle(subId SubscriptionId, vId model.VehicleId) {
connector.mutex.Lock()
defer connector.mutex.Unlock()
connector.toBroadcast[SubscriptionId(subId)] = append(connector.toBroadcast[SubscriptionId(subId)], vId)
}
// START TEST
type TestSIRIVMSubscriptionBroadcasterFactory struct{}
type TestVMSubscriptionBroadcaster struct {
connector
events []*model.VehicleBroadcastEvent
}
func NewTestVMSubscriptionBroadcaster() *TestVMSubscriptionBroadcaster {
connector := &TestVMSubscriptionBroadcaster{}
return connector
}
func (connector *TestVMSubscriptionBroadcaster) HandleBroadcastEvent(event *model.VehicleBroadcastEvent) {
connector.events = append(connector.events, event)
}
func (factory *TestSIRIVMSubscriptionBroadcasterFactory) Validate(apiPartner *APIPartner) {} // Always valid
func (factory *TestSIRIVMSubscriptionBroadcasterFactory) CreateConnector(partner *Partner) Connector {
return NewTestVMSubscriptionBroadcaster()
}