pkg/storage/manager/storage_manager.go

Summary

Maintainability
F
5 days
Test Coverage
F
38%
package manager

import (
    "context"
    "errors"
    "fmt"
    "strings"
    "sync"
    "time"

    "gopkg.in/yaml.v2"

    "github.com/werf/lockgate"
    "github.com/werf/logboek"
    "github.com/werf/logboek/pkg/style"
    "github.com/werf/logboek/pkg/types"
    "github.com/werf/werf/pkg/build/stage"
    "github.com/werf/werf/pkg/container_backend"
    "github.com/werf/werf/pkg/docker_registry"
    "github.com/werf/werf/pkg/image"
    "github.com/werf/werf/pkg/storage"
    "github.com/werf/werf/pkg/storage/lrumeta"
    "github.com/werf/werf/pkg/util/parallel"
    "github.com/werf/werf/pkg/werf"
)

var (
    ErrUnexpectedStagesStorageState = errors.New("unexpected stages storage state")
    ErrStageNotFound                = errors.New("stage not found")
)

func IsErrUnexpectedStagesStorageState(err error) bool {
    if err != nil {
        return strings.HasSuffix(err.Error(), ErrUnexpectedStagesStorageState.Error())
    }
    return false
}

func IsErrStageNotFound(err error) bool {
    if err != nil {
        return strings.HasSuffix(err.Error(), ErrStageNotFound.Error())
    }
    return false
}

type ForEachDeleteStageOptions struct {
    storage.DeleteImageOptions
    storage.FilterStagesAndProcessRelatedDataOptions
}

type StorageOptions struct {
    ContainerBackend container_backend.ContainerBackend
    DockerRegistry   docker_registry.GenericApiInterface
}

type StorageManagerInterface interface {
    InitCache(ctx context.Context) error

    GetStagesStorage() storage.PrimaryStagesStorage
    GetFinalStagesStorage() storage.StagesStorage
    GetSecondaryStagesStorageList() []storage.StagesStorage
    GetCacheStagesStorageList() []storage.StagesStorage

    GetImageInfoGetter(imageName string, desc *image.StageDescription, opts image.InfoGetterOptions) *image.InfoGetter

    EnableParallel(parallelTasksLimit int)
    MaxNumberOfWorkers() int
    GenerateStageUniqueID(digest string, stages []*image.StageDescription) (string, int64)

    GetImageInfo(ctx context.Context, ref string, opts StorageOptions) (*image.Info, error)

    LockStageImage(ctx context.Context, imageName string) error
    GetStagesByDigest(ctx context.Context, stageName, stageDigest string) ([]*image.StageDescription, error)
    GetStagesByDigestWithCache(ctx context.Context, stageName, stageDigest string) ([]*image.StageDescription, error)
    GetStagesByDigestFromStagesStorage(ctx context.Context, stageName, stageDigest string, stagesStorage storage.StagesStorage) ([]*image.StageDescription, error)
    GetStagesByDigestFromStagesStorageWithCache(ctx context.Context, stageName, stageDigest string, stagesStorage storage.StagesStorage) ([]*image.StageDescription, error)
    GetStageDescriptionList(ctx context.Context) ([]*image.StageDescription, error)
    GetStageDescriptionListWithCache(ctx context.Context) ([]*image.StageDescription, error)
    GetFinalStageDescriptionList(ctx context.Context) ([]*image.StageDescription, error)

    FetchStage(ctx context.Context, containerBackend container_backend.ContainerBackend, stg stage.Interface) error
    SelectSuitableStage(ctx context.Context, c stage.Conveyor, stg stage.Interface, stages []*image.StageDescription) (*image.StageDescription, error)
    CopySuitableByDigestStage(ctx context.Context, stageDesc *image.StageDescription, sourceStagesStorage, destinationStagesStorage storage.StagesStorage, containerBackend container_backend.ContainerBackend, targetPlatform string) (*image.StageDescription, error)
    CopyStageIntoCacheStorages(ctx context.Context, stageID image.StageID, cacheStagesStorages []storage.StagesStorage, opts CopyStageIntoStorageOptions) error
    CopyStageIntoFinalStorage(ctx context.Context, stageID image.StageID, finalStagesStorage storage.StagesStorage, opts CopyStageIntoStorageOptions) (*image.StageDescription, error)

    ForEachDeleteStage(ctx context.Context, options ForEachDeleteStageOptions, stagesDescriptions []*image.StageDescription, f func(ctx context.Context, stageDesc *image.StageDescription, err error) error) error
    ForEachDeleteFinalStage(ctx context.Context, options ForEachDeleteStageOptions, stagesDescriptions []*image.StageDescription, f func(ctx context.Context, stageDesc *image.StageDescription, err error) error) error
    ForEachRmImageMetadata(ctx context.Context, projectName, imageNameOrID string, stageIDCommitList map[string][]string, f func(ctx context.Context, commit, stageID string, err error) error) error
    ForEachRmManagedImage(ctx context.Context, projectName string, managedImages []string, f func(ctx context.Context, managedImage string, err error) error) error
    ForEachGetImportMetadata(ctx context.Context, projectName string, ids []string, f func(ctx context.Context, metadataID string, metadata *storage.ImportMetadata, err error) error) error
    ForEachRmImportMetadata(ctx context.Context, projectName string, ids []string, f func(ctx context.Context, id string, err error) error) error
    ForEachGetStageCustomTagMetadata(ctx context.Context, ids []string, f func(ctx context.Context, metadataID string, metadata *storage.CustomTagMetadata, err error) error) error
    ForEachDeleteStageCustomTag(ctx context.Context, ids []string, f func(ctx context.Context, tag string, err error) error) error
}

// NOTE: FetchStage is a legacy option, which could in theory be removed.
// NOTE: FetchStage and ContainerBackend options used for a case of copying between local and remote storage.
// NOTE: Remote->Local copy does not need FetchStage and ContainerBackend options.
type CopyStageIntoStorageOptions struct {
    FetchStage           stage.Interface
    ContainerBackend     container_backend.ContainerBackend
    ShouldBeBuiltMode    bool
    IsMultiplatformImage bool
    LogDetailedName      string
}

