vorteil/vorteil

View on GitHub
pkg/vconvert/vconvert.go

Summary

Maintainability
B
4 hrs
Test Coverage
F
0%
package vconvert

/**
 * SPDX-License-Identifier: Apache-2.0
 * Copyright 2020 vorteil.io Pty Ltd
 */

import (
    "archive/tar"
    "context"
    "fmt"
    "io"
    "io/ioutil"
    "os"
    "regexp"
    "strings"

    "github.com/sirupsen/logrus"
    "github.com/vorteil/vorteil/pkg/elog"
    "github.com/vorteil/vorteil/pkg/vcfg"

    "github.com/containerd/containerd/archive"
    "github.com/containerd/containerd/archive/compression"
    v1 "github.com/google/go-containerregistry/pkg/v1"

    clog "github.com/containerd/containerd/log"
    "github.com/heroku/docker-registry-client/registry"
    parser "github.com/novln/docker-parser"
)

const (
    configRepo    = "repositories"
    configURL     = "url"
    workers       = 5
    tarExpression = "%s/%s.tar"

    defaultDiskSize = "+256 MB"
    defaultRAMSize  = "256 MB"

    dockerRuntime     = "docker"
    containerdRuntime = "containerd"

    localIdentifier = "local."
)

// RegistryType defines if it is a image repository server or a local
// container runtime instance
type RegistryType string

// RegistryType values where local is e.g. docker and remote docker hub
var (
    DockerRegistry     RegistryType = "docker"
    ContainerdRegistry RegistryType = "containerd"
    RemoteRegistry     RegistryType = "remote"
    NullRegistry       RegistryType = ""
)

type job struct {
    layer  *layer
    dir    string
    number int

    // return value
    name string
    err  error
}

// compat struct for layers from local runtimes and remote image repos
type layer struct {
    layer interface{}
    size  int64
    hash  string

    file string
}

// ContainerConverter is the base object. Create a client with NewContainerConverter.
type ContainerConverter struct {
    imageRef *parser.Reference

    imageConfig  v1.Config
    registry     *registry.Registry
    registryType RegistryType

    layers      []*layer
    fetchReader func(string, *layer, *registry.Registry) (io.ReadCloser, error)
    jobsCh      chan *job
    jobsDoneCh  chan *job
    tmpLocalTar *os.File

    logger elog.View
}

// NewContainerConverter returns a ContainerConverter for the given image
// It parses and validates the application name., Logger elog.View
func NewContainerConverter(app, config string, log elog.View) (*ContainerConverter, error) {

    if log == nil {
        log = &elog.CLI{}
    }

    initConfig(config, log)

    // get the ref first
    ref, err := parser.Parse(app)
    if err != nil {
        return nil, err
    }

    log.Printf("convert image: %s", ref.Name())

    cc := &ContainerConverter{
        imageRef: ref,
    }

    if strings.HasPrefix(ref.Registry(), localIdentifier) {
        // we only support docker and containerd
        // we can assume %s.%s here
        s := strings.SplitN(ref.Registry(), ".", 2)

        switch s[1] {
        case dockerRuntime:
            cc.fetchReader = localGetReader
            cc.registryType = DockerRegistry
        case containerdRuntime:
            cc.fetchReader = localGetReader
            cc.registryType = ContainerdRegistry
        default:
            return nil, fmt.Errorf("unknown local container runtime")
        }

    } else {
        cc.registryType = RemoteRegistry
        cc.fetchReader = remoteGetReader
    }

    cc.logger = log
    cc.jobsDoneCh = make(chan *job, workers)

    return cc, nil

}

// ConvertToProject exports a container image as a vorteil.io VM into the dst directory
func (cc *ContainerConverter) ConvertToProject(dst, user, pwd string) error {

    // check if folder exists
    err := checkDirectory(dst)
    if err != nil {
        return err
    }

    var (
        url string
    )

    if cc.RegistryType() == RemoteRegistry {
        reg, err := fetchRepoConfig(cc.RegistryName())
        if err != nil {
            return err
        }

        url = reg["url"].(string)
        if url == "" {
            return fmt.Errorf("url not available for registry %s", cc.RegistryName())
        }

        cc.logger.Printf("registry %s, url %s", cc.RegistryName(), url)

    } else {
        cc.logger.Printf("registry %s", cc.RegistryType())
    }

    err = cc.downloadImageInformation(&registryConfig{
        url:  url,
        user: user,
        pwd:  pwd,
    })

    if err != nil {
        return err
    }

    dir, _ := ioutil.TempDir("", "vtest")
    defer os.RemoveAll(dir)

    err = cc.downloadBlobs(dir)
    if err != nil {
        return err
    }

    err = cc.untarLayers(dst)
    if err != nil {
        return err
    }

    err = cc.createVCFG(cc.imageConfig, dst)
    if err != nil {
        return err
    }

    return nil
}

