core/gtfs_rt_trip_updates_broadcaster.go
package core
import (
"fmt"
"sort"
"time"
"bitbucket.org/enroute-mobi/ara/cache"
"bitbucket.org/enroute-mobi/ara/gtfs"
"bitbucket.org/enroute-mobi/ara/logger"
"bitbucket.org/enroute-mobi/ara/model"
"bitbucket.org/enroute-mobi/ara/state"
)
const (
PAST_STOP_VISITS_MAX_TIME = -2 * time.Minute
)
type TripUpdatesBroadcaster struct {
state.Startable
connector
vjRemoteCodeSpaces []string
cache *cache.CachedItem
}
type TripUpdatesBroadcasterFactory struct{}
func (factory *TripUpdatesBroadcasterFactory) CreateConnector(partner *Partner) Connector {
return NewTripUpdatesBroadcaster(partner)
}
func (factory *TripUpdatesBroadcasterFactory) Validate(apiPartner *APIPartner) {
apiPartner.ValidatePresenceOfRemoteCodeSpace()
}
func NewTripUpdatesBroadcaster(partner *Partner) *TripUpdatesBroadcaster {
connector := &TripUpdatesBroadcaster{}
connector.partner = partner
return connector
}
func (connector *TripUpdatesBroadcaster) Start() {
connector.remoteCodeSpace = connector.partner.RemoteCodeSpace(GTFS_RT_TRIP_UPDATES_BROADCASTER)
connector.vjRemoteCodeSpaces = connector.partner.VehicleJourneyRemoteCodeSpaceWithFallback(GTFS_RT_TRIP_UPDATES_BROADCASTER)
connector.cache = cache.NewCachedItem("TripUpdates", connector.partner.CacheTimeout(GTFS_RT_TRIP_UPDATES_BROADCASTER), nil, func(...interface{}) (interface{}, error) { return connector.handleGtfs() })
}
func (connector *TripUpdatesBroadcaster) HandleGtfs(feed *gtfs.FeedMessage) {
entities, _ := connector.cache.Value()
feedEntities := entities.([]*gtfs.FeedEntity)
feed.Entity = append(feed.Entity, feedEntities...)
}
func (connector *TripUpdatesBroadcaster) handleGtfs() (entities []*gtfs.FeedEntity, err error) {
stopVisits := connector.partner.Model().StopVisits().FindAllAfter(connector.Clock().Now().Add(PAST_STOP_VISITS_MAX_TIME))
linesCode := make(map[model.VehicleJourneyId]model.Code)
feedEntities := make(map[model.VehicleJourneyId]*gtfs.FeedEntity)
for i := range stopVisits {
sa, ok := connector.partner.Model().StopAreas().Find(stopVisits[i].StopAreaId)
if !ok { // Should never happen
logger.Log.Debugf("Can't find StopArea %v of StopVisit %v", stopVisits[i].StopAreaId, stopVisits[i].Id())
continue
}
saId, ok := sa.Code(connector.remoteCodeSpace)
if !ok {
continue
}
feedEntity, ok := feedEntities[stopVisits[i].VehicleJourneyId]
// If we don't already have a tripUpdate with the VehicleJourney we create one
if !ok {
// Fetch all needed models and codes
vj, ok := connector.partner.Model().VehicleJourneys().Find(stopVisits[i].VehicleJourneyId)
if !ok {
continue
}
vjId, ok := vj.CodeWithFallback(connector.vjRemoteCodeSpaces)
if !ok {
continue
}
var routeId string
lineCode, ok := linesCode[vj.Id()]
if !ok {
l, ok := connector.partner.Model().Lines().Find(vj.LineId)
if !ok {
continue
}
lineCode, ok = l.Code(connector.remoteCodeSpace)
if !ok {
continue
}
linesCode[stopVisits[i].VehicleJourneyId] = lineCode
}
routeId = lineCode.Value()
tripId := vjId.Value()
// Fill the tripDescriptor
tripDescriptor := >fs.TripDescriptor{
TripId: &tripId,
RouteId: &routeId,
}
// Fill the FeedEntity
newId := fmt.Sprintf("trip:%v", vjId.Value())
feedEntity = >fs.FeedEntity{
Id: &newId,
TripUpdate: >fs.TripUpdate{Trip: tripDescriptor},
}
feedEntities[stopVisits[i].VehicleJourneyId] = feedEntity
}
stopId := saId.Value()
stopSequence := connector.gtfsStopSequence(stopVisits[i].PassageOrder)
arrival := >fs.TripUpdate_StopTimeEvent{}
departure := >fs.TripUpdate_StopTimeEvent{}
if a := stopVisits[i].ReferenceArrivalTime(); !a.IsZero() {
arrivalTime := int64(a.Unix())
arrival.Time = &arrivalTime
}
if d := stopVisits[i].ReferenceDepartureTime(); !d.IsZero() {
departureTime := int64(d.Unix())
departure.Time = &departureTime
}
stopTimeUpdate := >fs.TripUpdate_StopTimeUpdate{
StopSequence: &stopSequence,
StopId: &stopId,
Arrival: arrival,
Departure: departure,
}
if stopVisits[i].DepartureStatus == model.STOP_VISIT_DEPARTURE_CANCELLED {
skipped := gtfs.TripUpdate_StopTimeUpdate_SKIPPED
stopTimeUpdate.ScheduleRelationship = &skipped
}
feedEntity.TripUpdate.StopTimeUpdate = append(feedEntity.TripUpdate.StopTimeUpdate, stopTimeUpdate)
}
for _, entity := range feedEntities {
if len(entity.TripUpdate.StopTimeUpdate) == 0 {
continue
}
sort.Slice(entity.TripUpdate.StopTimeUpdate, func(i, j int) bool {
return *entity.TripUpdate.StopTimeUpdate[i].StopSequence < *entity.TripUpdate.StopTimeUpdate[j].StopSequence
})
// ARA-829
// if entity.TripUpdate.StopTimeUpdate[0].Departure.Time != nil {
// startTime := time.Unix(*entity.TripUpdate.StopTimeUpdate[0].Departure.Time, 0).Format("15:04:05")
// entity.TripUpdate.Trip.StartTime = &startTime
// }
entities = append(entities, entity)
}
return
}
func (connector *TripUpdatesBroadcaster) gtfsStopSequence(stopSequence int) uint32 {
return uint32(stopSequence - 1)
}