netdata/netdata

View on GitHub
src/go/collectors/go.d.plugin/modules/k8s_state/discover_pod.go

Summary

Maintainability
B
4 hrs
Test Coverage
// SPDX-License-Identifier: GPL-3.0-or-later

package k8s_state

import (
    "context"

    "github.com/netdata/netdata/go/go.d.plugin/logger"

    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/util/workqueue"
)

func newPodDiscoverer(si cache.SharedInformer, l *logger.Logger) *podDiscoverer {
    if si == nil {
        panic("nil pod shared informer")
    }

    queue := workqueue.NewWithConfig(workqueue.QueueConfig{Name: "pod"})
    _, _ = si.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    func(obj interface{}) { enqueue(queue, obj) },
        UpdateFunc: func(_, obj interface{}) { enqueue(queue, obj) },
        DeleteFunc: func(obj interface{}) { enqueue(queue, obj) },
    })

    return &podDiscoverer{
        Logger:   l,
        informer: si,
        queue:    queue,
        readyCh:  make(chan struct{}),
        stopCh:   make(chan struct{}),
    }
}

type podResource struct {
    src string
    val interface{}
}

func (r podResource) source() string         { return r.src }
func (r podResource) kind() kubeResourceKind { return kubeResourcePod }
func (r podResource) value() interface{}     { return r.val }

type podDiscoverer struct {
    *logger.Logger
    informer cache.SharedInformer
    queue    *workqueue.Type
    readyCh  chan struct{}
    stopCh   chan struct{}
}

func (d *podDiscoverer) run(ctx context.Context, in chan<- resource) {
    d.Info("pod_discoverer is started")
    defer func() { close(d.stopCh); d.Info("pod_discoverer is stopped") }()

    defer d.queue.ShutDown()

    go d.informer.Run(ctx.Done())

    if !cache.WaitForCacheSync(ctx.Done(), d.informer.HasSynced) {
        return
    }

    go d.runDiscover(ctx, in)
    close(d.readyCh)

    <-ctx.Done()
}

func (d *podDiscoverer) ready() bool   { return isChanClosed(d.readyCh) }
func (d *podDiscoverer) stopped() bool { return isChanClosed(d.stopCh) }

func (d *podDiscoverer) runDiscover(ctx context.Context, in chan<- resource) {
    for {
        item, shutdown := d.queue.Get()
        if shutdown {
            return
        }

        func() {
            defer d.queue.Done(item)

            key := item.(string)
            ns, name, err := cache.SplitMetaNamespaceKey(key)
            if err != nil {
                return
            }

            item, exists, err := d.informer.GetStore().GetByKey(key)
            if err != nil {
                return
            }

            r := &podResource{src: podSource(ns, name)}
            if exists {
                r.val = item
            }
            send(ctx, in, r)
        }()
    }
}

func podSource(namespace, name string) string {
    return "k8s/pod/" + namespace + "/" + name
}