core/partner.go
package core
import (
"encoding/json"
"errors"
"fmt"
"math"
"net/http"
"sort"
"sync"
"time"
"bitbucket.org/enroute-mobi/ara/cache"
e "bitbucket.org/enroute-mobi/ara/core/apierrs"
s "bitbucket.org/enroute-mobi/ara/core/settings"
"bitbucket.org/enroute-mobi/ara/logger"
"bitbucket.org/enroute-mobi/ara/model"
"bitbucket.org/enroute-mobi/ara/remote"
"bitbucket.org/enroute-mobi/ara/state"
"bitbucket.org/enroute-mobi/ara/uuid"
"golang.org/x/time/rate"
)
type OperationnalStatus string
const (
OPERATIONNAL_STATUS_UNKNOWN OperationnalStatus = "unknown"
OPERATIONNAL_STATUS_UP OperationnalStatus = "up"
OPERATIONNAL_STATUS_DOWN OperationnalStatus = "down"
)
type PartnerId string
type PartnerSlug string
type Partners interface {
uuid.UUIDInterface
state.Startable
state.Stopable
New(PartnerSlug) *Partner
Find(PartnerId) *Partner
FindBySlug(PartnerSlug) (*Partner, bool)
FindByCredential(string) (*Partner, bool)
FindAllByCollectPriority() []*Partner
FindAllWithConnector([]string) []*Partner
FindAll() []*Partner
Save(partner *Partner) bool
Delete(partner *Partner) bool
Model() model.Model
Referential() *Referential
UniqCredentials(PartnerId, string) bool
IsEmpty() bool
CancelSubscriptions()
Load() error
SaveToDatabase() (int, error)
}
type PartnerStatus struct {
OperationnalStatus OperationnalStatus
RetryCount int
ServiceStartedAt time.Time
}
type PartnerStatusCheck struct {
LastCheck time.Time
Status OperationnalStatus
}
type Partner struct {
uuid.UUIDConsumer
*s.PartnerSettings
mutex *sync.RWMutex
manager Partners
subscriptionManager Subscriptions
id PartnerId
slug PartnerSlug
Name string `json:",omitempty"`
PartnerStatus PartnerStatus
ConnectorTypes []string
connectors map[string]Connector
discoveredStopAreas map[string]struct{}
discoveredLines map[string]struct{}
startedAt time.Time
lastDiscovery time.Time
alternativeStatusCheck PartnerStatusCheck
httpClient *remote.HTTPClient
gtfsCache *cache.CacheTable
limiters map[string]*rate.Limiter
}
type ByPriority []*Partner
func (a ByPriority) Len() int { return len(a) }
func (a ByPriority) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByPriority) Less(i, j int) bool {
return a[i].CollectPriority() > a[j].CollectPriority()
}
type PartnerManager struct {
uuid.UUIDConsumer
mutex *sync.RWMutex
byId map[PartnerId]*Partner
localCredentialsIndex *LocalCredentialsIndex
guardian *PartnersGuardian
referential *Referential
}
// Test Method
func NewPartner() *Partner {
partner := &Partner{
mutex: &sync.RWMutex{},
ConnectorTypes: []string{},
connectors: make(map[string]Connector),
discoveredStopAreas: make(map[string]struct{}),
discoveredLines: make(map[string]struct{}),
PartnerStatus: PartnerStatus{
OperationnalStatus: OPERATIONNAL_STATUS_UNKNOWN,
},
gtfsCache: cache.NewCacheTable(),
limiters: make(map[string]*rate.Limiter),
}
partner.PartnerSettings = s.NewEmptyPartnerSettings(partner.UUIDGenerator)
partner.subscriptionManager = NewMemorySubscriptions(partner)
return partner
}
func (partner *Partner) Referential() *Referential {
return partner.manager.Referential()
}
func (partner *Partner) Subscriptions() Subscriptions {
return partner.subscriptionManager
}
func (partner *Partner) StartedAt() time.Time {
return partner.startedAt
}
func (partner *Partner) Id() PartnerId {
return partner.id
}
func (partner *Partner) Slug() PartnerSlug {
return partner.slug
}
func (partner *Partner) SetSlug(s PartnerSlug) {
partner.slug = s
}
func (partner *Partner) GtfsCache() *cache.CacheTable {
return partner.gtfsCache
}
func (partner *Partner) HTTPClient() *remote.HTTPClient {
opts := partner.HTTPClientOptions()
if partner.httpClient == nil {
logger.Log.Debugf("Create a new http client in partner %s to %s", partner.Name, opts.Urls.Url)
partner.httpClient = remote.NewHTTPClient(opts)
} else if partner.httpClient.HTTPClientUrls != opts.Urls {
partner.httpClient.SetURLs(opts.Urls)
}
return partner.httpClient
}
func (partner *Partner) SIRIClient() *remote.SIRIClient {
return partner.HTTPClient().SIRIClient()
}
func (partner *Partner) SIRILiteClient() *remote.SIRILiteClient {
return partner.HTTPClient().SIRILiteClient()
}
func (partner *Partner) OperationnalStatus() OperationnalStatus {
return partner.PartnerStatus.OperationnalStatus
}
func (partner *Partner) Save() (ok bool) {
return partner.manager.Save(partner)
}
func (partner *Partner) MarshalJSON() ([]byte, error) {
type Alias Partner
return json.Marshal(&struct {
Id PartnerId
Slug PartnerSlug
*Alias
Settings map[string]string
}{
Id: partner.id,
Slug: partner.slug,
Settings: partner.SettingsDefinition(),
Alias: (*Alias)(partner),
})
}
func (partner *Partner) Definition() *APIPartner {
return &APIPartner{
Id: partner.id,
Slug: partner.slug,
Name: partner.Name,
Settings: partner.SettingsDefinition(),
ConnectorTypes: partner.ConnectorTypes,
factories: make(map[string]ConnectorFactory),
Errors: e.NewErrors(),
manager: partner.manager,
}
}
func (partner *Partner) Stop() {
for _, connector := range partner.connectors {
c, ok := connector.(state.Stopable)
if ok {
c.Stop()
}
}
partner.CancelSubscriptions()
partner.gtfsCache.Clear()
partner.httpClient = nil
partner.limiters = make(map[string]*rate.Limiter)
}
func (partner *Partner) Start() {
partner.startedAt = partner.manager.Referential().Clock().Now()
partner.lastDiscovery = time.Time{}
partner.alternativeStatusCheck.LastCheck = time.Time{}
partner.alternativeStatusCheck.Status = OPERATIONNAL_STATUS_UNKNOWN
for _, connector := range partner.connectors {
c, ok := connector.(state.Startable)
if ok {
c.Start()
}
}
to := partner.GtfsCacheTimeout()
partner.gtfsCache.Add("trip-updates", to, nil)
partner.gtfsCache.Add("vehicle-positions", to, nil)
partner.gtfsCache.Add("service-alerts", to, nil)
partner.gtfsCache.Add("trip-updates,vehicle-position,service-alerts", to, nil)
}
func (partner *Partner) Allow(ip string) bool {
limit := partner.RateLimit()
if limit == 0 {
return true
}
partner.mutex.RLock()
rl, ok := partner.limiters[ip]
partner.mutex.RUnlock()
if !ok {
logger.Log.Debugf("Create a new limiter for ip %v", ip)
tick := time.Duration((1.0 / limit) * float64(time.Minute))
burst := int(math.Min(limit, 10))
limiter := rate.NewLimiter(rate.Every(tick), burst)
partner.mutex.Lock()
partner.limiters[ip] = limiter
partner.mutex.Unlock()
rl = limiter
}
return rl.Allow()
}
func (partner *Partner) CanCollect(stopId string, lineIds map[string]struct{}) bool {
canCollect := partner.CanCollectStop(stopId)
if canCollect == s.COLLECT_UNKNOWN {
canCollect = partner.CanCollectLines(lineIds)
}
return canCollect == s.COLLECT_UNKNOWN || canCollect == s.CAN_COLLECT
}
func (partner *Partner) CanCollectStop(stopId string) s.CollectStatus {
canCollect := partner.CollectSettings().CanCollectStop(stopId)
if canCollect != s.COLLECT_UNKNOWN {
return canCollect
}
if !partner.CollectSettings().UseDiscoveredSA {
return s.COLLECT_UNKNOWN
}
if partner.checkDiscoveredStopArea(stopId) {
return s.CAN_COLLECT
}
return s.CANNOT_COLLECT
}
func (partner *Partner) checkDiscoveredStopArea(stopId string) (ok bool) {
partner.mutex.RLock()
defer partner.mutex.RUnlock()
_, ok = partner.discoveredStopAreas[stopId]
return
}
func (partner *Partner) CanCollectLines(lineIds map[string]struct{}) s.CollectStatus {
if len(lineIds) == 0 {
return s.COLLECT_UNKNOWN
}
canCollect := partner.CollectSettings().CanCollectLines(lineIds)
if canCollect != s.COLLECT_UNKNOWN {
return canCollect
}
if !partner.CollectSettings().UseDiscoveredLines {
return s.COLLECT_UNKNOWN
}
if partner.checkDiscoveredLines(lineIds) {
return s.CAN_COLLECT
}
return s.CANNOT_COLLECT
}
func (partner *Partner) CanCollectLine(lineId string) bool { // Used for vehicle collect
canCollect := partner.CollectSettings().CanCollectLine(lineId)
if canCollect != s.COLLECT_UNKNOWN {
return canCollect == s.CAN_COLLECT
}
if partner.CollectSettings().UseDiscoveredLines {
return partner.checkDiscoveredLine(lineId)
}
return true
}
func (partner *Partner) checkDiscoveredLines(lineIds map[string]struct{}) (ok bool) {
partner.mutex.RLock()
defer partner.mutex.RUnlock()
for l := range lineIds {
_, ok = partner.discoveredLines[l]
if ok {
return
}
}
return false
}
func (partner *Partner) checkDiscoveredLine(lineId string) (ok bool) {
partner.mutex.RLock()
defer partner.mutex.RUnlock()
_, ok = partner.discoveredLines[lineId]
return
}
// APIPartner.Validate should be called for APIPartner factories to be set
func (partner *Partner) SetDefinition(apiPartner *APIPartner) {
partner.id = apiPartner.Id
partner.slug = apiPartner.Slug
partner.Name = apiPartner.Name
partner.PartnerSettings = s.NewPartnerSettings(partner.UUIDGenerator, apiPartner.Settings)
partner.ConnectorTypes = apiPartner.ConnectorTypes
partner.PartnerStatus.OperationnalStatus = OPERATIONNAL_STATUS_UNKNOWN
for id, factory := range apiPartner.factories {
if _, ok := partner.connectors[id]; !ok {
logger.Log.Debugf("Create connector %v for partner %v", id, partner.slug)
partner.connectors[id] = factory.CreateConnector(partner)
}
}
partner.cleanConnectors()
}
// Test method, refresh Connector instances according to connector type list without validation
func (partner *Partner) RefreshConnectors() {
logger.Log.Debugf("Initialize Connectors %#v for %s", partner.ConnectorTypes, partner.slug)
for _, connectorType := range partner.ConnectorTypes {
if _, ok := partner.connectors[connectorType]; !ok {
factory := NewConnectorFactory(connectorType)
if factory == nil {
continue
}
partner.connectors[connectorType] = factory.CreateConnector(partner)
}
}
partner.cleanConnectors()
}
// Delete from partner.Connectors connectors not in partner.ConnectorTypes
func (partner *Partner) cleanConnectors() {
sort.Strings(partner.ConnectorTypes)
for connectorType := range partner.connectors {
if connectorType == SIRI_SUBSCRIPTION_REQUEST_DISPATCHER && partner.hasSubscribers() {
continue
}
found := sort.SearchStrings(partner.ConnectorTypes, connectorType)
if found == len(partner.ConnectorTypes) || partner.ConnectorTypes[found] != connectorType {
delete(partner.connectors, connectorType)
}
}
}
func (partner *Partner) hasSubscribers() bool {
_, ok := partner.connectors[SIRI_STOP_MONITORING_SUBSCRIPTION_BROADCASTER]
if ok {
return true
}
_, ok = partner.connectors[SIRI_GENERAL_MESSAGE_SUBSCRIPTION_BROADCASTER]
if ok {
return true
}
_, ok = partner.connectors[SIRI_PRODUCTION_TIMETABLE_SUBSCRIPTION_BROADCASTER]
if ok {
return true
}
_, ok = partner.connectors[SIRI_ESTIMATED_TIMETABLE_SUBSCRIPTION_BROADCASTER]
if ok {
return true
}
_, ok = partner.connectors[SIRI_PRODUCTION_TIMETABLE_SUBSCRIPTION_BROADCASTER]
if ok {
return true
}
_, ok = partner.connectors[SIRI_VEHICLE_MONITORING_SUBSCRIPTION_BROADCASTER]
if ok {
return true
}
_, ok = partner.connectors[SIRI_SITUATION_EXCHANGE_SUBSCRIPTION_BROADCASTER]
if ok {
return true
}
return ok
}
func (partner *Partner) Connector(connectorType string) (Connector, bool) {
connector, ok := partner.connectors[connectorType]
return connector, ok
}
func (partner *Partner) HaveAtLeastOneConnector(connectorTypes []string) bool {
for i := range connectorTypes {
if _, present := partner.connectors[connectorTypes[i]]; present {
return true
}
}
return false
}
func (partner *Partner) CreateSubscriptionRequestDispatcher() {
partner.connectors[SIRI_SUBSCRIPTION_REQUEST_DISPATCHER] = NewSIRISubscriptionRequestDispatcher(partner)
}
func (partner *Partner) CheckStatusClient() (csc CheckStatusClient) {
c, ok := partner.connectors[SIRI_CHECK_STATUS_CLIENT_TYPE]
if !ok {
c = partner.connectors[TEST_CHECK_STATUS_CLIENT_TYPE]
}
csc, _ = c.(CheckStatusClient)
return
}
func (partner *Partner) GtfsConnectors() (connectors []GtfsConnector, ok bool) {
c, ok1 := partner.connectors[GTFS_RT_TRIP_UPDATES_BROADCASTER]
if ok1 {
connectors = append(connectors, c.(GtfsConnector))
}
c, ok2 := partner.connectors[GTFS_RT_VEHICLE_POSITIONS_BROADCASTER]
if ok2 {
connectors = append(connectors, c.(GtfsConnector))
}
c, ok3 := partner.connectors[GTFS_RT_SERVICE_ALERTS_BROADCASTER]
if ok3 {
connectors = append(connectors, c.(GtfsConnector))
}
ok = ok1 || ok2 || ok3
return
}
func (partner *Partner) SituationExchangeRequestCollector() SituationExchangeRequestCollector {
client, ok := partner.connectors[SIRI_SITUATION_EXCHANGE_REQUEST_COLLECTOR]
if ok {
return client.(SituationExchangeRequestCollector)
}
client, ok = partner.connectors[SIRI_GENERAL_MESSAGE_REQUEST_COLLECTOR]
if ok {
return client.(SituationExchangeRequestCollector)
}
return nil
}
func (partner *Partner) SituationExchangeSubscriptionCollector() SituationExchangeSubscriptionCollector {
client, ok := partner.connectors[SIRI_SITUATION_EXCHANGE_SUBSCRIPTION_COLLECTOR]
if ok {
return client.(SituationExchangeSubscriptionCollector)
}
return nil
}
func (partner *Partner) GeneralMessageSubscriptionCollector() GeneralMessageSubscriptionCollector {
client, ok := partner.connectors[SIRI_GENERAL_MESSAGE_SUBSCRIPTION_COLLECTOR]
if ok {
return client.(GeneralMessageSubscriptionCollector)
}
return nil
}
func (partner *Partner) StopMonitoringRequestCollector() (smrc StopMonitoringRequestCollector) {
c, ok := partner.connectors[SIRI_STOP_MONITORING_REQUEST_COLLECTOR]
if !ok {
c = partner.connectors[TEST_STOP_MONITORING_REQUEST_COLLECTOR]
}
smrc, _ = c.(StopMonitoringRequestCollector)
return
}
func (partner *Partner) LiteStopMonitoringRequestCollector() (lsmrc LiteStopMonitoringRequestCollector) {
c, ok := partner.connectors[SIRI_LITE_STOP_MONITORING_REQUEST_COLLECTOR]
if ok {
return c.(LiteStopMonitoringRequestCollector)
}
return nil
}
func (partner *Partner) StopMonitoringSubscriptionCollector() StopMonitoringSubscriptionCollector {
client, ok := partner.connectors[SIRI_STOP_MONITORING_SUBSCRIPTION_COLLECTOR]
if ok {
return client.(StopMonitoringSubscriptionCollector)
}
return nil
}
// func (partner *Partner) EstimatedTimetableRequestCollector() (smrc EstimatedTimetableRequestCollector) {
// client, ok := partner.connectors[SIRI_ESTIMATED_TIMETABLE_SUBSCRIPTION_COLLECTOR]
// if ok {
// return client.(EstimatedTimetableRequestCollector)
// }
// return nil
// }
func (partner *Partner) EstimatedTimetableSubscriptionCollector() EstimatedTimetableSubscriptionCollector {
client, ok := partner.connectors[SIRI_ESTIMATED_TIMETABLE_SUBSCRIPTION_COLLECTOR]
if ok {
return client.(EstimatedTimetableSubscriptionCollector)
}
return nil
}
func (partner *Partner) VehicleMonitoringRequestCollector() VehicleMonitoringRequestCollector {
client, ok := partner.connectors[SIRI_VEHICLE_MONITORING_REQUEST_COLLECTOR]
if ok {
return client.(VehicleMonitoringRequestCollector)
}
return nil
}
func (partner *Partner) VehicleMonitoringSubscriptionCollector() VehicleMonitoringSubscriptionCollector {
client, ok := partner.connectors[SIRI_VEHICLE_MONITORING_SUBSCRIPTION_COLLECTOR]
if ok {
return client.(VehicleMonitoringSubscriptionCollector)
}
return nil
}
func (partner *Partner) hasPushCollector() (ok bool) {
_, ok = partner.connectors[PUSH_COLLECTOR]
return ok
}
func (partner *Partner) hasGtfsCollector() (ok bool) {
_, ok = partner.connectors[GTFS_RT_REQUEST_COLLECTOR]
return ok
}
func (partner *Partner) alternativeStatusConnector() string {
if partner.hasPushCollector() {
return "push"
}
if partner.hasGtfsCollector() {
return "gtfs"
}
return "none"
}
func (partner *Partner) CheckStatus() (PartnerStatus, error) {
logger.Log.Debugf("Check '%s' partner status", partner.slug)
partnerStatus := PartnerStatus{}
if partner.CheckStatusClient() == nil {
switch partner.alternativeStatusConnector() {
case "push":
return partner.checkPushStatus()
case "gtfs":
return partner.checkGtfsStatus()
default:
logger.Log.Debugf("Can't define Status for partner %v", partner.slug)
partnerStatus.OperationnalStatus = OPERATIONNAL_STATUS_UNKNOWN
return partnerStatus, errors.New("no way to define status")
}
}
partnerStatus, err := partner.CheckStatusClient().Status()
if err != nil {
logger.Log.Printf("Error while checking %s partner status: %v", partner.Slug(), err)
}
logger.Log.Debugf("Partner %v status is %v", partner.slug, partnerStatus.OperationnalStatus)
return partnerStatus, nil
}
func (partner *Partner) checkPushStatus() (partnerStatus PartnerStatus, _ error) {
logger.Log.Debugf("Checking %v partner status with PushNotifications", partner.slug)
if partner.alternativeStatusCheck.LastCheck.Before(partner.manager.Referential().Clock().Now().Add(-5 * time.Minute)) {
partnerStatus.OperationnalStatus = OPERATIONNAL_STATUS_DOWN
} else {
partnerStatus.OperationnalStatus = OPERATIONNAL_STATUS_UP
}
logger.Log.Debugf("Partner %v status is %v", partner.slug, partnerStatus.OperationnalStatus)
return partnerStatus, nil
}
func (partner *Partner) checkGtfsStatus() (partnerStatus PartnerStatus, _ error) {
logger.Log.Debugf("Checking %v partner status with Gtfs collect", partner.slug)
if partner.alternativeStatusCheck.LastCheck.Before(partner.manager.Referential().Clock().Now().Add(-5 * time.Minute)) {
partnerStatus.OperationnalStatus = OPERATIONNAL_STATUS_DOWN
} else {
partnerStatus.OperationnalStatus = partner.alternativeStatusCheck.Status
}
logger.Log.Debugf("Partner %v status is %v", partner.slug, partnerStatus.OperationnalStatus)
return partnerStatus, nil
}
func (partner *Partner) Model() model.Model {
return partner.manager.Model()
}
func (partner *Partner) CancelSubscriptions() {
partner.subscriptionManager.CancelSubscriptions()
}
func (partner *Partner) CancelBroadcastSubscriptions() {
partner.subscriptionManager.CancelBroadcastSubscriptions()
}
func (partner *Partner) LastDiscovery() time.Time {
return partner.lastDiscovery
}
func (partner *Partner) Discover() {
partner.lastDiscovery = partner.manager.Referential().Clock().Now()
partner.lineDiscovery()
partner.stopDiscovery()
}
func (partner *Partner) stopDiscovery() {
logger.Log.Debugf("StopDiscovery for partner '%s'", partner.slug)
c, ok := partner.connectors[SIRI_STOP_POINTS_DISCOVERY_REQUEST_COLLECTOR]
if !ok {
logger.Log.Debugf("No SiriStopPointsDiscoveryRequestCollector found for partner '%s'", partner.slug)
return
}
c.(StopPointsDiscoveryRequestCollector).RequestStopPoints()
}
func (partner *Partner) lineDiscovery() {
logger.Log.Debugf("LineDiscovery for partner '%s'", partner.slug)
c, ok := partner.connectors[SIRI_LINES_DISCOVERY_REQUEST_COLLECTOR]
if !ok {
logger.Log.Debugf("No SiriLinesDiscoveryRequestCollector found for partner '%s'", partner.slug)
return
}
c.(LinesDiscoveryRequestCollector).RequestLines()
}
func (partner *Partner) Pushed() {
partner.alternativeStatusCheck.LastCheck = partner.manager.Referential().Clock().Now()
}
func (partner *Partner) GtfsStatus(s OperationnalStatus) {
partner.alternativeStatusCheck.LastCheck = partner.manager.Referential().Clock().Now()
partner.alternativeStatusCheck.Status = s
}
func (partner *Partner) RegisterDiscoveredStopAreas(stops []string) {
if !partner.CollectSettings().UseDiscoveredSA {
return
}
partner.mutex.Lock()
for i := range stops {
partner.discoveredStopAreas[stops[i]] = struct{}{}
}
partner.mutex.Unlock()
}
func (partner *Partner) RegisterDiscoveredLines(lines []string) {
if !partner.CollectSettings().UseDiscoveredLines {
return
}
partner.mutex.Lock()
for i := range lines {
partner.discoveredLines[lines[i]] = struct{}{}
}
partner.mutex.Unlock()
}
func NewPartnerManager(referential *Referential) *PartnerManager {
manager := &PartnerManager{
mutex: &sync.RWMutex{},
byId: make(map[PartnerId]*Partner),
localCredentialsIndex: NewLocalCredentialsIndex(),
referential: referential,
}
manager.guardian = NewPartnersGuardian(referential)
return manager
}
func (manager *PartnerManager) Guardian() *PartnersGuardian {
return manager.guardian
}
func (manager *PartnerManager) UniqCredentials(modelId PartnerId, localCredentials string) bool {
return manager.localCredentialsIndex.UniqCredentials(modelId, localCredentials)
}
func (manager *PartnerManager) Start() {
manager.guardian.Start()
for _, partner := range manager.byId {
partner.Start()
}
}
func (manager *PartnerManager) Stop() {
manager.guardian.Stop()
for _, partner := range manager.byId {
partner.Stop()
}
}
func (manager *PartnerManager) New(slug PartnerSlug) *Partner {
partner := &Partner{
mutex: &sync.RWMutex{},
slug: slug,
manager: manager,
connectors: make(map[string]Connector),
discoveredStopAreas: make(map[string]struct{}),
discoveredLines: make(map[string]struct{}),
PartnerStatus: PartnerStatus{
OperationnalStatus: OPERATIONNAL_STATUS_UNKNOWN,
},
ConnectorTypes: []string{},
gtfsCache: cache.NewCacheTable(),
limiters: make(map[string]*rate.Limiter),
}
partner.SetUUIDGenerator(manager.UUIDGenerator())
partner.PartnerSettings = s.NewEmptyPartnerSettings(partner.UUIDGenerator)
partner.subscriptionManager = NewMemorySubscriptions(partner)
return partner
}
func (manager *PartnerManager) MarshalJSON() ([]byte, error) {
partnersId := []PartnerId{}
for id := range manager.byId {
partnersId = append(partnersId, id)
}
return json.Marshal(partnersId)
}
func (manager *PartnerManager) Find(id PartnerId) *Partner {
manager.mutex.RLock()
partner := manager.byId[id]
manager.mutex.RUnlock()
return partner
}
func (manager *PartnerManager) FindBySlug(slug PartnerSlug) (*Partner, bool) {
manager.mutex.RLock()
for _, partner := range manager.byId {
if partner.slug == slug {
manager.mutex.RUnlock()
return partner, true
}
}
manager.mutex.RUnlock()
return nil, false
}
func (manager *PartnerManager) FindByCredential(c string) (*Partner, bool) {
manager.mutex.RLock()
defer manager.mutex.RUnlock()
id, ok := manager.localCredentialsIndex.Find(c)
if !ok {
return nil, false
}
partner, ok := manager.byId[id]
return partner, ok
}
func (manager *PartnerManager) FindAll() (partners []*Partner) {
manager.mutex.RLock()
for _, partner := range manager.byId {
partners = append(partners, partner)
}
manager.mutex.RUnlock()
return
}
func (manager *PartnerManager) FindAllByCollectPriority() []*Partner {
partners := manager.FindAll()
sort.Sort(ByPriority(partners))
return partners
}
func (manager *PartnerManager) FindAllWithConnector(connectorTypes []string) (partners []*Partner) {
manager.mutex.RLock()
for _, partner := range manager.byId {
if partner.HaveAtLeastOneConnector(connectorTypes) {
partners = append(partners, partner)
}
}
manager.mutex.RUnlock()
return
}
func (manager *PartnerManager) Save(partner *Partner) bool {
if partner.id == "" {
partner.id = PartnerId(manager.NewUUID())
}
partner.manager = manager
manager.mutex.Lock()
manager.byId[partner.id] = partner
manager.localCredentialsIndex.Index(partner.id, partner.Credentials())
manager.mutex.Unlock()
return true
}
func (manager *PartnerManager) Delete(partner *Partner) bool {
manager.mutex.Lock()
manager.localCredentialsIndex.Delete(partner.id)
delete(manager.byId, partner.id)
manager.mutex.Unlock()
return true
}
func (manager *PartnerManager) Model() model.Model {
return manager.referential.Model()
}
func (manager *PartnerManager) IsEmpty() bool {
return len(manager.byId) == 0
}
func (manager *PartnerManager) Referential() *Referential {
return manager.referential
}
func (manager *PartnerManager) CancelSubscriptions() {
manager.mutex.Lock()
for _, partner := range manager.byId {
partner.CancelSubscriptions()
}
manager.mutex.Unlock()
}
func (manager *PartnerManager) Load() error {
selectPartners := []model.SelectPartner{}
sqlQuery := fmt.Sprintf("select * from partners where referential_id = '%s'", manager.referential.Id())
_, err := model.Database.Select(&selectPartners, sqlQuery)
if err != nil {
return err
}
for _, p := range selectPartners {
partner := manager.New(PartnerSlug(p.Slug))
partner.id = PartnerId(p.Id)
if p.Name.Valid {
partner.Name = p.Name.String
}
if p.Settings.Valid && len(p.Settings.String) > 0 {
settings := make(map[string]string)
if err = json.Unmarshal([]byte(p.Settings.String), &settings); err != nil {
return err
}
partner.PartnerSettings = s.NewPartnerSettings(partner.UUIDGenerator, settings)
}
if p.ConnectorTypes.Valid && len(p.ConnectorTypes.String) > 0 {
if err = json.Unmarshal([]byte(p.ConnectorTypes.String), &partner.ConnectorTypes); err != nil {
return err
}
}
partner.RefreshConnectors()
manager.Save(partner)
}
return nil
}
func (manager *PartnerManager) SaveToDatabase() (int, error) {
// Check presence of Referential
selectReferentials := []model.SelectReferential{}
sqlQuery := fmt.Sprintf("select * from referentials where referential_id = '%s'", manager.referential.Id())
_, err := model.Database.Select(&selectReferentials, sqlQuery)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("database error: %v", err)
}
if len(selectReferentials) == 0 {
return http.StatusNotAcceptable, errors.New("can't save Partners without Referential in Database")
}
// Begin transaction
tx, err := model.Database.Begin()
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("database error: %v", err)
}
// Delete partners
sqlQuery = fmt.Sprintf("delete from partners where referential_id = '%s';", manager.referential.Id())
_, err = tx.Exec(sqlQuery)
if err != nil {
tx.Rollback()
return http.StatusInternalServerError, fmt.Errorf("database error: %v", err)
}
// Insert partners
manager.mutex.RLock()
defer manager.mutex.RUnlock()
for _, partner := range manager.byId {
dbPartner, err := manager.newDbPartner(partner)
if err != nil {
tx.Rollback()
return http.StatusInternalServerError, fmt.Errorf("internal error: %v", err)
}
err = tx.Insert(dbPartner)
if err != nil {
tx.Rollback()
return http.StatusInternalServerError, fmt.Errorf("internal error: %v", err)
}
}
// Commit transaction
err = tx.Commit()
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("database error: %v", err)
}
return http.StatusOK, nil
}
func (manager *PartnerManager) newDbPartner(partner *Partner) (*model.DatabasePartner, error) {
settings, err := json.Marshal(partner.PartnerSettings.SettingsDefinition())
if err != nil {
return nil, err
}
connectors, err := json.Marshal(partner.ConnectorTypes)
if err != nil {
return nil, err
}
return &model.DatabasePartner{
Id: string(partner.id),
ReferentialId: string(manager.referential.id),
Slug: string(partner.slug),
Name: partner.Name,
Settings: string(settings),
ConnectorTypes: string(connectors),
}, nil
}