dotcloud/docker

View on GitHub
daemon/events/events.go

Summary

Maintainability
A
0 mins
Test Coverage
package events // import "github.com/docker/docker/daemon/events"

import (
    "sync"
    "time"

    eventtypes "github.com/docker/docker/api/types/events"
    "github.com/moby/pubsub"
)

const (
    eventsLimit = 256
    bufferSize  = 1024
)

// Events is pubsub channel for events generated by the engine.
type Events struct {
    mu     sync.Mutex
    events []eventtypes.Message
    pub    *pubsub.Publisher
}

// New returns new *Events instance
func New() *Events {
    return &Events{
        events: make([]eventtypes.Message, 0, eventsLimit),
        pub:    pubsub.NewPublisher(100*time.Millisecond, bufferSize),
    }
}

// Subscribe adds new listener to events, returns slice of 256 stored
// last events, a channel in which you can expect new events (in form
// of interface{}, so you need type assertion), and a function to call
// to stop the stream of events.
func (e *Events) Subscribe() ([]eventtypes.Message, chan interface{}, func()) {
    eventSubscribers.Inc()
    e.mu.Lock()
    current := make([]eventtypes.Message, len(e.events))
    copy(current, e.events)
    l := e.pub.Subscribe()
    e.mu.Unlock()

    cancel := func() {
        e.Evict(l)
    }
    return current, l, cancel
}

// SubscribeTopic adds new listener to events, returns slice of 256 stored
// last events, a channel in which you can expect new events (in form
// of interface{}, so you need type assertion).
func (e *Events) SubscribeTopic(since, until time.Time, ef *Filter) ([]eventtypes.Message, chan interface{}) {
    eventSubscribers.Inc()
    e.mu.Lock()

    var topic func(m interface{}) bool
    if ef != nil && ef.filter.Len() > 0 {
        topic = func(m interface{}) bool { return ef.Include(m.(eventtypes.Message)) }
    }

    buffered := e.loadBufferedEvents(since, until, topic)

    var ch chan interface{}
    if topic != nil {
        ch = e.pub.SubscribeTopic(topic)
    } else {
        // Subscribe to all events if there are no filters
        ch = e.pub.Subscribe()
    }

    e.mu.Unlock()
    return buffered, ch
}

// Evict evicts listener from pubsub
func (e *Events) Evict(l chan interface{}) {
    eventSubscribers.Dec()
    e.pub.Evict(l)
}

// Log creates a local scope message and publishes it
func (e *Events) Log(action eventtypes.Action, eventType eventtypes.Type, actor eventtypes.Actor) {
    now := time.Now().UTC()
    jm := eventtypes.Message{
        Action:   action,
        Type:     eventType,
        Actor:    actor,
        Scope:    "local",
        Time:     now.Unix(),
        TimeNano: now.UnixNano(),
    }

    // fill deprecated fields for container and images
    switch eventType {
    case eventtypes.ContainerEventType:
        jm.ID = actor.ID
        jm.Status = string(action)
        jm.From = actor.Attributes["image"]
    case eventtypes.ImageEventType:
        jm.ID = actor.ID
        jm.Status = string(action)
    }

    e.PublishMessage(jm)
}

// PublishMessage broadcasts event to listeners. Each listener has 100 milliseconds to
// receive the event or it will be skipped.
func (e *Events) PublishMessage(jm eventtypes.Message) {
    eventsCounter.Inc()

    e.mu.Lock()
    if len(e.events) == cap(e.events) {
        // discard oldest event
        copy(e.events, e.events[1:])
        e.events[len(e.events)-1] = jm
    } else {
        e.events = append(e.events, jm)
    }
    e.mu.Unlock()
    e.pub.Publish(jm)
}

// SubscribersCount returns number of event listeners
func (e *Events) SubscribersCount() int {
    return e.pub.Len()
}

// loadBufferedEvents iterates over the cached events in the buffer
// and returns those that were emitted between two specific dates.
// It uses `time.Unix(seconds, nanoseconds)` to generate valid dates with those arguments.
// It filters those buffered messages with a topic function if it's not nil, otherwise it adds all messages.
func (e *Events) loadBufferedEvents(since, until time.Time, topic func(interface{}) bool) []eventtypes.Message {
    var buffered []eventtypes.Message
    if since.IsZero() && until.IsZero() {
        return buffered
    }

    var sinceNanoUnix int64
    if !since.IsZero() {
        sinceNanoUnix = since.UnixNano()
    }

    var untilNanoUnix int64
    if !until.IsZero() {
        untilNanoUnix = until.UnixNano()
    }

    for i := len(e.events) - 1; i >= 0; i-- {
        ev := e.events[i]

        if ev.TimeNano < sinceNanoUnix {
            break
        }

        if untilNanoUnix > 0 && ev.TimeNano > untilNanoUnix {
            continue
        }

        if topic == nil || topic(ev) {
            buffered = append([]eventtypes.Message{ev}, buffered...)
        }
    }
    return buffered
}