func RetryOnUnexpectedStagesStorageState(_ context.Context, _ StorageManagerInterface, f func() error) error {
Retry:
    err := f()

    if IsErrUnexpectedStagesStorageState(err) {
        goto Retry
    }

    return err
}

func NewStorageManager(projectName string, stagesStorage storage.PrimaryStagesStorage, finalStagesStorage storage.StagesStorage, secondaryStagesStorageList, cacheStagesStorageList []storage.StagesStorage, storageLockManager storage.LockManager) *StorageManager {
    return &StorageManager{
        ProjectName:        projectName,
        StorageLockManager: storageLockManager,

        StagesStorage:              stagesStorage,
        FinalStagesStorage:         finalStagesStorage,
        CacheStagesStorageList:     cacheStagesStorageList,
        SecondaryStagesStorageList: secondaryStagesStorageList,
    }
}

type StagesList struct {
    Mux      sync.Mutex
    StageIDs []image.StageID
}

func NewStagesList(stageIDs []image.StageID) *StagesList {
    return &StagesList{
        StageIDs: stageIDs,
    }
}

func (stages *StagesList) GetStageIDs() []image.StageID {
    stages.Mux.Lock()
    defer stages.Mux.Unlock()

    var res []image.StageID

    for _, stg := range stages.StageIDs {
        res = append(res, stg)
    }

    return res
}

func (stages *StagesList) AddStageID(stageID image.StageID) {
    stages.Mux.Lock()
    defer stages.Mux.Unlock()

    for _, stg := range stages.StageIDs {
        if stg.IsEqual(stageID) {
            return
        }
    }

    stages.StageIDs = append(stages.StageIDs, stageID)
}

type StorageManager struct {
    parallel           bool
    parallelTasksLimit int

    ProjectName string

    StorageLockManager storage.LockManager

    StagesStorage              storage.PrimaryStagesStorage
    FinalStagesStorage         storage.StagesStorage
    CacheStagesStorageList     []storage.StagesStorage
    SecondaryStagesStorageList []storage.StagesStorage

    // These will be released automatically when current process exits
    SharedHostImagesLocks []lockgate.LockHandle

    FinalStagesListCacheMux sync.Mutex
    FinalStagesListCache    *StagesList
}

func (m *StorageManager) GetStagesStorage() storage.PrimaryStagesStorage {
    return m.StagesStorage
}

func (m *StorageManager) GetFinalStagesStorage() storage.StagesStorage {
    return m.FinalStagesStorage
}

func (m *StorageManager) GetSecondaryStagesStorageList() []storage.StagesStorage {
    return m.SecondaryStagesStorageList
}

func (m *StorageManager) GetCacheStagesStorageList() []storage.StagesStorage {
    return m.CacheStagesStorageList
}

func (m *StorageManager) GetServiceValuesRepo() string {
    if m.FinalStagesStorage != nil {
        return m.FinalStagesStorage.String()
    }
    return m.StagesStorage.String()
}

func (m *StorageManager) GetImageInfoGetter(imageName string, desc *image.StageDescription, opts image.InfoGetterOptions) *image.InfoGetter {
    if m.FinalStagesStorage != nil {
        finalImageName := m.FinalStagesStorage.ConstructStageImageName(m.ProjectName, desc.StageID.Digest, desc.StageID.UniqueID)
        return image.NewInfoGetter(imageName, finalImageName, opts)
    }

    return image.NewInfoGetter(imageName, desc.Info.Name, opts)
}

func (m *StorageManager) InitCache(ctx context.Context) error {
    logboek.Context(ctx).Info().LogF("Initializing storage manager cache\n")

    if m.FinalStagesStorage != nil {
        if _, err := m.getOrCreateFinalStagesListCache(ctx); err != nil {
            return fmt.Errorf("unable to get or create final stages list cache: %w", err)
        }
    }

    return nil
}

func (m *StorageManager) EnableParallel(parallelTasksLimit int) {
    m.parallel = true
    m.parallelTasksLimit = parallelTasksLimit
}

func (m *StorageManager) MaxNumberOfWorkers() int {
    if m.parallel && m.parallelTasksLimit > 0 {
        return m.parallelTasksLimit
    }

    return 1
}

func (m *StorageManager) GetStageDescriptionListWithCache(ctx context.Context) ([]*image.StageDescription, error) {
    return m.getStageDescriptionList(ctx, storage.WithCache())
}

func (m *StorageManager) GetStageDescriptionList(ctx context.Context) ([]*image.StageDescription, error) {
    return m.getStageDescriptionList(ctx)
}

func (m *StorageManager) getStageDescriptionList(ctx context.Context, opts ...storage.Option) ([]*image.StageDescription, error) {
    stageIDs, err := m.StagesStorage.GetStagesIDs(ctx, m.ProjectName, opts...)
    if err != nil {
        return nil, fmt.Errorf("error getting stages ids from %s: %w", m.StagesStorage, err)
    }

    var mutex sync.Mutex
    var stages []*image.StageDescription

    if err := parallel.DoTasks(ctx, len(stageIDs), parallel.DoTasksOptions{
        MaxNumberOfWorkers: m.MaxNumberOfWorkers(),
    }, func(ctx context.Context, taskId int) error {
        stageID := stageIDs[taskId]

        stageDesc, err := getStageDescription(ctx, m.ProjectName, stageID, m.StagesStorage, m.CacheStagesStorageList, getStageDescriptionOptions{WithLocalManifestCache: m.getWithLocalManifestCacheOption()})
        if err != nil {
            return fmt.Errorf("error getting stage %s description: %w", stageID.String(), err)
        }

        if stageDesc == nil {
            logboek.Context(ctx).Warn().LogF("Ignoring stage %s: cannot get stage description from %s\n", stageID.String(), m.StagesStorage.String())
            return nil
        }

        mutex.Lock()
        defer mutex.Unlock()
        stages = append(stages, stageDesc)

        return nil
    }); err != nil {
        return nil, err
    }

    return stages, nil
}

