pkg/cleaning/git_history_based_cleanup/scanner.go

Summary

Maintainability
C
1 day
Test Coverage
B
88%
package git_history_based_cleanup

import (
    "context"
    "fmt"
    "reflect"
    "sort"
    "time"

    "github.com/go-git/go-git/v5"
    "github.com/go-git/go-git/v5/plumbing"
    "github.com/go-git/go-git/v5/plumbing/object"

    "github.com/werf/logboek"
    "github.com/werf/werf/v2/pkg/config"
    "github.com/werf/werf/v2/pkg/util"
)

func ScanReferencesHistory(ctx context.Context, gitRepository *git.Repository, refs []*ReferenceToScan, expectedStageIDCommitList map[string][]string) ([]string, map[string][]string, error) {
    var reachedStageIDs []string
    var stopCommitList []string
    stageIDHitCommitList := map[string][]string{}

    for i := len(refs) - 1; i >= 0; i-- {
        ref := refs[i]

        var refReachedStageIDs []string
        var refStopCommitList []string
        refStageIDHitCommitList := map[string][]string{}
        var err error

        var logProcessMessage string
        if ref.Reference.Name().IsTag() {
            logProcessMessage = "Tag " + ref.String()
        } else {
            logProcessMessage = "Reference " + ref.String()
        }

        if err := logboek.Context(ctx).Info().LogProcess(logProcessMessage).DoError(func() error {
            refReachedStageIDs, refStopCommitList, refStageIDHitCommitList, err = scanReferenceHistory(ctx, gitRepository, ref, expectedStageIDCommitList, stopCommitList)
            if err != nil {
                return fmt.Errorf("scan reference history failed: %w", err)
            }

            stopCommitList = util.AddNewStringsToStringArray(stopCommitList, refStopCommitList...)
            reachedStageIDs = util.AddNewStringsToStringArray(reachedStageIDs, refReachedStageIDs...)

            for refStageID, refCommitList := range refStageIDHitCommitList {
                hitCommitList, ok := stageIDHitCommitList[refStageID]
                if !ok {
                    stageIDHitCommitList[refStageID] = refCommitList
                    continue
                }

                stageIDHitCommitList[refStageID] = util.AddNewStringsToStringArray(hitCommitList, refCommitList...)
            }

            return nil
        }); err != nil {
            return nil, nil, err
        }
    }

    return reachedStageIDs, stageIDHitCommitList, nil
}

func applyImagesCleanupInPolicy(gitRepository *git.Repository, stageIDCommitList map[string][]string, in *time.Duration) map[string][]string {
    if in == nil {
        return stageIDCommitList
    }

    policyStageIDCommitList := map[string][]string{}
    for stageID, commitList := range stageIDCommitList {
        var resultCommitList []string
        for _, commit := range commitList {
            commitHash := plumbing.NewHash(commit)
            c, err := gitRepository.CommitObject(commitHash)
            if err != nil {
                panic("unexpected condition")
            }

            if c.Committer.When.After(time.Now().Add(-*in)) {
                resultCommitList = append(resultCommitList, commit)
            }
        }

        if len(resultCommitList) != 0 {
            policyStageIDCommitList[stageID] = resultCommitList
        }
    }

    return policyStageIDCommitList
}

type commitHistoryScanner struct {
    gitRepository             *git.Repository
    expectedStageIDCommitList map[string][]string
    reachedStageIDCommitList  map[string][]string
    reachedCommitList         []string
    stopCommitList            []string
    isAlreadyScannedCommit    map[string]bool
    scanDepth                 int

    referenceScanOptions
}

func (s *commitHistoryScanner) reachedStageIDList() []string {
    var reachedStageIDList []string
    for stageID, commitList := range s.reachedStageIDCommitList {
        if len(commitList) != 0 {
            reachedStageIDList = append(reachedStageIDList, stageID)
        }
    }

    return reachedStageIDList
}