// RegistryType returns the type of registry: local, remote or none
func (cc *ContainerConverter) RegistryType() RegistryType {
    return cc.registryType
}

// RegistryName returns the name of the image registry
// Returns empty string for local registries
func (cc *ContainerConverter) RegistryName() string {
    registry := ""

    if cc.registryType == RemoteRegistry {
        return cc.imageRef.Registry()
    }

    return registry
}

// downloadImageInformation downloads the information required to download the
// layers of an image. After downloading the layer information is stored in ContainerConverter.
// For remote registries the config needs to be provided with at least the url of the registry.
func (cc *ContainerConverter) downloadImageInformation(config *registryConfig) error {

    if cc.imageRef == nil {
        return fmt.Errorf("image reference missing")
    }

    var err error

    switch cc.registryType {
    case DockerRegistry:
        err = cc.downloadInformationDocker(cc.imageRef.ShortName(), cc.imageRef.Tag())
    case ContainerdRegistry:
        err = cc.downloadInformationContainerd(cc.imageRef.ShortName(), cc.imageRef.Tag())
    default:
        err = cc.downloadInformationRemote(config)
    }

    return err
}

func (cc *ContainerConverter) downloadBlobs(dir string) error {

    if dir == "" {
        return fmt.Errorf("directory not provided for downloads")
    }

    cc.jobsCh = make(chan *job, len(cc.layers))

    // start workers
    for i := 0; i < workers; i++ {
        go cc.blobDownloadWorker()
    }

    cc.logger.Printf("downloading %d layers", len(cc.layers))

    for i, layer := range cc.layers {

        job := &job{
            layer:  layer,
            number: i,
            dir:    dir,
        }
        cc.jobsCh <- job
    }

    cc.logger.Debugf("all %d jobs sent", len(cc.layers))
    r := 0
    for {

        j := <-cc.jobsDoneCh
        if j.err != nil {
            cc.logger.Errorf("error downloading layer: %s", j.err.Error())
        }
        r++
        cc.logger.Debugf("received %d from %d jobs finished", r, len(cc.layers))

        // we have received all responses
        if r == len(cc.layers) {
            break
        }
    }

    cc.logger.Printf("all layers downloaded")
    close(cc.jobsCh)

    if cc.tmpLocalTar != nil {
        os.Remove(cc.tmpLocalTar.Name())
        cc.tmpLocalTar = nil
    }

    return nil
}

func (cc *ContainerConverter) untarLayers(targetDir string) error {

    err := checkDirectory(targetDir)
    if err != nil {
        return err
    }

    for _, layer := range cc.layers {

        if layer.file == "" {
            return fmt.Errorf("no file associated with layer %s", layer.hash)
        }

        cc.logger.Printf("unpack layer %s into %s", layer.file, targetDir)

        fn, err := os.Open(layer.file)
        if err != nil {
            return err
        }

        // suppressing containerd logs
        clog.L.Logger.SetLevel(logrus.ErrorLevel)

        r, err := compression.DecompressStream(fn)
        if err != nil {
            return err
        }

        if _, err := archive.Apply(context.TODO(), targetDir, r, archive.WithFilter(func(hdr *tar.Header) (bool, error) {

            // fetching uid, gid
            hdr.Uid, hdr.Gid = fetchUIDandGID()

            // check if in our skip list
            for _, f := range folders {
                // check 3 different variations of the folder
                // /folder folder /folder/
                fmts := []string{"/%s", "/%s/", "%s"}
                for _, f1 := range fmts {
                    if strings.HasPrefix(hdr.Name, fmt.Sprintf(f1, f)) {
                        cc.logger.Debugf("skipping file/dir %s", hdr.Name)
                        return false, nil
                    }
                }
            }
            return true, nil

        })); err != nil {
            r.Close()
            return err
        }
        clog.L.Logger.SetLevel(logrus.InfoLevel)
        r.Close()
    }

    cc.logger.Printf("files created into %s", targetDir)

    return nil
}

