Mirantis/virtlet

View on GitHub
pkg/tapmanager/tapfdsource.go

Summary

Maintainability
A
0 mins
Test Coverage
/*
Copyright 2017 Mirantis

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tapmanager

import (
    "encoding/json"
    "fmt"
    "net"
    "strings"
    "sync"
    "time"

    "github.com/containernetworking/cni/pkg/ns"
    cnitypes "github.com/containernetworking/cni/pkg/types"
    cnicurrent "github.com/containernetworking/cni/pkg/types/current"
    "github.com/davecgh/go-spew/spew"
    "github.com/golang/glog"
    "github.com/vishvananda/netlink"

    "github.com/Mirantis/virtlet/pkg/cni"
    "github.com/Mirantis/virtlet/pkg/dhcp"
    "github.com/Mirantis/virtlet/pkg/nettools"
    "github.com/Mirantis/virtlet/pkg/network"
    "github.com/Mirantis/virtlet/pkg/utils"
)

const (
    calicoDefaultSubnet = 24
    calicoSubnetVar     = "VIRTLET_CALICO_SUBNET"
)

// InterfaceDescription contains interface type with additional data
// needed to identify it
type InterfaceDescription struct {
    Type         network.InterfaceType `json:"type"`
    HardwareAddr net.HardwareAddr      `json:"mac"`
    FdIndex      int                   `json:"fdIndex"`
    PCIAddress   string                `json:"pciAddress"`
}

// PodNetworkDesc contains the data that are required by TapFDSource
// to set up a tap device for a VM
type PodNetworkDesc struct {
    // PodID specifies the id of the pod
    PodID string `json:"podId"`
    // PodNs specifies the namespace of the pod
    PodNs string `json:"podNs"`
    // PodName specifies the name of the pod
    PodName string `json:"podName"`
    // DNS specifies DNS settings for the pod
    DNS *cnitypes.DNS
}

// GetFDPayload contains the data that are required by TapFDSource
// to prepare container side network configuration
type GetFDPayload struct {
    // Description contains the pod information and DNS settings for the pod
    Description *PodNetworkDesc `json:"podNetworkDesc"`
}

// RecoverPayload contains the data that are required by TapFDSource
// to recover a network configuration in a pod
type RecoverPayload struct {
    // Description contains the pod information and DNS settings for the pod
    Description *PodNetworkDesc `json:"podNetworkDesc"`
    // ContainerSideNetwork specifies configuration used to configure retaken
    // environment
    ContainerSideNetwork *network.ContainerSideNetwork `json:"csn"`
    // HaveRunningContainers is true if any domains are currently running
    // for this pod. VF reconfiguration is to be skipped if that's the case.
    HaveRunningContainers bool
}

type podNetwork struct {
    pnd        PodNetworkDesc
    csn        *network.ContainerSideNetwork
    dhcpServer *dhcp.Server
    doneCh     chan error
}

// TapFDSource sets up and tears down Virtlet VM network.
// It implements FDSource interface
type TapFDSource struct {
    sync.Mutex

    cniClient          cni.Client
    dummyNetwork       *cnicurrent.Result
    dummyNetworkNsPath string
    fdMap              map[string]*podNetwork
    enableSriov        bool
    calicoSubnetSize   int
}

var _ FDSource = &TapFDSource{}

// NewTapFDSource returns a TapFDSource for the specified CNI plugin &
// config dir
func NewTapFDSource(cniClient cni.Client, enableSriov bool, calicoSubnetSize int) (*TapFDSource, error) {
    s := &TapFDSource{
        cniClient:        cniClient,
        fdMap:            make(map[string]*podNetwork),
        calicoSubnetSize: calicoSubnetSize,
        enableSriov:      enableSriov,
    }

    return s, nil
}

func (s *TapFDSource) getDummyNetwork() (*cnicurrent.Result, string, error) {
    if s.dummyNetwork == nil {
        var err error
        s.dummyNetwork, s.dummyNetworkNsPath, err = s.cniClient.GetDummyNetwork()
        if err != nil {
            return nil, "", err
        }
        // s.dummyGateway = dummyResult.IPs[0].Address.IP

    }
    return s.dummyNetwork, s.dummyNetworkNsPath, nil
}

// GetFDs implements GetFDs method of FDSource interface
func (s *TapFDSource) GetFDs(key string, data []byte) ([]int, []byte, error) {
    var payload GetFDPayload
    if err := json.Unmarshal(data, &payload); err != nil {
        return nil, nil, fmt.Errorf("error unmarshalling GetFD payload: %v", err)
    }
    pnd := payload.Description
    if err := cni.CreateNetNS(pnd.PodID); err != nil {
        return nil, nil, fmt.Errorf("error creating new netns for pod %s (%s): %v", pnd.PodName, pnd.PodID, err)
    }

    gotError := false
    podAddedToNetwork := false
    defer func() {
        if gotError {
            if podAddedToNetwork {
                if err := s.cniClient.RemoveSandboxFromNetwork(pnd.PodID, pnd.PodName, pnd.PodNs); err != nil {
                    glog.Errorf("Error removing a pod from the pod network after failed network setup: %v", err)
                }
            }
            if err := cni.DestroyNetNS(pnd.PodID); err != nil {
                glog.Errorf("Error removing netns after failed network setup: %v", err)
            }
        }
    }()

    netConfig, err := s.cniClient.AddSandboxToNetwork(pnd.PodID, pnd.PodName, pnd.PodNs)
    if err != nil {
        gotError = true
        return nil, nil, fmt.Errorf("error adding pod %s (%s) to CNI network: %v", pnd.PodName, pnd.PodID, err)
    }
    podAddedToNetwork = true
    glog.V(3).Infof("CNI configuration for pod %s (%s): %s", pnd.PodName, pnd.PodID, spew.Sdump(netConfig))

    if netConfig == nil {
        netConfig = &cnicurrent.Result{}
    }

    if payload.Description.DNS != nil {
        netConfig.DNS.Nameservers = pnd.DNS.Nameservers
        netConfig.DNS.Search = pnd.DNS.Search
        netConfig.DNS.Options = pnd.DNS.Options
    }

    var fds []int
    var respData []byte
    var csn *network.ContainerSideNetwork
    if err := s.setupNetNS(key, pnd, func(netNSPath string, allLinks []netlink.Link, hostNS ns.NetNS) (*network.ContainerSideNetwork, error) {
        if netConfig, err = nettools.ValidateAndFixCNIResult(netConfig, netNSPath, allLinks); err != nil {
            gotError = true
            return nil, fmt.Errorf("error fixing cni configuration: %v", err)
        }
        if err := nettools.FixCalicoNetworking(netConfig, s.calicoSubnetSize, s.getDummyNetwork); err != nil {
            // don't fail in this case because there may be even no Calico
            glog.Warningf("Calico detection/fix didn't work: %v", err)
        }
        glog.V(3).Infof("CNI Result after fix:\n%s", spew.Sdump(netConfig))

        var err error
        if csn, err = nettools.SetupContainerSideNetwork(netConfig, netNSPath, allLinks, s.enableSriov, hostNS); err != nil {
            return nil, err
        }

        if respData, err = json.Marshal(csn); err != nil {
            return nil, fmt.Errorf("error marshalling net config: %v", err)
        }

        for _, i := range csn.Interfaces {
            fds = append(fds, int(i.Fo.Fd()))
        }
        return csn, nil
    }); err != nil {
        gotError = true
        return nil, nil, err
    }

    return fds, respData, nil
}

// Release implements Release method of FDSource interface
func (s *TapFDSource) Release(key string) error {
    s.Lock()
    defer s.Unlock()
    pn, found := s.fdMap[key]
    if !found {
        return fmt.Errorf("bad fd key: %q", key)
    }

    netNSPath := cni.PodNetNSPath(pn.pnd.PodID)

    vmNS, err := ns.GetNS(netNSPath)
    if err != nil {
        return fmt.Errorf("failed to open network namespace at %q: %v", netNSPath, err)
    }

    // Try to keep this function idempotent even if there are errors during the following calls.
    // This can cause some resource leaks in multiple CNI case but makes it possible
    // to call `RunPodSandbox` again after a failed attempt. Failing to do so would cause
    // the next `RunPodSandbox` call to fail due to the netns already being present.
    defer func() {
        if err := cni.DestroyNetNS(pn.pnd.PodID); err != nil {
            glog.Errorf("Error when removing network namespace for pod sandbox %q: %v", pn.pnd.PodID, err)
        }
    }()

    if err := nettools.ReconstructVFs(pn.csn, vmNS, false); err != nil {
        return fmt.Errorf("failed to reconstruct SR-IOV devices: %v", err)
    }

    if err := vmNS.Do(func(ns.NetNS) error {
        if err := pn.dhcpServer.Close(); err != nil {
            return fmt.Errorf("failed to stop dhcp server: %v", err)
        }
        <-pn.doneCh
        return nettools.Teardown(pn.csn)
    }); err != nil {
        return err
    }

    if err := s.cniClient.RemoveSandboxFromNetwork(pn.pnd.PodID, pn.pnd.PodName, pn.pnd.PodNs); err != nil {
        return fmt.Errorf("error removing pod sandbox %q from CNI network: %v", pn.pnd.PodID, err)
    }

    delete(s.fdMap, key)
    return nil
}

// GetInfo implements GetInfo method of FDSource interface
func (s *TapFDSource) GetInfo(key string) ([]byte, error) {
    s.Lock()
    defer s.Unlock()
    pn, found := s.fdMap[key]
    if !found {
        return nil, fmt.Errorf("bad fd key: %q", key)
    }
    var descriptions []InterfaceDescription
    for i, iface := range pn.csn.Interfaces {
        descriptions = append(descriptions, InterfaceDescription{
            FdIndex:      i,
            HardwareAddr: iface.HardwareAddr,
            Type:         iface.Type,
            PCIAddress:   iface.PCIAddress,
        })
    }
    data, err := json.Marshal(descriptions)
    if err != nil {
        return nil, fmt.Errorf("interface descriptions marshaling error: %v", err)
    }
    return data, nil
}

// Stop stops any running DHCP servers associated with TapFDSource
// and closes tap fds without releasing any other resources.
func (s *TapFDSource) Stop() error {
    s.Lock()
    defer s.Unlock()
    var errors []string
    for _, pn := range s.fdMap {
        if err := pn.dhcpServer.Close(); err != nil {
            errors = append(errors, fmt.Sprintf("error stopping dhcp server: %v", err.Error()))
        } else {
            <-pn.doneCh
        }
        for _, i := range pn.csn.Interfaces {
            if err := i.Fo.Close(); err != nil {
                errors = append(errors, fmt.Sprintf("error closing tap fd: %v", err))
            }
        }
    }
    s.fdMap = make(map[string]*podNetwork)
    if errors != nil {
        return fmt.Errorf("Errors while stopping TapFDSource:\n%s", strings.Join(errors, "\n"))
    }
    return nil
}

// Recover recovers the state for the netns after Virtlet restart
func (s *TapFDSource) Recover(key string, data []byte) error {
    var payload RecoverPayload
    if err := json.Unmarshal(data, &payload); err != nil {
        return fmt.Errorf("error unmarshalling GetFD payload: %v", err)
    }
    pnd := payload.Description
    csn := payload.ContainerSideNetwork
    if csn == nil {
        return fmt.Errorf("ContainerSideNetwork not passed to Recover()")
    }
    if csn.Result == nil {
        csn.Result = &cnicurrent.Result{}
    }
    netNSPath := cni.PodNetNSPath(pnd.PodID)
    vmNS, err := ns.GetNS(netNSPath)
    if err != nil {
        return fmt.Errorf("failed to open network namespace at %q: %v", netNSPath, err)
    }
    if !payload.HaveRunningContainers {
        if err := nettools.ReconstructVFs(csn, vmNS, true); err != nil {
            return err
        }
    }
    return s.setupNetNS(key, pnd, func(netNSPath string, allLinks []netlink.Link, hostNS ns.NetNS) (*network.ContainerSideNetwork, error) {
        if err := nettools.RecoverContainerSideNetwork(csn, netNSPath, allLinks, hostNS); err != nil {
            return nil, err
        }
        return csn, nil
    })
}

// RetrieveFDs retrieves the FDs.
// It's only used in case if VM exited but Recover() didn't populate the FDs
func (s *TapFDSource) RetrieveFDs(key string) ([]int, error) {
    var podNet *podNetwork
    var fds []int
    func() {
        s.Lock()
        defer s.Unlock()
        podNet = s.fdMap[key]
    }()
    if podNet == nil {
        return nil, fmt.Errorf("bad key %q to retrieve FDs", key)
    }

    netNSPath := cni.PodNetNSPath(podNet.pnd.PodID)
    vmNS, err := ns.GetNS(netNSPath)
    if err != nil {
        return nil, fmt.Errorf("failed to open network namespace at %q: %v", netNSPath, err)
    }

    if err := utils.CallInNetNSWithSysfsRemounted(vmNS, func(hostNS ns.NetNS) error {
        allLinks, err := netlink.LinkList()
        if err != nil {
            return fmt.Errorf("error listing the links: %v", err)
        }

        return nettools.RecoverContainerSideNetwork(podNet.csn, netNSPath, allLinks, hostNS)
    }); err != nil {
        return nil, err
    }

    for _, ifDesc := range podNet.csn.Interfaces {
        // Fail if not all succeeded
        if ifDesc.Fo == nil {
            return nil, fmt.Errorf("failed to open tap interface %q", ifDesc.Name)
        }
        fds = append(fds, int(ifDesc.Fo.Fd()))
    }
    return fds, nil
}

func (s *TapFDSource) setupNetNS(key string, pnd *PodNetworkDesc, initNet func(netNSPath string, allLinks []netlink.Link, hostNS ns.NetNS) (*network.ContainerSideNetwork, error)) error {
    netNSPath := cni.PodNetNSPath(pnd.PodID)
    vmNS, err := ns.GetNS(netNSPath)
    if err != nil {
        return fmt.Errorf("failed to open network namespace at %q: %v", netNSPath, err)
    }

    var csn *network.ContainerSideNetwork
    var dhcpServer *dhcp.Server
    doneCh := make(chan error)
    if err := utils.CallInNetNSWithSysfsRemounted(vmNS, func(hostNS ns.NetNS) error {
        allLinks, err := netlink.LinkList()
        if err != nil {
            return fmt.Errorf("error listing the links: %v", err)
        }

        if csn, err = initNet(netNSPath, allLinks, hostNS); err != nil {
            return err
        }

        dhcpServer = dhcp.NewServer(csn)
        if err := dhcpServer.SetupListener("0.0.0.0"); err != nil {
            return fmt.Errorf("Failed to set up dhcp listener: %v", err)
        }
        go func() {
            doneCh <- vmNS.Do(func(ns.NetNS) error {
                err := dhcpServer.Serve()
                if err != nil {
                    glog.Errorf("dhcp server error: %v", err)
                }
                return err
            })
        }()

        // FIXME: there's some very small possibility for a race here
        // (happens if the VM makes DHCP request before DHCP server is ready)
        // For now, let's make the probability of such problem even smaller
        time.Sleep(500 * time.Millisecond)
        return nil
    }); err != nil {
        return err
    }

    s.Lock()
    defer s.Unlock()
    s.fdMap[key] = &podNetwork{
        pnd:        *pnd,
        csn:        csn,
        dhcpServer: dhcpServer,
        doneCh:     doneCh,
    }
    return nil
}