xmidt-org/caduceus

View on GitHub
senderWrapper.go

Summary

Maintainability
A
1 hr
Test Coverage
// SPDX-FileCopyrightText: 2021 Comcast Cable Communications Management, LLC
// SPDX-License-Identifier: Apache-2.0
package main

import (
    "errors"
    "sync"
    "time"

    "github.com/go-kit/kit/metrics"
    "github.com/xmidt-org/ancla"
    "github.com/xmidt-org/wrp-go/v3"
    "go.uber.org/zap"
)

// SenderWrapperFactory configures the CaduceusSenderWrapper for creation
type SenderWrapperFactory struct {
    // The number of workers to assign to each OutboundSender created.
    NumWorkersPerSender int

    // The queue size to assign to each OutboundSender created.
    QueueSizePerSender int

    // The cut off time to assign to each OutboundSender created.
    CutOffPeriod time.Duration

    // Number of delivery retries before giving up
    DeliveryRetries int

    // Time in between delivery retries
    DeliveryInterval time.Duration

    // The amount of time to let expired OutboundSenders linger before
    // shutting them down and cleaning up the resources associated with them.
    Linger time.Duration

    // Metrics registry.
    MetricsRegistry CaduceusMetricsRegistry

    // The metrics counter for dropped messages due to invalid payloads
    DroppedMsgCounter metrics.Counter

    EventType metrics.Counter

    // The logger implementation to share with OutboundSenders.
    Logger *zap.Logger

    // The http client Do() function to share with OutboundSenders.
    Sender httpClient

    // CustomPIDs is a custom list of allowed PartnerIDs that will be used if a message
    // has no partner IDs.
    CustomPIDs []string

    // DisablePartnerIDs dictates whether or not to enforce the partner ID check.
    DisablePartnerIDs bool
}

type SenderWrapper interface {
    Update([]ancla.InternalWebhook)
    Queue(*wrp.Message)
    Shutdown(bool)
}

// CaduceusSenderWrapper contains no external parameters.
type CaduceusSenderWrapper struct {
    sender              httpClient
    numWorkersPerSender int
    queueSizePerSender  int
    deliveryRetries     int
    deliveryInterval    time.Duration
    cutOffPeriod        time.Duration
    linger              time.Duration
    logger              *zap.Logger
    mutex               sync.RWMutex
    senders             map[string]OutboundSender
    metricsRegistry     CaduceusMetricsRegistry
    eventType           metrics.Counter
    queryLatency        metrics.Histogram
    wg                  sync.WaitGroup
    shutdown            chan struct{}
    customPIDs          []string
    disablePartnerIDs   bool
}

// New produces a new SenderWrapper implemented by CaduceusSenderWrapper
// based on the factory configuration.
func (swf SenderWrapperFactory) New() (sw SenderWrapper, err error) {
    caduceusSenderWrapper := &CaduceusSenderWrapper{
        sender:              swf.Sender,
        numWorkersPerSender: swf.NumWorkersPerSender,
        queueSizePerSender:  swf.QueueSizePerSender,
        deliveryRetries:     swf.DeliveryRetries,
        deliveryInterval:    swf.DeliveryInterval,
        cutOffPeriod:        swf.CutOffPeriod,
        linger:              swf.Linger,
        logger:              swf.Logger,
        metricsRegistry:     swf.MetricsRegistry,
        customPIDs:          swf.CustomPIDs,
        disablePartnerIDs:   swf.DisablePartnerIDs,
    }

    if swf.Linger <= 0 {
        err = errors.New("Linger must be positive.")
        sw = nil
        return
    }

    caduceusSenderWrapper.queryLatency = NewMetricWrapperMeasures(swf.MetricsRegistry)
    caduceusSenderWrapper.eventType = swf.MetricsRegistry.NewCounter(IncomingEventTypeCounter)

    caduceusSenderWrapper.senders = make(map[string]OutboundSender)
    caduceusSenderWrapper.shutdown = make(chan struct{})

    caduceusSenderWrapper.wg.Add(1)
    go undertaker(caduceusSenderWrapper)

    sw = caduceusSenderWrapper
    return
}

