netdata/netdata

View on GitHub
src/go/collectors/go.d.plugin/agent/discovery/file/watch.go

Summary

Maintainability
A
0 mins
Test Coverage
// SPDX-License-Identifier: GPL-3.0-or-later

package file

import (
    "context"
    "fmt"
    "os"
    "path/filepath"
    "strings"
    "time"

    "github.com/netdata/netdata/go/go.d.plugin/agent/confgroup"
    "github.com/netdata/netdata/go/go.d.plugin/logger"

    "github.com/fsnotify/fsnotify"
)

type (
    Watcher struct {
        *logger.Logger

        paths        []string
        reg          confgroup.Registry
        watcher      *fsnotify.Watcher
        cache        cache
        refreshEvery time.Duration
    }
    cache map[string]time.Time
)

func (c cache) lookup(path string) (time.Time, bool) { v, ok := c[path]; return v, ok }
func (c cache) has(path string) bool                 { _, ok := c.lookup(path); return ok }
func (c cache) remove(path string)                   { delete(c, path) }
func (c cache) put(path string, modTime time.Time)   { c[path] = modTime }

func NewWatcher(reg confgroup.Registry, paths []string) *Watcher {
    d := &Watcher{
        Logger:       log,
        paths:        paths,
        reg:          reg,
        watcher:      nil,
        cache:        make(cache),
        refreshEvery: time.Minute,
    }
    return d
}

func (w *Watcher) String() string {
    return w.Name()
}

func (w *Watcher) Name() string {
    return "file watcher"
}

func (w *Watcher) Run(ctx context.Context, in chan<- []*confgroup.Group) {
    w.Info("instance is started")
    defer func() { w.Info("instance is stopped") }()

    watcher, err := fsnotify.NewWatcher()
    if err != nil {
        w.Errorf("fsnotify watcher initialization: %v", err)
        return
    }

    w.watcher = watcher
    defer w.stop()
    w.refresh(ctx, in)

    tk := time.NewTicker(w.refreshEvery)
    defer tk.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-tk.C:
            w.refresh(ctx, in)
        case event := <-w.watcher.Events:
            // TODO: check if event.Has will do
            if event.Name == "" || isChmodOnly(event) || !w.fileMatches(event.Name) {
                break
            }
            if event.Has(fsnotify.Create) && w.cache.has(event.Name) {
                // vim "backupcopy=no" case, already collected after Rename event.
                break
            }
            if event.Has(fsnotify.Rename) {
                // It is common to modify files using vim.
                // When writing to a file a backup is made. "backupcopy" option tells how it's done.
                // Default is "no": rename the file and write a new one.
                // This is cheap attempt to not send empty group for the old file.
                time.Sleep(time.Millisecond * 100)
            }
            w.refresh(ctx, in)
        case err := <-w.watcher.Errors:
            if err != nil {
                w.Warningf("watch: %v", err)
            }
        }
    }
}

func (w *Watcher) fileMatches(file string) bool {
    for _, pattern := range w.paths {
        if ok, _ := filepath.Match(pattern, file); ok {
            return true
        }
    }
    return false
}

func (w *Watcher) listFiles() (files []string) {
    for _, pattern := range w.paths {
        if matches, err := filepath.Glob(pattern); err == nil {
            files = append(files, matches...)
        }
    }
    return files
}

func (w *Watcher) refresh(ctx context.Context, in chan<- []*confgroup.Group) {
    select {
    case <-ctx.Done():
        return
    default:
    }
    var groups []*confgroup.Group
    seen := make(map[string]bool)

    for _, file := range w.listFiles() {
        fi, err := os.Lstat(file)
        if err != nil {
            w.Warningf("lstat '%s': %v", file, err)
            continue
        }

        if !fi.Mode().IsRegular() {
            continue
        }

        seen[file] = true
        if v, ok := w.cache.lookup(file); ok && v.Equal(fi.ModTime()) {
            continue
        }
        w.cache.put(file, fi.ModTime())

        if group, err := parse(w.reg, file); err != nil {
            w.Warningf("parse '%s': %v", file, err)
        } else if group == nil {
            groups = append(groups, &confgroup.Group{Source: file})
        } else {
            for _, cfg := range group.Configs {
                cfg.SetProvider("file watcher")
                cfg.SetSourceType(configSourceType(file))
                cfg.SetSource(fmt.Sprintf("discoverer=file_watcher,file=%s", file))
            }
            groups = append(groups, group)
        }
    }

    for name := range w.cache {
        if seen[name] {
            continue
        }
        w.cache.remove(name)
        groups = append(groups, &confgroup.Group{Source: name})
    }

    send(ctx, in, groups)

    w.watchDirs()
}

func (w *Watcher) watchDirs() {
    for _, path := range w.paths {
        if idx := strings.LastIndex(path, "/"); idx > -1 {
            path = path[:idx]
        } else {
            path = "./"
        }
        if err := w.watcher.Add(path); err != nil {
            w.Errorf("start watching '%s': %v", path, err)
        }
    }
}

func (w *Watcher) stop() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // closing the watcher deadlocks unless all events and errors are drained.
    go func() {
        for {
            select {
            case <-w.watcher.Errors:
            case <-w.watcher.Events:
            case <-ctx.Done():
                return
            }
        }
    }()

    _ = w.watcher.Close()
}

func isChmodOnly(event fsnotify.Event) bool {
    return event.Op^fsnotify.Chmod == 0
}

func send(ctx context.Context, in chan<- []*confgroup.Group, groups []*confgroup.Group) {
    if len(groups) == 0 {
        return
    }
    select {
    case <-ctx.Done():
    case in <- groups:
    }
}