docker/docker

View on GitHub
builder/builder-next/builder.go

Summary

Maintainability
D
2 days
Test Coverage
package buildkit

import (
    "context"
    "fmt"
    "io"
    "net"
    "strconv"
    "strings"
    "sync"
    "time"

    "github.com/containerd/containerd/platforms"
    "github.com/containerd/containerd/remotes/docker"
    "github.com/docker/docker/api/types"
    "github.com/docker/docker/api/types/backend"
    timetypes "github.com/docker/docker/api/types/time"
    "github.com/docker/docker/builder"
    "github.com/docker/docker/builder/builder-next/exporter"
    "github.com/docker/docker/builder/builder-next/exporter/mobyexporter"
    "github.com/docker/docker/builder/builder-next/exporter/overrides"
    "github.com/docker/docker/daemon/config"
    "github.com/docker/docker/daemon/images"
    "github.com/docker/docker/libnetwork"
    "github.com/docker/docker/opts"
    "github.com/docker/docker/pkg/idtools"
    "github.com/docker/docker/pkg/streamformatter"
    "github.com/docker/go-units"
    controlapi "github.com/moby/buildkit/api/services/control"
    "github.com/moby/buildkit/client"
    "github.com/moby/buildkit/control"
    "github.com/moby/buildkit/identity"
    "github.com/moby/buildkit/session"
    "github.com/moby/buildkit/util/entitlements"
    "github.com/moby/buildkit/util/tracing"
    "github.com/pkg/errors"
    "golang.org/x/sync/errgroup"
    "google.golang.org/grpc"
    grpcmetadata "google.golang.org/grpc/metadata"
)

type errMultipleFilterValues struct{}

func (errMultipleFilterValues) Error() string { return "filters expect only one value" }

func (errMultipleFilterValues) InvalidParameter() {}

type errConflictFilter struct {
    a, b string
}

func (e errConflictFilter) Error() string {
    return fmt.Sprintf("conflicting filters: %q and %q", e.a, e.b)
}

func (errConflictFilter) InvalidParameter() {}

type errInvalidFilterValue struct {
    error
}

func (errInvalidFilterValue) InvalidParameter() {}

var cacheFields = map[string]bool{
    "id":          true,
    "parent":      true,
    "type":        true,
    "description": true,
    "inuse":       true,
    "shared":      true,
    "private":     true,
    // fields from buildkit that are not exposed
    "mutable":   false,
    "immutable": false,
}

// Opt is option struct required for creating the builder
type Opt struct {
    SessionManager      *session.Manager
    Root                string
    EngineID            string
    Dist                images.DistributionServices
    ImageTagger         mobyexporter.ImageTagger
    NetworkController   *libnetwork.Controller
    DefaultCgroupParent string
    RegistryHosts       docker.RegistryHosts
    BuilderConfig       config.BuilderConfig
    Rootless            bool
    IdentityMapping     idtools.IdentityMapping
    DNSConfig           config.DNSConfig
    ApparmorProfile     string
    UseSnapshotter      bool
    Snapshotter         string
    ContainerdAddress   string
    ContainerdNamespace string
}

// Builder can build using BuildKit backend
type Builder struct {
    controller     *control.Controller
    dnsconfig      config.DNSConfig
    reqBodyHandler *reqBodyHandler

    mu             sync.Mutex
    jobs           map[string]*buildJob
    useSnapshotter bool
}

// New creates a new builder
func New(ctx context.Context, opt Opt) (*Builder, error) {
    reqHandler := newReqBodyHandler(tracing.DefaultTransport)

    c, err := newController(ctx, reqHandler, opt)
    if err != nil {
        return nil, err
    }
    b := &Builder{
        controller:     c,
        dnsconfig:      opt.DNSConfig,
        reqBodyHandler: reqHandler,
        jobs:           map[string]*buildJob{},
        useSnapshotter: opt.UseSnapshotter,
    }
    return b, nil
}

func (b *Builder) Close() error {
    return b.controller.Close()
}

// RegisterGRPC registers controller to the grpc server.
func (b *Builder) RegisterGRPC(s *grpc.Server) {
    b.controller.Register(s)
}

