
View on GitHub


0 mins
Test Coverage
package amqp

import (
    streadway ""

// sharedRelaySync holds all sync objects required for a Channel and it's event relays to
// stay in sync.
// Relays need to be carefully controlled so that they do not miss a short-lived channel
// while working on events from a previous channel. The flow must go:
// 1. A new relay can be setup while a channel is serving requests.
// 2. After a relay sets up, it begins serving events from the current underlying
//    channel.
// 3. When a channel disconnects, the transportManager must:
//        - Wait for all currently running relays to finish their work.
//        - Get the new channel.
//        - Allow the relays to setup on the channel BEFORE callers can access channel methods.
//        - Once setup is Restart the relays.
type sharedRelaySync struct {
    // ctx should be cancelled by the relay on an abort, or by the channel on
    // an unresponsive relay.
    ctx    context.Context
    cancel context.CancelFunc

    // beginLeg signals to the eventRelay that it should setup for this leg on Channel.
    // Relays can once on to running once they are done setting up and te manager has
    // received this signal.
    beginLeg chan *streadway.Channel

    // setup complete signals the transport manager that the relay is done setting up
    // and ready for the channel to be released back to the method callers.
    setupComplete chan struct{}

    // legComplete signals to the transportManager that this relay has finished running.
    legComplete chan struct{}

// newSharedSync creates a new shared sync object.
func newSharedSync(transportCtx context.Context) sharedRelaySync {
    ctx, cancel := context.WithCancel(transportCtx)

    return sharedRelaySync{
        ctx:           ctx,
        cancel:        cancel,
        beginLeg:      make(chan *streadway.Channel),
        setupComplete: make(chan struct{}),
        legComplete:   make(chan struct{}),

// managerRelaySync exposes methods for the transportManager to block on / signal
// for it's half of the sharedRelaySync.
type managerRelaySync struct {
    shared     []sharedRelaySync
    sharedLock *sync.Mutex

    // legCompleteWaited tracks whether we have waited for the leg to complete already.
    // WaitOnLegComplete is called every time a reconnect *attempt* occurs, so we need
    // to track whether we have to wait again, or if we've waited once already since
    // the last disconnect.
    legCompleteWaited bool

// AddRelay adds a relay to the manager.
func (managerSync *managerRelaySync) AddRelay(shared sharedRelaySync) {
    defer managerSync.sharedLock.Unlock()
    managerSync.shared = append(managerSync.shared, shared)

// ReleaseRelaysForLeg signals all relays to begin their next leg.
func (managerSync *managerRelaySync) ReleaseRelaysForLeg(newChan *streadway.Channel) {
    defer managerSync.sharedLock.Unlock()
    managerSync.legCompleteWaited = false

    timer := time.NewTimer(30 * time.Second)
    defer timer.Stop()

    // We want to remove any relays that o not signal.
    signaled := make([]sharedRelaySync, 0, len(managerSync.shared))

    for _, thisSync := range managerSync.shared {
        select {
        // Signal to a relay that it can start relaying events.
        case thisSync.beginLeg <- newChan:
        case <-thisSync.ctx.Done():
        case <-timer.C:
            // On a timeout, cancel the relay.
        signaled = append(signaled, thisSync)

    // replace our list of shared signals with only the active relays.
    managerSync.shared = signaled

// WaitOnSetupComplete will block until all relays have reported that they are done
// setting up.
// The transportManger should block on this before reconnecting to a nwe channel.
func (managerSync *managerRelaySync) WaitOnSetupComplete() {
    defer managerSync.sharedLock.Unlock()

    timer := time.NewTimer(30 * time.Second)
    defer timer.Stop()

    signaled := make([]sharedRelaySync, 0, len(managerSync.shared))

    for _, thisSync := range managerSync.shared {
        select {
        // Wait fot the ree.
        case <-thisSync.setupComplete:
        case <-thisSync.ctx.Done():
        case <-timer.C:
            // On a timeout, cancel the relay.
        signaled = append(signaled, thisSync)

    managerSync.shared = signaled

// WaitOnLegComplete will block until all relays have reported that all events from the
// previous underlying streadway/amqp.Channel have been relayed.
// The transportManger should block on this before reconnecting to a nwe channel.
func (managerSync *managerRelaySync) WaitOnLegComplete() {
    defer managerSync.sharedLock.Unlock()

    // If we've already waited on this and have not yet started the setup of the relays,
    // return right away. We don't need to wait again.
    if managerSync.legCompleteWaited {

    timer := time.NewTimer(30 * time.Second)
    defer timer.Stop()

    signaled := make([]sharedRelaySync, 0, len(managerSync.shared))

    for _, thisSync := range managerSync.shared {
        select {
        // Wait fot the ree.
        case <-thisSync.legComplete:
        case <-thisSync.ctx.Done():
        case <-timer.C:
            // On a timeout, cancel the relay.
        signaled = append(signaled, thisSync)

    managerSync.shared = signaled
    managerSync.legCompleteWaited = true

// relaySync exposes methods for single eventRelay runner to block on / signal for it's
// half of the sharedRelaySync.
// relaySync is not concurrency-safe, and each eventRelay runner should get it's own
// relaySync object.
type relaySync struct {
    // sharedRelaySync object
    shared sharedRelaySync

// SetDone signals that this relay is done serving messages.
func (relaySync relaySync) SetDone() {

// IsDone returns whether the relay is done, either from a context cancellation or the
// relay has reported it is finished.
func (relaySync relaySync) IsDone() bool {
    return relaySync.shared.ctx.Err() != nil

// SignalSetupComplete should be called by the eventRelay runner after
// setup has been completed for the relay but before the relay moves on to processing
// messages.
func (relaySync relaySync) SignalSetupComplete() {
    select {
    // Wait fot the ree.
    case relaySync.shared.setupComplete <- struct{}{}:
    case <-relaySync.shared.ctx.Done():

// SignalLegComplete should be called by the eventRelay runner after
// eventRelay.RunRelayLeg() returns before blocking on WaitToSetup.
func (relaySync relaySync) SignalLegComplete() {
    select {
    // Wait fot the ree.
    case relaySync.shared.legComplete <- struct{}{}:
    case <-relaySync.shared.ctx.Done():

// WaitForNextLeg blocks until the transportManager signals that relay setup should
// begin.
// The eventRelay runner should then call eventRelay.SetupForRelayLeg().
func (relaySync relaySync) WaitForNextLeg() *streadway.Channel {
    select {
    // Wait fot the manager to signal the run of the next leg or to be cancelled.
    case newChan := <-relaySync.shared.beginLeg:
        return newChan
    case <-relaySync.shared.ctx.Done():
        return nil

// newRelaySync created a new relaySync for an eventRelay runner from an underlying
// sharedRelaySync object being tracked by the transportManager.
func newRelaySync(transportCtx context.Context) relaySync {
    return relaySync{shared: newSharedSync(transportCtx)}