stats.go
package j8a
import (
"fmt"
"github.com/hako/durafmt"
"github.com/rs/zerolog/log"
"github.com/shirou/gopsutil/process"
"github.com/simonmittag/procspy"
"net"
"os"
"runtime/pprof"
"strconv"
"strings"
"sync"
"syscall"
"time"
)
type sample struct {
pid int32
cpuPc float64
mPc float32
rssBytes uint64
vmsBytes uint64
swapBytes uint64
time time.Time
dwnOpenTcpConns uint64
dwnMaxOpenTcpConns uint64
upOpenTcpConns uint64
upMaxOpenTcpConns uint64
threads int
ulimit uint64
}
type growthRate struct {
v float64
high bool
}
type samples []sample
const cpuSampleMilliSeconds = 2000
const logSamplerSleepSeconds = 60
const historySamplerSleepSeconds = 3600
const historyMaxSamples = 24
const growthRateThreshold float64 = 2.0
var procStatsLock sync.Mutex
var procHistory samples
const pid = "pid"
const pidCPUCorePct = "pidCpuCorePct"
const pidMemPct = "pidMemPct"
const pidRssBytes = "pidMemRssBytes"
const pidVmsBytes = "pidMemVmsBytes"
const pidSwapBytes = "pidMemSwapBytes"
const pidDwnOpenTcpConns = "pidDwnOpenTcpConns"
const pidDwnMaxOpenTcpConns = "pidDwnMaxOpenTcpConns"
const pidUpOpenTcpConns = "pidUpOpenTcpConns"
const pidUpMaxOpenTcpConns = "pidUpMaxOpenTcpConns"
const pidOSThreads = "pidOSThreads"
const pidOSUlimit = "pidOSUlimit"
const serverPerformance = "server performance"
const pcd2f = "%.2f"
const rssMemIncrease = "RSS memory increase for previous %s with high factor >=%s, monitor actively."
func (s sample) log() {
log.Info().
Int32(pid, s.pid).
Str(pidCPUCorePct, fmt.Sprintf(pcd2f, s.cpuPc)).
Str(pidMemPct, fmt.Sprintf(pcd2f, s.mPc)).
Uint64(pidDwnOpenTcpConns, s.dwnOpenTcpConns).
Uint64(pidDwnMaxOpenTcpConns, s.dwnMaxOpenTcpConns).
Uint64(pidUpOpenTcpConns, s.upOpenTcpConns).
Uint64(pidUpMaxOpenTcpConns, s.upMaxOpenTcpConns).
Uint64(pidRssBytes, s.rssBytes).
Uint64(pidVmsBytes, s.vmsBytes).
Uint64(pidSwapBytes, s.swapBytes).
Int(pidOSThreads, s.threads).
Uint64(pidOSUlimit, s.ulimit).
Msg(serverPerformance)
}
func (samples *samples) append(s sample) {
l := len(*samples)
if l >= historyMaxSamples {
(*samples)[0] = *new(sample)
*samples = (*samples)[1:]
}
*samples = append(*samples, s)
}
func (samples *samples) log() []growthRate {
var growthRates = make([]growthRate, len(*samples))
for l := len(*samples) - 1; l >= 0; l-- {
if (*samples)[l].pid == 0 {
//effectively does nothing to log here because of insufficient data.
return growthRates
}
}
for l := len(*samples) - 1; l >= 0; l-- {
growthRates[l].v = float64((*samples)[l].rssBytes) / float64((*samples)[0].rssBytes)
if growthRates[l].v >= growthRateThreshold {
growthRates[l].high = true
}
}
High:
for m := len(*samples) - 1; m >= 0; m-- {
if growthRates[m].high {
log.Info().
Msgf(rssMemIncrease,
durafmt.Parse(time.Duration(time.Second*historySamplerSleepSeconds*historyMaxSamples)).LimitFirstN(1).String(),
fmt.Sprintf("%.1f", growthRateThreshold))
break High
}
}
return growthRates
}
func (rt *Runtime) getSample(proc *process.Process) sample {
procStatsLock.Lock()
var threadProfile = pprof.Lookup("threadcreate")
cpuPc, _ := proc.Percent(time.Millisecond * cpuSampleMilliSeconds)
mPc, _ := proc.MemoryPercent()
mInfo, _ := proc.MemoryInfo()
var rLimit syscall.Rlimit
e := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit)
var ulimit uint64 = 0
if e == nil {
ulimit = rLimit.Cur
}
cs, e := rt.FindUpConns()
if e != nil {
//if this fails, skip upconns and continue with other sample data.
_ = e
} else {
d := rt.CountUpConns(proc, cs, rt.LookUpResourceIps())
rt.ConnectionWatcher.SetUp(uint64(d))
rt.ConnectionWatcher.UpdateMaxUp(uint64(d))
}
_ = cs
procStatsLock.Unlock()
return sample{
pid: proc.Pid,
cpuPc: cpuPc,
mPc: mPc,
rssBytes: mInfo.RSS,
vmsBytes: mInfo.VMS,
swapBytes: mInfo.Swap,
time: time.Now(),
dwnOpenTcpConns: rt.ConnectionWatcher.DwnCount(),
dwnMaxOpenTcpConns: rt.ConnectionWatcher.DwnMaxCount(),
upOpenTcpConns: rt.ConnectionWatcher.UpCount(),
upMaxOpenTcpConns: rt.ConnectionWatcher.UpMaxCount(),
threads: threadProfile.Count(),
ulimit: ulimit,
}
}
func (rt *Runtime) FindUpConns() (procspy.ConnIter, error) {
cs, e := procspy.Connections(true)
return cs, e
}
func (rt *Runtime) CountUpConns(proc *process.Process, cs procspy.ConnIter, ips map[string][]net.IP) int {
d := 0
UpConn:
for c := cs.Next(); c != nil; c = cs.Next() {
if c.PID == uint(proc.Pid) {
for _, v := range rt.Config.Resources {
for _, r := range v {
rup, _ := strconv.Atoi(r.URL.Port)
if c.RemotePort == uint16(rup) {
for _, ip := range ips[r.URL.Host] {
if ip.Equal(c.RemoteAddress) {
d++
continue UpConn
}
}
}
}
}
}
}
return d
}
func (rt *Runtime) LookUpResourceIps() map[string][]net.IP {
var ips = make(map[string][]net.IP)
for _, v := range rt.Resources {
for _, r := range v {
is := make([]net.IP, 1)
h := strings.TrimLeft(r.URL.Host, "[")
h = strings.TrimRight(h, "]")
is[0] = net.ParseIP(h)
if is[0] == nil {
is, _ = net.LookupIP(r.URL.Host)
}
ips[r.URL.Host] = is
}
}
return ips
}
// log proc samples infinite loop
func (rt *Runtime) logRuntimeStats(proc *process.Process) {
go func() {
for {
rt.getSample(proc).log()
time.Sleep(time.Second * logSamplerSleepSeconds)
}
}()
go func() {
procHistory = make(samples, historyMaxSamples)
lazy := rt.getSample(proc)
for k := 0; k < historyMaxSamples; k++ {
procHistory[k] = lazy
}
for {
procHistory.append(rt.getSample(proc))
time.Sleep(time.Second * historySamplerSleepSeconds)
}
}()
go func() {
for {
time.Sleep(time.Second * historySamplerSleepSeconds)
procHistory.log()
}
}()
}
const uptimeMicros = "uptimeMicros"
func (rt *Runtime) logUptime() {
go func() {
for {
upNanos := time.Since(rt.Start)
if upNanos > time.Second*10 {
uptime := durafmt.Parse(upNanos).LimitFirstN(1).String()
log.Info().
Int(pid, os.Getpid()).
Int64(uptimeMicros, int64(upNanos/1000)).
Msgf(fmt.Sprintf("server upTime is %s", uptime))
}
time.Sleep(time.Hour * 24)
}
}()
}