alibaba/kt-connect

View on GitHub
pkg/kt/service/cluster/service.go

Summary

Maintainability
B
5 hrs
Test Coverage
package cluster

import (
    "context"
    "github.com/alibaba/kt-connect/pkg/kt/util"
    "github.com/rs/zerolog/log"
    coreV1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    labelApi "k8s.io/apimachinery/pkg/labels"
    "k8s.io/apimachinery/pkg/types"
)

// SvcMetaAndSpec ...
type SvcMetaAndSpec struct {
    Meta      *ResourceMeta
    External  bool
    Ports     map[int]int
    Selectors map[string]string
}

// GetService get service
func (k *Kubernetes) GetService(name, namespace string) (*coreV1.Service, error) {
    return k.Clientset.CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{})
}

// GetServicesBySelector get services by selector
func (k *Kubernetes) GetServicesBySelector(matchLabels map[string]string, namespace string) ([]coreV1.Service, error) {
    var matchedSvcs []coreV1.Service
    svcList, err := k.GetAllServiceInNamespace(namespace)
    if err != nil {
        return nil, err
    }
    for _, svc := range svcList.Items {
        if util.MapContains(svc.Spec.Selector, matchLabels) {
            matchedSvcs = append(matchedSvcs, svc)
        }
    }
    return matchedSvcs, nil
}

// GetServicesByLabel get services by label
func (k *Kubernetes) GetServicesByLabel(labels map[string]string, namespace string) (svcs *coreV1.ServiceList, err error) {
    return k.Clientset.CoreV1().Services(namespace).List(context.TODO(), metav1.ListOptions{
        LabelSelector:  labelApi.SelectorFromSet(labels).String(),
        TimeoutSeconds: &apiTimeout,
    })
}

// GetAllServiceInNamespace get all services in specified namespace
func (k *Kubernetes) GetAllServiceInNamespace(namespace string) (*coreV1.ServiceList, error) {
    return k.Clientset.CoreV1().Services(namespace).List(context.TODO(), metav1.ListOptions{
        TimeoutSeconds: &apiTimeout,
    })
}

// CreateService create kubernetes service
func (k *Kubernetes) CreateService(metaAndSpec *SvcMetaAndSpec) (*coreV1.Service, error) {
    SetupHeartBeat(metaAndSpec.Meta.Name, metaAndSpec.Meta.Namespace, k.UpdateServiceHeartBeat)
    return k.Clientset.CoreV1().Services(metaAndSpec.Meta.Namespace).
        Create(context.TODO(), createService(metaAndSpec), metav1.CreateOptions{})
}

// UpdateService ...
func (k *Kubernetes) UpdateService(svc *coreV1.Service) (*coreV1.Service, error) {
    return k.Clientset.CoreV1().Services(svc.Namespace).Update(context.TODO(), svc, metav1.UpdateOptions{})
}

// RemoveService remove service
func (k *Kubernetes) RemoveService(name, namespace string) (err error) {
    deletePolicy := metav1.DeletePropagationBackground
    return k.Clientset.CoreV1().Services(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{
        PropagationPolicy: &deletePolicy,
    })
}

func (k *Kubernetes) UpdateServiceHeartBeat(name, namespace string) {
    key := "service_" + name
    if _, err := k.Clientset.CoreV1().Services(namespace).
        Patch(context.TODO(), name, types.JSONPatchType, []byte(resourceHeartbeatPatch()), metav1.PatchOptions{}); err != nil {
        if healthy, exists := LastHeartBeatStatus.Get(key); healthy || !exists {
            log.Warn().Err(err).Msgf("Failed to update heart beat of service %s", name)
        } else {
            log.Debug().Err(err).Msgf("Service %s heart beat interrupted", name)
        }
        LastHeartBeatStatus.Set(key, false)
    } else {
        log.Debug().Msgf("Heartbeat service %s ticked at %s", name, util.FormattedTime())
        LastHeartBeatStatus.Set(key, true)
    }
}

// WatchService ...
func (k *Kubernetes) WatchService(name, namespace string, fAdd, fDel, fMod func(*coreV1.Service)) {
    k.watchResource(name, namespace, string(coreV1.ResourceServices), &coreV1.Service{},
        func(obj any) {
            handleServiceEvent(obj, "added", fAdd)
        },
        func(obj any) {
            handleServiceEvent(obj, "deleted", fDel)
        },
        func(obj any) {
            handleServiceEvent(obj, "modified", fMod)
        },
    )
}

func handleServiceEvent(obj any, status string, f func(*coreV1.Service)) {
    switch obj.(type) {
    case *coreV1.Service:
        if f != nil {
            log.Debug().Msgf("Service %s %s", obj.(*coreV1.Service).Name, status)
            f(obj.(*coreV1.Service))
        }
    default:
        // ignore
    }
}