pkg/amqp/eventRelayNotifyFlow.go
package amqp
import (
"context"
"github.com/peake100/rogerRabbit-go/pkg/amqp/amqpmiddleware"
streadway "github.com/streadway/amqp"
)
// notifyFlowRelay implements eventRelay for Channel.NotifyFlow.
type notifyFlowRelay struct {
// ChannelCtx is the context of the current channel
ChannelCtx context.Context
// CallerFlow is the channel we are relaying returns to from the broker
CallerFlow chan<- bool
// brokerFlow is the current broker channel we are pulling from.
brokerFlow <-chan bool
// setup is whether this relay has been setup before.
setup bool
// lastEvent is the value of the last event sent from the broker.
lastEvent bool
// handler is the event handler for the relay wrapped in all middleware.
handler amqpmiddleware.HandlerNotifyFlowEvents
}
func (relay *notifyFlowRelay) baseHandler() amqpmiddleware.HandlerNotifyFlowEvents {
return func(_ amqpmiddleware.EventMetadata, event amqpmiddleware.EventNotifyFlow) {
relay.CallerFlow <- event.FlowNotification
relay.lastEvent = event.FlowNotification
}
}
// SetupForRelayLeg implements eventRelay, and sets up a new source event channel from
// streadway/amqp.Channel.NotifyFlow.
func (relay *notifyFlowRelay) SetupForRelayLeg(newChannel *streadway.Channel) error {
// Check if this is our initial setup
if relay.setup {
// If we have already setup the relay once, that means we are opening a new
// channel, and should send a flow -> true to the caller as a fresh channel
// will not have flow turned off yet.
relay.handler(
createEventMetadata(-1, -1),
amqpmiddleware.EventNotifyFlow{FlowNotification: true},
)
} else {
relay.setup = true
}
// Set the last notification to true.
relay.lastEvent = true
brokerChannel := make(chan bool, cap(relay.CallerFlow))
relay.brokerFlow = brokerChannel
newChannel.NotifyFlow(brokerChannel)
return nil
}
// RunRelayLeg implements eventRelay, and relays streadway/amqp.Channel.NotifyFlow
// events to the original caller.
func (relay *notifyFlowRelay) RunRelayLeg(legNum int) (done bool) {
eventNum := int64(0)
for thisFlow := range relay.brokerFlow {
relay.handler(
createEventMetadata(legNum, eventNum),
amqpmiddleware.EventNotifyFlow{FlowNotification: thisFlow},
)
eventNum++
}
// Turn flow to false on broker disconnection if the roger channel has not been
// closed and the last notification sent was a ``true`` (we don'tb want to send two
// false values in a row).
if relay.ChannelCtx.Err() == nil && relay.lastEvent {
relay.handler(
createEventMetadata(-1, -1),
amqpmiddleware.EventNotifyFlow{FlowNotification: false},
)
}
return false
}
// Shutdown implements eventRelay, and closes the caller-facing event channel.
func (relay *notifyFlowRelay) Shutdown() error {
defer close(relay.CallerFlow)
return nil
}
// newNotifyReturnRelay creates a new notifyReturnRelay.
func newNotifyFlowRelay(
channelCtx context.Context,
callerFlowNotifications chan<- bool,
middleware []amqpmiddleware.NotifyFlowEvents,
) *notifyFlowRelay {
// Create the relay
relay := ¬ifyFlowRelay{
ChannelCtx: channelCtx,
CallerFlow: callerFlowNotifications,
}
// Apply all middleware to the handler
relay.handler = relay.baseHandler()
for _, thisMiddleware := range middleware {
relay.handler = thisMiddleware(relay.handler)
}
return relay
}