rancher/opni-monitoring

View on GitHub
pkg/agent/rules.go

Summary

Maintainability
C
1 day
Test Coverage
package agent

import (
    "context"
    "fmt"
    "net/http"
    "time"

    "github.com/prometheus/prometheus/model/rulefmt"
    "github.com/rancher/opni-monitoring/pkg/rules"
    "github.com/rancher/opni-monitoring/pkg/sdk/api"
    "github.com/rancher/opni-monitoring/pkg/util"
    "go.uber.org/zap"
    "gopkg.in/yaml.v3"
)

func (a *Agent) configureRuleFinder() (rules.RuleFinder, error) {
    if a.Rules != nil {
        if pr := a.Rules.Discovery.PrometheusRules; pr != nil {
            client, err := util.NewK8sClient(util.ClientOptions{
                Kubeconfig: pr.Kubeconfig,
                Scheme:     api.NewScheme(),
            })
            if err != nil {
                return nil, fmt.Errorf("failed to create k8s client: %w", err)
            }
            finder := rules.NewPrometheusRuleFinder(client,
                rules.WithLogger(a.logger),
                rules.WithNamespaces(pr.SearchNamespaces...),
            )
            return finder, nil
        }
    }
    return nil, fmt.Errorf("missing configuration")
}

func (a *Agent) streamRuleGroupUpdates(ctx context.Context) (<-chan [][]byte, error) {
    finder, err := a.configureRuleFinder()
    if err != nil {
        return nil, fmt.Errorf("failed to configure rule discovery: %w", err)
    }
    searchInterval := time.Minute * 15
    if interval := a.Rules.Discovery.Interval; interval != "" {
        duration, err := time.ParseDuration(interval)
        if err != nil {
            return nil, fmt.Errorf("failed to parse discovery interval: %w", err)
        }
        searchInterval = duration
    }
    notifier := rules.NewPeriodicUpdateNotifier(ctx, finder, searchInterval)

    notifierC := notifier.NotifyC(ctx)
    a.logger.Debug("starting rule group update notifier")
    groupYamlDocs := make(chan [][]byte, cap(notifierC))
    go func() {
        defer close(groupYamlDocs)
        for {
            ruleGroups, ok := <-notifierC
            if !ok {
                a.logger.Debug("rule discovery channel closed")
                return
            }
            a.logger.Debug("received updated rule groups from discovery")
            go func() {
                groupYamlDocs <- a.marshalRuleGroups(ruleGroups)
            }()
        }
    }()
    return groupYamlDocs, nil
}

func (a *Agent) marshalRuleGroups(ruleGroups []rulefmt.RuleGroup) [][]byte {
    yamlDocs := make([][]byte, 0, len(ruleGroups))
    for _, ruleGroup := range ruleGroups {
        doc, err := yaml.Marshal(ruleGroup)
        if err != nil {
            a.logger.With(
                zap.Error(err),
                zap.String("group", ruleGroup.Name),
            ).Error("failed to marshal rule group")
            continue
        }
        yamlDocs = append(yamlDocs, doc)
    }
    return yamlDocs
}

func (a *Agent) streamRulesToGateway(ctx context.Context) error {
    lg := a.logger
    updateC, err := a.streamRuleGroupUpdates(ctx)
    if err != nil {
        a.logger.With(
            zap.Error(err),
        ).Error("failed to configure rule discovery")
        return err
    }
    pending := make(chan [][]byte, 1)
    go func() {
        defer close(pending)
        for {
            var docs [][]byte
            select {
            case docs = <-pending:
            case <-ctx.Done():
                lg.Error(ctx.Err())
                return
            }
        RETRY:
            lg.Debug("sending alert rules to gateway")
            for {
                for _, doc := range docs {
                    reqCtx, ca := context.WithTimeout(ctx, time.Second*2)
                    defer ca()
                    code, _, err := a.gatewayClient.Post(reqCtx, "/api/agent/sync_rules").
                        Set("Content-Type", "application/yaml").
                        Body(doc).
                        Do()
                    if err != nil || code != http.StatusAccepted {
                        // retry, unless another update is received from the channel
                        lg.With(
                            zap.Error(err),
                            zap.Int("code", code),
                        ).Error("failed to send alert rules to gateway (retry in 5 seconds)")
                        select {
                        case docs = <-pending:
                            lg.Debug("updated rules were received during backoff, retrying immediately")
                            goto RETRY
                        case <-time.After(5 * time.Second):
                            goto RETRY
                        case <-ctx.Done():
                            lg.Error(ctx.Err())
                            return
                        }
                    }
                }
                lg.Infof("successfully sent %d alert rules to gateway", len(docs))
                break
            }
        }
    }()
    for {
        select {
        case <-ctx.Done():
            return nil
        case yamlDocs, ok := <-updateC:
            if !ok {
                lg.Debug("rule discovery stream closed")
                return nil
            }
            pending <- yamlDocs
        }
    }
}