// Cancel cancels a build using ID
func (b *Builder) Cancel(ctx context.Context, id string) error {
    b.mu.Lock()
    if j, ok := b.jobs[id]; ok && j.cancel != nil {
        j.cancel()
    }
    b.mu.Unlock()
    return nil
}

// DiskUsage returns a report about space used by build cache
func (b *Builder) DiskUsage(ctx context.Context) ([]*types.BuildCache, error) {
    duResp, err := b.controller.DiskUsage(ctx, &controlapi.DiskUsageRequest{})
    if err != nil {
        return nil, err
    }

    var items []*types.BuildCache
    for _, r := range duResp.Record {
        items = append(items, &types.BuildCache{
            ID:          r.ID,
            Parent:      r.Parent, //nolint:staticcheck // ignore SA1019 (Parent field is deprecated)
            Parents:     r.Parents,
            Type:        r.RecordType,
            Description: r.Description,
            InUse:       r.InUse,
            Shared:      r.Shared,
            Size:        r.Size_,
            CreatedAt:   r.CreatedAt,
            LastUsedAt:  r.LastUsedAt,
            UsageCount:  int(r.UsageCount),
        })
    }
    return items, nil
}

// Prune clears all reclaimable build cache
func (b *Builder) Prune(ctx context.Context, opts types.BuildCachePruneOptions) (int64, []string, error) {
    ch := make(chan *controlapi.UsageRecord)

    eg, ctx := errgroup.WithContext(ctx)

    validFilters := make(map[string]bool, 1+len(cacheFields))
    validFilters["unused-for"] = true
    validFilters["until"] = true
    validFilters["label"] = true  // TODO(tiborvass): handle label
    validFilters["label!"] = true // TODO(tiborvass): handle label!
    for k, v := range cacheFields {
        validFilters[k] = v
    }
    if err := opts.Filters.Validate(validFilters); err != nil {
        return 0, nil, err
    }

    pi, err := toBuildkitPruneInfo(opts)
    if err != nil {
        return 0, nil, err
    }

    eg.Go(func() error {
        defer close(ch)
        return b.controller.Prune(&controlapi.PruneRequest{
            All:          pi.All,
            KeepDuration: int64(pi.KeepDuration),
            KeepBytes:    pi.KeepBytes,
            Filter:       pi.Filter,
        }, &pruneProxy{
            streamProxy: streamProxy{ctx: ctx},
            ch:          ch,
        })
    })

    var size int64
    var cacheIDs []string
    eg.Go(func() error {
        for r := range ch {
            size += r.Size_
            cacheIDs = append(cacheIDs, r.ID)
        }
        return nil
    })

    if err := eg.Wait(); err != nil {
        return 0, nil, err
    }

    return size, cacheIDs, nil
}