func (m *StorageManager) GetFinalStageDescriptionList(ctx context.Context) ([]*image.StageDescription, error) {
    existingStagesListCache, err := m.getOrCreateFinalStagesListCache(ctx)
    if err != nil {
        return nil, fmt.Errorf("error getting existing stages list of final repo %s: %w", m.FinalStagesStorage.String(), err)
    }

    logboek.Context(ctx).Debug().LogF("[%p] Got existing final stages list cache: %#v\n", m, existingStagesListCache.StageIDs)

    stageIDs := existingStagesListCache.GetStageIDs()

    var mutex sync.Mutex
    var stages []*image.StageDescription

    if err := parallel.DoTasks(ctx, len(stageIDs), parallel.DoTasksOptions{
        MaxNumberOfWorkers: m.MaxNumberOfWorkers(),
    }, func(ctx context.Context, taskId int) error {
        stageID := stageIDs[taskId]

        stageDesc, err := getStageDescription(ctx, m.ProjectName, stageID, m.FinalStagesStorage, nil, getStageDescriptionOptions{WithLocalManifestCache: true})
        if err != nil {
            return fmt.Errorf("error getting stage %s description from %s: %w", stageID.String(), m.FinalStagesStorage.String(), err)
        }

        if stageDesc == nil {
            logboek.Context(ctx).Warn().LogF("Ignoring stage %s: cannot get stage description from %s\n", stageID.String(), m.FinalStagesStorage.String())
            return nil
        }

        mutex.Lock()
        defer mutex.Unlock()
        stages = append(stages, stageDesc)

        return nil
    }); err != nil {
        return nil, err
    }

    return stages, nil
}

func (m *StorageManager) ForEachDeleteFinalStage(ctx context.Context, options ForEachDeleteStageOptions, stagesDescriptions []*image.StageDescription, f func(ctx context.Context, stageDesc *image.StageDescription, err error) error) error {
    return parallel.DoTasks(ctx, len(stagesDescriptions), parallel.DoTasksOptions{
        MaxNumberOfWorkers:         m.MaxNumberOfWorkers(),
        InitDockerCLIForEachWorker: true,
    }, func(ctx context.Context, taskId int) error {
        stageDescription := stagesDescriptions[taskId]

        err := m.FinalStagesStorage.DeleteStage(ctx, stageDescription, options.DeleteImageOptions)
        return f(ctx, stageDescription, err)
    })
}

func (m *StorageManager) ForEachDeleteStage(ctx context.Context, options ForEachDeleteStageOptions, stagesDescriptions []*image.StageDescription, f func(ctx context.Context, stageDesc *image.StageDescription, err error) error) error {
    if localStagesStorage, isLocal := m.StagesStorage.(*storage.LocalStagesStorage); isLocal {
        filteredStagesDescriptions, err := localStagesStorage.FilterStagesAndProcessRelatedData(ctx, stagesDescriptions, options.FilterStagesAndProcessRelatedDataOptions)
        if err != nil {
            return fmt.Errorf("error filtering local docker server stages: %w", err)
        }

        stagesDescriptions = filteredStagesDescriptions
    }

    return parallel.DoTasks(ctx, len(stagesDescriptions), parallel.DoTasksOptions{
        MaxNumberOfWorkers:         m.MaxNumberOfWorkers(),
        InitDockerCLIForEachWorker: true,
    }, func(ctx context.Context, taskId int) error {
        stageDescription := stagesDescriptions[taskId]

        for _, cacheStagesStorage := range m.CacheStagesStorageList {
            if err := cacheStagesStorage.DeleteStage(ctx, stageDescription, options.DeleteImageOptions); err != nil {
                logboek.Context(ctx).Warn().LogF("Unable to delete stage %s from the cache stages storage %s: %s\n", stageDescription.StageID.String(), cacheStagesStorage.String(), err)
            }
        }

        err := m.StagesStorage.DeleteStage(ctx, stageDescription, options.DeleteImageOptions)
        return f(ctx, stageDescription, err)
    })
}

func (m *StorageManager) GetImageInfo(ctx context.Context, ref string, opts StorageOptions) (*image.Info, error) {
    info, err := m.getImageInfoFromContainerBackend(ctx, ref, opts.ContainerBackend)
    if err != nil {
        return nil, err
    }
    if info != nil {
        return info, err
    }
    return m.getImageInfoFromRegistry(ctx, ref, opts.DockerRegistry)
}

func (m *StorageManager) getImageInfoFromContainerBackend(ctx context.Context, ref string, containerBackend container_backend.ContainerBackend) (*image.Info, error) {
    return containerBackend.GetImageInfo(ctx, ref, container_backend.GetImageInfoOpts{})
}

func (m *StorageManager) getImageInfoFromRegistry(ctx context.Context, ref string, dockerRegistry docker_registry.GenericApiInterface) (*image.Info, error) {
    cfg, err := dockerRegistry.GetRepoImageConfigFile(ctx, ref)
    if err != nil {
        return nil, err
    }
    return docker_registry.NewImageInfoFromRegistryConfig(ref, cfg), nil
}

func (m *StorageManager) LockStageImage(ctx context.Context, imageName string) error {
    imageLockName := container_backend.ImageLockName(imageName)

    _, lock, err := werf.AcquireHostLock(ctx, imageLockName, lockgate.AcquireOptions{Shared: true})
    if err != nil {
        return fmt.Errorf("error locking %q shared lock: %w", imageLockName, err)
    }

    m.SharedHostImagesLocks = append(m.SharedHostImagesLocks, lock)

    return nil
}

func doFetchStage(ctx context.Context, projectName string, stagesStorage storage.StagesStorage, stageID image.StageID, img container_backend.LegacyImageInterface) error {
    err := logboek.Context(ctx).Info().LogProcess("Check manifest availability").DoError(func() error {
        freshStageDescription, err := stagesStorage.GetStageDescription(ctx, projectName, stageID)
        if err != nil {
            return fmt.Errorf("unable to get stage description: %w", err)
        }

        if freshStageDescription == nil {
            return ErrStageNotFound
        }

        img.SetStageDescription(freshStageDescription)

        return nil
    })
    if err != nil {
        return err
    }

    return logboek.Context(ctx).Info().LogProcess("Fetch image").DoError(func() error {
        logboek.Context(ctx).Debug().LogF("Image name: %s\n", img.Name())

        if err := stagesStorage.FetchImage(ctx, img); err != nil {
            return fmt.Errorf("unable to fetch stage %s image %s: %w", stageID.String(), img.Name(), err)
        }
        return nil
    })
}