func scanReferenceHistory(ctx context.Context, gitRepository *git.Repository, ref *ReferenceToScan, expectedStageIDCommitList map[string][]string, stopCommitList []string) ([]string, []string, map[string][]string, error) {
    filteredExpectedStageIDCommitList := applyImagesCleanupInPolicy(gitRepository, expectedStageIDCommitList, ref.imagesCleanupKeepPolicy.In)

    var refExpectedStageIDCommitList map[string][]string
    isImagesCleanupKeepPolicyOnlyInOrAndBoth := ref.imagesCleanupKeepPolicy.Last == nil || (ref.imagesCleanupKeepPolicy.Operator != nil && *ref.imagesCleanupKeepPolicy.Operator == config.AndOperator)
    if isImagesCleanupKeepPolicyOnlyInOrAndBoth {
        refExpectedStageIDCommitList = filteredExpectedStageIDCommitList
    } else {
        refExpectedStageIDCommitList = expectedStageIDCommitList
    }

    if len(refExpectedStageIDCommitList) == 0 {
        logboek.Context(ctx).Info().LogLn("Skip reference due to nothing to seek")
        return []string{}, stopCommitList, map[string][]string{}, nil
    }

    s := &commitHistoryScanner{
        gitRepository:             gitRepository,
        expectedStageIDCommitList: refExpectedStageIDCommitList,
        reachedStageIDCommitList:  map[string][]string{},
        stopCommitList:            stopCommitList,

        referenceScanOptions:   ref.referenceScanOptions,
        isAlreadyScannedCommit: map[string]bool{},
    }

    if err := s.scanCommitHistory(ctx, ref.HeadCommit.Hash.String()); err != nil {
        return nil, nil, nil, fmt.Errorf("scan commit %s history failed: %w", ref.HeadCommit.Hash.String(), err)
    }

    isImagesCleanupKeepPolicyLastWithoutLimit := s.referenceScanOptions.imagesCleanupKeepPolicy.Last != nil && *s.referenceScanOptions.imagesCleanupKeepPolicy.Last != -1
    if isImagesCleanupKeepPolicyLastWithoutLimit {
        if len(s.reachedStageIDList()) == *s.referenceScanOptions.imagesCleanupKeepPolicy.Last {
            return s.reachedStageIDList(), s.stopCommitList, s.stageIDHitCommitList(), nil
        } else if len(s.reachedStageIDList()) > *s.referenceScanOptions.imagesCleanupKeepPolicy.Last {
            logboek.Context(ctx).Info().LogF("Reached more tags than expected by last (%d/%d)\n", len(s.reachedStageIDList()), *s.referenceScanOptions.imagesCleanupKeepPolicy.Last)

            latestCommitStageIDs := s.latestCommitStageIDs()
            var latestCommitList []*object.Commit
            for latestCommit := range latestCommitStageIDs {
                latestCommitList = append(latestCommitList, latestCommit)
            }

            sort.Slice(latestCommitList, func(i, j int) bool {
                return latestCommitList[i].Committer.When.After(latestCommitList[j].Committer.When)
            })

            if s.referenceScanOptions.imagesCleanupKeepPolicy.In == nil {
                return s.handleExtraStageIDsByLast(ctx, latestCommitStageIDs, latestCommitList)
            } else {
                return s.handleExtraStageIDsByLastWithIn(ctx, latestCommitStageIDs, latestCommitList)
            }
        }
    }

    if !reflect.DeepEqual(expectedStageIDCommitList, refExpectedStageIDCommitList) {
        return s.reachedStageIDList(), s.stopCommitList, s.stageIDHitCommitList(), nil
    }

    return s.handleStopCommitList(ctx, ref)
}

func (s *commitHistoryScanner) handleStopCommitList(ctx context.Context, ref *ReferenceToScan) ([]string, []string, map[string][]string, error) {
    switch {
    case s.referenceScanOptions.scanDepthLimit != 0:
        if len(s.reachedStageIDList()) == len(s.expectedStageIDCommitList) {
            s.stopCommitList = append(s.stopCommitList, s.reachedCommitList[len(s.reachedCommitList)-1])
        } else {
            return s.reachedStageIDList(), s.stopCommitList, s.stageIDHitCommitList(), nil
        }
    case len(s.reachedStageIDList()) != 0:
        s.stopCommitList = append(s.stopCommitList, s.reachedCommitList[len(s.reachedCommitList)-1])
    default:
        s.stopCommitList = append(s.stopCommitList, ref.HeadCommit.Hash.String())
    }
    logboek.Context(ctx).Debug().LogF("Stop commit %s added\n", s.stopCommitList[len(s.stopCommitList)-1])

    return s.reachedStageIDList(), s.stopCommitList, s.stageIDHitCommitList(), nil
}

