simonmittag/p0d

View on GitHub
stats.go

Summary

Maintainability
A
0 mins
Test Coverage
B
84%
package p0d

import (
    "encoding/json"
    "github.com/axiomhq/variance"
    "github.com/showwin/speedtest-go/speedtest"
    "github.com/simonmittag/procspy"
    "github.com/spenczar/tdigest"
    "math"
    "net/http"
    "sync/atomic"
    "time"
)

type Sample struct {
    Server      string
    HTTPVersion string
    TLSVersion  string
    IPVersion   string
    RemoteAddr  string
}

const emptySampleMsg = "not detected"

func NewSample() Sample {
    return Sample{
        Server:      emptySampleMsg,
        HTTPVersion: emptySampleMsg,
        TLSVersion:  emptySampleMsg,
        IPVersion:   emptySampleMsg,
        RemoteAddr:  emptySampleMsg,
    }
}

type ReqStats struct {
    Start                        time.Time
    ElpsdNs                      time.Duration
    ReqAtmpts                    int64
    CurReqAtmptsPSec             int64
    MeanReqAtmptsPSec            int64
    MaxReqAtmptsPSec             int64
    CurBytesReadPSec             int64
    SumBytesRead                 int64
    MeanBytesReadPSec            int64
    MaxBytesReadPSec             int64
    CurBytesWrittenPSec          int64
    SumBytesWritten              int64
    MeanBytesWrittenPSec         int64
    MaxBytesWrittenPSec          int64
    ElpsdAtmptLatencyNsQuantiles *Quantile
    ElpsdAtmptLatencyNs          *Welford
    SumMatchingResponseCodes     int
    PctMatchingResponseCodes     float32
    Sample                       Sample
    SumErrors                    int
    PctErrors                    float32
    ErrorTypes                   map[string]int
}

type Welford struct {
    s *variance.Stats
}

func NewWelford() *Welford {
    return &Welford{s: variance.New()}
}

func (w *Welford) Add(val float64) {
    w.s.Add(val)
}

func (w *Welford) Mean() float64 {
    return w.s.Mean()
}

func (w *Welford) Stddev() float64 {
    return w.s.StandardDeviation()
}

func (w *Welford) StddevPop() float64 {
    return w.s.StandardDeviationPopulation()
}

func (w *Welford) Var() float64 {
    return w.s.Variance()
}

func (w *Welford) VarPop() float64 {
    return w.s.VariancePopulation()
}

func (w *Welford) Cv() float64 {
    return w.s.StandardDeviation() / w.s.Mean()
}

func (w *Welford) Stderr() float64 {
    return w.s.StandardDeviation() / math.Sqrt(float64(w.s.NumDataValues()))
}

func (w *Welford) MarshalJSON() ([]byte, error) {
    m := make(map[string]float64)
    m["mean"] = w.Mean()
    m["stddev"] = w.Stddev()
    m["cv"] = w.Cv()
    m["stderr"] = w.Stderr()
    return json.Marshal(m)
}

type Quantile struct {
    t *tdigest.TDigest
}

func NewQuantile() *Quantile {
    return &Quantile{
        t: tdigest.New(),
    }
}

func NewQuantileWithCompression(compression float64) *Quantile {
    return &Quantile{
        t: tdigest.NewWithCompression(compression),
    }
}

func (q *Quantile) Add(val float64, weight int) *Quantile {
    q.t.Add(val, weight)
    return q
}

func (q *Quantile) Quantile(v float64) float64 {
    return q.t.Quantile(v)
}

func (q *Quantile) MarshalJSON() ([]byte, error) {
    m := make(map[string]int64)
    m["min"] = int64(math.Ceil(q.t.Quantile(0)))
    m["p10"] = int64(math.Ceil(q.t.Quantile(0.1)))
    m["p16"] = int64(math.Ceil(q.t.Quantile(0.16)))
    m["p25"] = int64(math.Ceil(q.t.Quantile(0.25)))
    m["p50"] = int64(math.Ceil(q.t.Quantile(0.50)))
    m["p75"] = int64(math.Ceil(q.t.Quantile(0.75)))
    m["p84"] = int64(math.Ceil(q.t.Quantile(0.84)))
    m["p90"] = int64(math.Ceil(q.t.Quantile(0.90)))
    m["p99"] = int64(math.Ceil(q.t.Quantile(0.99)))
    m["max"] = int64(math.Ceil(q.t.Quantile(1)))
    return json.Marshal(m)
}

