peake100/rogerRabbit-go

View on GitHub
pkg/roger/rconsumer/opts.go

Summary

Maintainability
A
0 mins
Test Coverage
package rconsumer

import (
    "fmt"
    "github.com/peake100/rogerRabbit-go/pkg/amqp"
    "github.com/peake100/rogerRabbit-go/pkg/internal"
    "github.com/peake100/rogerRabbit-go/pkg/roger/rconsumer/middleware"
    "github.com/rs/zerolog"
)

// Opts holds options for running a consumer.
type Opts struct {
    // The maximum number of workers that can be active at one time.
    maxWorkers int

    // The middleware we should add to each DeliveryProcessor.
    middleware Middleware

    // noLoggingMiddleware is true if we should not add logging middleware.
    noLoggingMiddleware bool
    // logger to use in the default logging middleware.
    logger zerolog.Logger
    // logSuccessLevel is the level at which the default logger should log a
    // successfully processed delivery.
    logSuccessLevel zerolog.Level
    // logSuccessLevel is the level at which the default logger should log the full
    // delivery object.
    logDeliveryLevel zerolog.Level
}

// it's correct that we are modifying value-receivers here. Disable the revive check
// for this.

// revive:disable:modifies-value-receiver

// WithMaxWorkers sets the maximum number of workers that can be running at the same
// time. If 0 or less, no limit will be used.
//
// Default: 0.
func (opts Opts) WithMaxWorkers(max int) Opts {
    opts.maxWorkers = max
    return opts
}

// WithMiddleware sets the Middleware to use. Default: includes all default middleware
// in the consumer/middleware package.
//
// Default: Middleware{}
func (opts Opts) WithMiddleware(processorMiddleware Middleware) Opts {
    opts.middleware = processorMiddleware
    return opts
}

// WithDefaultLogging enables the default zerolog.Logger logging middleware. If false
// all other logging settings have no effect.
//
// Default: true
func (opts Opts) WithDefaultLogging(log bool) Opts {
    opts.noLoggingMiddleware = !log
    return opts
}

// WithLogger sets the zerolog.Logger for the default logging middleware to use
// If WithDefaultLogging is false, this setting has no effect.
//
// Default: lockless, pretty-printed logger set to Info level.
func (opts Opts) WithLogger(logger zerolog.Logger) Opts {
    opts.logger = logger
    return opts.WithLoggingLevel(logger.GetLevel())
}

// WithLoggingLevel sets the level of the logger passed to WithLogger.
func (opts Opts) WithLoggingLevel(level zerolog.Level) Opts {
    opts.logger = opts.logger.Level(level)
    return opts
}

// WithLogSuccessLevel is the minimum logging level to log a successful delivery at.
//
// Default: zerolog.DebugLevel.
func (opts Opts) WithLogSuccessLevel(level zerolog.Level) Opts {
    opts.logSuccessLevel = level
    return opts
}

// WithLogDeliveryLevel is the minimum logging level to log the full delivery object at.
//
// Default: zerolog.DebugLevel.
func (opts Opts) WithLogDeliveryLevel(level zerolog.Level) Opts {
    opts.logDeliveryLevel = level
    return opts
}

// revive:enable:modifies-value-receiver

// DefaultOpts returns new Opts object with default settings.
func DefaultOpts() Opts {
    opts := Opts{
        maxWorkers: 0,
        middleware: Middleware{
            providers: make(map[middleware.ProviderTypeID]struct{}),
        },
        noLoggingMiddleware: false,
        logger:              internal.CreateDefaultLogger(zerolog.InfoLevel),
        logSuccessLevel:     zerolog.DebugLevel,
        logDeliveryLevel:    zerolog.ErrorLevel,
    }

    return opts
}

// Middleware holds the middleware to register on a consumer.
type Middleware struct {
    // setupChannel is all SetupChannel middlewares to register on the processor.
    setupChannel []middleware.SetupChannel
    // delivery is all Delivery middlewares to register on the processor.
    delivery []middleware.Delivery
    // cleanupChannel is all CleanupChannel middlewares to register on the processor.
    cleanupChannel []middleware.CleanupChannel

    // providerFactories are middleware provider constructors to run for each processor
    // passed to a consumer.
    providerFactories []func() middleware.ProvidesMiddleware

    // providers tracks the type IDs of providers passed to this config.
    providers map[middleware.ProviderTypeID]struct{}
}

// AddSetupChannel adds a middleware.SetupChannel to be added to each
// DeliveryProcessor.SetupChannel passed to a Consumer.
func (config *Middleware) AddSetupChannel(processorMiddleware middleware.SetupChannel) {
    config.setupChannel = append(config.setupChannel, processorMiddleware)
}

// AddDelivery adds a middleware.Delivery to be added to each
// DeliveryProcessor.HandleDelivery passed to a Consumer.
func (config *Middleware) AddDelivery(processorMiddleware middleware.Delivery) {
    config.delivery = append(config.delivery, processorMiddleware)
}

// AddCleanupChannel adds a middleware.CleanupChannel to be added to each
// DeliveryProcessor.CleanupChannel passed to a Consumer.
func (config *Middleware) AddCleanupChannel(processorMiddleware middleware.CleanupChannel) {
    config.cleanupChannel = append(config.cleanupChannel, processorMiddleware)
}

// AddProvider adds consume middleware provided by methods of provider.
func (config *Middleware) AddProvider(provider middleware.ProvidesMiddleware) error {
    return config.addProviderMethods(provider)
}

// AddCleanupChannel adds a middleware.CleanupChannel to be added to each
// DeliveryProcessor.CleanupChannel passed to a Consumer.
func (config *Middleware) addProviderMethods(provider middleware.ProvidesMiddleware) error {
    // Check if this provider has already been registered.
    if _, ok := config.providers[provider.TypeID()]; ok {
        return amqp.ErrDuplicateProvider
    }

    // Register it's methods.
    methodsFound := false
    if hasMethod, ok := provider.(middleware.ProvidesSetupChannel); ok {
        config.AddSetupChannel(hasMethod.SetupChannel)
        methodsFound = true
    }

    if hasMethod, ok := provider.(middleware.ProvidesDelivery); ok {
        config.AddDelivery(hasMethod.Delivery)
        methodsFound = true
    }

    if hasMethod, ok := provider.(middleware.ProvidesCleanupChannel); ok {
        config.AddCleanupChannel(hasMethod.CleanupChannel)
        methodsFound = true
    }

    if !methodsFound {
        return amqp.ErrNoMiddlewareMethods
    }

    // Add the provider to our cache.
    config.providers[provider.TypeID()] = struct{}{}

    return nil
}

// buildProviderFactories creates new providers from all registered factories and
// registers their methods.
func (config *Middleware) buildProviderFactories() error {
    for _, thisFactory := range config.providerFactories {
        provider := thisFactory()
        err := config.addProviderMethods(provider)
        if err != nil {
            return fmt.Errorf(
                "could not reegister middleware provider '%v': %w", provider.TypeID(), err,
            )
        }
    }

    return nil
}