netdata/netdata

View on GitHub
src/go/collectors/go.d.plugin/agent/discovery/sd/discoverer/kubernetes/service.go

Summary

Maintainability
B
4 hrs
Test Coverage
// SPDX-License-Identifier: GPL-3.0-or-later

package kubernetes

import (
    "context"
    "fmt"
    "net"
    "strconv"
    "strings"

    "github.com/netdata/netdata/go/go.d.plugin/agent/discovery/sd/model"
    "github.com/netdata/netdata/go/go.d.plugin/logger"

    corev1 "k8s.io/api/core/v1"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/util/workqueue"
)

type serviceTargetGroup struct {
    targets []model.Target
    source  string
}

func (s serviceTargetGroup) Provider() string        { return "sd:k8s:service" }
func (s serviceTargetGroup) Source() string          { return s.source }
func (s serviceTargetGroup) Targets() []model.Target { return s.targets }

type ServiceTarget struct {
    model.Base `hash:"ignore"`

    hash uint64
    tuid string

    Address      string
    Namespace    string
    Name         string
    Annotations  map[string]any
    Labels       map[string]any
    Port         string
    PortName     string
    PortProtocol string
    ClusterIP    string
    ExternalName string
    Type         string
}

func (s ServiceTarget) Hash() uint64 { return s.hash }
func (s ServiceTarget) TUID() string { return s.tuid }

type serviceDiscoverer struct {
    *logger.Logger
    model.Base

    informer cache.SharedInformer
    queue    *workqueue.Type
}

func newServiceDiscoverer(inf cache.SharedInformer) *serviceDiscoverer {
    if inf == nil {
        panic("nil service informer")
    }

    queue := workqueue.NewWithConfig(workqueue.QueueConfig{Name: "service"})
    _, _ = inf.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    func(obj any) { enqueue(queue, obj) },
        UpdateFunc: func(_, obj any) { enqueue(queue, obj) },
        DeleteFunc: func(obj any) { enqueue(queue, obj) },
    })

    return &serviceDiscoverer{
        Logger:   log,
        informer: inf,
        queue:    queue,
    }
}

func (s *serviceDiscoverer) String() string {
    return "k8s service"
}

func (s *serviceDiscoverer) Discover(ctx context.Context, ch chan<- []model.TargetGroup) {
    s.Info("instance is started")
    defer s.Info("instance is stopped")
    defer s.queue.ShutDown()

    go s.informer.Run(ctx.Done())

    if !cache.WaitForCacheSync(ctx.Done(), s.informer.HasSynced) {
        s.Error("failed to sync caches")
        return
    }

    go s.run(ctx, ch)

    <-ctx.Done()
}

func (s *serviceDiscoverer) run(ctx context.Context, in chan<- []model.TargetGroup) {
    for {
        item, shutdown := s.queue.Get()
        if shutdown {
            return
        }

        s.handleQueueItem(ctx, in, item)
    }
}

func (s *serviceDiscoverer) handleQueueItem(ctx context.Context, in chan<- []model.TargetGroup, item any) {
    defer s.queue.Done(item)

    key := item.(string)
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return
    }

    obj, exists, err := s.informer.GetStore().GetByKey(key)
    if err != nil {
        return
    }

    if !exists {
        tgg := &serviceTargetGroup{source: serviceSourceFromNsName(namespace, name)}
        send(ctx, in, tgg)
        return
    }

    svc, err := toService(obj)
    if err != nil {
        return
    }

    tgg := s.buildTargetGroup(svc)

    for _, tgt := range tgg.Targets() {
        tgt.Tags().Merge(s.Tags())
    }

    send(ctx, in, tgg)
}

func (s *serviceDiscoverer) buildTargetGroup(svc *corev1.Service) model.TargetGroup {
    // TODO: headless service?
    if svc.Spec.ClusterIP == "" || len(svc.Spec.Ports) == 0 {
        return &serviceTargetGroup{
            source: serviceSource(svc),
        }
    }
    return &serviceTargetGroup{
        source:  serviceSource(svc),
        targets: s.buildTargets(svc),
    }
}

func (s *serviceDiscoverer) buildTargets(svc *corev1.Service) (targets []model.Target) {
    for _, port := range svc.Spec.Ports {
        portNum := strconv.FormatInt(int64(port.Port), 10)
        tgt := &ServiceTarget{
            tuid:         serviceTUID(svc, port),
            Address:      net.JoinHostPort(svc.Name+"."+svc.Namespace+".svc", portNum),
            Namespace:    svc.Namespace,
            Name:         svc.Name,
            Annotations:  mapAny(svc.Annotations),
            Labels:       mapAny(svc.Labels),
            Port:         portNum,
            PortName:     port.Name,
            PortProtocol: string(port.Protocol),
            ClusterIP:    svc.Spec.ClusterIP,
            ExternalName: svc.Spec.ExternalName,
            Type:         string(svc.Spec.Type),
        }
        hash, err := calcHash(tgt)
        if err != nil {
            continue
        }
        tgt.hash = hash

        targets = append(targets, tgt)
    }

    return targets
}

func serviceTUID(svc *corev1.Service, port corev1.ServicePort) string {
    return fmt.Sprintf("%s_%s_%s_%s",
        svc.Namespace,
        svc.Name,
        strings.ToLower(string(port.Protocol)),
        strconv.FormatInt(int64(port.Port), 10),
    )
}

func serviceSourceFromNsName(namespace, name string) string {
    return fmt.Sprintf("discoverer=k8s,kind=service,namespace=%s,service_name=%s", namespace, name)
}

func serviceSource(svc *corev1.Service) string {
    return serviceSourceFromNsName(svc.Namespace, svc.Name)
}

func toService(obj any) (*corev1.Service, error) {
    svc, ok := obj.(*corev1.Service)
    if !ok {
        return nil, fmt.Errorf("received unexpected object type: %T", obj)
    }
    return svc, nil
}