pkg/fwdservice/fwdservice.go
package fwdservice
import (
"context"
"fmt"
"strconv"
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/txn2/kubefwd/pkg/fwdIp"
"github.com/txn2/kubefwd/pkg/fwdnet"
"github.com/txn2/kubefwd/pkg/fwdport"
"github.com/txn2/kubefwd/pkg/fwdpub"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
restclient "k8s.io/client-go/rest"
)
// ServiceFWD Single service to forward, with a reference to
// all the pods being forwarded for it
type ServiceFWD struct {
ClientSet kubernetes.Clientset
ListOptions metav1.ListOptions
Hostfile *fwdport.HostFileWithLock
ClientConfig restclient.Config
RESTClient restclient.RESTClient
// Context is a unique key (string) in kubectl config representing
// a user/cluster combination. Kubefwd uses context as the
// cluster name when forwarding to more than one cluster.
Context string
// Namespace is the current Kubernetes Namespace to locate services
// and the pods that back them for port-forwarding
Namespace string
// ClusterN is the ordinal index of the cluster (from configuration)
// cluster 0 is considered local while > 0 is remote
ClusterN int
// NamespaceN is the ordinal index of the namespace from the
// perspective of the user. Namespace 0 is considered local
// while > 0 is an external namespace
NamespaceN int
// FwdInc the forward increment for ip
FwdInc *int
// Domain is specified by the user and used in place of .local
Domain string
PodLabelSelector string // The label selector to query for matching pods.
NamespaceServiceLock *sync.Mutex //
Svc *v1.Service // Reference to the k8s service.
// Headless service will forward all of the pods,
// while normally only a single pod is forwarded.
Headless bool
LastSyncedAt time.Time // When was the set of pods last synced
PortMap *[]PortMap // port map array.
// Use debouncer for listing pods so we don't hammer the k8s when a bunch of changes happen at once
SyncDebouncer func(f func())
// A mapping of all the pods currently being forwarded.
// key = podName
PortForwards map[string]*fwdport.PortForwardOpts
DoneChannel chan struct{} // After shutdown is complete, this channel will be closed
ForwardConfigurationPath string // file path to IP reservation configuration
ForwardIPReservations []string // cli passed IP reservations
}
/**
add port map
@url https://github.com/txn2/kubefwd/issues/121
*/
type PortMap struct {
SourcePort string
TargetPort string
}
// String representation of a ServiceFWD returns a unique name
// in the form SERVICE_NAME.NAMESPACE.CONTEXT
func (svcFwd *ServiceFWD) String() string {
return svcFwd.Svc.Name + "." + svcFwd.Namespace + "." + svcFwd.Context
}
// GetPodsForService queries k8s and returns all pods backing this service
// which are eligible for port-forwarding; exclude some pods which are in final/failure state.
func (svcFwd *ServiceFWD) GetPodsForService() []v1.Pod {
listOpts := metav1.ListOptions{LabelSelector: svcFwd.PodLabelSelector}
pods, err := svcFwd.ClientSet.CoreV1().Pods(svcFwd.Svc.Namespace).List(context.TODO(), listOpts)
if err != nil {
if errors.IsNotFound(err) {
log.Warnf("WARNING: No Pods found for service %s: %s\n", svcFwd, err.Error())
} else {
log.Warnf("WARNING: Error in List pods for %s: %s\n", svcFwd, err.Error())
}
return nil
}
podsEligible := make([]v1.Pod, 0, len(pods.Items))
for _, pod := range pods.Items {
if pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning {
podsEligible = append(podsEligible, pod)
}
}
return podsEligible
}
// SyncPodForwards selects one or all pods behind a service, and invokes
// the forwarding setup for that or those pod(s). It will remove pods in-mem
// that are no longer returned by k8s, should these not be correctly deleted.
func (svcFwd *ServiceFWD) SyncPodForwards(force bool) {
sync := func() {
defer func() { svcFwd.LastSyncedAt = time.Now() }()
k8sPods := svcFwd.GetPodsForService()
// If no pods are found currently. Will try again next re-sync period.
if len(k8sPods) == 0 {
log.Warnf("WARNING: No Running Pods returned for service %s", svcFwd)
return
}
// Check if the pods currently being forwarded still exist in k8s and if
// they are not in a (pre-)running state, if not: remove them
for _, podName := range svcFwd.ListServicePodNames() {
keep := false
for _, pod := range k8sPods {
if podName == pod.Name && (pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning) {
keep = true
break
}
}
if !keep {
svcFwd.RemoveServicePod(podName)
}
}
// Set up port-forwarding for one or all of these pods normal service
// port-forward the first pod as service name. headless service not only
// forward first Pod as service name, but also port-forward all pods.
if len(k8sPods) != 0 {
// if this is a headless service forward the first pod from the
// service name, then subsequent pods from their pod name
if svcFwd.Headless {
svcFwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false)
svcFwd.LoopPodsToForward(k8sPods, true)
return
}
// Check if currently we are forwarding a pod which is good to keep using
podNameToKeep := ""
for _, podName := range svcFwd.ListServicePodNames() {
if podNameToKeep != "" {
break
}
for _, pod := range k8sPods {
if podName == pod.Name && (pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning) {
podNameToKeep = pod.Name
break
}
}
}
// Stop forwarding others, should there be. In case none of the currently
// forwarded pods are good to keep, podNameToKeep will be the empty string,
// and the comparison will mean we will remove all pods, which is the desired behaviour.
for _, podName := range svcFwd.ListServicePodNames() {
if podName != podNameToKeep {
svcFwd.RemoveServicePod(podName)
}
}
// If no good pod was being forwarded already, start one
if podNameToKeep == "" {
svcFwd.LoopPodsToForward([]v1.Pod{k8sPods[0]}, false)
}
}
}
// When a whole set of pods gets deleted at once, they all will trigger a SyncPodForwards() call.
// This would hammer k8s with load needlessly. We therefore use a debouncer to only update pods
// if things have been stable for at least a few seconds. However, if things never stabilize we
// will still reload this information at least once every 5 minutes.
if force || time.Since(svcFwd.LastSyncedAt) > 5*time.Minute {
// Replace current debounced function with no-op
svcFwd.SyncDebouncer(func() {})
// Do the syncing work
sync()
} else {
// Queue sync
svcFwd.SyncDebouncer(sync)
}
}
// LoopPodsToForward starts the port-forwarding for each
// pod in the given list
func (svcFwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost bool) {
publisher := &fwdpub.Publisher{
PublisherName: "Services",
Output: false,
}
// Ip address handout is a critical section for synchronization,
// use a lock which synchronizes inside each namespace.
svcFwd.NamespaceServiceLock.Lock()
defer svcFwd.NamespaceServiceLock.Unlock()
for _, pod := range pods {
// If pod is already configured to be forwarded, skip it
if _, found := svcFwd.PortForwards[pod.Name]; found {
continue
}
podPort := ""
serviceHostName := svcFwd.Svc.Name
svcName := svcFwd.Svc.Name
if includePodNameInHost {
serviceHostName = pod.Name + "." + svcFwd.Svc.Name
svcName = pod.Name + "." + svcFwd.Svc.Name
}
opts := fwdIp.ForwardIPOpts{
ServiceName: svcName,
PodName: pod.Name,
Context: svcFwd.Context,
ClusterN: svcFwd.ClusterN,
NamespaceN: svcFwd.NamespaceN,
Namespace: svcFwd.Namespace,
Port: podPort,
ForwardConfigurationPath: svcFwd.ForwardConfigurationPath,
ForwardIPReservations: svcFwd.ForwardIPReservations,
}
localIp, err := fwdnet.ReadyInterface(opts)
if err != nil {
log.Warnf("WARNING: error readying interface: %s\n", err)
}
// if this is not the first namespace on the
// first cluster then append the namespace
if svcFwd.NamespaceN > 0 {
serviceHostName = serviceHostName + "." + pod.Namespace
}
// if this is not the first cluster append the full
// host name
if svcFwd.ClusterN > 0 {
serviceHostName = serviceHostName + "." + svcFwd.Context
}
for _, port := range svcFwd.Svc.Spec.Ports {
// Skip if pod port protocol is UDP - not supported in k8s port forwarding yet, see https://github.com/kubernetes/kubernetes/issues/47862
if port.Protocol == v1.ProtocolUDP {
log.Warnf("WARNING: Skipped Port-Forward for %s:%d to pod %s:%s - k8s port-forwarding doesn't support UDP protocol\n",
serviceHostName,
port.Port,
pod.Name,
port.TargetPort.String(),
)
continue
}
podPort = port.TargetPort.String()
localPort := svcFwd.getPortMap(port.Port)
p, err := strconv.ParseInt(localPort, 10, 32)
if err != nil {
log.Fatal(err)
}
port.Port = int32(p)
if _, err := strconv.Atoi(podPort); err != nil {
// search a pods containers for the named port
if namedPodPort, ok := portSearch(podPort, pod.Spec.Containers); ok {
podPort = namedPodPort
}
}
log.Debugf("Resolving: %s to %s (%s)\n",
serviceHostName,
localIp.String(),
svcName,
)
log.Printf("Port-Forward: %16s %s:%d to pod %s:%s\n",
localIp.String(),
serviceHostName,
port.Port,
pod.Name,
podPort,
)
pfo := &fwdport.PortForwardOpts{
Out: publisher,
Config: svcFwd.ClientConfig,
ClientSet: svcFwd.ClientSet,
RESTClient: svcFwd.RESTClient,
Context: svcFwd.Context,
Namespace: pod.Namespace,
Service: svcName,
ServiceFwd: svcFwd,
PodName: pod.Name,
PodPort: podPort,
LocalIp: localIp,
LocalPort: localPort,
HostFile: svcFwd.Hostfile,
ClusterN: svcFwd.ClusterN,
NamespaceN: svcFwd.NamespaceN,
Domain: svcFwd.Domain,
ManualStopChan: make(chan struct{}),
DoneChan: make(chan struct{}),
}
// Fire and forget. The stopping is done in the service.Shutdown() method.
go func() {
svcFwd.AddServicePod(pfo)
if err := pfo.PortForward(); err != nil {
select {
case <-pfo.ManualStopChan: // if shutdown was given, we don't bother with the error.
default:
log.Errorf("PortForward error on %s: %s", pfo.PodName, err.Error())
}
} else {
select {
case <-pfo.ManualStopChan: // if shutdown was given, don't log a warning as it's an intented stopping.
default:
log.Warnf("Stopped forwarding pod %s for %s", pfo.PodName, svcFwd)
}
}
}()
}
}
}
// AddServicePod
func (svcFwd *ServiceFWD) AddServicePod(pfo *fwdport.PortForwardOpts) {
svcFwd.NamespaceServiceLock.Lock()
ServicePod := pfo.Service + "." + pfo.PodName
if _, found := svcFwd.PortForwards[ServicePod]; !found {
svcFwd.PortForwards[ServicePod] = pfo
}
svcFwd.NamespaceServiceLock.Unlock()
}
// ListServicePodNames
func (svcFwd *ServiceFWD) ListServicePodNames() []string {
svcFwd.NamespaceServiceLock.Lock()
currentPodNames := make([]string, 0, len(svcFwd.PortForwards))
for podName := range svcFwd.PortForwards {
currentPodNames = append(currentPodNames, podName)
}
svcFwd.NamespaceServiceLock.Unlock()
return currentPodNames
}
func (svcFwd *ServiceFWD) RemoveServicePod(servicePodName string) {
if pod, found := svcFwd.PortForwards[servicePodName]; found {
pod.Stop()
<-pod.DoneChan
svcFwd.NamespaceServiceLock.Lock()
delete(svcFwd.PortForwards, servicePodName)
svcFwd.NamespaceServiceLock.Unlock()
}
}
func portSearch(portName string, containers []v1.Container) (string, bool) {
for _, container := range containers {
for _, cp := range container.Ports {
if cp.Name == portName {
return fmt.Sprint(cp.ContainerPort), true
}
}
}
return "", false
}
// port exist port map return
func (svcFwd *ServiceFWD) getPortMap(port int32) string {
p := strconv.Itoa(int(port))
if svcFwd.PortMap != nil {
for _, portMapInfo := range *svcFwd.PortMap {
if p == portMapInfo.SourcePort {
//use map port
return portMapInfo.TargetPort
}
}
}
return p
}