bus.go

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
package query

import (
    "runtime"
    "sync/atomic"
    "time"
)

const iteratorListenerTimeout = time.Second

// Bus is the only struct exported and required for the query bus usage.
// The Bus should be instantiated using the NewBus function.
type Bus struct {
    iteratorWorkerPoolSize int
    iteratorQueueBuffer    int
    iteratorResultBuffer   int
    initialized            *uint32
    shuttingDown           *uint32
    iteratorWorkers        *uint32
    handlers               []Handler
    iteratorHandlers       []IteratorHandler
    errorHandlers          []ErrorHandler
    cacheAdapters          []CacheAdapter
    iteratorQueryQueue     chan *pendingIteratorQuery
    closed                 chan bool
}

// NewBus instantiates the Bus struct.
// The Initialization of IteratorHandlers is performed separately (InitializeIteratorHandlers function) for dependency injection purposes.
func NewBus() *Bus {
    return &Bus{
        iteratorWorkerPoolSize: runtime.GOMAXPROCS(0),
        iteratorQueueBuffer:    100,
        iteratorResultBuffer:   0,
        initialized:            new(uint32),
        shuttingDown:           new(uint32),
        iteratorWorkers:        new(uint32),
        handlers:               make([]Handler, 0),
        iteratorHandlers:       make([]IteratorHandler, 0),
        errorHandlers:          make([]ErrorHandler, 0),
        cacheAdapters:          []CacheAdapter{NewMemoryCacheAdapter()},
        closed:                 make(chan bool),
    }
}

// Handlers for the regular queries.
func (bus *Bus) Handlers(hdls ...Handler) {
    bus.handlers = hdls
}

// ErrorHandlers may optionally be provided.
// They will receive any error thrown during the querying process.
func (bus *Bus) ErrorHandlers(hdls ...ErrorHandler) {
    bus.errorHandlers = hdls
}

// CacheAdapters may optionally be provided.
// They will be used instead of the default MemoryCacheAdapter.
func (bus *Bus) CacheAdapters(adps ...CacheAdapter) {
    for _, adp := range bus.cacheAdapters {
        adp.Shutdown()
    }
    bus.cacheAdapters = adps
}

// IteratorWorkerPoolSize may optionally be provided to tweak the iteratorWorker pool size for iterator query queue.
// It can only be adjusted *before* the bus is initialized.
// It defaults to the value returned by runtime.GOMAXPROCS(0).
func (bus *Bus) IteratorWorkerPoolSize(workerPoolSize int) {
    if !bus.isInitialized() {
        bus.iteratorWorkerPoolSize = workerPoolSize
    }
}

// IteratorQueueBuffer may optionally be provided to tweak the buffer size of the iterator query queue.
// This value may have high impact on performance depending on the use case.
// It can only be adjusted *before* the bus is initialized.
// It defaults to 100.
func (bus *Bus) IteratorQueueBuffer(buf int) {
    if !bus.isInitialized() {
        bus.iteratorQueueBuffer = buf
    }
}

// IteratorResultBuffer may optionally be provided to tweak the buffer size of the results channel for iterator queries.
// This value may have high impact on performance depending on the use case.
// It defaults to 1.
func (bus *Bus) IteratorResultBuffer(buf int) {
    bus.iteratorResultBuffer = buf
}

// InitializeIteratorHandlers initializes the query bus to support iterator queries.
func (bus *Bus) InitializeIteratorHandlers(hdls ...IteratorHandler) {
    if bus.initialize() {
        bus.iteratorHandlers = hdls
        bus.iteratorQueryQueue = make(chan *pendingIteratorQuery, bus.iteratorQueueBuffer)
        for i := 0; i < bus.iteratorWorkerPoolSize; i++ {
            bus.iteratorWorkerUp()
            go bus.iteratorWorker(bus.iteratorQueryQueue, bus.closed)
        }
    }
}

// Query for a single result or a pre-populated collection.
func (bus *Bus) Query(qry Query) (*Result, error) {
    if err := bus.isValid(qry); err != nil {
        return nil, err
    }

    res, cached := bus.result(qry)
    if cached {
        return res, nil
    }

    return res, bus.query(qry, res)
}

// IteratorQuery uses a channel to iterate the results while they are being populated.
// *Iterator queries are not cached*.
func (bus *Bus) IteratorQuery(qry Query) (*IteratorResult, error) {
    if err := bus.isIteratorValid(qry); err != nil {
        return nil, err
    }

    res := newIteratorResult(bus.iteratorResultBuffer)
    bus.enqueueIteratorQuery(qry, res)
    return res, nil
}