func (m *StorageManager) FetchStage(ctx context.Context, containerBackend container_backend.ContainerBackend, stg stage.Interface) error {
    logboek.Context(ctx).Debug().LogF("-- StagesManager.FetchStage %s\n", stg.LogDetailedName())

    if err := m.LockStageImage(ctx, stg.GetStageImage().Image.Name()); err != nil {
        return fmt.Errorf("error locking stage image %q: %w", stg.GetStageImage().Image.Name(), err)
    }

    shouldFetch, err := m.StagesStorage.ShouldFetchImage(ctx, stg.GetStageImage().Image)
    if err != nil {
        return fmt.Errorf("error checking should fetch image: %w", err)
    }
    if !shouldFetch {
        imageName := m.StagesStorage.ConstructStageImageName(m.ProjectName, stg.GetStageImage().Image.GetStageDescription().StageID.Digest, stg.GetStageImage().Image.GetStageDescription().StageID.UniqueID)

        logboek.Context(ctx).Info().LogF("Image %s exists, will not perform fetch\n", imageName)

        if err := lrumeta.CommonLRUImagesCache.AccessImage(ctx, imageName); err != nil {
            return fmt.Errorf("error accessing last recently used images cache for %s: %w", imageName, err)
        }

        return nil
    }

    var fetchedImg container_backend.LegacyImageInterface
    var cacheStagesStorageListToRefill []storage.StagesStorage

    fetchStageFromCache := func(stagesStorage storage.StagesStorage) (container_backend.LegacyImageInterface, error) {
        stageID := stg.GetStageImage().Image.GetStageDescription().StageID
        imageName := stagesStorage.ConstructStageImageName(m.ProjectName, stageID.Digest, stageID.UniqueID)
        stageImage := container_backend.NewLegacyStageImage(nil, imageName, containerBackend, stg.GetStageImage().Image.GetTargetPlatform())

        shouldFetch, err := stagesStorage.ShouldFetchImage(ctx, stageImage)
        if err != nil {
            return nil, fmt.Errorf("error checking should fetch image from cache repo %s: %w", stagesStorage.String(), err)
        }

        if shouldFetch {
            logboek.Context(ctx).Info().LogF("Cache repo image %s does not exist locally, will perform fetch\n", stageImage.Name())

            proc := logboek.Context(ctx).Default().LogProcess("Fetching stage %s from %s", stg.LogDetailedName(), stagesStorage.String())
            proc.Start()

            err := doFetchStage(ctx, m.ProjectName, stagesStorage, *stageID, stageImage)

            if IsErrStageNotFound(err) {
                logboek.Context(ctx).Default().LogF("Stage not found\n")
                proc.End()
                return nil, err
            }

            if err != nil {
                proc.Fail()
                return nil, err
            }

            proc.End()

            if err := storeStageDescriptionIntoLocalManifestCache(ctx, m.ProjectName, *stageID, stagesStorage, stageImage.GetStageDescription()); err != nil {
                return nil, fmt.Errorf("error storing stage %s description into local manifest cache: %w", imageName, err)
            }
        } else {
            logboek.Context(ctx).Info().LogF("Cache repo image %s exists locally, will not perform fetch\n", stageImage.Name())

            stageDesc, err := getStageDescription(ctx, m.ProjectName, *stageID, stagesStorage, nil, getStageDescriptionOptions{WithLocalManifestCache: true})
            if err != nil {
                return nil, fmt.Errorf("error getting stage %s description from %s: %w", stageID.String(), m.FinalStagesStorage.String(), err)
            }
            if stageDesc == nil {
                return nil, ErrStageNotFound
            }
            stageImage.SetStageDescription(stageDesc)
        }

        if err := lrumeta.CommonLRUImagesCache.AccessImage(ctx, stageImage.Name()); err != nil {
            return nil, fmt.Errorf("error accessing last recently used images cache for %s: %w", stageImage.Name(), err)
        }

        return stageImage, nil
    }

    prepareCacheStageAsPrimary := func(cacheImg container_backend.LegacyImageInterface, primaryStage stage.Interface) error {
        stageID := primaryStage.GetStageImage().Image.GetStageDescription().StageID
        primaryImageName := m.StagesStorage.ConstructStageImageName(m.ProjectName, stageID.Digest, stageID.UniqueID)

        // TODO(buildah): check no bugs introduced by removing of following calls
        // if err := containerBackend.RenameImage(ctx, cacheDockerImage, primaryImageName, false); err != nil {
        //     return fmt.Errorf("unable to rename image %s to %s: %w", fetchedDockerImage.Image.Name(), primaryImageName, err)
        // }

        // if err := containerBackend.RefreshImageObject(ctx, &container_backend.Image{Image: primaryStage.GetImage()}); err != nil {
        //     return fmt.Errorf("unable to refresh stage image %s: %w", primaryStage.GetImage().Name(), err)
        // }

        // TODO(buildah): check no bugs introduced by removing of following calls
        // if err := storeStageDescriptionIntoLocalManifestCache(ctx, m.ProjectName, *stageID, m.StagesStorage, ConvertStageDescriptionForStagesStorage(cacheDockerImage.Image.GetStageDescription(), m.StagesStorage)); err != nil {
        if err := storeStageDescriptionIntoLocalManifestCache(ctx, m.ProjectName, *stageID, m.StagesStorage, cacheImg.GetStageDescription()); err != nil {
            return fmt.Errorf("error storing stage %s description into local manifest cache: %w", primaryImageName, err)
        }

        if err := lrumeta.CommonLRUImagesCache.AccessImage(ctx, primaryImageName); err != nil {
            return fmt.Errorf("error accessing last recently used images cache for %s: %w", primaryImageName, err)
        }

        return nil
    }

    for _, cacheStagesStorage := range m.CacheStagesStorageList {
        cacheImg, err := fetchStageFromCache(cacheStagesStorage)
        if err != nil {
            if !IsErrStageNotFound(err) {
                logboek.Context(ctx).Warn().LogF("Unable to fetch stage %s from cache stages storage %s: %s\n", stg.GetStageImage().Image.GetStageDescription().StageID.String(), cacheStagesStorage.String(), err)
            }

            cacheStagesStorageListToRefill = append(cacheStagesStorageListToRefill, cacheStagesStorage)

            continue
        }

        if err := prepareCacheStageAsPrimary(cacheImg, stg); err != nil {
            logboek.Context(ctx).Warn().LogF("Unable to prepare stage %s fetched from cache stages storage %s as a primary: %s\n", cacheImg.Name(), cacheStagesStorage.String(), err)

            cacheStagesStorageListToRefill = append(cacheStagesStorageListToRefill, cacheStagesStorage)

            continue
        }

        fetchedImg = cacheImg
        break
    }

    if fetchedImg == nil {
        stageID := stg.GetStageImage().Image.GetStageDescription().StageID
        img := stg.GetStageImage()

        err := logboek.Context(ctx).Default().LogProcess("Fetching stage %s from %s", stg.LogDetailedName(), m.StagesStorage.String()).
            DoError(func() error {
                return doFetchStage(ctx, m.ProjectName, m.StagesStorage, *stageID, img.Image)
            })

        if IsErrStageNotFound(err) {
            logboek.Context(ctx).Error().LogF("Stage %s image %s is no longer available!\n", stg.LogDetailedName(), stg.GetStageImage().Image.Name())
            return ErrUnexpectedStagesStorageState
        }

        if storage.IsErrBrokenImage(err) {
            logboek.Context(ctx).Error().LogF("Broken stage %s image %s!\n", stg.LogDetailedName(), stg.GetStageImage().Image.Name())

            logboek.Context(ctx).Error().LogF("Will mark image %s as rejected in the stages storage %s\n", stg.GetStageImage().Image.Name(), m.StagesStorage.String())
            if err := m.StagesStorage.RejectStage(ctx, m.ProjectName, stageID.Digest, stageID.UniqueID); err != nil {
                return fmt.Errorf("unable to reject stage %s image %s in the stages storage %s: %w", stg.LogDetailedName(), stg.GetStageImage().Image.Name(), m.StagesStorage.String(), err)
            }

            return ErrUnexpectedStagesStorageState
        }

        if err != nil {
            return fmt.Errorf("unable to fetch stage %s from stages storage %s: %w", stageID.String(), m.StagesStorage.String(), err)
        }

        fetchedImg = img.Image
    }

    for _, cacheStagesStorage := range cacheStagesStorageListToRefill {
        stageID := stg.GetStageImage().Image.GetStageDescription().StageID

        err := logboek.Context(ctx).Default().LogProcess("Copy stage %s into cache %s", stg.LogDetailedName(), cacheStagesStorage.String()).
            DoError(func() error {
                if _, err := m.CopyStage(ctx, m.StagesStorage, cacheStagesStorage, *stageID, CopyStageOptions{
                    ContainerBackend: containerBackend,
                    LegacyImage:      fetchedImg,
                }); err != nil {
                    return fmt.Errorf("unable to copy stage %s into cache stages storage %s: %w", stageID.String(), cacheStagesStorage.String(), err)
                }
                return nil
            })
        if err != nil {
            logboek.Context(ctx).Warn().LogF("Warning %s\n", err)
        }
    }

    return nil
}

