ARM-software/golang-utils

View on GitHub
utils/filesystem/lockfile.go

Summary

Maintainability
A
2 hrs
Test Coverage
/*
 * Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors. All rights reserved.
 * SPDX-License-Identifier: Apache-2.0
 */

// Distributed lock using lock files https://fileinfo.com/extension/lock
package filesystem

import (
    "context"
    "fmt"
    "path/filepath"
    "strings"
    "time"

    retry "github.com/avast/retry-go"

    "github.com/ARM-software/golang-utils/utils/collection"
    "github.com/ARM-software/golang-utils/utils/commonerrors"
    "github.com/ARM-software/golang-utils/utils/parallelisation"
)

const LockFilePrefix = "lockfile"

// RemoteLockFile describes a distributed lock using only the file system.
// The locking mechanism is performed using directories and the atomic function `mkdir`.
// A major issue of distributed locks is the presence of stale locks due to many factors such as the loss of the holder of a lock for various reasons.
// To mitigate this problem, a "heart bit" file is modified regularly by the lock holder in order to specify the holder is still alive and the lock still valid.
type RemoteLockFile struct {
    id                   string
    prefix               string
    path                 string
    timeBetweenLockTries time.Duration
    fs                   *VFS
    lockHeartBeatPeriod  time.Duration
    cancelStore          *parallelisation.CancelFunctionStore
    overrideStaleLock    bool
}

// NewGenericRemoteLockFile creates a new remote lock using the file system.
func NewGenericRemoteLockFile(fs *VFS, lockID string, dirPath string, overrideStaleLock bool) ILock {
    return &RemoteLockFile{
        id:                   lockID,
        prefix:               LockFilePrefix,
        path:                 dirPath,
        timeBetweenLockTries: 10 * time.Millisecond,
        fs:                   fs,
        lockHeartBeatPeriod:  50 * time.Millisecond,
        cancelStore:          parallelisation.NewCancelFunctionsStore(),
        overrideStaleLock:    overrideStaleLock,
    }
}

// NewRemoteLockFile creates a new remote lock using the file system.
// lockID Id for the lock.
// dirPath path where the lock should be applied to.
func NewRemoteLockFile(fs *VFS, lockID string, dirPath string) ILock {
    return NewGenericRemoteLockFile(fs, lockID, dirPath, false)
}

func heartBeat(ctx context.Context, fs FS, period time.Duration, filepath string) {
    for {
        if err := parallelisation.DetermineContextError(ctx); err != nil {
            return
        }
        now := time.Now()
        _ = fs.WriteFile(filepath, []byte(fmt.Sprintf("alive @ %v", now)), 0775)
        // FIXME: this is to overcome the problem found with different filesystems which do not update modTime on file change.
        // e.g. https://github.com/spf13/afero/issues/297
        _ = fs.Chtimes(filepath, now, now)
        // sleeping until next heart beat
        parallelisation.SleepWithContext(ctx, period-time.Millisecond)
    }
}
func (l *RemoteLockFile) lockPath() string {
    return filepath.Join(l.path, fmt.Sprintf("%v-%v", strings.TrimSpace(l.prefix), strings.TrimSpace(l.id)))
}

// IsStale checks whether the lock is stale (i.e. no heart beat detected) or not.
func (l *RemoteLockFile) IsStale() bool {
    lockPath := l.lockPath()
    heartBeatFiles, err := l.fs.Ls(lockPath)
    if err != nil {
        return false
    }
    if len(heartBeatFiles) == 0 {
        // if directory exists but no files are present, then it could be that the directory has been created
        // but that the heart beat file hasn't yet. Therefore we check the age of the directory and deduce whether
        // it is stale or not.
        dirInfo, err := l.fs.StatTimes(lockPath)
        if err != nil {
            return false
        }
        return isStale(dirInfo, l.lockHeartBeatPeriod)
    }
    return areHeartBeatFilesAllStale(l.fs, lockPath, heartBeatFiles, l.lockHeartBeatPeriod)
}

func areHeartBeatFilesAllStale(fs *VFS, lockPath string, heartBeatFiles []string, lockHeartBeatPeriod time.Duration) bool {
    staleFiles := []bool{}
    for i := range heartBeatFiles {
        heartBeat := filepath.Join(lockPath, heartBeatFiles[i]) // there should only be one file in the directory
        // check the time since the heart beat was last modified.
        // if this is less than that beat period then the lock is alive
        info, err := fs.StatTimes(heartBeat)
        isStaleB := false
        if err == nil {
            isStaleB = isStale(info, lockHeartBeatPeriod)
        }
        staleFiles = append(staleFiles, isStaleB)
    }
    return collection.All(staleFiles)
}

