DominicBreuker/pspy

View on GitHub
internal/fswatcher/fswatcher.go

Summary

Maintainability
A
0 mins
Test Coverage
package fswatcher

import (
    "fmt"

    "github.com/dominicbreuker/pspy/internal/fswatcher/inotify"
    "github.com/dominicbreuker/pspy/internal/fswatcher/walker"
)

type Inotify interface {
    Init() error
    Watch(dir string) error
    NumWatchers() int
    Read(buf []byte) (int, error)
    ParseNextEvent(buf []byte) (*inotify.Event, uint32, error)
    Close() error
}

type Walker interface {
    Walk(dir string, depth int) (chan string, chan error, chan struct{})
}

type FSWatcher struct {
    i           Inotify
    w           Walker
    maxWatchers int
    eventSize   int
    drain       bool
}

func NewFSWatcher() *FSWatcher {
    return &FSWatcher{
        i:           inotify.NewInotify(),
        w:           walker.NewWalker(),
        maxWatchers: inotify.MaxWatchers,
        eventSize:   inotify.EventSize,
        drain:       true,
    }
}

func (fs *FSWatcher) Enable() {
    fs.drain = false
}

func (fs *FSWatcher) Close() {
    fs.i.Close()
}

func (fs *FSWatcher) Init(rdirs, dirs []string) (chan error, chan struct{}) {
    errCh := make(chan error)
    doneCh := make(chan struct{})

    go func() {
        defer close(doneCh)

        err := fs.i.Init()
        if err != nil {
            errCh <- fmt.Errorf("setting up inotify: %v", err)
            return
        }

        fs.addWatchers(rdirs, dirs, errCh)
    }()

    return errCh, doneCh
}

func (fs *FSWatcher) addWatchers(rdirs, dirs []string, errCh chan error) {
    for _, dir := range rdirs {
        fs.addWatchersToDir(dir, -1, errCh)
    }
    for _, dir := range dirs {
        fs.addWatchersToDir(dir, 0, errCh)
    }
}

func (fs *FSWatcher) addWatchersToDir(dir string, depth int, errCh chan error) {
    dirCh, walkErrCh, doneCh := fs.w.Walk(dir, depth)

    for {
        if fs.maximumWatchersExceeded() {
            close(doneCh)
            return
        }

        if done := fs.handleNextWalkerResult(dirCh, walkErrCh, errCh); done {
            return
        }
    }
}

func (fs *FSWatcher) maximumWatchersExceeded() bool {
    return fs.maxWatchers > 0 && fs.i.NumWatchers() >= fs.maxWatchers
}

func (fs *FSWatcher) handleNextWalkerResult(dirCh chan string, walkErrCh chan error, errCh chan error) bool {
    select {
    case err := <-walkErrCh:
        errCh <- fmt.Errorf("adding inotify watchers: %v", err)
    case dir, ok := <-dirCh:
        if !ok {
            return true
        }
        if err := fs.i.Watch(dir); err != nil {
            errCh <- fmt.Errorf("can't create watcher: %v", err)
        }
    }
    return false
}

func (fs *FSWatcher) Run() (chan struct{}, chan string, chan error) {
    triggerCh, dataCh, eventCh, errCh := make(chan struct{}), make(chan []byte), make(chan string), make(chan error)

    go fs.observe(triggerCh, dataCh, errCh)
    go fs.parseEvents(dataCh, eventCh, errCh)

    return triggerCh, eventCh, errCh
}

func (fs *FSWatcher) observe(triggerCh chan struct{}, dataCh chan []byte, errCh chan error) {
    buf := make([]byte, 5*fs.eventSize)

    for {
        n, err := fs.i.Read(buf)
        if fs.drain {
            continue
        }

        triggerCh <- struct{}{}
        if err != nil {
            errCh <- fmt.Errorf("reading inotify buffer: %v", err)
            continue
        }
        bufCopy := make([]byte, n)
        copy(bufCopy, buf)
        dataCh <- bufCopy
    }
}

func (fs *FSWatcher) parseEvents(dataCh chan []byte, eventCh chan string, errCh chan error) {
    for buf := range dataCh {
        fs.handleChunk(buf, eventCh, errCh)
    }
}

func (fs *FSWatcher) handleChunk(buf []byte, eventCh chan string, errCh chan error) {
    var ptr uint32
    for len(buf[ptr:]) > 0 {
        event, size, err := fs.i.ParseNextEvent(buf[ptr:])
        ptr += size
        if err != nil {
            errCh <- fmt.Errorf("parsing events: %v", err)
            continue
        }
        eventCh <- fmt.Sprintf("%20s | %s", event.Op, event.Name)
    }
}