func (s *commitHistoryScanner) handleExtraStageIDsByLastWithIn(ctx context.Context, latestCommitStageIDs map[*object.Commit][]string, latestCommitList []*object.Commit) ([]string, []string, map[string][]string, error) {
    var latestCommitListByLast []*object.Commit
    var latestCommitListByIn []*object.Commit
    stageIDHitCommitList := map[string][]string{}

    for ind, latestCommit := range latestCommitList {
        if ind < *s.referenceScanOptions.imagesCleanupKeepPolicy.Last {
            latestCommitListByLast = append(latestCommitListByLast, latestCommit)
        }

        if latestCommit.Committer.When.After(time.Now().Add(-*s.referenceScanOptions.imagesCleanupKeepPolicy.In)) {
            latestCommitListByIn = append(latestCommitListByIn, latestCommit)
        }
    }

    var resultLatestCommitList []*object.Commit
    isImagesCleanupKeepPolicyOperatorAnd := s.referenceScanOptions.imagesCleanupKeepPolicy.Operator == nil || *s.referenceScanOptions.imagesCleanupKeepPolicy.Operator == config.AndOperator
    if isImagesCleanupKeepPolicyOperatorAnd {
        for _, commitByLast := range latestCommitListByLast {
            for _, commitByIn := range latestCommitListByIn {
                if commitByLast == commitByIn {
                    resultLatestCommitList = append(resultLatestCommitList, commitByLast)
                }
            }
        }
    } else {
        resultLatestCommitList = latestCommitListByIn
    latestCommitListByLastLoop:
        for _, commitByLast := range latestCommitListByLast {
            for _, commitByIn := range latestCommitListByIn {
                if commitByLast == commitByIn {
                    continue latestCommitListByLastLoop
                }
            }

            resultLatestCommitList = append(resultLatestCommitList, commitByLast)
        }
    }

    var reachedStageIDList []string
    for _, latestCommit := range resultLatestCommitList {
        stageIDs := latestCommitStageIDs[latestCommit]
        if len(stageIDs) > 1 {
            logboek.Context(ctx).Info().LogBlock("Counted tags as one due to identical related commit %s", latestCommit.Hash.String()).Do(func() {
                for _, stageID := range stageIDs {
                    logboek.Context(ctx).Info().LogLn(stageID)
                }
            })
        }

        for _, stageID := range stageIDs {
            reachedStageIDList = append(reachedStageIDList, stageID)
            stageIDHitCommitList[stageID] = []string{latestCommit.Hash.String()}
        }
    }

    var skippedStageIDList []string
latestCommitStageIDLoop:
    for _, stageIDs := range latestCommitStageIDs {
        for _, stageID := range stageIDs {
            for _, reachedStageID := range reachedStageIDList {
                if stageID == reachedStageID {
                    continue latestCommitStageIDLoop
                }
            }

            skippedStageIDList = append(skippedStageIDList, stageID)
        }
    }

    if len(skippedStageIDList) != 0 {
        logboek.Context(ctx).Info().LogBlock(fmt.Sprintf("Skipped tags by keep policy (%s)", s.imagesCleanupKeepPolicy.String())).Do(func() {
            for _, stageID := range skippedStageIDList {
                logboek.Context(ctx).Info().LogLn(stageID)
            }
        })
    }

    return reachedStageIDList, s.stopCommitList, stageIDHitCommitList, nil
}

func (s *commitHistoryScanner) handleExtraStageIDsByLast(ctx context.Context, latestCommitStageIDs map[*object.Commit][]string, latestCommitList []*object.Commit) ([]string, []string, map[string][]string, error) {
    var reachedStageIDList []string
    var skippedStageIDList []string
    stageIDHitCommitList := map[string][]string{}

    for ind, latestCommit := range latestCommitList {
        stageIDs := latestCommitStageIDs[latestCommit]
        if ind < *s.referenceScanOptions.imagesCleanupKeepPolicy.Last {
            if len(stageIDs) > 1 {
                logboek.Context(ctx).Info().LogBlock("Counted tags as one due to identical related commit %s", latestCommit.Hash.String()).Do(func() {
                    for _, stageID := range stageIDs {
                        logboek.Context(ctx).Info().LogLn(stageID)
                    }
                })
            }

            for _, stageID := range stageIDs {
                reachedStageIDList = append(reachedStageIDList, stageID)
                stageIDHitCommitList[stageID] = []string{latestCommit.Hash.String()}
            }
        } else {
            skippedStageIDList = append(skippedStageIDList, stageIDs...)
        }
    }

    if len(skippedStageIDList) != 0 {
        logboek.Context(ctx).Info().LogBlock(fmt.Sprintf("Skipped tags by keep policy (%s)", s.imagesCleanupKeepPolicy.String())).Do(func() {
            for _, stageID := range skippedStageIDList {
                logboek.Context(ctx).Info().LogLn(stageID)
            }
        })
    }

    return reachedStageIDList, s.stopCommitList, stageIDHitCommitList, nil
}

