connection.go
package amqprpc
import (
"errors"
amqp "github.com/rabbitmq/amqp091-go"
)
// ErrUnexpectedConnClosed is returned by ListenAndServe() if the server
// shuts down without calling Stop() and if AMQP does not give an error
// when said shutdown happens.
var ErrUnexpectedConnClosed = errors.New("unexpected connection close without specific error")
// OnStartedFunc can be registered at Server.OnStarted(f) and
// Client.OnStarted(f). This is used when you want to do more setup on the
// connections and/or channels from amqp, for example setting Qos,
// NotifyPublish etc.
type OnStartedFunc func(inputConn, outputConn *amqp.Connection, inputChannel, outputChannel *amqp.Channel)
// ExchangeDeclareSettings is the settings that will be used when a handler
// is mapped to a fanout exchange and an exchange is declared.
type ExchangeDeclareSettings struct {
// Durable sets the durable flag. Durable exchanges survives server restart.
Durable bool
// AutoDelete sets the auto-delete flag, this ensures the exchange is
// deleted when it isn't bound to any more.
AutoDelete bool
// Args sets the arguments table used.
Args amqp.Table
}
// QueueDeclareSettings is the settings that will be used when the response
// any kind of queue is declared. Se documentation for amqp.QueueDeclare
// for more information about these settings.
type QueueDeclareSettings struct {
// DeleteWhenUnused sets the auto-delete flag. It's recommended to have this
// set to false so that amqp-rpc can reconnect and use the same queue while
// keeping any messages in the queue.
DeleteWhenUnused bool
// Durable sets the durable flag. It's recommended to have this set to false
// and instead use ha-mode for queues and messages.
Durable bool
// Exclusive sets the exclusive flag when declaring queues. This flag has
// no effect on Clients reply-to queues which are never exclusive so it
// can support reconnects properly.
Exclusive bool
// Args sets the arguments table used.
Args amqp.Table
}
// ConsumeSettings is the settings that will be used when the consumption
// on a specified queue is started.
type ConsumeSettings struct {
// Consumer sets the consumer tag used when consuming.
Consumer string
// AutoAck sets the auto-ack flag. When this is set to false, you must
// manually ack any deliveries. This is always true for the Client when
// consuming replies.
AutoAck bool
// Exclusive sets the exclusive flag. When this is set to true, no other
// instances can consume from a given queue. This has no affect on the
// Client when consuming replies where it's always set to true so that no
// two clients can consume from the same reply-to queue.
Exclusive bool
// QoSPrefetchCount sets the prefetch-count. Set this to a value to ensure
// that amqp-rpc won't prefetch all messages in the queue. This has no
// effect on the Client which will always try to fetch everything.
QoSPrefetchCount int
// QoSPrefetchSize sets the prefetch-size. Set this to a value to ensure
// that amqp-rpc won't prefetch all messages in the queue.
QoSPrefetchSize int
// Args sets the arguments table used.
Args amqp.Table
}
// PublishSettings is the settings that will be used when a message is about
// to be published to the message bus. These settings are only used by the
// Client and never by the Server. For the server, Mandatory or Immediate
// can be set on the ResponseWriter instead.
type PublishSettings struct {
// Mandatory sets the mandatory flag. When this is true a Publish call will
// be returned if it's not routable by the exchange.
Mandatory bool
// Immediate sets the immediate flag. When this is true a Publish call will
// be returned if a consumer isn't directly available.
Immediate bool
// ConfirmMode puts the channel that messages are published over in
// confirm mode. This makes sending requests more reliable at the cost
// of some performance. Each publishing must be confirmed by the server.
// See https://www.rabbitmq.com/confirms.html#publisher-confirms
ConfirmMode bool
}
func monitorAndWait(restartChan, stopChan chan struct{}, amqpErrs ...chan *amqp.Error) (bool, error) {
result := make(chan error, len(amqpErrs))
// Setup monitoring for connections and channels, can be several connections and several channels.
// The first one closed will yield the error.
for _, errCh := range amqpErrs {
go func(c chan *amqp.Error) {
err, ok := <-c
if !ok {
result <- ErrUnexpectedConnClosed
return
}
result <- err
}(errCh)
}
select {
case err := <-result:
return true, err
case <-restartChan:
return true, nil
case <-stopChan:
return false, nil
}
}
func createConnections(url string, config amqp.Config) (conn1, conn2 *amqp.Connection, err error) {
conn1, err = amqp.DialConfig(url, config)
if err != nil {
return nil, nil, err
}
conn2, err = amqp.DialConfig(url, config)
if err != nil {
return nil, nil, err
}
return conn1, conn2, nil
}
func createChannels(inputConn, outputConn *amqp.Connection) (inputCh, outputCh *amqp.Channel, err error) {
inputCh, err = inputConn.Channel()
if err != nil {
return nil, nil, err
}
outputCh, err = outputConn.Channel()
if err != nil {
return nil, nil, err
}
return inputCh, outputCh, nil
}