func (m *StorageManager) CopyStageIntoCacheStorages(ctx context.Context, stageID image.StageID, cacheStagesStorageList []storage.StagesStorage, opts CopyStageIntoStorageOptions) error {
    for _, cache := range cacheStagesStorageList {
        err := logboek.Context(ctx).Default().LogProcess("Copy stage %s into cache %s", opts.LogDetailedName, cache.String()).
            DoError(func() error {
                copyOpts := CopyStageOptions{ContainerBackend: opts.ContainerBackend}
                if opts.FetchStage != nil {
                    copyOpts.FetchStage = opts.FetchStage
                    copyOpts.LegacyImage = opts.FetchStage.GetStageImage().Image
                }
                if _, err := m.CopyStage(ctx, m.StagesStorage, cache, stageID, copyOpts); err != nil {
                    return fmt.Errorf("unable to copy stage %s into cache stages storage %s: %w", stageID.String(), cache.String(), err)
                }
                return nil
            })
        if err != nil {
            logboek.Context(ctx).Warn().LogF("Warning: %s\n", err)
        }
    }
    return nil
}

func (m *StorageManager) getOrCreateFinalStagesListCache(ctx context.Context) (*StagesList, error) {
    m.FinalStagesListCacheMux.Lock()
    defer m.FinalStagesListCacheMux.Unlock()

    if m.FinalStagesListCache != nil {
        return m.FinalStagesListCache, nil
    }

    stageIDs, err := m.FinalStagesStorage.GetStagesIDs(ctx, m.ProjectName)
    if err != nil {
        return nil, fmt.Errorf("unable to get final repo stages list: %w", err)
    }
    m.FinalStagesListCache = NewStagesList(stageIDs)

    return m.FinalStagesListCache, nil
}

func (m *StorageManager) CopyStageIntoFinalStorage(ctx context.Context, stageID image.StageID, finalStagesStorage storage.StagesStorage, opts CopyStageIntoStorageOptions) (*image.StageDescription, error) {
    existingStagesListCache, err := m.getOrCreateFinalStagesListCache(ctx)
    if err != nil {
        return nil, fmt.Errorf("error getting existing stages list of final repo %s: %w", finalStagesStorage.String(), err)
    }

    logboek.Context(ctx).Debug().LogF("[%p] Got existing final stages list cache: %#v\n", m, existingStagesListCache.StageIDs)

    finalImageName := finalStagesStorage.ConstructStageImageName(m.ProjectName, stageID.Digest, stageID.UniqueID)

    for _, existingStg := range existingStagesListCache.GetStageIDs() {
        if existingStg.IsEqual(stageID) {
            desc, err := m.GetFinalStagesStorage().GetStageDescription(ctx, m.ProjectName, stageID)
            if err != nil {
                return nil, fmt.Errorf("unable to get stage %s descriptor from final repo %s: %w", stageID.String(), m.GetFinalStagesStorage().String(), err)
            }
            if desc != nil {
                logboek.Context(ctx).Info().LogF("Stage %s already exists in the final repo, skipping\n", stageID.String())

                logboek.Context(ctx).Default().LogFHighlight("Use previously built final image for %s\n", opts.LogDetailedName)
                container_backend.LogImageName(ctx, finalImageName)

                return desc, nil
            }
        }
    }

    if opts.ShouldBeBuiltMode {
        return nil, fmt.Errorf("%s with digest %s is not exist in the final repo", opts.LogDetailedName, stageID.Digest)
    }

    var desc *image.StageDescription

    err = logboek.Context(ctx).Default().LogProcess("Copy stage %s into the final repo", opts.LogDetailedName).
        Options(func(options types.LogProcessOptionsInterface) {
            options.Style(style.Highlight())
        }).
        DoError(func() error {
            copyOpts := CopyStageOptions{
                ContainerBackend:     opts.ContainerBackend,
                IsMultiplatformImage: opts.IsMultiplatformImage,
            }
            if opts.FetchStage != nil {
                copyOpts.FetchStage = opts.FetchStage
                copyOpts.LegacyImage = opts.FetchStage.GetStageImage().Image
            }
            desc, err = m.CopyStage(ctx, m.StagesStorage, finalStagesStorage, stageID, copyOpts)
            if err != nil {
                return fmt.Errorf("unable to copy stage %s into the final repo %s: %w", stageID.String(), finalStagesStorage.String(), err)
            }

            logboek.Context(ctx).Default().LogFDetails("  name: %s\n", finalImageName)

            return nil
        })
    if err != nil {
        return nil, err
    }

    existingStagesListCache.AddStageID(stageID)
    logboek.Context(ctx).Debug().LogF("Updated existing final stages list: %#v\n", m.FinalStagesListCache.StageIDs)

    return desc, nil
}