// Build executes a build request
func (b *Builder) Build(ctx context.Context, opt backend.BuildConfig) (*builder.Result, error) {
    if len(opt.Options.Outputs) > 1 {
        return nil, errors.Errorf("multiple outputs not supported")
    }

    rc := opt.Source
    if buildID := opt.Options.BuildID; buildID != "" {
        b.mu.Lock()

        upload := false
        if strings.HasPrefix(buildID, "upload-request:") {
            upload = true
            buildID = strings.TrimPrefix(buildID, "upload-request:")
        }

        if _, ok := b.jobs[buildID]; !ok {
            b.jobs[buildID] = newBuildJob()
        }
        j := b.jobs[buildID]
        var cancel func()
        ctx, cancel = context.WithCancel(ctx)
        j.cancel = cancel
        b.mu.Unlock()

        if upload {
            ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
            defer cancel()
            err := j.SetUpload(ctx2, rc)
            return nil, err
        }

        if remoteContext := opt.Options.RemoteContext; remoteContext == "upload-request" {
            ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
            defer cancel()
            var err error
            rc, err = j.WaitUpload(ctx2)
            if err != nil {
                return nil, err
            }
            opt.Options.RemoteContext = ""
        }

        defer func() {
            b.mu.Lock()
            delete(b.jobs, buildID)
            b.mu.Unlock()
        }()
    }

    var out builder.Result

    id := identity.NewID()

    frontendAttrs := map[string]string{}

    if opt.Options.Target != "" {
        frontendAttrs["target"] = opt.Options.Target
    }

    if opt.Options.Dockerfile != "" && opt.Options.Dockerfile != "." {
        frontendAttrs["filename"] = opt.Options.Dockerfile
    }

    if opt.Options.RemoteContext != "" {
        if opt.Options.RemoteContext != "client-session" {
            frontendAttrs["context"] = opt.Options.RemoteContext
        }
    } else {
        url, cancel := b.reqBodyHandler.newRequest(rc)
        defer cancel()
        frontendAttrs["context"] = url
    }

    cacheFrom := append([]string{}, opt.Options.CacheFrom...)

    frontendAttrs["cache-from"] = strings.Join(cacheFrom, ",")

    for k, v := range opt.Options.BuildArgs {
        if v == nil {
            continue
        }
        frontendAttrs["build-arg:"+k] = *v
    }

    for k, v := range opt.Options.Labels {
        frontendAttrs["label:"+k] = v
    }

    if opt.Options.NoCache {
        frontendAttrs["no-cache"] = ""
    }

    if opt.Options.PullParent {
        frontendAttrs["image-resolve-mode"] = "pull"
    } else {
        frontendAttrs["image-resolve-mode"] = "default"
    }

    if opt.Options.Platform != "" {
        // same as in newBuilder in builder/dockerfile.builder.go
        // TODO: remove once opt.Options.Platform is of type specs.Platform
        _, err := platforms.Parse(opt.Options.Platform)
        if err != nil {
            return nil, err
        }
        frontendAttrs["platform"] = opt.Options.Platform
    }

    switch opt.Options.NetworkMode {
    case "host", "none":
        frontendAttrs["force-network-mode"] = opt.Options.NetworkMode
    case "", "default":
    default:
        return nil, errors.Errorf("network mode %q not supported by buildkit", opt.Options.NetworkMode)
    }

    extraHosts, err := toBuildkitExtraHosts(opt.Options.ExtraHosts, b.dnsconfig.HostGatewayIP)
    if err != nil {
        return nil, err
    }
    frontendAttrs["add-hosts"] = extraHosts

    if opt.Options.ShmSize > 0 {
        frontendAttrs["shm-size"] = strconv.FormatInt(opt.Options.ShmSize, 10)
    }

    ulimits, err := toBuildkitUlimits(opt.Options.Ulimits)
    if err != nil {
        return nil, err
    } else if len(ulimits) > 0 {
        frontendAttrs["ulimit"] = ulimits
    }

    exporterName := ""
    exporterAttrs := map[string]string{}
    if len(opt.Options.Outputs) == 0 {
        exporterName = exporter.Moby
    } else {
        // cacheonly is a special type for triggering skipping all exporters
        if opt.Options.Outputs[0].Type != "cacheonly" {
            exporterName = opt.Options.Outputs[0].Type
            exporterAttrs = opt.Options.Outputs[0].Attrs
        }
    }

    if (exporterName == client.ExporterImage || exporterName == exporter.Moby) && len(opt.Options.Tags) > 0 {
        nameAttr, err := overrides.SanitizeRepoAndTags(opt.Options.Tags)
        if err != nil {
            return nil, err
        }
        if exporterAttrs == nil {
            exporterAttrs = make(map[string]string)
        }
        exporterAttrs["name"] = strings.Join(nameAttr, ",")
    }

    cache := controlapi.CacheOptions{}
    if inlineCache := opt.Options.BuildArgs["BUILDKIT_INLINE_CACHE"]; inlineCache != nil {
        if b, err := strconv.ParseBool(*inlineCache); err == nil && b {
            cache.Exports = append(cache.Exports, &controlapi.CacheOptionsEntry{
                Type: "inline",
            })
        }
    }

    req := &controlapi.SolveRequest{
        Ref: id,
        Exporters: []*controlapi.Exporter{
            &controlapi.Exporter{Type: exporterName, Attrs: exporterAttrs},
        },
        Frontend:      "dockerfile.v0",
        FrontendAttrs: frontendAttrs,
        Session:       opt.Options.SessionID,
        Cache:         cache,
    }

    if opt.Options.NetworkMode == "host" {
        req.Entitlements = append(req.Entitlements, entitlements.EntitlementNetworkHost)
    }

    aux := streamformatter.AuxFormatter{Writer: opt.ProgressWriter.Output}

    eg, ctx := errgroup.WithContext(ctx)

    eg.Go(func() error {
        resp, err := b.controller.Solve(ctx, req)
        if err != nil {
            return err
        }
        if exporterName != exporter.Moby && exporterName != client.ExporterImage {
            return nil
        }
        id, ok := resp.ExporterResponse["containerimage.digest"]
        if !ok {
            return errors.Errorf("missing image id")
        }
        out.ImageID = id
        return aux.Emit("moby.image.id", types.BuildResult{ID: id})
    })

    ch := make(chan *controlapi.StatusResponse)

    eg.Go(func() error {
        defer close(ch)
        // streamProxy.ctx is not set to ctx because when request is cancelled,
        // only the build request has to be cancelled, not the status request.
        stream := &statusProxy{streamProxy: streamProxy{ctx: context.TODO()}, ch: ch}
        return b.controller.Status(&controlapi.StatusRequest{Ref: id}, stream)
    })

    eg.Go(func() error {
        for sr := range ch {
            dt, err := sr.Marshal()
            if err != nil {
                return err
            }
            if err := aux.Emit("moby.buildkit.trace", dt); err != nil {
                return err
            }
        }
        return nil
    })

    if err := eg.Wait(); err != nil {
        return nil, err
    }

    return &out, nil
}

