package amqp

import (
    streadway ""

// livesOnce is a common interface between the *streadway.Connection and
// *streadway.Channel. We want o abstract away identical operations we need to implement
// call on these for reconnecting to a new underlying underlyingTransport.
type livesOnce interface {
    NotifyClose(receiver chan *streadway.Error) chan *streadway.Error
    Close() error

// reconnects is the interface we need to implement for a livesOnce capable of
// reconnecting itself.
type reconnects interface {
    transportType() amqpmiddleware.TransportType

    // livesOnce returns  the underlying livesOnce (streadway.Connection or
    // streadway.Channel)
    underlyingTransport() livesOnce

    // tryReconnect is needed for both Connection and Channel. This method
    // attempts to re-establish a connection for the underlying object exactly once.
    tryReconnect(ctx context.Context, attempt uint64) (chan *streadway.Error, error)

    // cleanup releases any resources. Called on final close AFTER the main context is
    // cancelled but before the current underlying connection is closed. NOTE:
    // re-connects will NOT occur once this method is called, so cleanup implementation
    // must be able to handle a disconnected underlying connection.
    cleanup() error

// transportManager handles lifetime of underlying livesOnce method, such as
// reconnections, closures, and connection status subscribers. To be embedded into the
// Connection and Channel types for free implementation of common methods.
type transportManager struct {
    // The master context of the robust connection. Cancellation of this context
    // should keep the connection from re-dialing and close the current connection.
    ctx context.Context
    // Cancel func that cancels our main context.
    cancelFunc context.CancelFunc

    // Our core transport object.
    transport reconnects
    // Lock to control the transport.
    transportLock *sync.RWMutex
    // This value is incremented every time we re-connect to the broker.
    reconnectCount *uint64
    // sync.Cond that broadcasts whenever a connection is successfully re-established
    reconnectCond *sync.Cond
    // opErrorEncountered should be signaled (but not blocked on) when an op hit's an
    // error.
    opErrorEncountered chan error

    // handlers are the underlying method and event handlers for this transport's
    // methods, in order to allow user-defined middleware on both channels and
    // connections.
    handlers transportManagerHandlers

    // notificationSubscribersDial is the list of channels to send a connection
    // established notification to.
    notificationSubscribersDial []chan error
    // notifyDialEventHandlers is the list of event handles used to send NotifyDial
    // events.
    notifyDialEventHandlers []amqpmiddleware.HandlerNotifyDialEvents

    // List of channels to send a connection lost notification to.
    notificationSubscriberDisconnect []chan error
    // notifyDisconnectEventHandlers is the list of event handles used to send
    // NotifyDisconnect events.
    notifyDisconnectEventHandlers []amqpmiddleware.HandlerNotifyDisconnectEvents

    // List of channels to send a connection close notification to.
    notificationSubscriberClose []chan *streadway.Error
    // notifyCloseEventHandlers is the list of event handles used to send NotifyClose
    // events.
    notifyCloseEventHandlers []amqpmiddleware.HandlerNotifyCloseEvents
    // Mutex for notification subscriber lists. Allows subscribers to be added during
    // an active redial loop.
    notificationSubscriberLock *sync.Mutex

// repeatErrCodes are errors that should cause an automatic reattempt of an operation.
// We want to automatically re-run any operations that fail
var repeatErrCodes = [3]int{

// isRepeatErr returns true if err is an error we should reattempt an operation on.
func isRepeatErr(err error) bool {
    // If there was an error, check and see if it is an error we should try again
    // on.

    // If this is an io.EOF error, there was probably an aborted connection during
    // broker startup. We can try again with a new channel.
    if errors.Is(err, io.EOF) {
        return true

    var streadwayErr *streadway.Error
    if errors.As(err, &streadwayErr) {
        for _, repeatCode := range repeatErrCodes {
            if streadwayErr.Code == repeatCode {
                return true

    return false

// revive:disable:context-as-argument - we have two contexts here, they can't both be
// first.

// retryOperationOnClosedSingle attempts a Connection or Channel channel method a single
// time.
func (manager *transportManager) retryOperationOnClosedSingle(
    opCtx context.Context,
    operation func(ctx context.Context) error,
) error {
    if opCtx.Err() != nil {
        return fmt.Errorf("operation context closed: %w", opCtx.Err())

    // Acquire the transportLock for read. This allow multiple operation to
    // occur at the same time, but blocks the connection from being switched
    // out until the operations resolve.
    // We don't need to worry about lock contention, as once the livesOnce
    // reconnection routine requests the lock, and new read acquisitions will
    // be blocked until the lock is acquired and released for write.
    defer manager.transportLock.RUnlock()

    err := operation(opCtx)
    // If there was an error, alert the redial routine in case streadway fails to.
    if err != nil {
        select {
        case manager.opErrorEncountered <- err:

    return err

// maxWait is the max amount of time a method should wait before trying again after
// multiple failures in a row.
const maxWait = time.Second * 5

// revive:enable:context-as-argument

// retryOperationOnClosed repeats operation / method call until a non-closed error is
// returned or ctx expires. This is a helper method for implementing methods like
// Channel.QueueBind, in which we want to retry the operation if our underlying
// livesOnce mechanism has connection issues.
func (manager *transportManager) retryOperationOnClosed(
    transportCtx context.Context,
    operation func(ctx context.Context) error,
    retry bool,
) error {
    attempt := 0
    for {
        opCtx := context.WithValue(
            transportCtx, amqpmiddleware.MetadataKey("opAttempt"), attempt,
        err := manager.retryOperationOnClosedSingle(opCtx, operation)

        // If there was no error, we are not trying, or this is not a repeat error,
        // or our context has cancelled: return.
        if err == nil || !retry || !isRepeatErr(err) || transportCtx.Err() != nil {
            return err

        // Otherwise retry the operation once the connection has been established.

        // We don't want to saturate the connection with retries if we are having
        // a hard time reconnecting.
        // We'll give one immediate retry, but after that start increasing how long
        // we need to wait before re-attempting.
        waitDur := time.Second / 2 * time.Duration(attempt-1)
        if waitDur > maxWait {
            waitDur = maxWait

// sendDialNotifications sends results of a dial attempt to all
// transportManager.NotifyDial subscribers.
func (manager *transportManager) sendDialNotifications(err error) {
    defer manager.notificationSubscriberLock.Unlock()

    // We are going to send the close error (if any) through the event handlers.
    event := amqpmiddleware.EventNotifyDial{
        TransportType: manager.transport.transportType(),
        Err:           err,

    // Notify all our close subscribers.
    for _, receiver := range manager.notifyDialEventHandlers {
        receiver(make(amqpmiddleware.EventMetadata), event)

// sendDisconnectNotifications sends the error from NotifyClose of the underlying
// connection when a disconnect occurs to all NotifyOnDisconnect subscribers.
func (manager *transportManager) sendDisconnectNotifications(streadwayErr error) {
    defer manager.notificationSubscriberLock.Unlock()

    // Even if the pointer is nil, setting an error field to a concrete nil pointer
    // results in a non-nil error interface, so we only want to set this if it's NOT
    // nil.
    var err error
    if streadwayErr != nil {
        err = streadwayErr

    // We need to send an explicit nil on a nil pointer as a nil pointer with a
    // concrete type is, weirdly, a non-nil error interface value.
    event := amqpmiddleware.EventNotifyDisconnect{
        TransportType: manager.transport.transportType(),
        Err:           err,

    // Notify all our close subscribers.
    for _, handler := range manager.notifyDisconnectEventHandlers {
        handler(make(amqpmiddleware.EventMetadata), event)

// sendCloseNotifications sends notification to all NotifyOnClose subscribers.
func (manager *transportManager) sendCloseNotifications(err *streadway.Error) {
    defer manager.notificationSubscriberLock.Unlock()

    // Notify all our close subscribers.
    event := amqpmiddleware.EventNotifyClose{
        TransportType: manager.transport.transportType(),
        Err:           err,
    for _, receiver := range manager.notifyCloseEventHandlers {
        receiver(make(amqpmiddleware.EventMetadata), event)

// we can get away with making methods like NotifyClose a non-pinter type because the
// method handlers have already captured a pointer to the manager.

// NotifyClose is as NotifyClose on streadway Connection/Channel.NotifyClose.
// Subscribers to Close events will not be notified when a reconnection occurs under
// the hood, only when the roger Connection or Channel is closed by calling the Close
// method. This mirrors the streadway implementation, where Close events are only sent
// once when the livesOnce object becomes unusable.
// For finer-grained connection status, see NotifyDial and NotifyDisconnect, which
// will both send individual events when the connection is lost or re-acquired.
func (manager *transportManager) NotifyClose(
    receiver chan *streadway.Error,
) chan *streadway.Error {
    args := amqpmiddleware.ArgsNotifyClose{
        Receiver:      receiver,
        TransportType: manager.transport.transportType(),
    return manager.handlers.notifyClose(manager.ctx, args).CallerChan

// NotifyDial is new for robust Roger transportType objects. NotifyDial will send all
// subscribers an event notification every time we try to re-acquire a connection. This
// will include both failure AND successes.
func (manager *transportManager) NotifyDial(
    receiver chan error,
) error {
    args := amqpmiddleware.ArgsNotifyDial{
        TransportType: manager.transport.transportType(),
        Receiver:      receiver,
    return manager.handlers.notifyDial(manager.ctx, args)

// NotifyDisconnect is new for robust Roger transportType objects. NotifyDisconnect will
// send all subscribers an event notification every time the underlying connection is
// lost.
func (manager *transportManager) NotifyDisconnect(
    receiver chan error,
) error {
    args := amqpmiddleware.ArgsNotifyDisconnect{
        TransportType: manager.transport.transportType(),
        Receiver:      receiver,
    return manager.handlers.notifyDisconnect(manager.ctx, args)

// cancelCtxCloseTransport cancels the main context and closes the underlying connection
// during shutdown.
func (manager *transportManager) cancelCtxCloseTransport() {
    // Grab the notification subscriber lock so new subscribers will not get added
    // without seeing the context cancel.

    // Cancel the context the tryReconnect this closure will cause exits.

    // Release the notification lock. Not doing so before we grab the livesOnce lock
    // can result in a deadlock if a redial is in process (since the redial needs to
    // grab the subscribers lock to notify them).

    // Take control of the connection lock to ensure all in-process operations have
    // completed.
    defer manager.transportLock.Unlock()

    // Close the current connection on exit
    defer manager.transport.underlyingTransport().Close()

// Close the robust connection. This both closes the current connection and keeps it
// from reconnecting.
func (manager *transportManager) Close() error {
    args := amqpmiddleware.ArgsClose{TransportType: manager.transport.transportType()}
    return manager.handlers.transportClose(manager.ctx, args)

// IsClosed returns true if the connection is marked as closed, otherwise false
// is returned.
// --
// ROGER NOTE: unlike streadway/amqp, which only implements IsClosed() on connection
// objects, rogerRabbit makes IsClosed() available on both connections and channels.
// IsClosed() will return false until the connection / channel is shut down, even if the
// underlying connection is currently disconnected and waiting to reconnectMiddleware.
func (manager *transportManager) IsClosed() bool {
    // If the context is cancelled, the livesOnce is closed.
    return manager.ctx.Err() != nil

// Test methods for the livesOnce
func (manager *transportManager) Test(t *testing.T) *TransportTesting {
    return &TransportTesting{
        tb:      t,
        manager: manager,

// newTransportManager returns a new transportManager for a given livesOnce.
func (manager *transportManager) setup(
    ctx context.Context,
    transport reconnects,
    middleware transportManagerMiddleware,
) {
    newCtx, cancelFunc := context.WithCancel(ctx)

    reconnectCount := uint64(0)

    manager.ctx = newCtx
    manager.cancelFunc = cancelFunc
    manager.transport = transport
    manager.transportLock = new(sync.RWMutex)
    manager.notificationSubscriberLock = new(sync.Mutex)
    manager.reconnectCond = sync.NewCond(manager.transportLock)
    manager.reconnectCount = &reconnectCount
    manager.opErrorEncountered = make(chan error, 1)

    // Create the base method handlers.
    manager.handlers = newTransportManagerHandlers(manager, middleware)

// TestReconnectSignaler allows us to block until a reconnection occurs during a test.
type TestReconnectSignaler struct {
    // The test we are using.
    tb testing.TB

    // reconnectSignal will close when a reconnection occurs.
    reconnectSignal chan struct{}

    original livesOnce
    manager  *transportManager

// WaitOnReconnect blocks until a reconnection of the underlying livesOnce occurs. Once
// the first reconnection event occurs, this object will no longer block and a new
// signaler will need to be created for the next re-connection.
// If no context is passed a context with 3-second timeout will be used.
func (signaler *TestReconnectSignaler) WaitOnReconnect(ctx context.Context) {
    if ctx == nil {
        var cancel context.CancelFunc
        ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
        defer cancel()

    for signaler.original == signaler.manager.transport.underlyingTransport() {
        select {
        case <-signaler.reconnectSignal:
        case <-ctx.Done():
                "context cancelled before reconnection occurred: %v", ctx.Err(),

// TransportTesting provides testing methods for testing Channel and Connection.
type TransportTesting struct {
    tb      testing.TB
    manager *transportManager
    // The number of times a connection has been blocked from being acquired.
    blocks *int32

// TransportLock is the underlying lock which controls access to the channel /
// connection. When held for read or write, a reconnection of the underlying livesOnce
// cannot occur.
func (tester *TransportTesting) TransportLock() *sync.RWMutex {
    return tester.manager.transportLock

// BlockReconnect blocks a livesOnce from reconnecting. If too few calls to
// UnblockReconnect() are made, the block will be removed at the end of the test.
func (tester *TransportTesting) BlockReconnect() {
    atomic.AddInt32(tester.blocks, 1)

// UnblockReconnect unblocks the livesOnce from reconnecting after calling
// BlockReconnect()
func (tester *TransportTesting) UnblockReconnect() {
    defer tester.manager.transportLock.RUnlock()
    atomic.AddInt32(tester.blocks, -1)

// cleanup calls UnblockReconnect the required number of times to unblock the channel
// from reconnecting during test cleanup.
func (tester *TransportTesting) cleanup() {
    blocks := *tester.blocks
    for i := int32(0); i < blocks; i++ {

// SignalOnReconnect returns a signaler that can be used to wait on the next
// reconnection event of the livesOnce.
func (tester *TransportTesting) SignalOnReconnect() *TestReconnectSignaler {
    // Signal that the connection has been re-established
    reconnected := make(chan struct{})

    // Grab the cond lock

    // Launch a routine to close the channel after the wait.
    go func() {
        defer tester.manager.reconnectCond.L.Unlock()
        defer close(reconnected)

    signaler := &TestReconnectSignaler{
        tb:              tester.tb,
        reconnectSignal: reconnected,
        original:        tester.manager.transport.underlyingTransport(),
        manager:         tester.manager,

    return signaler

// DisconnectTransport closes the underlying livesOnce to force a reconnection.
func (tester *TransportTesting) DisconnectTransport() {
    err := tester.manager.transport.underlyingTransport().Close()
    if !assert.NoError(tester.tb, err, "close underlying livesOnce") {

// ForceReconnect forces a disconnect of the channel or connection and waits for a
// reconnection to occur or ctx to cancel. If a nil context is passed, a context with
// a 3-second timeout will be used instead.
func (tester *TransportTesting) ForceReconnect(ctx context.Context) {
    if ctx == nil {
        var cancel context.CancelFunc
        ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
        defer cancel()

    // Get the current transport
    current := tester.manager.transport.underlyingTransport()
    // Wait until the underlying transport pointer has changed. The first time we signal
    // might be from the previous reconnect.

    for current == tester.manager.transport.underlyingTransport() {
        // Register a channel to be closed when a reconnection occurs
        reconnected := tester.SignalOnReconnect()

        // Disconnect the livesOnce

        // Wait for the connection to be re-connected.
        // If the context has cancelled, return.
        if ctx.Err() != nil {