pkg/util/route.go
package util
import (
"context"
"fmt"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
v12 "k8s.io/client-go/kubernetes/typed/core/v1"
)
func GetNsForListPodAndSvc(ctx context.Context, clientset *kubernetes.Clientset, nsList []string) (podNs string, svcNs string, err error) {
for _, ns := range nsList {
log.Debugf("List namepsace %s pods", ns)
_, err = clientset.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{Limit: 1})
if apierrors.IsForbidden(err) {
continue
}
if err != nil {
return
}
podNs = ns
break
}
if err != nil {
err = errors.Wrap(err, "can not list pod to add it to route table")
return
}
for _, ns := range nsList {
log.Debugf("List namepsace %s services", ns)
_, err = clientset.CoreV1().Services(ns).List(ctx, metav1.ListOptions{Limit: 1})
if apierrors.IsForbidden(err) {
continue
}
if err != nil {
return
}
svcNs = ns
break
}
if err != nil {
err = errors.Wrap(err, "can not list service to add it to route table")
return
}
return
}
func ListService(ctx context.Context, lister v12.ServiceInterface, addRouteFunc func(resource string, ipStr string)) error {
opts := metav1.ListOptions{Limit: 100, Continue: ""}
for {
serviceList, err := lister.List(ctx, opts)
if err != nil {
return err
}
for _, service := range serviceList.Items {
addRouteFunc(service.Name, service.Spec.ClusterIP)
}
if serviceList.Continue == "" {
return nil
}
opts.Continue = serviceList.Continue
}
}
func WatchServiceToAddRoute(ctx context.Context, watcher v12.ServiceInterface, routeFunc func(resource string, ipStr string)) error {
defer func() {
if er := recover(); er != nil {
log.Error(er)
}
}()
w, err := watcher.Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
return err
}
defer w.Stop()
for {
select {
case <-ctx.Done():
return nil
case e, ok := <-w.ResultChan():
if !ok {
return errors.New("watch service chan done")
}
var svc *v1.Service
svc, ok = e.Object.(*v1.Service)
if !ok {
continue
}
routeFunc(svc.Name, svc.Spec.ClusterIP)
}
}
}
func ListPod(ctx context.Context, lister v12.PodInterface, addRouteFunc func(resource string, ipStr string)) error {
opts := metav1.ListOptions{Limit: 100, Continue: ""}
for {
podList, err := lister.List(ctx, opts)
if err != nil {
return err
}
for _, pod := range podList.Items {
if pod.Spec.HostNetwork {
continue
}
addRouteFunc(pod.Name, pod.Status.PodIP)
}
if podList.Continue == "" {
return nil
}
opts.Continue = podList.Continue
}
}
func WatchPodToAddRoute(ctx context.Context, watcher v12.PodInterface, addRouteFunc func(resource string, ipStr string)) error {
defer func() {
if er := recover(); er != nil {
log.Errorln(er)
}
}()
w, err := watcher.Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
return err
}
defer w.Stop()
for {
select {
case <-ctx.Done():
return nil
case e, ok := <-w.ResultChan():
if !ok {
return fmt.Errorf("watch pod chan done")
}
var pod *v1.Pod
pod, ok = e.Object.(*v1.Pod)
if !ok {
continue
}
if pod.Spec.HostNetwork {
continue
}
ip := pod.Status.PodIP
addRouteFunc(pod.Name, ip)
}
}
}