
View on GitHub


0 mins
Test Coverage
package amqp

import (
    streadway ""

// reconnect establishes a new underlying connection and sets up a listener for it's
// closure.
func (manager *transportManager) reconnect(ctx context.Context, retry bool) error {
    // This may be called directly by Dial methods. It's okay NOT to use the lock here
    // since the caller won't be handed back the Connection or Channel until the initial
    // one is established.
    // Once the first connection is established, reconnectListenForClose will grab
    // the lock immediately on a disconnect.

    // Redial the broker until we reconnectMiddleware
    closeChan, err := manager.reconnectRedial(ctx, retry)
    if err != nil {
        return err

    // Broadcast that we have made a successful reconnection to any one-time listeners.

    // Launch a goroutine to reconnectMiddleware on connection closure.
    go manager.reconnectListenForClose(closeChan)

    return nil

// reconnectListenForClose listens for a close event from the underlying livesOnce, and
// starts the reconnection process.
func (manager *transportManager) reconnectListenForClose(closeChan <-chan *streadway.Error) {
    // Wait for a disconnection event.
    disconnectEvent := manager.receiveDisconnectEventAndLockChannel(closeChan)

    // receiveDisconnectEventAndLockChannel already grabbed our transport lock for
    // write, so we just need to release the lock when we exit.
    defer manager.transportLock.Unlock()

    // Send a disconnect event to all interested subscribers.

    // Exit if our context has been cancelled.
    if manager.ctx.Err() != nil {

    // Now that we have an initial connection, we use our internal context and retry
    // on failure.
    _ = manager.reconnect(manager.ctx, true)

func (manager *transportManager) receiveDisconnectEventAndLockChannel(notifyClose <-chan *streadway.Error) error {
    // Wait for the current connection to close
    var disconnectEvent error

    select {
    // Get an event from the current underlying channel. Both of these paths will
    // immediately lock the channel so we don't waste any time in blocking methods
    // and reduce returned errors to the caller.
    case streadwayEvent := <-notifyClose:
        // Lock access to the connection and don't unlock until we have reconnected.

        // If the event is not nil, use it as our error. We need to do this because
        // a nil *streadway.Error pointer is still a non-nil error interface value.
        if streadwayEvent != nil {
            disconnectEvent = streadwayEvent
    // Or get a report from an operation that there was an error in the event that
    // streadway fails to signal us.
    case disconnectEvent = <-manager.opErrorEncountered:
        // Lock access to the connection and don't unlock until we have reconnected.

        // Give streadway an extra 50ms to signal. We prefer to report it's error.
        timer := time.NewTimer(50 * time.Millisecond)
        defer timer.Stop()

        select {
        case streadwayEvent := <-notifyClose:
            // If it does, use this as the disconnection event.
            if streadwayEvent != nil {
                disconnectEvent = streadwayEvent
        case <-timer.C:

    return disconnectEvent

// reconnectRedial tries to reconnect the livesOnce until successful or ctx is
// cancelled.
func (manager *transportManager) reconnectRedial(ctx context.Context, retry bool) (chan *streadway.Error, error) {
    // Endlessly redial the broker
    attempt := 0

    for {
        // Check to see if our context has been cancelled, and exit if so.
        if ctx.Err() != nil {
            return nil, ctx.Err()

        closeNotifications, err := manager.reconnectRedialOnce(ctx, attempt)
        // If no error OR there is an error and retry is false return.
        if err == nil || (err != nil && !retry) {
            return closeNotifications, err

        // We don't want to saturate the connection with retries if we are having
        // a hard time reconnecting.
        // We'll give one immediate retry, but after that start increasing how long
        // we need to wait before re-attempting.
        waitDur := time.Second / 2 * time.Duration(attempt-1)
        if waitDur > maxWait {
            waitDur = maxWait

// reconnectRedialOnce attempts to reconnect the livesOnce a single time.
func (manager *transportManager) reconnectRedialOnce(ctx context.Context, attempt int) (chan *streadway.Error, error) {
    opCtx := context.WithValue(ctx, amqpmiddleware.MetadataKey("opAttempt"), attempt)

    // Replace the operation error alert. It might have another error in from the
    // middleware of the last attempt.
    manager.opErrorEncountered = make(chan error, 1)

    // Make the connection.
    closeNotifications, err := manager.transport.tryReconnect(
        opCtx, atomic.LoadUint64(manager.reconnectCount)+uint64(attempt),
    // Send a notification to all listeners subscribed to dial events.
    if err != nil {
        // Otherwise, return (and possibly try again).
        return nil, err

    // Increment our reconnection count tracker.
    atomic.AddUint64(manager.reconnectCount, 1)

    // If there was no error, break out of the loop.
    return closeNotifications, nil