// Shutdown the query bus gracefully.
// *Queries handled while shutting down will be disregarded*.
func (bus *Bus) Shutdown() {
    if atomic.CompareAndSwapUint32(bus.shuttingDown, 0, 1) {
        bus.shutdown()
    }
}

//-----Private Functions------//

func (bus *Bus) initialize() bool {
    return atomic.CompareAndSwapUint32(bus.initialized, 0, 1)
}

func (bus *Bus) isInitialized() bool {
    return atomic.LoadUint32(bus.initialized) == 1
}

func (bus *Bus) isShuttingDown() bool {
    return atomic.LoadUint32(bus.shuttingDown) == 1
}

func (bus *Bus) iteratorWorker(qryQ <-chan *pendingIteratorQuery, closed chan<- bool) {
    for penQry := range qryQ {
        // nil queries are used as signals to break out
        if penQry == nil {
            break
        }

        // wait for a listener
        if penQry.res.waitListener(iteratorListenerTimeout) {
            bus.iteratorQuery(penQry.qry, penQry.res)
            penQry.res.close()
            continue
        }

        bus.error(penQry.qry, NewErrorQueryTimedOut(penQry.qry))
    }
    closed <- true
}

func (bus *Bus) iteratorQuery(qry Query, res *IteratorResult) {
    for _, hdl := range bus.iteratorHandlers {
        if err := hdl.Handle(qry, res); err != nil {
            bus.error(qry, err)
            return
        }
        if res.propagationStopped() {
            return
        }
    }
    if !res.isHandled() {
        bus.error(qry, NewErrorNoQueryHandlersFound(qry))
    }
}

func (bus *Bus) enqueueIteratorQuery(qry Query, res *IteratorResult) {
    bus.iteratorQueryQueue <- &pendingIteratorQuery{
        qry: qry,
        res: res,
    }
}

func (bus *Bus) query(qry Query, res *Result) error {
    for _, hdl := range bus.handlers {
        if err := hdl.Handle(qry, res); err != nil {
            bus.error(qry, err)
            return err
        }
        if res.propagationStopped() {
            break
        }
    }

    if !res.isHandled() {
        err := NewErrorNoQueryHandlersFound(qry)
        bus.error(qry, err)
        return err
    }

    bus.handleCache(qry, res)
    return nil
}

func (bus *Bus) result(qry Query) (*Result, bool) {
    if qry, implements := qry.(Cacheable); implements {
        for _, adp := range bus.cacheAdapters {
            if res := adp.Get(qry); res != nil {
                res.loadedFromCache()
                return res, true
            }
        }
        return newCacheableResult(qry), false
    }
    return newResult(), false
}

func (bus *Bus) handleCache(qry Query, res *Result) {
    if qry, implements := qry.(Cacheable); implements && qry.CacheDuration() > 0 {
        at := time.Now()
        res.expires(at.Add(qry.CacheDuration()))
        cached := false
        for _, adp := range bus.cacheAdapters {
            cached = cached || adp.Set(qry, res)
        }
        if cached {
            res.cached(at)
        }
    }
}

func (bus *Bus) iteratorWorkerUp() {
    atomic.AddUint32(bus.iteratorWorkers, 1)
}

func (bus *Bus) iteratorWorkerDown() {
    atomic.AddUint32(bus.iteratorWorkers, ^uint32(0))
}

func (bus *Bus) shutdown() {
    for atomic.LoadUint32(bus.iteratorWorkers) > 0 {
        bus.iteratorQueryQueue <- nil
        <-bus.closed
        bus.iteratorWorkerDown()
    }
    for _, adp := range bus.cacheAdapters {
        adp.Shutdown()
    }
    atomic.CompareAndSwapUint32(bus.initialized, 1, 0)
    atomic.CompareAndSwapUint32(bus.shuttingDown, 1, 0)
}

func (bus *Bus) isValid(qry Query) error {
    var err error
    if qry == nil {
        err = InvalidQueryError
        bus.error(qry, err)
        return err
    }
    return nil
}

func (bus *Bus) isIteratorValid(qry Query) error {
    err := bus.isValid(qry)
    if err != nil {
        return err
    }
    if !bus.isInitialized() {
        err = BusNotInitializedError
        bus.error(qry, err)
        return err
    }
    if bus.isShuttingDown() {
        err = BusIsShuttingDownError
        bus.error(qry, err)
        return err
    }
    return nil
}

func (bus *Bus) error(qry Query, err error) {
    for _, errHdl := range bus.errorHandlers {
        errHdl.Handle(qry, err)
    }
}