core/gtfs_rt_request_collector.go
package core
import (
"bitbucket.org/enroute-mobi/ara/audit"
"bitbucket.org/enroute-mobi/ara/gtfs"
"bitbucket.org/enroute-mobi/ara/logger"
"bitbucket.org/enroute-mobi/ara/model"
"bitbucket.org/enroute-mobi/ara/model/schedules"
"bitbucket.org/enroute-mobi/ara/remote"
"crypto/sha1"
"encoding/binary"
"encoding/json"
"fmt"
"time"
"golang.org/x/exp/maps"
)
type GtfsRequestCollectorFactory struct{}
func (factory *GtfsRequestCollectorFactory) Validate(apiPartner *APIPartner) {
apiPartner.ValidatePresenceOfRemoteCodeSpace()
apiPartner.ValidatePresenceOfLightRemoteCredentials()
}
func (factory *GtfsRequestCollectorFactory) CreateConnector(partner *Partner) Connector {
return NewGtfsRequestCollector(partner)
}
type GtfsRequestCollector struct {
connector
remoteCodeSpace string
origin string
ttl time.Duration
subscriber UpdateSubscriber
stop chan struct{}
}
func NewGtfsRequestCollector(partner *Partner) *GtfsRequestCollector {
connector := &GtfsRequestCollector{}
connector.partner = partner
manager := partner.Referential().CollectManager()
connector.subscriber = manager.BroadcastUpdateEvent
return connector
}
func (connector *GtfsRequestCollector) Start() {
logger.Log.Debugf("Start GtfsRequestCollector")
connector.ttl = connector.Partner().GtfsTTL()
connector.remoteCodeSpace = connector.Partner().RemoteCodeSpace()
connector.origin = string(connector.Partner().Slug())
connector.stop = make(chan struct{})
go connector.run()
}
func (connector *GtfsRequestCollector) run() {
c := connector.Clock().After(5 * time.Second)
for {
select {
case <-connector.stop:
logger.Log.Debugf("gtfs collector routine stop")
return
case <-c:
logger.Log.Debugf("gtfs collector routine routine")
connector.requestGtfs()
c = connector.Clock().After(connector.ttl)
}
}
}
func (connector *GtfsRequestCollector) Stop() {
if connector.stop != nil {
close(connector.stop)
}
}
func (connector *GtfsRequestCollector) requestGtfs() {
message := connector.newBQEvent()
defer audit.CurrentBigQuery(string(connector.Partner().Referential().Slug())).WriteEvent(message)
startTime := connector.Clock().Now()
feed, err := connector.Partner().HTTPClient().GTFSRequest()
message.ProcessingTime = connector.Clock().Since(startTime).Seconds()
if err != nil {
s := operationnalStatusFromError(err)
connector.Partner().GtfsStatus(s)
message.Status = "Error"
message.ErrorDetails = fmt.Sprintf("Error while making a GTFS Request: %v", err)
return
}
updateEvents := NewCollectUpdateEvents()
for _, entity := range feed.GetEntity() {
if entity.GetTripUpdate() != nil {
connector.handleTripUpdate(updateEvents, entity.GetTripUpdate())
} else if entity.GetVehicle() != nil {
connector.handleVehicle(updateEvents, entity.GetVehicle())
} else if entity.GetAlert() != nil {
connector.handleAlert(updateEvents, entity.GetAlert(), entity.GetId(), feed.GetHeader().GetTimestamp())
}
}
// logging
message.Lines = GetModelReferenceSlice(updateEvents.LineRefs)
message.StopAreas = GetModelReferenceSlice(updateEvents.MonitoringRefs)
// Broadcast all events
connector.broadcastUpdateEvents(updateEvents)
connector.Partner().GtfsStatus(OPERATIONNAL_STATUS_UP)
}
func (connector *GtfsRequestCollector) handleAlert(events *CollectUpdateEvents, a *gtfs.Alert, id string, timestamp uint64) {
entities := a.GetInformedEntity()
if len(entities) == 0 {
logger.Log.Debugf("%d affects for this Alert, skipping message", len(entities))
return
}
event := &model.SituationUpdateEvent{
RecordedAt: connector.Clock().Now(),
VersionedAt: time.Unix(int64(timestamp), 0),
Origin: string(connector.Partner().Slug()),
Progress: model.SituationProgressPublished,
}
// Affects
for _, entity := range entities {
affect, collectedRefs, err := model.AffectFromProto(entity,
connector.remoteCodeSpace,
connector.Partner().Model(),
)
if err != nil {
logger.Log.Debugf("cannot convert Proto entity: %v", err)
continue
}
maps.Copy(events.MonitoringRefs, collectedRefs.MonitoringRefs)
maps.Copy(events.LineRefs, collectedRefs.LineRefs)
event.Affects = append(event.Affects, affect)
}
if len(event.Affects) == 0 {
logger.Log.Debugf("%d affected line/stopArea found for this Alert, skipping message", len(event.Affects))
return
}
// Version
alert, err := json.Marshal(a)
if err != nil {
logger.Log.Debugf("Cannot Marshal gtfs Alert: %v", err)
return
}
hasher := sha1.New()
hasher.Write(alert)
data := binary.BigEndian.Uint64(hasher.Sum(nil))
version := int(data)
if version < 0 {
version = -version
}
event.Version = version
// Code
code := model.NewCode(connector.remoteCodeSpace, id)
event.SituationCode = code
// Internal tags
event.InternalTags = append(event.InternalTags, connector.Partner().CollectSituationsInternalTags()...)
// ValidityPeriods
var validityPeriods []*model.TimeRange
periods := a.GetActivePeriod()
for _, period := range periods {
var timePeriod model.TimeRange
if err := timePeriod.FromProto(period); err != nil {
logger.Log.Debugf("cannot convert Proto TimeRange: %v", err)
continue
}
validityPeriods = append(validityPeriods, &timePeriod)
}
event.ValidityPeriods = validityPeriods
// Summary
headerTexts := a.GetHeaderText().GetTranslation()
s := model.NewTranslatedStringFromProto(headerTexts)
event.Summary = s
// Description
descriptionTexts := a.GetDescriptionText().GetTranslation()
d := model.NewTranslatedStringFromProto(descriptionTexts)
event.Description = d
// AlertCause
var alertCause model.SituationAlertCause
if err := alertCause.FromProto(a.GetCause()); err != nil {
logger.Log.Debugf("error in alert cause: %v", err)
} else {
event.AlertCause = alertCause
}
// Severity
var severity model.SituationSeverity
if err := severity.FromProto(a.GetSeverityLevel()); err != nil {
logger.Log.Debugf("error in severity: %v", err)
} else {
event.Severity = severity
}
// Condition
var condition model.SituationCondition
if err := condition.FromProto(a.GetEffect()); err != nil {
logger.Log.Debugf("error in condition: %v", err)
} else {
consequence := &model.Consequence{
Condition: condition,
}
event.Consequences = append(event.Consequences, consequence)
}
events.Situations = append(events.Situations, event)
}
func (connector *GtfsRequestCollector) handleTripUpdate(events *CollectUpdateEvents, t *gtfs.TripUpdate) {
trip := t.GetTrip()
if trip == nil {
return
}
vjCode := connector.handleTrip(events, trip) // returns the vj code
for _, stu := range t.GetStopTimeUpdate() {
sid := stu.GetStopId()
svid := fmt.Sprintf("%v-%v", vjCode.Value(), connector.handleStopSequence(stu))
stopAreaCode := model.NewCode(connector.remoteCodeSpace, sid)
if sid != "" {
_, ok := events.StopAreas[sid]
if !ok {
// CollectedAlways is false by default
event := &model.StopAreaUpdateEvent{
Origin: connector.origin,
Code: stopAreaCode,
}
events.StopAreas[sid] = event
}
}
_, ok := events.StopVisits[sid][svid]
if !ok {
stopVisitCode := model.NewCode(connector.remoteCodeSpace, svid)
svEvent := &model.StopVisitUpdateEvent{
Origin: connector.origin,
Code: stopVisitCode,
StopAreaCode: stopAreaCode,
VehicleJourneyCode: vjCode,
PassageOrder: connector.handleStopSequence(stu),
Monitored: true,
RecordedAt: connector.Clock().Now(),
Schedules: schedules.NewStopVisitSchedules(),
}
svEvent.Schedules.SetSchedule(
schedules.Expected,
time.Unix(stu.GetDeparture().GetTime(), 0),
time.Unix(stu.GetArrival().GetTime(), 0))
if connector.hasSkippedScheduleRelationship(stu) {
svEvent.DepartureStatus = model.STOP_VISIT_DEPARTURE_CANCELLED
svEvent.ArrivalStatus = model.STOP_VISIT_ARRIVAL_CANCELLED
}
if events.StopVisits[sid] == nil {
events.StopVisits[sid] = make(map[string]*model.StopVisitUpdateEvent)
}
events.StopVisits[sid][svid] = svEvent
}
}
}
func (connector *GtfsRequestCollector) hasSkippedScheduleRelationship(stu *gtfs.TripUpdate_StopTimeUpdate) bool {
return stu.GetScheduleRelationship() == gtfs.TripUpdate_StopTimeUpdate_SKIPPED
}
func (connector *GtfsRequestCollector) handleStopSequence(st *gtfs.TripUpdate_StopTimeUpdate) int {
return int(st.GetStopSequence() + uint32(1))
}
func (connector *GtfsRequestCollector) handleVehicle(events *CollectUpdateEvents, v *gtfs.VehiclePosition) {
trip := v.GetTrip()
if trip == nil || v.GetVehicle() == nil {
return
}
occupancy := v.OccupancyStatus
vjCode := connector.handleTrip(events, trip, occupancy) // returns the vj code
vid := v.GetVehicle().GetId()
_, ok := events.Vehicles[vid]
if !ok {
vCode := model.NewCode(connector.remoteCodeSpace, vid)
p := v.GetPosition()
event := &model.VehicleUpdateEvent{
Code: vCode,
StopAreaCode: model.NewCode(connector.remoteCodeSpace, v.GetStopId()),
VehicleJourneyCode: vjCode,
Longitude: float64(p.GetLongitude()),
Latitude: float64(p.GetLatitude()),
Bearing: float64(p.GetBearing()),
Occupancy: occupancyName(occupancy),
}
events.Vehicles[vid] = event
}
}
// returns the vj code
func (connector *GtfsRequestCollector) handleTrip(events *CollectUpdateEvents, trip *gtfs.TripDescriptor, occupancy ...*gtfs.VehiclePosition_OccupancyStatus) model.Code {
rid := trip.GetRouteId()
tid := trip.GetTripId()
lineCode := model.NewCode(connector.remoteCodeSpace, rid)
vjCode := model.NewCode(connector.remoteCodeSpace, tid)
_, ok := events.Lines[rid]
if !ok {
// CollectedAlways is false by default
lineEvent := &model.LineUpdateEvent{
Origin: connector.origin,
Code: lineCode,
}
events.Lines[rid] = lineEvent
}
_, ok = events.VehicleJourneys[tid]
if !ok {
vjEvent := &model.VehicleJourneyUpdateEvent{
Origin: connector.origin,
Code: vjCode,
LineCode: lineCode,
Monitored: true,
}
if len(occupancy) != 0 {
vjEvent.Occupancy = occupancyName(occupancy[0])
}
events.VehicleJourneys[tid] = vjEvent
}
return vjCode
}
func (connector *GtfsRequestCollector) SetSubscriber(s UpdateSubscriber) {
connector.subscriber = s
}
func (connector *GtfsRequestCollector) broadcastUpdateEvents(events *CollectUpdateEvents) {
if connector.subscriber == nil {
return
}
for _, e := range events.StopAreas {
connector.subscriber(e)
}
for _, e := range events.Lines {
connector.subscriber(e)
}
for _, e := range events.VehicleJourneys {
connector.subscriber(e)
}
for _, es := range events.StopVisits { // Stopvisits are map[MonitoringRef]map[ItemIdentifier]event
for _, e := range es {
connector.subscriber(e)
}
}
for _, e := range events.Vehicles {
connector.subscriber(e)
}
for _, e := range events.Situations {
connector.subscriber(e)
}
}
func operationnalStatusFromError(err error) OperationnalStatus {
if _, ok := err.(remote.GtfsError); ok {
return OPERATIONNAL_STATUS_DOWN
}
return OPERATIONNAL_STATUS_UNKNOWN
}
func occupancyName(occupancy *gtfs.VehiclePosition_OccupancyStatus) string {
if occupancy == nil {
return model.Undefined
}
switch *occupancy {
case gtfs.VehiclePosition_NO_DATA_AVAILABLE:
return model.Undefined
case gtfs.VehiclePosition_EMPTY:
return model.Empty
case gtfs.VehiclePosition_MANY_SEATS_AVAILABLE:
return model.ManySeatsAvailable
case gtfs.VehiclePosition_FEW_SEATS_AVAILABLE:
return model.FewSeatsAvailable
case gtfs.VehiclePosition_STANDING_ROOM_ONLY:
return model.StandingRoomOnly
case gtfs.VehiclePosition_CRUSHED_STANDING_ROOM_ONLY:
return model.CrushedStandingRoomOnly
case gtfs.VehiclePosition_FULL:
return model.Full
case gtfs.VehiclePosition_NOT_ACCEPTING_PASSENGERS:
return model.NotAcceptingPassengers
// case gtfs.VehiclePosition_NOT_BOARDABLE:
// return model.Unknown
default:
return model.Unknown
}
}
func (connector *GtfsRequestCollector) newBQEvent() *audit.BigQueryMessage {
return &audit.BigQueryMessage{
Type: audit.GTFS_REQUEST,
Protocol: "gtfs",
Direction: "sent",
Partner: string(connector.Partner().Slug()),
Status: "OK",
}
}