type streamProxy struct {
    ctx context.Context
}

func (sp *streamProxy) SetHeader(_ grpcmetadata.MD) error {
    return nil
}

func (sp *streamProxy) SendHeader(_ grpcmetadata.MD) error {
    return nil
}

func (sp *streamProxy) SetTrailer(_ grpcmetadata.MD) {
}

func (sp *streamProxy) Context() context.Context {
    return sp.ctx
}

func (sp *streamProxy) RecvMsg(m interface{}) error {
    return io.EOF
}

type statusProxy struct {
    streamProxy
    ch chan *controlapi.StatusResponse
}

func (sp *statusProxy) Send(resp *controlapi.StatusResponse) error {
    return sp.SendMsg(resp)
}

func (sp *statusProxy) SendMsg(m interface{}) error {
    if sr, ok := m.(*controlapi.StatusResponse); ok {
        sp.ch <- sr
    }
    return nil
}

type pruneProxy struct {
    streamProxy
    ch chan *controlapi.UsageRecord
}

func (sp *pruneProxy) Send(resp *controlapi.UsageRecord) error {
    return sp.SendMsg(resp)
}

func (sp *pruneProxy) SendMsg(m interface{}) error {
    if sr, ok := m.(*controlapi.UsageRecord); ok {
        sp.ch <- sr
    }
    return nil
}

type wrapRC struct {
    io.ReadCloser
    once   sync.Once
    err    error
    waitCh chan struct{}
}

func (w *wrapRC) Read(b []byte) (int, error) {
    n, err := w.ReadCloser.Read(b)
    if err != nil {
        e := err
        if e == io.EOF {
            e = nil
        }
        w.close(e)
    }
    return n, err
}

func (w *wrapRC) Close() error {
    err := w.ReadCloser.Close()
    w.close(err)
    return err
}

func (w *wrapRC) close(err error) {
    w.once.Do(func() {
        w.err = err
        close(w.waitCh)
    })
}

func (w *wrapRC) wait() error {
    <-w.waitCh
    return w.err
}

type buildJob struct {
    cancel func()
    waitCh chan func(io.ReadCloser) error
}

func newBuildJob() *buildJob {
    return &buildJob{waitCh: make(chan func(io.ReadCloser) error)}
}