// Update is called when we get changes to our webhook listeners with either
// additions, or updates.  This code takes care of building new OutboundSenders
// and maintaining the existing OutboundSenders.
func (sw *CaduceusSenderWrapper) Update(list []ancla.InternalWebhook) {
    // We'll like need this, so let's get one ready
    osf := OutboundSenderFactory{
        Sender:            sw.sender,
        CutOffPeriod:      sw.cutOffPeriod,
        NumWorkers:        sw.numWorkersPerSender,
        QueueSize:         sw.queueSizePerSender,
        MetricsRegistry:   sw.metricsRegistry,
        DeliveryRetries:   sw.deliveryRetries,
        DeliveryInterval:  sw.deliveryInterval,
        Logger:            sw.logger,
        CustomPIDs:        sw.customPIDs,
        DisablePartnerIDs: sw.disablePartnerIDs,
        QueryLatency:      sw.queryLatency,
    }

    ids := make([]struct {
        Listener ancla.InternalWebhook
        ID       string
    }, len(list))

    for i, v := range list {
        ids[i].Listener = v
        ids[i].ID = v.Webhook.Config.URL
    }

    sw.mutex.Lock()
    defer sw.mutex.Unlock()

    for _, inValue := range ids {
        sender, ok := sw.senders[inValue.ID]
        if !ok {
            osf.Listener = inValue.Listener
            metricWrapper, err := newMetricWrapper(time.Now, osf.QueryLatency)

            if err != nil {
                continue
            }
            osf.ClientMiddleware = metricWrapper.roundTripper
            obs, err := osf.New()
            if nil == err {
                sw.senders[inValue.ID] = obs
            }
            continue
        }
        sender.Update(inValue.Listener)
    }
}

// Queue is used to send all the possible outbound senders a request.  This
// function performs the fan-out and filtering to multiple possible endpoints.
func (sw *CaduceusSenderWrapper) Queue(msg *wrp.Message) {
    sw.mutex.RLock()
    defer sw.mutex.RUnlock()

    sw.eventType.With(eventLabel, msg.FindEventStringSubMatch()).Add(1)

    for _, v := range sw.senders {
        v.Queue(msg)
    }
}

// Shutdown closes down the delivery mechanisms and cleans up the underlying
// OutboundSenders either gently (waiting for delivery queues to empty) or not
// (dropping enqueued messages)
func (sw *CaduceusSenderWrapper) Shutdown(gentle bool) {
    sw.mutex.Lock()
    defer sw.mutex.Unlock()
    for k, v := range sw.senders {
        v.Shutdown(gentle)
        delete(sw.senders, k)
    }
    close(sw.shutdown)
}

// undertaker looks at the OutboundSenders periodically and prunes the ones
// that have been retired for too long, freeing up resources.
func undertaker(sw *CaduceusSenderWrapper) {
    defer sw.wg.Done()
    // Collecting unused OutboundSenders isn't a huge priority, so do it
    // slowly.
    ticker := time.NewTicker(2 * sw.linger)
    for {
        select {
        case <-ticker.C:
            threshold := time.Now().Add(-1 * sw.linger)

            // Actually shutting these down could take longer then we
            // want to lock the mutex, so just remove them from the active
            // list & shut them down afterwards.
            deadList := createDeadlist(sw, threshold)

            // Shut them down
            for _, v := range deadList {
                v.Shutdown(false)
            }
        case <-sw.shutdown:
            ticker.Stop()
            return
        }
    }
}

func createDeadlist(sw *CaduceusSenderWrapper, threshold time.Time) map[string]OutboundSender {
    if sw == nil || threshold.IsZero() {
        return nil
    }

    deadList := make(map[string]OutboundSender)
    sw.mutex.Lock()
    defer sw.mutex.Unlock()
    for k, v := range sw.senders {
        retired := v.RetiredSince()
        if threshold.After(retired) {
            deadList[k] = v
            delete(sw.senders, k)
        }
    }
    return deadList
}