peake100/rogerRabbit-go

View on GitHub
pkg/amqp/connectionDial.go

Summary

Maintainability
A
0 mins
Test Coverage
package amqp

import (
    "context"
    "crypto/tls"
    "fmt"
)

// Dial accepts a string in the AMQP URI format and returns a new Connection
// over TCP using PlainAuth.  Defaults to a server heartbeat interval of 10
// seconds and sets the handshake deadline to 30 seconds. After handshake,
// deadlines are cleared.
//
// Dial uses the zero value of tls.Config when it encounters an amqps://
// scheme.  It is equivalent to calling DialTLS(amqp, nil).
func Dial(url string) (*Connection, error) {
    // Use the same default middlewares as streadway/amqp.
    config := DefaultConfig()

    // Make our initial connection.
    return DialConfig(url, config)
}

// DialConfig accepts a string in the AMQP URI format and a configuration for
// the livesOnce and connection setup, returning a new Connection.  Defaults to
// a server heartbeat interval of 10 seconds and sets the initial read deadline
// to 30 seconds.
func DialConfig(url string, config Config) (*Connection, error) {
    // Create the robust connection object.
    conn, err := newConnection(url, config)
    if err != nil {
        return nil, fmt.Errorf("error building connection value")
    }
    // Make our initial connection
    err = conn.reconnect(conn.ctx, false)
    if err != nil {
        return nil, err
    }
    return conn, nil
}

// DialTLS accepts a string in the AMQP URI format and returns a new Connection
// over TCP using PlainAuth.  Defaults to a server heartbeat interval of 10
// seconds and sets the initial read deadline to 30 seconds.
//
// DialTLS uses the provided tls.Config when encountering an amqps:// scheme.
func DialTLS(url string, amqps *tls.Config) (*Connection, error) {
    config := DefaultConfig()
    config.TLSClientConfig = amqps

    return DialConfig(url, config)
}

// DialConfigCtx is as DialConfig, but endlessly redials the connection until ctx is
// cancelled. Once returned, cancelling ctx does not affect the connection.
func DialConfigCtx(
    ctx context.Context, url string, config Config,
) (*Connection, error) {
    conn, err := newConnection(url, config)
    if err != nil {
        return nil, fmt.Errorf("error building connection value")
    }
    err = conn.reconnect(ctx, true)
    if err != nil {
        return nil, err
    }
    return conn, nil
}

// DialCtx is as Dial, but endlessly redials the connection until ctx is cancelled. Once
// returned, cancelling ctx does not affect the connection.
func DialCtx(ctx context.Context, url string) (*Connection, error) {
    // Use the same default middlewares as streadway/amqp.
    config := DefaultConfig()

    // Dial the connection.
    return DialConfigCtx(ctx, url, config)
}

// DialTLSCtx is as DialTLS, but endlessly redials the connection until ctx is
// cancelled. Once returned, cancelling ctx does not affect the connection.
func DialTLSCtx(ctx context.Context, url string, amqps *tls.Config) (*Connection, error) {
    config := DefaultConfig()
    config.TLSClientConfig = amqps

    return DialConfigCtx(ctx, url, config)
}