pkg/amqp/defaultmiddlewares/routeDeclaration.go
package defaultmiddlewares
import (
"context"
"errors"
"fmt"
"github.com/peake100/rogerRabbit-go/pkg/amqp/amqpmiddleware"
streadway "github.com/streadway/amqp"
"sync"
)
// RouteDeclarationMiddlewareID can be used to retrieve the running instance of
// RouteDeclarationMiddleware during testing.
const RouteDeclarationMiddlewareID amqpmiddleware.ProviderTypeID = "DefaultRouteDeclaration"
// RouteDeclarationMiddleware implements handlers for re-declaring queues, exchanges,
// and bindings upon reconnectMiddleware.
type RouteDeclarationMiddleware struct {
// declareQueues is a map queues that must be declared upon re-establishing the
// channel. We use a map so we can remove queues from this list on queue delete.
declareQueues *sync.Map
// declareExchanges us a map of exchanges that must be declared upon re-establishing
// the channel. We use a map so we can remove exchanges from this list on exchange
// delete.
declareExchanges *sync.Map
// bindQueues is a list of bindings to re-build on channel re-establishment.
bindQueues []amqpmiddleware.ArgsQueueBind
// bindQueuesLock must be acquired to alter bindQueues.
bindQueuesLock *sync.Mutex
// bindExchanges is a list of bindings to re-build on channel re-establishment.
bindExchanges []amqpmiddleware.ArgsExchangeBind
// bindExchangesLock must be acquired to alter bindQueues.
bindExchangesLock *sync.Mutex
}
// TypeID implements amqpmiddleware.ProvidesMiddleware and returns a static type ID for
// retrieving the active middleware value during testing.
func (middleware *RouteDeclarationMiddleware) TypeID() amqpmiddleware.ProviderTypeID {
return RouteDeclarationMiddlewareID
}
// removeQueue Removes a queue from the list of queues to be redeclared on
// reconnectMiddleware.
func (middleware *RouteDeclarationMiddleware) removeQueue(queueName string) {
// Remove the queue.
middleware.declareQueues.Delete(queueName)
// Remove all binding commands associated with this queue from the re-bind on
// reconnectMiddleware list.
middleware.removeQueueBindings(
queueName,
"",
"",
)
}
// removeQueueBindingOpts holds information about a queue to be removed.
type removeQueueBindingOpts struct {
// queueName is the name to remove bindings for.
queueName string
// exchangeName is an exchange name to remove bindings for
exchangeName string
// routingKey is the key on the exchange to remove bindings for
routingKey string
// removeQueueMatch: when true, allows the removal of a binding if queue name
// matches
removeQueueMatch bool
// removeExchangeMatch: when true, allows the removal of a binding if exchange
// name matches
removeExchangeMatch bool
// removeExchangeMatch: when true, allows the removal of a binding if route name
// matches
removeRouteMatch bool
}
// removeQueueBindingOk compares the original args a queue bind was made with and
// a removeQueueBindingOpts to see if a queue should be removed.
func removeQueueBindingOk(
binding amqpmiddleware.ArgsQueueBind, opts removeQueueBindingOpts,
) bool {
// If there is a routing Key to match, then the queue and exchange must match
// too (so we don't end up removing a binding with the same routing Key between
// a different queue-exchange pair).
if opts.removeRouteMatch &&
binding.Key == opts.routingKey &&
binding.Name == opts.queueName &&
binding.Exchange == opts.exchangeName {
// then:
return true
}
if opts.removeQueueMatch && binding.Name == opts.queueName {
return true
}
if opts.removeExchangeMatch && binding.Exchange == opts.exchangeName {
return true
}
return false
}
// removeQueueBindingsFromSlice iterates over all queue bindings, and removes any
// relevant to the detailed removeQueueBindingOpts.
func (middleware *RouteDeclarationMiddleware) removeQueueBindingsFromSlice(
opts removeQueueBindingOpts,
) {
// Rather than creating a new slice, we are going to filter out any matching
// bind declarations we find, then constrain the slice to the number if items
// we have left.
i := 0
for _, thisBinding := range middleware.bindQueues {
if removeQueueBindingOk(thisBinding, opts) {
continue
}
middleware.bindQueues[i] = thisBinding
i++
}
middleware.bindQueues = middleware.bindQueues[0:i]
}
// removeQueueBindings removes a re-connection queue binding when a queue, exchange, or
// binding is removed.
func (middleware *RouteDeclarationMiddleware) removeQueueBindings(
queueName string,
exchangeName string,
routingKey string,
) {
middleware.bindQueuesLock.Lock()
defer middleware.bindQueuesLock.Unlock()
middleware.removeQueueBindingsFromSlice(
removeQueueBindingOpts{
queueName: queueName,
exchangeName: exchangeName,
routingKey: routingKey,
// If the queue name is non-empty, remove on queue match (will remove all
// bindings for the queue).
removeQueueMatch: queueName != "",
// If the exchange name is non-empty, remove on exchange match (will remove
//all bindings for the exchange).
removeExchangeMatch: exchangeName != "",
// If the routeingKey is non-nil, only remove bindings where the routing
// key, queue, and exchange all match (that exact binding).
removeRouteMatch: routingKey != "",
},
)
}
// removeExchange removes an exchange from the re-declaration list, as well as all queue
// and inter-exchange bindings it was a part of.
func (middleware *RouteDeclarationMiddleware) removeExchange(exchangeName string) {
// Remove the exchange from the list of exchanges we need to re-declare
middleware.declareExchanges.Delete(exchangeName)
// Remove all bindings associated with this exchange from the list of bindings
// to re-declare on re-connections.
middleware.removeQueueBindings("", exchangeName, "")
middleware.removeExchangeBindings(
exchangeName, "", exchangeName, true,
)
}
// removeExchangeBindingOpts details the information for removing exchange bindings.
type removeExchangeBindingOpts struct {
// destination exchange requested for removal.
destination string
// key: routing key requested for removal.
key string
// source exchange requested for removal.
source string
// When a queue is deleted we need to remove any binding where the source or
// destination matches
destinationOrSource bool
// removeDestinationMatch: when true, remove any bindings were destination matches
// the binding.
removeDestinationMatch bool
// removeDestinationMatch: when true, remove any bindings were key matches
// the binding.
removeKeyMatch bool
// removeDestinationMatch: when true, remove any bindings were source matches
// the binding.
removeSourceMatch bool
}
// removeExchangeBindingOk returns true is binding made with
// amqpmiddleware.ArgsExchangeBind should be removed based on removeExchangeBindingOpts.
func removeExchangeBindingOk(
binding amqpmiddleware.ArgsExchangeBind, opts removeExchangeBindingOpts,
) bool {
// If there is a routing Key to match, then the source and destination exchanges
// must match too (so we don't end up removing a binding with the same routing
// Key between a different isSet of exchanges).
if opts.removeKeyMatch &&
binding.Key == opts.key &&
binding.Source == opts.source &&
binding.Destination == opts.destination {
// then:
return true
}
if opts.removeDestinationMatch && binding.Destination == opts.destination {
return true
}
if opts.removeSourceMatch && binding.Source == opts.source {
return true
}
return false
}
// removeExchangeBindingsFromSlice removes all exchange bindings relevant to
// removeExchangeBindingOpts.
func (middleware *RouteDeclarationMiddleware) removeExchangeBindingsFromSlice(
opts removeExchangeBindingOpts,
) {
// Rather than creating a new slice, we are going to filter out any matching
// bind declarations we find, then constrain the slice to the number if items
// we have left.
i := 0
for _, thisBinding := range middleware.bindExchanges {
// If we are to remove the binding, continue.
if removeExchangeBindingOk(thisBinding, opts) {
continue
}
middleware.bindExchanges[i] = thisBinding
i++
}
middleware.bindQueues = middleware.bindQueues[0:i]
}
// removeExchangeBindings removes a re-connection binding when a binding or exchange is
// removed.
func (middleware *RouteDeclarationMiddleware) removeExchangeBindings(
destination string,
key string,
source string,
// When a queue is deleted we need to remove any binding where the source or
// destination matches
destinationOrSource bool,
) {
middleware.bindQueuesLock.Lock()
defer middleware.bindQueuesLock.Unlock()
middleware.removeExchangeBindingsFromSlice(
removeExchangeBindingOpts{
destination: destination,
key: key,
source: source,
destinationOrSource: destinationOrSource,
removeDestinationMatch: destination != "" || destinationOrSource,
removeKeyMatch: key != "",
removeSourceMatch: source != "" || destinationOrSource,
},
)
}
// reconnectDeclareQueues re-declares all previously declared queues on a amqp.Channel
// reconnection.
func (middleware *RouteDeclarationMiddleware) reconnectDeclareQueues(
channel *streadway.Channel,
) error {
var err error
redeclareQueues := func(key, value interface{}) bool {
thisQueue := value.(amqpmiddleware.ArgsQueueDeclare)
// By default, we will passively declare a queue. This allows us to respect
// queue deletion by other producers or consumers.
method := channel.QueueDeclarePassive
// UNLESS it is an auto-delete queue. Such a queue may have been cleaned up
// by the broker and should be fully re-declared on reconnectMiddleware.
// TODO: add ability to configure whether or not a passive declare should be
// used.
if true {
method = channel.QueueDeclare
}
_, err = method(
thisQueue.Name,
thisQueue.Durable,
thisQueue.AutoDelete,
thisQueue.Exclusive,
// We need to wait and confirm this gets received before moving on
false,
thisQueue.Args,
)
if err != nil {
var streadwayErr *streadway.Error
if errors.As(err, &streadwayErr) &&
streadwayErr.Code == streadway.NotFound {
// THEN:
// If this is a passive declare, we can get a 404 NOT_FOUND error. If we
// do, then we should remove this queue from the list of queues that is
// to be re-declared, so that we don't get caught in an endless loop
// of reconnects.
middleware.removeQueue(thisQueue.Name)
}
err = fmt.Errorf(
"error re-declaring queue '%v': %w", thisQueue.Name, err,
)
return false
}
return true
}
// Redeclare all queues in the map.
middleware.declareQueues.Range(redeclareQueues)
return err
}
// reconnectDeclareExchanges re-declares all exchanges previously declared on am
// amqp.Channel during reconnection.
func (middleware *RouteDeclarationMiddleware) reconnectDeclareExchanges(
channel *streadway.Channel,
) error {
var err error
redeclareExchanges := func(key, value interface{}) bool {
thisExchange := value.(amqpmiddleware.ArgsExchangeDeclare)
// By default, we will passively declare a queue. This allows us to respect
// queue deletion by other producers or consumers.
method := channel.ExchangeDeclarePassive
// UNLESS it is an auto-delete queue. Such a queue may have been cleaned up
// by the broker and should be fully re-declared on reconnectMiddleware.
// TODO: add ability to configure whether or not a passive declare should be
// used.
if true {
method = channel.ExchangeDeclare
}
err = method(
thisExchange.Name,
thisExchange.Kind,
thisExchange.Durable,
thisExchange.AutoDelete,
thisExchange.Internal,
// we are going to wait so this is done synchronously.
false,
thisExchange.Args,
)
if err != nil {
var streadwayErr *streadway.Error
if errors.As(err, &streadwayErr) &&
streadwayErr.Code == streadway.NotFound {
// If this is a passive declare, we can get a 404 NOT_FOUND error. If we
// do, then we should remove this queue from the list of queues that is
// to be re-declared, so that we don't get caught in an endless loop
// of reconnects.
middleware.removeExchange(thisExchange.Name)
}
err = fmt.Errorf(
"error re-declaring exchange '%v': %w", thisExchange.Name, err,
)
return false
}
return true
}
// Redeclare all queues in the map.
middleware.declareExchanges.Range(redeclareExchanges)
return err
}
// reconnectBindQueues re-declares queue bindings previously made on an amqp.Channel
// during reconnection.
func (middleware *RouteDeclarationMiddleware) reconnectBindQueues(
channel *streadway.Channel,
) error {
// We shouldn't meed to lock this resource here, since this method will only be
// used when we have a write lock on the transport, and all methods that modify the
// binding list must first acquire the same lock for read, but we will put this here
// in case that changes in the future.
middleware.bindQueuesLock.Lock()
defer middleware.bindQueuesLock.Unlock()
for _, thisBinding := range middleware.bindQueues {
err := channel.QueueBind(
thisBinding.Name,
thisBinding.Key,
thisBinding.Exchange,
false,
thisBinding.Args,
)
if err != nil {
return fmt.Errorf(
"error re-binding queue '%v' to exchange '%v' with routing Key"+
" '%v': %w",
thisBinding.Name,
thisBinding.Exchange,
thisBinding.Key,
err,
)
}
}
return nil
}
// reconnectBindExchanges re-declares all exchange bindings previously made on an
// amqp.Channel during reconnection.
func (middleware *RouteDeclarationMiddleware) reconnectBindExchanges(
channel *streadway.Channel,
) error {
// We shouldn't meed to lock this resource here, since this method will only be
// used when we have a write lock on the transport, and all methods that modify the
// binding list must first acquire the same lock for read, but we will put this here
// in case that changes in the future.
middleware.bindExchangesLock.Lock()
defer middleware.bindExchangesLock.Unlock()
for _, thisBinding := range middleware.bindExchanges {
err := channel.ExchangeBind(
thisBinding.Destination,
thisBinding.Key,
thisBinding.Source,
false,
thisBinding.Args,
)
if err != nil {
return fmt.Errorf(
"error re-binding source exchange '%v' to destination exchange"+
" '%v' with routing Key '%v': %w",
thisBinding.Source,
thisBinding.Destination,
thisBinding.Key,
err,
)
}
}
return nil
}
// reconnectHandler re-establishes queue and exchange topologies on a channel
// reconnection event.
func (middleware *RouteDeclarationMiddleware) reconnectHandler(
ctx context.Context,
args amqpmiddleware.ArgsChannelReconnect,
next amqpmiddleware.HandlerChannelReconnect,
) (amqpmiddleware.ResultsChannelReconnect, error) {
results, err := next(ctx, args)
// If there was an error, pass it up the chain.
if err != nil {
return results, err
}
err = middleware.reconnectDeclareQueues(results.Channel)
if err != nil {
return results, err
}
err = middleware.reconnectDeclareExchanges(results.Channel)
if err != nil {
return results, err
}
err = middleware.reconnectBindExchanges(results.Channel)
if err != nil {
return results, err
}
err = middleware.reconnectBindQueues(results.Channel)
if err != nil {
return results, err
}
return results, nil
}
// ChannelReconnect is invoked on reconnection of the underlying amqp Channel, and makes
// sure our queue and exchange topology is re-configured to present a seamless
// experience to the caller.
func (middleware *RouteDeclarationMiddleware) ChannelReconnect(
next amqpmiddleware.HandlerChannelReconnect,
) (handler amqpmiddleware.HandlerChannelReconnect) {
handler = func(
ctx context.Context, args amqpmiddleware.ArgsChannelReconnect,
) (amqpmiddleware.ResultsChannelReconnect, error) {
return middleware.reconnectHandler(ctx, args, next)
}
return handler
}
// QueueDeclare captures amqp.Channel.QueueDeclare() calls and stores their arguments
// for re-declaring channels on disconnect.
func (middleware *RouteDeclarationMiddleware) QueueDeclare(
next amqpmiddleware.HandlerQueueDeclare,
) (handler amqpmiddleware.HandlerQueueDeclare) {
handler = func(
ctx context.Context, args amqpmiddleware.ArgsQueueDeclare,
) (amqpmiddleware.ResultsQueueDeclare, error) {
// If there is any sort of error, pass it on.
results, err := next(ctx, args)
if err != nil {
return results, err
}
// Store the results name so we can re-declare it
middleware.declareQueues.Store(args.Name, args)
return results, err
}
return handler
}
// QueueDelete captures amqp.Channel.QueueDelete() calls and removes all relevant
// saved queues and bindings so they are not re-declared on a channel reconnect.
func (middleware *RouteDeclarationMiddleware) QueueDelete(
next amqpmiddleware.HandlerQueueDelete,
) (handler amqpmiddleware.HandlerQueueDelete) {
// If there is any sort of error, pass it on.
handler = func(
ctx context.Context, args amqpmiddleware.ArgsQueueDelete,
) (amqpmiddleware.ResultsQueueDelete, error) {
results, err := next(ctx, args)
if err != nil {
return results, err
}
// Remove the queue from our list of queue to redeclare.
middleware.removeQueue(args.Name)
return results, err
}
return handler
}
// QueueBind captures amqp.Channel.QueueBind() saves queue bind arguments, so they can
// be re-declared on channel reconnection.
func (middleware *RouteDeclarationMiddleware) QueueBind(
next amqpmiddleware.HandlerQueueBind,
) (handler amqpmiddleware.HandlerQueueBind) {
// If there is any sort of error, pass it on.
handler = func(ctx context.Context, args amqpmiddleware.ArgsQueueBind) error {
err := next(ctx, args)
if err != nil {
return err
}
middleware.bindQueuesLock.Lock()
defer middleware.bindQueuesLock.Unlock()
middleware.bindQueues = append(middleware.bindQueues, args)
return nil
}
return handler
}
// QueueUnbind captures amqp.Channel.QueueUnbind() calls and removes all relevant saved
// bindings so they are not re-declared on a channel reconnect.
func (middleware *RouteDeclarationMiddleware) QueueUnbind(
next amqpmiddleware.HandlerQueueUnbind,
) (handler amqpmiddleware.HandlerQueueUnbind) {
handler = func(ctx context.Context, args amqpmiddleware.ArgsQueueUnbind) error {
// If there is any sort of error, pass it on.
err := next(ctx, args)
if err != nil {
return err
}
// Remove this binding from the list of bindings to re-create on reconnect.
middleware.removeQueueBindings(
args.Name,
args.Exchange,
args.Key,
)
return nil
}
return handler
}
// ExchangeDeclare captures amqp.Channel.ExchangeDeclare() saves passed arguments so
// exchanges can be re-declared on channel reconnection.
func (middleware *RouteDeclarationMiddleware) ExchangeDeclare(
next amqpmiddleware.HandlerExchangeDeclare,
) (handler amqpmiddleware.HandlerExchangeDeclare) {
handler = func(ctx context.Context, args amqpmiddleware.ArgsExchangeDeclare) error {
// If there is any sort of error, pass it on.
err := next(ctx, args)
if err != nil {
return err
}
middleware.declareExchanges.Store(args.Name, args)
return nil
}
return handler
}
// ExchangeDelete captures amqp.Channel.ExchangeDelete() calls and removes all relevant
// saved exchanges so they are not re-declared on a channel reconnect.
func (middleware *RouteDeclarationMiddleware) ExchangeDelete(
next amqpmiddleware.HandlerExchangeDelete,
) (handler amqpmiddleware.HandlerExchangeDelete) {
handler = func(ctx context.Context, args amqpmiddleware.ArgsExchangeDelete) error {
// If there is any sort of error, pass it on.
err := next(ctx, args)
if err != nil {
return err
}
// Remove the exchange from our re-declare on reconnect lists.
middleware.removeExchange(args.Name)
return nil
}
return handler
}
// ExchangeBind captures amqp.Channel.ExchangeBind() saves passed arguments, so exchange
// bindings can be re-declared on channel reconnection.
func (middleware *RouteDeclarationMiddleware) ExchangeBind(
next amqpmiddleware.HandlerExchangeBind,
) (handler amqpmiddleware.HandlerExchangeBind) {
// If there is any sort of error, pass it on.
handler = func(ctx context.Context, args amqpmiddleware.ArgsExchangeBind) error {
err := next(ctx, args)
if err != nil {
return err
}
// Store this binding so we can re-bind it if we lose and regain the connection.
middleware.bindExchangesLock.Lock()
defer middleware.bindExchangesLock.Unlock()
middleware.bindExchanges = append(middleware.bindExchanges, args)
return nil
}
return handler
}
// ExchangeUnbind captures amqp.Channel.ExchangeUnbind() calls and removes all relevant
// saved bindings so they are not re-declared on a channel reconnect.
func (middleware *RouteDeclarationMiddleware) ExchangeUnbind(
next amqpmiddleware.HandlerExchangeUnbind,
) (handler amqpmiddleware.HandlerExchangeUnbind) {
// If there is any sort of error, pass it on.
handler = func(ctx context.Context, args amqpmiddleware.ArgsExchangeUnbind) error {
err := next(ctx, args)
if err != nil {
return err
}
middleware.removeExchangeBindings(
args.Destination, args.Key, args.Source, false,
)
return nil
}
return handler
}
// NewRouteDeclarationMiddleware creates a new RouteDeclarationMiddleware.
func NewRouteDeclarationMiddleware() amqpmiddleware.ProvidesMiddleware {
middleware := &RouteDeclarationMiddleware{
declareQueues: new(sync.Map),
declareExchanges: new(sync.Map),
bindQueues: nil,
bindQueuesLock: new(sync.Mutex),
bindExchanges: nil,
bindExchangesLock: new(sync.Mutex),
}
return middleware
}