core/subscription_manager.go

Summary

Maintainability
A
1 hr
Test Coverage
package core

import (
    "encoding/json"
    "fmt"
    "strings"
    "sync"
    "time"

    "bitbucket.org/enroute-mobi/ara/core/idgen"
    "bitbucket.org/enroute-mobi/ara/logger"
    "bitbucket.org/enroute-mobi/ara/uuid"
)

type Subscriptions interface {
    uuid.UUIDInterface

    New(string) *Subscription
    Find(SubscriptionId) (*Subscription, bool)
    FindAll() []*Subscription
    FindOrCreateByKind(string) *Subscription
    FindByKind(string) (*Subscription, bool)
    FindSubscriptionsByKind(string) []*Subscription
    FindBroadcastSubscriptions() []*Subscription
    Index(*Subscription)
    Save(*Subscription) bool
    Delete(*Subscription) bool
    DeleteById(SubscriptionId)
    CancelSubscriptions()
    CancelSubscriptionsResourcesBefore(time.Time)
    CancelBroadcastSubscriptions()
    CancelCollectSubscriptions()
    FindByResourceId(id, kind string) []*Subscription
    FindByExternalId(string) (*Subscription, bool)
}

type MemorySubscriptions struct {
    uuid.UUIDConsumer

    mutex   *sync.RWMutex
    partner *Partner

    byIdentifier        map[SubscriptionId]*Subscription
    byKindAndResourceId map[string][]SubscriptionId
}

func (manager *MemorySubscriptions) MarshalJSON() ([]byte, error) {
    subscriptions := make([]*Subscription, 0)
    for _, subscription := range manager.byIdentifier {
        subscriptions = append(subscriptions, subscription)
    }

    return json.Marshal(subscriptions)
}

func NewMemorySubscriptions(partner *Partner) *MemorySubscriptions {
    return &MemorySubscriptions{
        mutex:               &sync.RWMutex{},
        byIdentifier:        make(map[SubscriptionId]*Subscription),
        byKindAndResourceId: make(map[string][]SubscriptionId),
        partner:             partner,
    }
}

func (manager *MemorySubscriptions) New(kind string) *Subscription {
    logger.Log.Debugf("Creating subscription with kind %v", kind)
    subscription := &Subscription{
        kind:                kind,
        manager:             manager,
        resourcesByCode:     make(map[string]*SubscribedResource),
        subscriptionOptions: make(map[string]string),
    }
    subscription.Save()
    return subscription
}

func (manager *MemorySubscriptions) FindByKind(kind string) (*Subscription, bool) {
    manager.mutex.RLock()
    defer manager.mutex.RUnlock()

    for _, subscription := range manager.byIdentifier {
        if subscription.Kind() == kind {
            return subscription, true
        }
    }
    return nil, false
}

func (manager *MemorySubscriptions) FindSubscriptionsByKind(kind string) (subscriptions []*Subscription) {
    manager.mutex.RLock()
    defer manager.mutex.RUnlock()

    for _, subscription := range manager.byIdentifier {
        if subscription.Kind() == kind {
            subscriptions = append(subscriptions, subscription)
        }
    }
    return subscriptions
}

func (manager *MemorySubscriptions) FindBroadcastSubscriptions() (subscriptions []*Subscription) {
    manager.mutex.RLock()
    defer manager.mutex.RUnlock()

    for _, subscription := range manager.byIdentifier {
        if strings.HasSuffix(subscription.Kind(), "Broadcast") {
            subscriptions = append(subscriptions, subscription)
        }
    }
    return subscriptions
}

func (manager *MemorySubscriptions) FindByExternalId(externalId string) (*Subscription, bool) {
    manager.mutex.RLock()
    defer manager.mutex.RUnlock()

    for _, subscription := range manager.byIdentifier {
        if subscription.ExternalId() == externalId {
            return subscription, true
        }
    }
    return nil, false
}

func (manager *MemorySubscriptions) FindByResourceId(id, kind string) []*Subscription {
    manager.mutex.RLock()
    defer manager.mutex.RUnlock()

    kindAndResourceId := fmt.Sprintf("%s-%s", kind, id)
    subscriptionIdentifiers, found := manager.byKindAndResourceId[kindAndResourceId]

    subscriptions := []*Subscription{}

    if found {
        for _, subscriptionId := range subscriptionIdentifiers {
            subscription, ok := manager.byIdentifier[subscriptionId]
            if !ok {
                // Subscription no longer exists
                continue
            }

            if subscription.kind != kind {
                // Subscription wrong kind ? changed kind ?
                continue
            }

            subscription.RLock()
            _, ok = subscription.resourcesByCode[id]
            subscription.RUnlock()

            if !ok {
                // Resource no longer presents in subscription
                continue
            }

            subscriptions = append(subscriptions, subscription)
        }
    }

    if len(subscriptions) > 0 {
        if len(subscriptions) != len(subscriptionIdentifiers) {
            // Some of subscriptions are no longer associated to this kindAndResourceId
            subscriptionIds := []SubscriptionId{}
            for _, subscription := range subscriptions {
                subscriptionIds = append(subscriptionIds, subscription.Id())
            }

            // "Update" lock to RW one
            manager.mutex.RUnlock()
            manager.mutex.Lock()

            manager.byKindAndResourceId[kindAndResourceId] = subscriptionIds

            manager.mutex.Unlock()
            manager.mutex.RLock()
        }
    } else {
        // No subscription found for this kindAndResourceId

        // "Update" lock to RW one
        manager.mutex.RUnlock()
        manager.mutex.Lock()

        delete(manager.byKindAndResourceId, kindAndResourceId)

        manager.mutex.Unlock()
        manager.mutex.RLock()
    }

    return subscriptions
}