func (cc *ContainerConverter) createVCFG(config v1.Config, targetDir string) error {

    if _, err := os.Stat(targetDir); err != nil {
        return fmt.Errorf("directory %s does not exist", targetDir)
    }

    vcfgFile := new(vcfg.VCFG)

    ds, _ := vcfg.ParseBytes(defaultDiskSize)
    vcfgFile.VM.DiskSize = ds

    ram, _ := vcfg.ParseBytes(defaultRAMSize)
    vcfgFile.VM.RAM = ram

    vcfgFile.Programs = make([]vcfg.Program, 1)
    vcfgFile.Networks = make([]vcfg.NetworkInterface, 1)

    var finalCmd []string

    if len(config.Entrypoint) > 0 {
        ss := []string(config.Entrypoint)
        finalCmd = append(finalCmd, ss...)
    }

    if len(config.Cmd) > 0 {
        finalCmd = append(finalCmd, config.Cmd...)
    }

    vcfgFile.Programs[0].Cwd = config.WorkingDir

    if len(finalCmd) == 0 {
        return fmt.Errorf("can not generate command: %s", finalCmd)
    }

    bin, err := findBinary(finalCmd[0], config.Env, config.WorkingDir, targetDir, cc.logger)
    if err != nil {
        return err
    }

    var args []string
    args = append(args, bin)

    for _, arg := range finalCmd[1:] {
        if len(arg) == 1 {
            continue
        }
        if strings.Index(arg, " ") > 0 {
            args = append(args, fmt.Sprintf("'%s'", arg))
        } else {
            args = append(args, arg)
        }
    }

    argsString := strings.Join(args, " ")
    space := regexp.MustCompile(`\s+`)
    s := space.ReplaceAllString(argsString, " ")
    vcfgFile.Programs[0].Args = s

    // environment variables
    vcfgFile.Programs[0].Env = config.Env

    var portTCP []string
    var portUDP []string
    for key := range config.ExposedPorts {
        p := strings.SplitN(string(key), "/", 2)
        if len(p) == 2 {
            if p[1] == "tcp" {
                portTCP = append(portTCP, p[0])
            } else {
                portUDP = append(portUDP, p[0])
            }
        } else {
            portTCP = append(portTCP, p[0])
        }
    }
    vcfgFile.Networks[0].TCP = portTCP
    vcfgFile.Networks[0].UDP = portUDP

    b, err := vcfgFile.Marshal()
    if err != nil {
        return err
    }

    // write default.vcfg and .projectfile
    err = ioutil.WriteFile(fmt.Sprintf("%s/.vorteilproject", targetDir), []byte(defaultProjectFile), 0644)
    if err != nil {
        return err
    }

    err = ioutil.WriteFile(fmt.Sprintf("%s/default.vcfg", targetDir), b, 0644)
    if err != nil {
        return err
    }

    cc.logger.Debugf("vcfg file:\n%v\n", string(b))

    return nil

}

func (cc *ContainerConverter) blobDownloadWorker() {

    var (
        err    error
        reader io.ReadCloser
        pr     io.ReadCloser
        p      elog.Progress
    )

    for {

        job, opened := <-cc.jobsCh
        if !opened {
            break
        }

        reader, err = cc.fetchReader(cc.imageRef.ShortName(), job.layer, cc.registry)
        if err != nil {
            goto cont
        }

        p = cc.logger.NewProgress(fmt.Sprintf("layer %s:", job.layer.hash), "KiB", job.layer.size)
        pr = p.ProxyReader(reader)

        job.name = fmt.Sprintf(tarExpression, job.dir, job.layer.hash)

        err = writeFile(job.name, pr)

    cont:
        job.err = err

        if err != nil {
            p.Finish(false)
        } else {
            p.Finish(true)
        }

        pr.Close()

        // set the file to the layer
        cc.layers[job.number].file = job.name
        cc.jobsDoneCh <- job

    }

}