func (s *commitHistoryScanner) scanCommitHistory(ctx context.Context, commit string) error {
    var currentIteration, nextIteration []string

    currentIteration = append(currentIteration, commit)
    for {
        s.scanDepth++

        for _, commit := range currentIteration {
            if s.isAlreadyScannedCommit[commit] {
                continue
            }

            if s.isStopCommit(commit) {
                logboek.Context(ctx).Debug().LogF("Stop scanning commit history %s due to stop commit reached\n", commit)
                continue
            }

            commitParents, err := s.handleCommit(ctx, commit)
            if err != nil {
                return err
            }

            if s.scanDepth == s.referenceScanOptions.scanDepthLimit {
                logboek.Context(ctx).Debug().LogF("Stop scanning commit history %s due to scanDepthLimit (%d)\n", commit, s.referenceScanOptions.scanDepthLimit)
                continue
            }

            if len(s.expectedStageIDCommitList) == len(s.reachedStageIDCommitList) {
                logboek.Context(ctx).Debug().LogLn("Stop scanning due to all expected tags reached")
                break
            }

            s.isAlreadyScannedCommit[commit] = true
            nextIteration = append(nextIteration, commitParents...)
        }

        if len(nextIteration) == 0 {
            break
        }

        currentIteration = nextIteration
        nextIteration = nil
    }

    return nil
}

func (s *commitHistoryScanner) handleCommit(ctx context.Context, commit string) ([]string, error) {
    var isReachedCommit bool

outerLoop:
    for stageID, commitList := range s.expectedStageIDCommitList {
        for _, c := range commitList {
            if c == commit {
                if s.imagesCleanupKeepPolicy.In != nil {
                    commit, err := s.gitRepository.CommitObject(plumbing.NewHash(commit))
                    if err != nil {
                        panic("unexpected condition")
                    }

                    isImagesCleanupKeepPolicyOnlyInOrAndBoth := s.imagesCleanupKeepPolicy.Last == nil || s.imagesCleanupKeepPolicy.Operator == nil || *s.imagesCleanupKeepPolicy.Operator == config.AndOperator
                    if isImagesCleanupKeepPolicyOnlyInOrAndBoth {
                        if commit.Committer.When.Before(time.Now().Add(-*s.imagesCleanupKeepPolicy.In)) {
                            break outerLoop
                        }
                    }
                }

                isReachedCommit = true
                s.reachedStageIDCommitList[stageID] = append(s.reachedStageIDCommitList[stageID], c)

                logboek.Context(ctx).Info().LogF(
                    "Expected content digest %s was reached on commit %s\n",
                    stageID,
                    commit,
                )

                break
            }
        }
    }

    if isReachedCommit {
        s.reachedCommitList = append(s.reachedCommitList, commit)
    }

    co, err := s.gitRepository.CommitObject(plumbing.NewHash(commit))
    if err != nil {
        return nil, fmt.Errorf("commit hash %s resolve failed: %w", commit, err)
    }

    var parentHashes []string
    for _, commitHash := range co.ParentHashes {
        parentHashes = append(parentHashes, commitHash.String())
    }

    return parentHashes, nil
}

func (s *commitHistoryScanner) isStopCommit(commit string) bool {
    for _, c := range s.stopCommitList {
        if commit == c {
            return true
        }
    }

    return false
}

func (s *commitHistoryScanner) stageIDHitCommitList() map[string][]string {
    result := map[string][]string{}
    for commit, stageIDs := range s.latestCommitStageIDs() {
        for _, stageID := range stageIDs {
            result[stageID] = []string{commit.Hash.String()}
        }
    }

    return result
}

func (s *commitHistoryScanner) latestCommitStageIDs() map[*object.Commit][]string {
    latestCommitStageIDs := map[*object.Commit][]string{}
    commitObjectCache := map[string]*object.Commit{}
    for stageID, commitList := range s.reachedStageIDCommitList {
        var latestCommitObject *object.Commit
        for _, commit := range commitList {
            var commitObject *object.Commit

            var err error
            var ok bool
            if commitObject, ok = commitObjectCache[commit]; !ok {
                commitObject, err = s.gitRepository.CommitObject(plumbing.NewHash(commit))
                if err != nil {
                    panic("unexpected condition")
                }

                commitObjectCache[commit] = commitObject
            }

            if latestCommitObject == nil || commitObject.Committer.When.After(latestCommitObject.Committer.When) {
                latestCommitObject = commitObject
            }
        }

        if latestCommitObject != nil {
            latestCommitStageIDs[latestCommitObject] = append(latestCommitStageIDs[latestCommitObject], stageID)
        }
    }

    return latestCommitStageIDs
}