Mirantis/virtlet

View on GitHub
pkg/manager/runtime.go

Summary

Maintainability
A
0 mins
Test Coverage
/*
Copyright 2016-2018 Mirantis

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package manager

import (
    "errors"
    "fmt"
    "time"

    cnitypes "github.com/containernetworking/cni/pkg/types"
    cnicurrent "github.com/containernetworking/cni/pkg/types/current"
    "github.com/golang/glog"
    "github.com/jonboulle/clockwork"
    "golang.org/x/net/context"
    kubeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"

    "github.com/Mirantis/virtlet/pkg/cni"
    "github.com/Mirantis/virtlet/pkg/libvirttools"
    "github.com/Mirantis/virtlet/pkg/metadata"
    "github.com/Mirantis/virtlet/pkg/metadata/types"
    "github.com/Mirantis/virtlet/pkg/tapmanager"
)

const (
    runtimeAPIVersion = "0.1.0"
    runtimeName       = "virtlet"
    runtimeVersion    = "0.1.0"
)

// StreamServer denotes a server that handles Attach and PortForward requests.
type StreamServer interface {
    GetAttach(req *kubeapi.AttachRequest) (*kubeapi.AttachResponse, error)
    GetPortForward(req *kubeapi.PortForwardRequest) (*kubeapi.PortForwardResponse, error)
}

// GCHandler performs GC when a container is deleted.
type GCHandler interface {
    GC() error
}

// VirtletRuntimeService handles CRI runtime service calls.
type VirtletRuntimeService struct {
    virtTool      *libvirttools.VirtualizationTool
    metadataStore metadata.Store
    fdManager     tapmanager.FDManager
    streamServer  StreamServer
    gcHandler     GCHandler
    clock         clockwork.Clock
}

// NewVirtletRuntimeService returns a new instance of VirtletRuntimeService.
func NewVirtletRuntimeService(
    virtTool *libvirttools.VirtualizationTool,
    metadataStore metadata.Store,
    fdManager tapmanager.FDManager,
    streamServer StreamServer,
    gcHandler GCHandler,
    clock clockwork.Clock) *VirtletRuntimeService {
    if clock == nil {
        clock = clockwork.NewRealClock()
    }
    return &VirtletRuntimeService{
        virtTool:      virtTool,
        metadataStore: metadataStore,
        fdManager:     fdManager,
        streamServer:  streamServer,
        gcHandler:     gcHandler,
        clock:         clock,
    }
}

// Version implements Version method of CRI.
func (v *VirtletRuntimeService) Version(ctx context.Context, in *kubeapi.VersionRequest) (*kubeapi.VersionResponse, error) {
    vRuntimeAPIVersion := runtimeAPIVersion
    vRuntimeName := runtimeName
    vRuntimeVersion := runtimeVersion
    return &kubeapi.VersionResponse{
        Version:           vRuntimeAPIVersion,
        RuntimeName:       vRuntimeName,
        RuntimeVersion:    vRuntimeVersion,
        RuntimeApiVersion: vRuntimeVersion,
    }, nil
}

//
// Sandboxes
//

// RunPodSandbox implements RunPodSandbox method of CRI.
func (v *VirtletRuntimeService) RunPodSandbox(ctx context.Context, in *kubeapi.RunPodSandboxRequest) (response *kubeapi.RunPodSandboxResponse, retErr error) {
    config := in.GetConfig()
    if config == nil {
        return nil, errors.New("no pod sandbox config passed to RunPodSandbox")
    }
    podName := "<no metadata>"
    if config.Metadata != nil {
        podName = config.Metadata.Name
    }
    if err := validatePodSandboxConfig(config); err != nil {
        return nil, err
    }
    podID := config.Metadata.Uid
    podNs := config.Metadata.Namespace

    // Check if sandbox already exists, it may happen when virtlet restarts and kubelet "thinks" that sandbox disappered
    sandbox := v.metadataStore.PodSandbox(podID)
    sandboxInfo, err := sandbox.Retrieve()
    if err == nil && sandboxInfo != nil {
        if sandboxInfo.State == types.PodSandboxState_SANDBOX_READY {
            return &kubeapi.RunPodSandboxResponse{
                PodSandboxId: podID,
            }, nil
        }
    }

    state := kubeapi.PodSandboxState_SANDBOX_READY
    pnd := &tapmanager.PodNetworkDesc{
        PodID:   podID,
        PodNs:   podNs,
        PodName: podName,
    }
    // Mimic kubelet's method of handling nameservers.
    // As of k8s 1.5.2, kubelet doesn't use any nameserver information from CNI.
    // (TODO: recheck this for 1.6)
    // CNI is used just to configure the network namespace and CNI DNS
    // info is ignored. Instead of this, DnsConfig from PodSandboxConfig
    // is used to configure container's resolv.conf.
    if config.DnsConfig != nil {
        pnd.DNS = &cnitypes.DNS{
            Nameservers: config.DnsConfig.Servers,
            Search:      config.DnsConfig.Searches,
            Options:     config.DnsConfig.Options,
        }
    }

    fdPayload := &tapmanager.GetFDPayload{Description: pnd}
    csnBytes, err := v.fdManager.AddFDs(podID, fdPayload)
    // The reason for defer here is that it is also necessary to ReleaseFDs if AddFDs fail
    // Try to clean up CNI netns (this may be necessary e.g. in case of multiple CNI plugins with CNI Genie)
    defer func() {
        if retErr != nil {
            // Try to clean up CNI netns if we couldn't add the pod to the metadata store or if AddFDs call wasn't
            // successful to avoid leaking resources
            if fdErr := v.fdManager.ReleaseFDs(podID); fdErr != nil {
                glog.Errorf("Error removing pod %s (%s) from CNI network: %v", podName, podID, fdErr)
            }
        }
    }()
    if err != nil {
        return nil, fmt.Errorf("Error adding pod %s (%s) to CNI network: %v", podName, podID, err)
    }

    psi, err := metadata.NewPodSandboxInfo(
        CRIPodSandboxConfigToPodSandboxConfig(config),
        csnBytes, types.PodSandboxState(state), v.clock)
    if err != nil {
        return nil, err
    }

    sandbox = v.metadataStore.PodSandbox(config.Metadata.Uid)
    if err := sandbox.Save(
        func(c *types.PodSandboxInfo) (*types.PodSandboxInfo, error) {
            return psi, nil
        },
    ); err != nil {
        return nil, err
    }

    return &kubeapi.RunPodSandboxResponse{
        PodSandboxId: podID,
    }, nil
}

// StopPodSandbox implements StopPodSandbox method of CRI.
func (v *VirtletRuntimeService) StopPodSandbox(ctx context.Context, in *kubeapi.StopPodSandboxRequest) (*kubeapi.StopPodSandboxResponse, error) {
    sandbox := v.metadataStore.PodSandbox(in.PodSandboxId)
    switch sandboxInfo, err := sandbox.Retrieve(); {
    case err != nil:
        return nil, err
    case sandboxInfo == nil:
        return nil, fmt.Errorf("sandbox %q not found in Virtlet metadata store", in.PodSandboxId)
    // check if the sandbox is already stopped
    case sandboxInfo.State != types.PodSandboxState_SANDBOX_NOTREADY:
        if err := sandbox.Save(
            func(c *types.PodSandboxInfo) (*types.PodSandboxInfo, error) {
                // make sure the pod is not removed during the call
                if c != nil {
                    c.State = types.PodSandboxState_SANDBOX_NOTREADY
                }
                return c, nil
            },
        ); err != nil {
            return nil, err
        }

        if err := v.fdManager.ReleaseFDs(in.PodSandboxId); err != nil {
            glog.Errorf("Error releasing tap fd for the pod %q: %v", in.PodSandboxId, err)
        }
    }

    response := &kubeapi.StopPodSandboxResponse{}
    return response, nil
}

// RemovePodSandbox method implements RemovePodSandbox from CRI.
func (v *VirtletRuntimeService) RemovePodSandbox(ctx context.Context, in *kubeapi.RemovePodSandboxRequest) (*kubeapi.RemovePodSandboxResponse, error) {
    podSandboxID := in.PodSandboxId

    if err := v.metadataStore.PodSandbox(podSandboxID).Save(
        func(c *types.PodSandboxInfo) (*types.PodSandboxInfo, error) {
            return nil, nil
        },
    ); err != nil {
        return nil, err
    }

    response := &kubeapi.RemovePodSandboxResponse{}
    return response, nil
}

// PodSandboxStatus method implements PodSandboxStatus from CRI.
func (v *VirtletRuntimeService) PodSandboxStatus(ctx context.Context, in *kubeapi.PodSandboxStatusRequest) (*kubeapi.PodSandboxStatusResponse, error) {
    podSandboxID := in.PodSandboxId

    sandbox := v.metadataStore.PodSandbox(podSandboxID)
    sandboxInfo, err := sandbox.Retrieve()
    if err != nil {
        return nil, err
    }
    if sandboxInfo == nil {
        return nil, fmt.Errorf("sandbox %q not found in Virtlet metadata store", podSandboxID)
    }
    status := PodSandboxInfoToCRIPodSandboxStatus(sandboxInfo)

    var cniResult *cnicurrent.Result
    if sandboxInfo.ContainerSideNetwork != nil {
        cniResult = sandboxInfo.ContainerSideNetwork.Result
    }

    ip := cni.GetPodIP(cniResult)
    if ip != "" {
        status.Network = &kubeapi.PodSandboxNetworkStatus{Ip: ip}
    }

    response := &kubeapi.PodSandboxStatusResponse{Status: status}
    return response, nil
}

// ListPodSandbox method implements ListPodSandbox from CRI.
func (v *VirtletRuntimeService) ListPodSandbox(ctx context.Context, in *kubeapi.ListPodSandboxRequest) (*kubeapi.ListPodSandboxResponse, error) {
    filter := CRIPodSandboxFilterToPodSandboxFilter(in.GetFilter())
    sandboxes, err := v.metadataStore.ListPodSandboxes(filter)
    if err != nil {
        return nil, err
    }
    var podSandboxList []*kubeapi.PodSandbox
    for _, sandbox := range sandboxes {
        sandboxInfo, err := sandbox.Retrieve()
        if err != nil {
            glog.Errorf("Error retrieving pod sandbox %q", sandbox.GetID())
        }
        if sandboxInfo != nil {
            podSandboxList = append(podSandboxList, PodSandboxInfoToCRIPodSandbox(sandboxInfo))
        }
    }
    response := &kubeapi.ListPodSandboxResponse{Items: podSandboxList}
    return response, nil
}

//
// Containers
//

// CreateContainer method implements CreateContainer from CRI.
func (v *VirtletRuntimeService) CreateContainer(ctx context.Context, in *kubeapi.CreateContainerRequest) (*kubeapi.CreateContainerResponse, error) {
    config := in.GetConfig()
    podSandboxID := in.PodSandboxId
    name := config.GetMetadata().Name

    sandboxInfo, err := v.metadataStore.PodSandbox(podSandboxID).Retrieve()
    if err != nil {
        return nil, err
    }
    if sandboxInfo == nil {
        return nil, fmt.Errorf("sandbox %q not in Virtlet metadata store", podSandboxID)
    }

    // Was a container already started in this sandbox?
    // NOTE: there is no distinction between lack of key and other types of
    // errors when accessing boltdb. This will be changed when we switch to
    // storing whole marshaled sandbox metadata as json.
    curContainers, err := v.metadataStore.ListPodContainers(podSandboxID)
    if err != nil {
        glog.V(3).Infof("Error retrieving pod %q containers", podSandboxID)
    } else {
        for _, container := range curContainers {
            // TODO: check container name; if it's the same, update the network config
            glog.V(3).Infof("CreateContainer: there's already a container in the sandbox (id: %s)", container.GetID())
            //err := v.updateContainer(sandboxInfo, container.GetID())
            err := v.virtTool.UpdateContainerNetwork(container.GetID(), sandboxInfo.ContainerSideNetwork)
            if err != nil {
                return nil, err
            }
            response := &kubeapi.CreateContainerResponse{ContainerId: container.GetID()}
            return response, nil
        }
    }

    fdKey := podSandboxID
    vmConfig, err := GetVMConfig(in, sandboxInfo.ContainerSideNetwork)
    if err != nil {
        return nil, err
    }
    if sandboxInfo.ContainerSideNetwork == nil || sandboxInfo.ContainerSideNetwork.Result == nil {
        fdKey = ""
    }

    uuid, err := v.virtTool.CreateContainer(vmConfig, fdKey)
    if err != nil {
        glog.Errorf("Error creating container %s: %v", name, err)
        return nil, err
    }

    response := &kubeapi.CreateContainerResponse{ContainerId: uuid}
    return response, nil
}

// StartContainer method implements StartContainer from CRI.
func (v *VirtletRuntimeService) StartContainer(ctx context.Context, in *kubeapi.StartContainerRequest) (*kubeapi.StartContainerResponse, error) {
    info, err := v.virtTool.ContainerInfo(in.ContainerId)
    if err == nil && info != nil && info.State == types.ContainerState_CONTAINER_RUNNING {
        glog.V(2).Infof("StartContainer: Container %s is already running", in.ContainerId)
        response := &kubeapi.StartContainerResponse{}
        return response, nil
    }

    if err := v.virtTool.StartContainer(in.ContainerId); err != nil {
        return nil, err
    }
    response := &kubeapi.StartContainerResponse{}
    return response, nil
}

// StopContainer method implements StopContainer from CRI.
func (v *VirtletRuntimeService) StopContainer(ctx context.Context, in *kubeapi.StopContainerRequest) (*kubeapi.StopContainerResponse, error) {
    if err := v.virtTool.StopContainer(in.ContainerId, time.Duration(in.Timeout)*time.Second); err != nil {
        return nil, err
    }
    response := &kubeapi.StopContainerResponse{}
    return response, nil
}

// RemoveContainer method implements RemoveContainer from CRI.
func (v *VirtletRuntimeService) RemoveContainer(ctx context.Context, in *kubeapi.RemoveContainerRequest) (*kubeapi.RemoveContainerResponse, error) {
    if err := v.virtTool.RemoveContainer(in.ContainerId); err != nil {
        return nil, err
    }

    if err := v.gcHandler.GC(); err != nil {
        return nil, fmt.Errorf("GC error: %v", err)
    }

    response := &kubeapi.RemoveContainerResponse{}
    return response, nil
}

// ListContainers method implements ListContainers from CRI.
func (v *VirtletRuntimeService) ListContainers(ctx context.Context, in *kubeapi.ListContainersRequest) (*kubeapi.ListContainersResponse, error) {
    filter := CRIContainerFilterToContainerFilter(in.GetFilter())
    containers, err := v.virtTool.ListContainers(filter)
    if err != nil {
        return nil, err
    }
    var r []*kubeapi.Container
    for _, c := range containers {
        r = append(r, ContainerInfoToCRIContainer(c))
    }
    response := &kubeapi.ListContainersResponse{Containers: r}
    return response, nil
}

// ContainerStatus method implements ContainerStatus from CRI.
func (v *VirtletRuntimeService) ContainerStatus(ctx context.Context, in *kubeapi.ContainerStatusRequest) (*kubeapi.ContainerStatusResponse, error) {
    info, err := v.virtTool.ContainerInfo(in.ContainerId)
    if err != nil {
        return nil, err
    }

    response := &kubeapi.ContainerStatusResponse{Status: ContainerInfoToCRIContainerStatus(info)}
    return response, nil
}

// ExecSync is a placeholder for an unimplemented CRI method.
func (v *VirtletRuntimeService) ExecSync(context.Context, *kubeapi.ExecSyncRequest) (*kubeapi.ExecSyncResponse, error) {
    return nil, errors.New("not implemented")
}

// Exec is a placeholder for an unimplemented CRI method.
func (v *VirtletRuntimeService) Exec(context.Context, *kubeapi.ExecRequest) (*kubeapi.ExecResponse, error) {
    return nil, errors.New("not implemented")
}

// Attach calls streamer server to implement Attach functionality from CRI.
func (v *VirtletRuntimeService) Attach(ctx context.Context, req *kubeapi.AttachRequest) (*kubeapi.AttachResponse, error) {
    if !req.Stdout && !req.Stderr {
        // Support k8s 1.8 or earlier.
        // We don't care about Stderr because it's not used
        // by the Virtlet stream server.
        req.Stdout = true
    }
    return v.streamServer.GetAttach(req)
}

// PortForward calls streamer server to implement PortForward functionality from CRI.
func (v *VirtletRuntimeService) PortForward(ctx context.Context, req *kubeapi.PortForwardRequest) (*kubeapi.PortForwardResponse, error) {
    return v.streamServer.GetPortForward(req)
}

// UpdateRuntimeConfig is a placeholder for an unimplemented CRI method.
func (v *VirtletRuntimeService) UpdateRuntimeConfig(context.Context, *kubeapi.UpdateRuntimeConfigRequest) (*kubeapi.UpdateRuntimeConfigResponse, error) {
    // we don't need to do anything here for now
    return &kubeapi.UpdateRuntimeConfigResponse{}, nil
}

// UpdateContainerResources stores in domain on libvirt info about Cpuset
// for container then looks for running emulator and tries to adjust its
// current settings through cgroups
func (v *VirtletRuntimeService) UpdateContainerResources(ctx context.Context, req *kubeapi.UpdateContainerResourcesRequest) (*kubeapi.UpdateContainerResourcesResponse, error) {
    setByCgroup, err := v.virtTool.UpdateCpusetsForEmulatorProcess(req.GetContainerId(), req.GetLinux().CpusetCpus)
    if err != nil {
        return nil, err
    }
    if !setByCgroup {
        if err = v.virtTool.UpdateCpusetsInContainerDefinition(req.GetContainerId(), req.GetLinux().CpusetCpus); err != nil {
            return nil, err
        }
    }
    return &kubeapi.UpdateContainerResourcesResponse{}, nil
}

// Status method implements Status from CRI for both types of service, Image and Runtime.
func (v *VirtletRuntimeService) Status(context.Context, *kubeapi.StatusRequest) (*kubeapi.StatusResponse, error) {
    ready := true
    runtimeReadyStr := kubeapi.RuntimeReady
    networkReadyStr := kubeapi.NetworkReady
    return &kubeapi.StatusResponse{
        Status: &kubeapi.RuntimeStatus{
            Conditions: []*kubeapi.RuntimeCondition{
                {
                    Type:   runtimeReadyStr,
                    Status: ready,
                },
                {
                    Type:   networkReadyStr,
                    Status: ready,
                },
            },
        },
    }, nil
}

// ContainerStats returns cpu/memory/disk usage for particular container id
func (v *VirtletRuntimeService) ContainerStats(ctx context.Context, in *kubeapi.ContainerStatsRequest) (*kubeapi.ContainerStatsResponse, error) {
    info, err := v.virtTool.ContainerInfo(in.ContainerId)
    if err != nil {
        return nil, err
    }
    vs, err := v.virtTool.VMStats(info.Id, info.Name)
    if err != nil {
        return nil, err
    }
    fsstats, err := v.virtTool.ImageManager().FilesystemStats()
    if err != nil {
        return nil, err
    }
    return &kubeapi.ContainerStatsResponse{
        Stats: VMStatsToCRIContainerStats(*vs, fsstats.Mountpoint),
    }, nil
}

// ListContainerStats returns stats (same as ContainerStats) for containers
// selected by filter
func (v *VirtletRuntimeService) ListContainerStats(ctx context.Context, in *kubeapi.ListContainerStatsRequest) (*kubeapi.ListContainerStatsResponse, error) {
    filter := CRIContainerStatsFilterToVMStatsFilter(in.GetFilter())
    vmstatsList, err := v.virtTool.ListVMStats(filter)
    if err != nil {
        return nil, err
    }
    fsstats, err := v.virtTool.ImageManager().FilesystemStats()
    if err != nil {
        return nil, err
    }
    var stats []*kubeapi.ContainerStats
    for _, vs := range vmstatsList {
        stats = append(stats, VMStatsToCRIContainerStats(vs, fsstats.Mountpoint))
    }

    return &kubeapi.ListContainerStatsResponse{
        Stats: stats,
    }, nil
}

// VMStatsToCRIContainerStats converts internal representation of vm/container stats
// to corresponding kubeapi type object
func VMStatsToCRIContainerStats(vs types.VMStats, mountpoint string) *kubeapi.ContainerStats {
    return &kubeapi.ContainerStats{
        Attributes: &kubeapi.ContainerAttributes{
            Id: vs.ContainerID,
            Metadata: &kubeapi.ContainerMetadata{
                Name: vs.ContainerID,
            },
        },
        Cpu: &kubeapi.CpuUsage{
            Timestamp:            vs.Timestamp,
            UsageCoreNanoSeconds: &kubeapi.UInt64Value{Value: vs.CpuUsage},
        },
        Memory: &kubeapi.MemoryUsage{
            Timestamp:       vs.Timestamp,
            WorkingSetBytes: &kubeapi.UInt64Value{Value: vs.MemoryUsage},
        },
        WritableLayer: &kubeapi.FilesystemUsage{
            Timestamp: vs.Timestamp,
            FsId: &kubeapi.FilesystemIdentifier{
                Mountpoint: mountpoint,
            },
            UsedBytes:  &kubeapi.UInt64Value{Value: vs.FsBytes},
            InodesUsed: &kubeapi.UInt64Value{Value: 1},
        },
    }
}

// ReopenContainerLog is a placeholder for an unimplemented CRI method.
func (v *VirtletRuntimeService) ReopenContainerLog(ctx context.Context, in *kubeapi.ReopenContainerLogRequest) (*kubeapi.ReopenContainerLogResponse, error) {
    return &kubeapi.ReopenContainerLogResponse{}, nil
}

func validatePodSandboxConfig(config *kubeapi.PodSandboxConfig) error {
    if config.GetMetadata() == nil {
        return errors.New("sandbox config is missing Metadata attribute")
    }

    return nil
}