func (j *buildJob) WaitUpload(ctx context.Context) (io.ReadCloser, error) {
    done := make(chan struct{})

    var upload io.ReadCloser
    fn := func(rc io.ReadCloser) error {
        w := &wrapRC{ReadCloser: rc, waitCh: make(chan struct{})}
        upload = w
        close(done)
        return w.wait()
    }

    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    case j.waitCh <- fn:
        <-done
        return upload, nil
    }
}

func (j *buildJob) SetUpload(ctx context.Context, rc io.ReadCloser) error {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case fn := <-j.waitCh:
        return fn(rc)
    }
}

// toBuildkitExtraHosts converts hosts from docker key:value format to buildkit's csv format
func toBuildkitExtraHosts(inp []string, hostGatewayIP net.IP) (string, error) {
    if len(inp) == 0 {
        return "", nil
    }
    hosts := make([]string, 0, len(inp))
    for _, h := range inp {
        host, ip, ok := strings.Cut(h, ":")
        if !ok || host == "" || ip == "" {
            return "", errors.Errorf("invalid host %s", h)
        }
        // If the IP Address is a "host-gateway", replace this value with the
        // IP address stored in the daemon level HostGatewayIP config variable.
        if ip == opts.HostGatewayName {
            gateway := hostGatewayIP.String()
            if gateway == "" {
                return "", fmt.Errorf("unable to derive the IP value for host-gateway")
            }
            ip = gateway
        } else if net.ParseIP(ip) == nil {
            return "", fmt.Errorf("invalid host %s", h)
        }
        hosts = append(hosts, host+"="+ip)
    }
    return strings.Join(hosts, ","), nil
}

// toBuildkitUlimits converts ulimits from docker type=soft:hard format to buildkit's csv format
func toBuildkitUlimits(inp []*units.Ulimit) (string, error) {
    if len(inp) == 0 {
        return "", nil
    }
    ulimits := make([]string, 0, len(inp))
    for _, ulimit := range inp {
        ulimits = append(ulimits, ulimit.String())
    }
    return strings.Join(ulimits, ","), nil
}

func toBuildkitPruneInfo(opts types.BuildCachePruneOptions) (client.PruneInfo, error) {
    var until time.Duration
    untilValues := opts.Filters.Get("until")          // canonical
    unusedForValues := opts.Filters.Get("unused-for") // deprecated synonym for "until" filter

    if len(untilValues) > 0 && len(unusedForValues) > 0 {
        return client.PruneInfo{}, errConflictFilter{"until", "unused-for"}
    }
    filterKey := "until"
    if len(unusedForValues) > 0 {
        filterKey = "unused-for"
    }
    untilValues = append(untilValues, unusedForValues...)

    switch len(untilValues) {
    case 0:
        // nothing to do
    case 1:
        ts, err := timetypes.GetTimestamp(untilValues[0], time.Now())
        if err != nil {
            return client.PruneInfo{}, errInvalidFilterValue{
                errors.Wrapf(err, "%q filter expects a duration (e.g., '24h') or a timestamp", filterKey),
            }
        }
        seconds, nanoseconds, err := timetypes.ParseTimestamps(ts, 0)
        if err != nil {
            return client.PruneInfo{}, errInvalidFilterValue{
                errors.Wrapf(err, "failed to parse timestamp %q", ts),
            }
        }

        until = time.Since(time.Unix(seconds, nanoseconds))
    default:
        return client.PruneInfo{}, errMultipleFilterValues{}
    }

    bkFilter := make([]string, 0, opts.Filters.Len())
    for cacheField := range cacheFields {
        if opts.Filters.Contains(cacheField) {
            values := opts.Filters.Get(cacheField)
            switch len(values) {
            case 0:
                bkFilter = append(bkFilter, cacheField)
            case 1:
                if cacheField == "id" {
                    bkFilter = append(bkFilter, cacheField+"~="+values[0])
                } else {
                    bkFilter = append(bkFilter, cacheField+"=="+values[0])
                }
            default:
                return client.PruneInfo{}, errMultipleFilterValues{}
            }
        }
    }
    return client.PruneInfo{
        All:          opts.All,
        KeepDuration: until,
        KeepBytes:    opts.KeepStorage,
        Filter:       []string{strings.Join(bkFilter, ",")},
    }, nil
}