func (s *ReqStats) update(atmpt ReqAtmpt, now time.Time, cfg Config) {
    s.ReqAtmpts++
    s.ElpsdNs = now.Sub(s.Start)

    s.MeanReqAtmptsPSec = int64(math.Floor(float64(s.ReqAtmpts) / s.ElpsdNs.Seconds()))

    crs := atomic.AddInt64(&s.CurReqAtmptsPSec, 1)
    if crs > s.MaxReqAtmptsPSec {
        s.MaxReqAtmptsPSec = crs
    }
    time.AfterFunc(time.Second*1, func() {
        atomic.AddInt64(&s.CurReqAtmptsPSec, -1)
    })

    crbs := atomic.AddInt64(&s.CurBytesReadPSec, atmpt.ResBytes)
    if crbs > s.MaxBytesReadPSec {
        s.MaxBytesReadPSec = crbs
    }
    time.AfterFunc(time.Second*1, func() {
        atomic.AddInt64(&s.CurBytesReadPSec, -atmpt.ResBytes)
    })

    s.SumBytesRead += atmpt.ResBytes
    s.MeanBytesReadPSec = int64(math.Floor(float64(s.SumBytesRead) / s.ElpsdNs.Seconds()))
    if s.MeanBytesReadPSec > s.MaxBytesReadPSec {
        s.MaxBytesReadPSec = s.MeanBytesReadPSec
    }

    cwbs := atomic.AddInt64(&s.CurBytesWrittenPSec, atmpt.ReqBytes)
    if cwbs > s.MaxBytesWrittenPSec {
        s.MaxBytesWrittenPSec = cwbs
    }
    time.AfterFunc(time.Second*1, func() {
        atomic.AddInt64(&s.CurBytesWrittenPSec, -atmpt.ReqBytes)
    })

    s.SumBytesWritten += atmpt.ReqBytes
    s.MeanBytesWrittenPSec = int64(math.Floor(float64(s.SumBytesWritten) / s.ElpsdNs.Seconds()))
    if s.MeanBytesWrittenPSec > s.MaxBytesWrittenPSec {
        s.MaxBytesWrittenPSec = s.MeanBytesWrittenPSec
    }
    s.ElpsdAtmptLatencyNs.Add(float64(atmpt.ElpsdNs))
    s.ElpsdAtmptLatencyNsQuantiles.Add(float64(atmpt.ElpsdNs.Nanoseconds()), 1)

    if atmpt.ResCode == cfg.Res.Code {
        s.SumMatchingResponseCodes++
    }
    s.PctMatchingResponseCodes = 100 * (float32(s.SumMatchingResponseCodes) / float32(s.ReqAtmpts))

    if atmpt.ResErr != "" {
        s.SumErrors++
        s.ErrorTypes[atmpt.ResErr]++
    }
    s.PctErrors = 100 * (float32(s.SumErrors) / float32(s.ReqAtmpts))
}

type OSOpenConns struct {
    Time      time.Time
    OpenConns int
    PID       int
}

func NewOSOpenConns(pid int) *OSOpenConns {
    return &OSOpenConns{
        Time:      time.Now(),
        OpenConns: 0,
        PID:       pid,
    }
}

func (oss *OSOpenConns) updateOpenConns(cfg Config) {
    cs, e := procspy.Connections(true)
    if e != nil {
        _ = e
    } else {
        d := 0
        for c := cs.Next(); c != nil; c = cs.Next() {
            // fixes bug where PID connections to other network infra are reported as false positive, see:
            // https://github.com/simonmittag/p0d/issues/31
            if c.PID == uint(oss.PID) {
                for _, ip := range cfg.Req.Ips {
                    if c.RemotePort == cfg.getRemotePort() &&
                        ip.Equal(c.RemoteAddress) {
                        d++
                    }
                }
            }
        }
        oss.OpenConns = d
    }
}

type OSNet struct {
    Target *speedtest.Server
    client *http.Client
}

func NewOSNet() (*OSNet, error) {
    closeIdler := &http.Client{}
    spdt := speedtest.New(speedtest.WithDoer(closeIdler))
    user, e1 := spdt.FetchUserInfo()
    if e1 != nil {
        return nil, e1
    }
    servers, e2 := spdt.FetchServers(user)
    if e2 != nil {
        return nil, e2
    }
    closeIdler.CloseIdleConnections()

    targets, e3 := servers.FindServer([]int{})
    if len(targets) == 0 || e3 != nil {
        return nil, e3
    }
    user = nil
    servers = nil

    return &OSNet{
        Target: targets[0],
        client: closeIdler,
    }, nil
}