func isStale(filetime FileTimeInfo, beatPeriod time.Duration) bool {
    if filetime == nil {
        return false
    }
    return time.Since(filetime.ModTime()).Milliseconds() > 2*beatPeriod.Milliseconds()
}

func (l *RemoteLockFile) ReleaseIfStale(ctx context.Context) error {
    if l.IsStale() {
        return l.Unlock(ctx)
    }
    return nil
}

// TryLock attempts to lock the lock straight away.
func (l *RemoteLockFile) TryLock(ctx context.Context) (err error) {
    if err := parallelisation.DetermineContextError(ctx); err != nil {
        return err
    }

    lockPath := l.lockPath()
    // create directory as lock
    err = l.fs.vfs.Mkdir(lockPath, 0755)
    if commonerrors.Any(ConvertFileSystemError(err), commonerrors.ErrExists) {
        if l.IsStale() {
            if l.overrideStaleLock {
                _ = l.ReleaseIfStale(ctx)
                err = l.TryLock(ctx)
                return err
            }
            return commonerrors.ErrStaleLock
        }
        return commonerrors.ErrLocked
    }
    if err != nil {
        return
    }

    // FIXME: the following is to overcome the problem found with different filesystems which do not update modTime on directory creation.
    // e.g. https://github.com/spf13/afero/issues/297
    now := time.Now()
    _ = l.fs.Chtimes(lockPath, now, now)
    // create a heart beat file that will be updated whilst the lock is active
    // there will be a context for cancelling update status when unlock is called
    // the status file will update the file (modtime) until told to cancel through ctx
    heartBeatFilePath := l.heartBeatFile(lockPath)
    subctx, cancelFunc := context.WithCancel(ctx)
    l.cancelStore.RegisterCancelFunction(cancelFunc)
    go heartBeat(subctx, l.fs, l.lockHeartBeatPeriod, heartBeatFilePath)
    return nil
}

func (l *RemoteLockFile) heartBeatFile(lockPath string) string {
    return filepath.Join(lockPath, fmt.Sprintf("%v.lock", l.id))
}

// Lock locks the lock. This call will block until the lock is available.
func (l *RemoteLockFile) Lock(ctx context.Context) error {
    for {
        if err := parallelisation.DetermineContextError(ctx); err != nil {
            return err
        }
        if err := l.TryLock(ctx); err != nil {
            if err == commonerrors.ErrLocked {
                waitCtx, cancel := context.WithTimeout(ctx, l.timeBetweenLockTries)
                <-waitCtx.Done()
                cancel()
            } else {
                return err
            }
        } else {
            return nil
        }
    }
}

// LockWithTimeout tries to lock the lock until the timeout expires
func (l *RemoteLockFile) LockWithTimeout(ctx context.Context, timeout time.Duration) error {
    if err := parallelisation.DetermineContextError(ctx); err != nil {
        return err
    }
    return parallelisation.RunActionWithTimeoutAndCancelStore(ctx, timeout, l.cancelStore, l.Lock)
}

// Unlock unlocks the lock
func (l *RemoteLockFile) Unlock(ctx context.Context) error {
    l.cancelStore.Cancel()
    return retry.Do(
        func() error {
            err := l.fs.Rm(l.lockPath())
            if err != nil {
                return fmt.Errorf("cannot unlock lock [%v]: %w", l.id, err)
            }
            if l.fs.Exists(l.lockPath()) {
                return fmt.Errorf("cannot unlock lock [%v]: %w", l.id, commonerrors.ErrLocked)
            }
            return nil
        },
        retry.MaxJitter(25*time.Millisecond),
        retry.DelayType(retry.RandomDelay),
        retry.Attempts(10),
        retry.Context(ctx),
    )
}

// MakeStale is mostly useful for testing purposes and tries to mock locks going stale.
func (l *RemoteLockFile) MakeStale(ctx context.Context) error {
    l.cancelStore.Cancel()
    parallelisation.SleepWithContext(ctx, l.lockHeartBeatPeriod+time.Millisecond)
    lockPath := l.lockPath()
    filePath := l.heartBeatFile(lockPath)
    newTime := time.Now().Add(-1 * (l.lockHeartBeatPeriod + time.Millisecond))
    return retry.Do(
        func() error {
            if !l.fs.Exists(lockPath) {
                return nil
            }
            if l.fs.Exists(filePath) {
                _ = l.fs.Chtimes(filePath, newTime, newTime)
            } else {
                _ = l.fs.Chtimes(lockPath, newTime, newTime)
            }
            if !l.IsStale() {
                return fmt.Errorf("cannot make lock [%v] stale", l.id)
            }
            return nil
        },
        retry.MaxJitter(l.lockHeartBeatPeriod),
        retry.DelayType(retry.RandomDelay),
        retry.Attempts(10),
        retry.Context(ctx),
    )
}