func (m *StorageManager) SelectSuitableStage(ctx context.Context, c stage.Conveyor, stg stage.Interface, stages []*image.StageDescription) (*image.StageDescription, error) {
    if len(stages) == 0 {
        return nil, nil
    }

    var stageDesc *image.StageDescription
    if err := logboek.Context(ctx).Info().LogProcess("Selecting suitable image for stage %s by digest %s", stg.Name(), stg.GetDigest()).
        DoError(func() error {
            var err error
            stageDesc, err = stg.SelectSuitableStage(ctx, c, stages)
            return err
        }); err != nil {
        return nil, err
    }
    if stageDesc == nil {
        return nil, nil
    }

    imgInfoData, err := yaml.Marshal(stageDesc)
    if err != nil {
        panic(err)
    }

    logboek.Context(ctx).Debug().LogBlock("Selected cache image").
        Options(func(options types.LogBlockOptionsInterface) {
            options.Style(style.Highlight())
        }).
        Do(func() {
            logboek.Context(ctx).Debug().LogF(string(imgInfoData))
        })

    return stageDesc, nil
}

func (m *StorageManager) GetStagesByDigestWithCache(ctx context.Context, stageName, stageDigest string) ([]*image.StageDescription, error) {
    return m.GetStagesByDigestFromStagesStorageWithCache(ctx, stageName, stageDigest, m.StagesStorage)
}

func (m *StorageManager) GetStagesByDigest(ctx context.Context, stageName, stageDigest string) ([]*image.StageDescription, error) {
    return m.GetStagesByDigestFromStagesStorage(ctx, stageName, stageDigest, m.StagesStorage)
}

func (m *StorageManager) GetStagesByDigestFromStagesStorageWithCache(ctx context.Context, stageName, stageDigest string, stagesStorage storage.StagesStorage) ([]*image.StageDescription, error) {
    cachedStageDescriptionList, err := m.getStagesByDigestFromStagesStorage(ctx, stageName, stageDigest, stagesStorage, storage.WithCache())
    if err != nil {
        return nil, err
    }

    if len(cachedStageDescriptionList) != 0 {
        return cachedStageDescriptionList, nil
    }

    return m.getStagesByDigestFromStagesStorage(ctx, stageName, stageDigest, stagesStorage)
}

func (m *StorageManager) GetStagesByDigestFromStagesStorage(ctx context.Context, stageName, stageDigest string, stagesStorage storage.StagesStorage) ([]*image.StageDescription, error) {
    return m.getStagesByDigestFromStagesStorage(ctx, stageName, stageDigest, stagesStorage)
}

func (m *StorageManager) getStagesByDigestFromStagesStorage(ctx context.Context, stageName, stageDigest string, stagesStorage storage.StagesStorage, opts ...storage.Option) ([]*image.StageDescription, error) {
    stageIDs, err := m.getStagesIDsByDigestFromStagesStorage(ctx, stageName, stageDigest, stagesStorage, opts...)
    if err != nil {
        return nil, fmt.Errorf("unable to get stages ids from %s by digest %s for stage %s: %w", stagesStorage.String(), stageDigest, stageName, err)
    }

    stages, err := m.getStagesDescriptions(ctx, stageIDs, stagesStorage, m.CacheStagesStorageList)
    if err != nil {
        return nil, fmt.Errorf("unable to get stage descriptions by ids from %s: %w", stagesStorage.String(), err)
    }

    return stages, nil
}

func (m *StorageManager) CopySuitableByDigestStage(ctx context.Context, stageDesc *image.StageDescription, sourceStagesStorage, destinationStagesStorage storage.StagesStorage, containerBackend container_backend.ContainerBackend, targetPlatform string) (*image.StageDescription, error) {
    img := container_backend.NewLegacyStageImage(nil, stageDesc.Info.Name, containerBackend, targetPlatform)

    logboek.Context(ctx).Info().LogF("Fetching %s\n", img.Name())
    if err := sourceStagesStorage.FetchImage(ctx, img); err != nil {
        return nil, fmt.Errorf("unable to fetch %s from %s: %w", stageDesc.Info.Name, sourceStagesStorage.String(), err)
    }

    newImageName := destinationStagesStorage.ConstructStageImageName(m.ProjectName, stageDesc.StageID.Digest, stageDesc.StageID.UniqueID)
    logboek.Context(ctx).Info().LogF("Renaming image %s to %s\n", img.Name(), newImageName)
    if err := containerBackend.RenameImage(ctx, img, newImageName, false); err != nil {
        return nil, err
    }

    logboek.Context(ctx).Info().LogF("Storing %s\n", newImageName)
    if err := destinationStagesStorage.StoreImage(ctx, img); err != nil {
        return nil, fmt.Errorf("unable to store %s to %s: %w", stageDesc.Info.Name, destinationStagesStorage.String(), err)
    }

    if destinationStageDesc, err := getStageDescription(ctx, m.ProjectName, *stageDesc.StageID, destinationStagesStorage, m.CacheStagesStorageList, getStageDescriptionOptions{WithLocalManifestCache: m.getWithLocalManifestCacheOption()}); err != nil {
        return nil, fmt.Errorf("unable to get stage %s description from %s: %w", stageDesc.StageID.String(), destinationStagesStorage.String(), err)
    } else {
        return destinationStageDesc, nil
    }
}