func (manager *MemorySubscriptions) FindOrCreateByKind(kind string) *Subscription {
    maxResource := manager.partner.SubscriptionMaximumResources()
    if maxResource == 1 {
        return manager.New(kind)
    }

    manager.mutex.RLock()
    for _, subscription := range manager.byIdentifier {
        if subscription.Kind() == kind && (maxResource < 1 || subscription.ResourcesLen() < maxResource) && !subscription.subscribed {
            manager.mutex.RUnlock()
            return subscription
        }
    }
    manager.mutex.RUnlock()

    return manager.New(kind)
}

func (manager *MemorySubscriptions) Find(id SubscriptionId) (*Subscription, bool) {
    manager.mutex.RLock()
    defer manager.mutex.RUnlock()

    subscription, ok := manager.byIdentifier[id]
    if ok {
        return subscription, true
    } else {
        return nil, false
    }
}

func (manager *MemorySubscriptions) FindAll() (subscriptions []*Subscription) {
    manager.mutex.RLock()
    defer manager.mutex.RUnlock()

    if len(manager.byIdentifier) == 0 {
        return []*Subscription{}
    }
    for _, subscription := range manager.byIdentifier {
        subscriptions = append(subscriptions, subscription)
    }
    return
}

func (manager *MemorySubscriptions) Save(subscription *Subscription) bool {
    manager.mutex.Lock()
    defer manager.mutex.Unlock()

    if subscription.Id() == "" {
        subscription.id = SubscriptionId(manager.partner.NewIdentifier(idgen.IdentifierAttributes{Type: "Subscription", Id: manager.NewUUID()}))
    }

    subscription.manager = manager
    manager.byIdentifier[subscription.Id()] = subscription

    manager.unsafeIndex(subscription)

    return true
}

// Index the subscription with all associated resources
func (manager *MemorySubscriptions) Index(subscription *Subscription) {
    manager.mutex.Lock()
    manager.unsafeIndex(subscription)
    manager.mutex.Unlock()
}

// Unsafe method, need to handle manager mutex before calling
func (manager *MemorySubscriptions) unsafeIndex(subscription *Subscription) {
    subscription.RLock()

    for resourceId := range subscription.resourcesByCode {
        kindAndResourceId := fmt.Sprintf("%s-%s", subscription.Kind(), resourceId)
        subscriptionIdentifiers, found := manager.byKindAndResourceId[kindAndResourceId]

        if !found {
            // No subscription associated to this kindAndResourceId, create a new SubscriptionId slice
            manager.byKindAndResourceId[kindAndResourceId] = []SubscriptionId{subscription.Id()}
            continue
        }

        // Is the Subscription already associated to this kindAndResourceId ?
        if newSubscriptionForKindAndResourceId(subscriptionIdentifiers, subscription.Id()) {
            // Associate this Subscription to the kindAndResourceId
            manager.byKindAndResourceId[kindAndResourceId] = append(subscriptionIdentifiers, subscription.Id())
        }
    }
    subscription.RUnlock()
}

// Check if a SubscriptionId is in a Slice of SubscriptionIds
func newSubscriptionForKindAndResourceId(s []SubscriptionId, id SubscriptionId) bool {
    for _, subscriptionId := range s {
        if subscriptionId == id {
            // The Subscription is already associated to this kindAndResourceId
            return false
        }
    }
    return true
}

func (manager *MemorySubscriptions) Delete(subscription *Subscription) bool {
    manager.mutex.Lock()
    defer manager.mutex.Unlock()

    delete(manager.byIdentifier, subscription.Id())
    return true
}

func (manager *MemorySubscriptions) DeleteById(id SubscriptionId) {
    manager.mutex.Lock()
    defer manager.mutex.Unlock()

    delete(manager.byIdentifier, id)
}

func (manager *MemorySubscriptions) CancelSubscriptionsResourcesBefore(time time.Time) {
    manager.mutex.Lock()
    defer manager.mutex.Unlock()

    for _, sub := range manager.byIdentifier {
        for key, resource := range sub.ResourcesByCodeCopy() {
            if resource.SubscribedAt().After(time) || resource.SubscribedAt().IsZero() {
                continue
            }
            sub.DeleteResource(key)
            logger.Log.Debugf("Deleting ressource %v from subscription with id %v after partner reload", key, sub.Id())

        }
        if sub.ResourcesLen() == 0 {
            delete(manager.byIdentifier, sub.Id())
        }
    }
}

func (manager *MemorySubscriptions) CancelSubscriptions() {
    manager.mutex.Lock()
    defer manager.mutex.Unlock()

    for id := range manager.byIdentifier {
        delete(manager.byIdentifier, id)
    }
}

func (manager *MemorySubscriptions) CancelBroadcastSubscriptions() {
    manager.mutex.Lock()
    defer manager.mutex.Unlock()

    for id, subscription := range manager.byIdentifier {
        if subscription.externalId != "" {
            delete(manager.byIdentifier, id)
        }
    }
}

func (manager *MemorySubscriptions) CancelCollectSubscriptions() {
    manager.mutex.Lock()
    defer manager.mutex.Unlock()

    for id, subscription := range manager.byIdentifier {
        if subscription.externalId == "" {
            delete(manager.byIdentifier, id)
        }
    }
}