peake100/rogerRabbit-go

View on GitHub
pkg/amqp/channelEventRelays.go

Summary

Maintainability
A
0 mins
Test Coverage
package amqp

import (
    "context"
    "github.com/peake100/rogerRabbit-go/pkg/amqp/amqpmiddleware"
    streadway "github.com/streadway/amqp"
)

// createEventMetadata creates the amqpmiddleware.EventMetadata for an event
func createEventMetadata(legNum int, eventNum int64) amqpmiddleware.EventMetadata {
    return map[amqpmiddleware.MetadataKey]interface{}{
        "LegNum":   legNum,
        "EventNum": eventNum,
    }
}

// eventRelay is a common interface for relaying events from the underlying channels to
// the client without interruption. The boilerplate of handling all the synchronization
// locks will be handled for any relay passed to Channel.eventRelaySetupAndLaunch()
type eventRelay interface {
    // SetupForRelayLeg runs the setup for a new relay leg.
    SetupForRelayLeg(newChannel *streadway.Channel) error

    // RunRelayLeg runs the relay until all events from the streadway/amqp.Channel
    // passed to SetupForRelayLeg() are exhausted,
    //
    // legNum is the leg number starting from 0 and incrementing each time this relay
    // is called.
    RunRelayLeg(legNum int) (done bool)

    // Shutdown is called to exit the relay. This will usually involve closing the
    // client-facing channel.
    Shutdown() error
}

// shutdownRelay handles all the boilerplate of calling eventRelay.Shutdown.
func shutdownRelay(relay eventRelay, relaySync relaySync) {
    // Release any outstanding WaitGroups we are holding on exit
    defer relaySync.SetDone()

    // Invoke the shutdown method of the relay.
    _ = relay.Shutdown()
}

// eventRelaySetupAndLaunch sets up a new relay and launches a goroutine to run it
func (channel *Channel) eventRelaySetupAndLaunch(relay eventRelay) {
    // Create a signal chan to tell us when the initial setup has been completed. This
    // means it is safe to return to the user as our listener channel has been
    // registered with the underlying channel. If we were to return immediately, the
    // user might start taking actions that SHOULD generate events before the even
    // channel is correctly registered, causing those events to go "missing".
    setupComplete := make(chan struct{})

    // Launch the relay.
    thisSync := newRelaySync(channel.ctx)
    go channel.runEventRelay(relay, thisSync, setupComplete)

    // Wait for the signal that our setup is complete.
    <-setupComplete
}

// runEventRelay should be launched as goroutine to run an event relay after it's
// initial setup.
func (channel *Channel) runEventRelay(relay eventRelay, relaySync relaySync, setupComplete chan struct{}) {
    // Shutdown our relay on exit.
    defer shutdownRelay(relay, relaySync)

    firstLegComplete := make(chan struct{})
    channel.eventRelayInitialSetup(relay, relaySync, setupComplete, firstLegComplete)

    // Wait for ou first leg to complete, then fall into a rhythm with the transport
    // manager
    <-firstLegComplete
    relayLeg := 1

    // Start running each leg.
    for {
        channel.runEventRelayCycle(relay, relaySync, relayLeg)
        if relaySync.IsDone() {
            return
        }
        relayLeg++
    }
}

// eventRelayInitialSetup does the initial setup of an event relay. We need to make sure
// that setup is complete before we return to the user, otherwise they may start taking
// actions that SHOULD trigger events before the event listener has been registered with
// the underlying streadway/amqp.Channel.
func (channel *Channel) eventRelayInitialSetup(
    relay eventRelay,
    relaySync relaySync,
    setupComplete chan struct{},
    firstLegComplete chan struct{},
) {
    // Signal this leg in an op so we can make sure we grab the right channel.
    _ = channel.transportManager.retryOperationOnClosed(
        channel.ctx,
        func(ctx context.Context) error {
            // Register the relay with the channel.
            channel.relaySync.AddRelay(relaySync.shared)

            // Run the fist leg with the current channel. We need to launch it in a
            // routine so we can signal leg complete (the manager needs to grab a write
            // lock to the transport before it checks the relays)
            go func(currentChannel *streadway.Channel) {
                defer close(firstLegComplete)
                defer relaySync.SignalLegComplete()

                // Run the relay setup then signal that initial setup is complete.
                var err error
                func() {
                    defer close(setupComplete)
                    err = relay.SetupForRelayLeg(currentChannel)
                }()
                if err != nil {
                    return
                }

                // Run the first relay leg.
                if done := relay.RunRelayLeg(0); done {
                    relaySync.SetDone()
                }
            }(channel.underlyingChannel)

            // Wait for the relay setup to complete before we return or release the read
            // lock to the user. Otherwise the user may think we are receiving events
            // and do something that creates them before we've actually set that up.
            <-setupComplete

            return nil
        },
        true,
    )
}

// runEventRelayCycle runs a single, full cycle of setting up and running a relay leg.
func (channel *Channel) runEventRelayCycle(
    relay eventRelay, relaySync relaySync, legNum int,
) {
    if relaySync.IsDone() {
        return
    }

    newChan := relaySync.WaitForNextLeg()
    if newChan == nil {
        return
    }

    // Whether or not we run the leg, reset our sync to mark the run as complete.
    defer relaySync.SignalLegComplete()

    err := relay.SetupForRelayLeg(newChan)
    relaySync.SignalSetupComplete()
    if err != nil {
        return
    }

    if done := relay.RunRelayLeg(legNum); done {
        relaySync.SetDone()
    }
}