func (m *StorageManager) getWithLocalManifestCacheOption() bool {
    return m.StagesStorage.Address() != storage.LocalStorageAddress
}

func (m *StorageManager) getStagesIDsByDigestFromStagesStorage(ctx context.Context, stageName, stageDigest string, stagesStorage storage.StagesStorage, opts ...storage.Option) ([]image.StageID, error) {
    var stageIDs []image.StageID
    if err := logboek.Context(ctx).Info().LogProcess("Get %s stages by digest %s from storage", stageName, stageDigest).
        DoError(func() error {
            var err error
            stageIDs, err = stagesStorage.GetStagesIDsByDigest(ctx, m.ProjectName, stageDigest, opts...)
            if err != nil {
                return fmt.Errorf("error getting project %s stage %s images from storage: %w", m.StagesStorage.String(), stageDigest, err)
            }

            logboek.Context(ctx).Debug().LogF("Stages ids: %#v\n", stageIDs)

            return nil
        }); err != nil {
        return nil, err
    }

    return stageIDs, nil
}

func (m *StorageManager) getStagesDescriptions(ctx context.Context, stageIDs []image.StageID, stagesStorage storage.StagesStorage, cacheStagesStorageList []storage.StagesStorage) ([]*image.StageDescription, error) {
    var stages []*image.StageDescription
    for _, stageID := range stageIDs {
        stageDesc, err := getStageDescription(ctx, m.ProjectName, stageID, stagesStorage, cacheStagesStorageList, getStageDescriptionOptions{WithLocalManifestCache: m.getWithLocalManifestCacheOption()})
        if err != nil {
            return nil, err
        }

        if stageDesc == nil {
            logboek.Context(ctx).Warn().LogF("Ignoring stage %s: cannot get stage description from %s\n", stageID.String(), m.StagesStorage.String())
            continue
        }

        stages = append(stages, stageDesc)
    }

    return stages, nil
}

type getStageDescriptionOptions struct {
    WithLocalManifestCache bool
}

func getStageDescriptionFromLocalManifestCache(ctx context.Context, projectName string, stageID image.StageID, stagesStorage storage.StagesStorage) (*image.StageDescription, error) {
    stageImageName := stagesStorage.ConstructStageImageName(projectName, stageID.Digest, stageID.UniqueID)

    logboek.Context(ctx).Debug().LogF("Getting image %s info from the manifest cache...\n", stageImageName)
    imgInfo, err := image.CommonManifestCache.GetImageInfo(ctx, stagesStorage.String(), stageImageName)
    if err != nil {
        return nil, fmt.Errorf("error getting image %s info: %w", stageImageName, err)
    }

    if imgInfo != nil {
        logboek.Context(ctx).Info().LogF("Got image %s info from the manifest cache (CACHE HIT)\n", stageImageName)

        return &image.StageDescription{
            StageID: image.NewStageID(stageID.Digest, stageID.UniqueID),
            Info:    imgInfo,
        }, nil
    } else {
        logboek.Context(ctx).Info().LogF("Not found %s image info in the manifest cache (CACHE MISS)\n", stageImageName)
    }

    return nil, nil
}

func ConvertStageDescriptionForStagesStorage(stageDesc *image.StageDescription, stagesStorage storage.StagesStorage) *image.StageDescription {
    return &image.StageDescription{
        StageID: image.NewStageID(stageDesc.StageID.Digest, stageDesc.StageID.UniqueID),
        Info: &image.Info{
            Name:              fmt.Sprintf("%s:%s-%d", stagesStorage.Address(), stageDesc.StageID.Digest, stageDesc.StageID.UniqueID),
            Repository:        stagesStorage.Address(),
            Tag:               stageDesc.Info.Tag,
            RepoDigest:        stageDesc.Info.RepoDigest,
            ID:                stageDesc.Info.ID,
            ParentID:          stageDesc.Info.ParentID,
            Labels:            stageDesc.Info.Labels,
            Size:              stageDesc.Info.Size,
            CreatedAtUnixNano: stageDesc.Info.CreatedAtUnixNano,
            OnBuild:           stageDesc.Info.OnBuild,
            Env:               stageDesc.Info.Env,
        },
    }
}

