src/go/plugin/go.d/modules/nvidia_smi/exec.go
// SPDX-License-Identifier: GPL-3.0-or-later
package nvidia_smi
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"os/exec"
"strconv"
"sync"
"time"
"github.com/netdata/netdata/go/plugins/logger"
)
type nvidiaSmiBinary interface {
queryGPUInfo() ([]byte, error)
stop() error
}
func newNvidiaSmiBinary(path string, cfg Config, log *logger.Logger) (nvidiaSmiBinary, error) {
if !cfg.LoopMode {
return &nvidiaSmiExec{
Logger: log,
binPath: path,
timeout: cfg.Timeout.Duration(),
}, nil
}
smi := &nvidiaSmiLoopExec{
Logger: log,
binPath: path,
updateEvery: cfg.UpdateEvery,
firstSampleTimeout: time.Second * 3,
}
if err := smi.run(); err != nil {
return nil, err
}
return smi, nil
}
type nvidiaSmiExec struct {
*logger.Logger
binPath string
timeout time.Duration
}
func (e *nvidiaSmiExec) queryGPUInfo() ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), e.timeout)
defer cancel()
cmd := exec.CommandContext(ctx, e.binPath, "-q", "-x")
e.Debugf("executing '%s'", cmd)
bs, err := cmd.Output()
if err != nil {
return nil, fmt.Errorf("error on '%s': %v", cmd, err)
}
return bs, nil
}
func (e *nvidiaSmiExec) stop() error { return nil }
type nvidiaSmiLoopExec struct {
*logger.Logger
binPath string
updateEvery int
firstSampleTimeout time.Duration
cmd *exec.Cmd
done chan struct{}
mux sync.Mutex
lastSample string
}
func (e *nvidiaSmiLoopExec) queryGPUInfo() ([]byte, error) {
select {
case <-e.done:
return nil, errors.New("process has already exited")
default:
}
e.mux.Lock()
defer e.mux.Unlock()
return []byte(e.lastSample), nil
}
func (e *nvidiaSmiLoopExec) run() error {
secs := 5
if e.updateEvery < secs {
secs = e.updateEvery
}
cmd := exec.Command(e.binPath, "-q", "-x", "-l", strconv.Itoa(secs))
e.Debugf("executing '%s'", cmd)
r, err := cmd.StdoutPipe()
if err != nil {
return err
}
if err := cmd.Start(); err != nil {
return err
}
firstSample := make(chan struct{}, 1)
done := make(chan struct{})
e.cmd = cmd
e.done = done
go func() {
defer close(done)
var buf bytes.Buffer
var insideLog bool
var emptyRows int64
var outsideLogRows int64
const unexpectedRowsLimit = 500
sc := bufio.NewScanner(r)
for sc.Scan() {
line := sc.Text()
if !insideLog {
outsideLogRows++
} else {
outsideLogRows = 0
}
if line == "" {
emptyRows++
} else {
emptyRows = 0
}
if outsideLogRows >= unexpectedRowsLimit || emptyRows >= unexpectedRowsLimit {
e.Errorf("unexpected output from nvidia-smi loop: outside log rows %d, empty rows %d", outsideLogRows, emptyRows)
break
}
switch {
case line == "<nvidia_smi_log>":
insideLog = true
buf.Reset()
buf.WriteString(line)
buf.WriteByte('\n')
case line == "</nvidia_smi_log>":
insideLog = false
buf.WriteString(line)
e.mux.Lock()
e.lastSample = buf.String()
e.mux.Unlock()
buf.Reset()
select {
case firstSample <- struct{}{}:
default:
}
case insideLog:
buf.WriteString(line)
buf.WriteByte('\n')
default:
continue
}
}
}()
select {
case <-e.done:
_ = e.stop()
return errors.New("process exited before the first sample was collected")
case <-time.After(e.firstSampleTimeout):
_ = e.stop()
return errors.New("timed out waiting for first sample")
case <-firstSample:
return nil
}
}
func (e *nvidiaSmiLoopExec) stop() error {
if e.cmd == nil || e.cmd.Process == nil {
return nil
}
_ = e.cmd.Process.Kill()
_ = e.cmd.Wait()
e.cmd = nil
select {
case <-e.done:
return nil
case <-time.After(time.Second * 2):
return errors.New("timed out waiting for process to exit")
}
}