func getStageDescription(ctx context.Context, projectName string, stageID image.StageID, stagesStorage storage.StagesStorage, cacheStagesStorageList []storage.StagesStorage, opts getStageDescriptionOptions) (*image.StageDescription, error) {
    if opts.WithLocalManifestCache {
        stageDesc, err := getStageDescriptionFromLocalManifestCache(ctx, projectName, stageID, stagesStorage)
        if err != nil {
            return nil, fmt.Errorf("error getting stage %s description from %s: %w", stageID.String(), stagesStorage.String(), err)
        }
        if stageDesc != nil {
            return stageDesc, nil
        }
    }

    for _, cacheStagesStorage := range cacheStagesStorageList {
        if opts.WithLocalManifestCache {
            stageDesc, err := getStageDescriptionFromLocalManifestCache(ctx, projectName, stageID, cacheStagesStorage)
            if err != nil {
                return nil, fmt.Errorf("error getting stage %s description from the local manifest cache: %w", stageID.String(), err)
            }
            if stageDesc != nil {
                return ConvertStageDescriptionForStagesStorage(stageDesc, stagesStorage), nil
            }
        }

        var stageDesc *image.StageDescription
        err := logboek.Context(ctx).Info().LogProcess("Get stage %s description from cache stages storage %s", stageID.String(), cacheStagesStorage.String()).
            DoError(func() error {
                var err error
                stageDesc, err = cacheStagesStorage.GetStageDescription(ctx, projectName, stageID)

                logboek.Context(ctx).Debug().LogF("Got stage description: %#v\n", stageDesc)
                return err
            })
        if err != nil {
            logboek.Context(ctx).Warn().LogF("Unable to get stage description from cache stages storage %s: %s\n", cacheStagesStorage.String(), err)
            continue
        }

        if stageDesc != nil {
            if opts.WithLocalManifestCache {
                if err := storeStageDescriptionIntoLocalManifestCache(ctx, projectName, stageID, cacheStagesStorage, stageDesc); err != nil {
                    return nil, fmt.Errorf("error storing stage %s description into local manifest cache: %w", stageID.String(), err)
                }
            }

            return ConvertStageDescriptionForStagesStorage(stageDesc, stagesStorage), nil
        }
    }

    logboek.Context(ctx).Debug().LogF("Getting digest %q uniqueID %d stage info from %s...\n", stageID.Digest, stageID.UniqueID, stagesStorage.String())
    stageDesc, err := stagesStorage.GetStageDescription(ctx, projectName, stageID)
    switch {
    case storage.IsErrBrokenImage(err):
        return nil, nil
    case err != nil:
        return nil, fmt.Errorf("error getting digest %q uniqueID %d stage info from %s: %w", stageID.Digest, stageID.UniqueID, stagesStorage.String(), err)
    case stageDesc != nil:
        if opts.WithLocalManifestCache {
            if err := storeStageDescriptionIntoLocalManifestCache(ctx, projectName, stageID, stagesStorage, stageDesc); err != nil {
                return nil, fmt.Errorf("error storing stage %s description into local manifest cache: %w", stageID.String(), err)
            }
        }
        return stageDesc, nil
    default:
        return nil, nil
    }
}

func (m *StorageManager) GenerateStageUniqueID(digest string, stages []*image.StageDescription) (string, int64) {
    var imageName string

    for {
        timeNow := time.Now().UTC()
        uniqueID := timeNow.Unix()*1000 + int64(timeNow.Nanosecond()/1000000)
        imageName = m.StagesStorage.ConstructStageImageName(m.ProjectName, digest, uniqueID)

        for _, stageDesc := range stages {
            if stageDesc.Info.Name == imageName {
                continue
            }
        }
        return imageName, uniqueID
    }
}

type rmImageMetadataTask struct {
    commit  string
    stageID string
}

func (m *StorageManager) ForEachRmImageMetadata(ctx context.Context, projectName, imageNameOrID string, stageIDCommitList map[string][]string, f func(ctx context.Context, commit, stageID string, err error) error) error {
    var tasks []rmImageMetadataTask
    for stageID, commitList := range stageIDCommitList {
        for _, commit := range commitList {
            tasks = append(tasks, rmImageMetadataTask{
                commit:  commit,
                stageID: stageID,
            })
        }
    }

    return parallel.DoTasks(ctx, len(tasks), parallel.DoTasksOptions{
        MaxNumberOfWorkers: m.MaxNumberOfWorkers(),
    }, func(ctx context.Context, taskId int) error {
        task := tasks[taskId]
        err := m.StagesStorage.RmImageMetadata(ctx, projectName, imageNameOrID, task.commit, task.stageID)
        return f(ctx, task.commit, task.stageID, err)
    })
}

func (m *StorageManager) ForEachRmManagedImage(ctx context.Context, projectName string, managedImages []string, f func(ctx context.Context, managedImage string, err error) error) error {
    return parallel.DoTasks(ctx, len(managedImages), parallel.DoTasksOptions{
        MaxNumberOfWorkers: m.MaxNumberOfWorkers(),
    }, func(ctx context.Context, taskId int) error {
        managedImage := managedImages[taskId]
        err := m.StagesStorage.RmManagedImage(ctx, projectName, managedImage)
        return f(ctx, managedImage, err)
    })
}

func (m *StorageManager) ForEachGetImportMetadata(ctx context.Context, projectName string, ids []string, f func(ctx context.Context, metadataID string, metadata *storage.ImportMetadata, err error) error) error {
    return parallel.DoTasks(ctx, len(ids), parallel.DoTasksOptions{
        MaxNumberOfWorkers: m.MaxNumberOfWorkers(),
    }, func(ctx context.Context, taskId int) error {
        id := ids[taskId]
        metadata, err := m.StagesStorage.GetImportMetadata(ctx, projectName, id)
        return f(ctx, id, metadata, err)
    })
}

func (m *StorageManager) ForEachRmImportMetadata(ctx context.Context, projectName string, ids []string, f func(ctx context.Context, id string, err error) error) error {
    return parallel.DoTasks(ctx, len(ids), parallel.DoTasksOptions{
        MaxNumberOfWorkers: m.MaxNumberOfWorkers(),
    }, func(ctx context.Context, taskId int) error {
        id := ids[taskId]
        err := m.StagesStorage.RmImportMetadata(ctx, projectName, id)
        return f(ctx, id, err)
    })
}

func (m *StorageManager) ForEachDeleteStageCustomTag(ctx context.Context, ids []string, f func(ctx context.Context, tag string, err error) error) error {
    return parallel.DoTasks(ctx, len(ids), parallel.DoTasksOptions{
        MaxNumberOfWorkers: m.MaxNumberOfWorkers(),
    }, func(ctx context.Context, taskId int) error {
        id := ids[taskId]

        if err := m.StagesStorage.DeleteStageCustomTag(ctx, id); err != nil {
            return f(ctx, id, fmt.Errorf("unable to delete stage custom tag: %w", err))
        }
        if err := m.StagesStorage.UnregisterStageCustomTag(ctx, id); err != nil {
            return f(ctx, id, fmt.Errorf("unable to unregister stage custom tag: %w", err))
        }

        return f(ctx, id, nil)
    })
}

func (m *StorageManager) ForEachGetStageCustomTagMetadata(ctx context.Context, ids []string, f func(ctx context.Context, metadataID string, metadata *storage.CustomTagMetadata, err error) error) error {
    return parallel.DoTasks(ctx, len(ids), parallel.DoTasksOptions{
        MaxNumberOfWorkers: m.MaxNumberOfWorkers(),
    }, func(ctx context.Context, taskId int) error {
        id := ids[taskId]
        metadata, err := m.StagesStorage.GetStageCustomTagMetadata(ctx, id)
        return f(ctx, id, metadata, err)
    })
}