p0d.go
package p0d
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"github.com/axiomhq/variance"
"github.com/google/uuid"
"github.com/gosuri/uilive"
"github.com/hako/durafmt"
. "github.com/logrusorgru/aurora"
"io"
"io/ioutil"
"math"
"math/rand"
"mime/multipart"
"net"
"net/http"
"net/http/httputil"
"net/url"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
)
const Version string = "v0.4.0"
const ua = "User-Agent"
const N = ""
const ct = "Content-Type"
const applicationJson = "application/json"
const multipartFormdata = "multipart/form-data"
const applicationXWWWFormUrlEncoded = "application/x-www-form-urlencoded"
const AT = "@"
var vs = fmt.Sprintf("p0d %s", Version)
var bodyTypes = []string{"POST", "PUT", "PATCH"}
type P0d struct {
ID string
Time Time
Config Config
OS OS
ReqStats *ReqStats
Output string
Interrupted bool
client map[int]*http.Client
sampleConn net.Conn
outFile *os.File
liveWriters []io.Writer
bar *ProgressBar
interrupt chan os.Signal
stopLiveWriters chan struct{}
stopThreads []chan struct{}
}
type Time struct {
Start time.Time
Stop time.Time
Phase TimerPhase
}
type OS struct {
PID int
OpenConns []OSOpenConns
MaxOpenConns int
LimitOpenFiles int64
LimitRAMBytes uint64
InetLatencyNs time.Duration
InetUlSpeedMBits float64
InetDlSpeedMBits float64
InetTestAborted bool
inetUlSpeedDoneFlag bool
inetDlSpeedDoneFlag bool
inetLatencyDoneFlag bool
inetUlSpeedDone chan struct{}
inetDlSpeedDone chan struct{}
inetLatencyDone chan struct{}
inetTestError chan struct{}
updateLock sync.Mutex
}
func (o *OS) isInetTestDone() bool {
return o.inetDlSpeedDoneFlag && o.inetUlSpeedDoneFlag && o.inetLatencyDoneFlag
}
type TimerPhase int
const (
Bootstrap TimerPhase = 1 << iota
RampUp
Main
RampDown
Draining
Drained
Done
)
type ReqAtmpt struct {
Start time.Time
Stop time.Time
ElpsdNs time.Duration
ReqBytes int64
ResCode int
ResBytes int64
ResErr string
}
func initStopThreads(cfg Config) []chan struct{} {
var v = make([]chan struct{}, 0)
for i := 0; i < cfg.Exec.Concurrency; i++ {
//make sure this channel never blocks if drain runs after stop
v = append(v, make(chan struct{}, 2))
}
return v
}
func interruptChannel() chan os.Signal {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL, syscall.SIGQUIT)
return sigs
}
func NewP0dWithValues(c int, d int, u string, h string, o string, s bool) *P0d {
hv, _ := strconv.ParseFloat(h, 32)
cfg := Config{
Req: Req{
Method: "GET",
Url: u,
FormData: make([]map[string]string, 0),
FormDataFiles: make(map[string][]byte, 0),
},
Exec: Exec{
DurationSeconds: d,
Concurrency: c,
HttpVersion: float32(hv),
SkipInetTest: s,
},
}
cfg = *cfg.validate()
_, ul := getUlimit()
return NewP0d(cfg, ul, o, d, interruptChannel())
}
func NewP0dFromFile(f string, o string) *P0d {
cfg := loadConfigFromFile(f)
cfg.File = f
cfg = cfg.validate()
_, ul := getUlimit()
return NewP0d(*cfg, ul, o, cfg.Exec.DurationSeconds, interruptChannel())
}
func NewP0d(cfg Config, ulimit int64, outputFile string, durationSecs int, interrupt chan os.Signal) *P0d {
return &P0d{
ID: createRunId(),
Time: Time{
Phase: Bootstrap,
},
Config: cfg,
OS: OS{
OpenConns: make([]OSOpenConns, 0),
LimitOpenFiles: ulimit,
MaxOpenConns: 0,
updateLock: sync.Mutex{},
InetTestAborted: false,
inetUlSpeedDone: make(chan struct{}),
inetDlSpeedDone: make(chan struct{}),
inetLatencyDone: make(chan struct{}),
inetTestError: make(chan struct{}),
},
ReqStats: &ReqStats{
ErrorTypes: make(map[string]int),
Sample: NewSample(),
ElpsdAtmptLatencyNsQuantiles: NewQuantileWithCompression(500),
ElpsdAtmptLatencyNs: &Welford{s: variance.New()},
},
Output: outputFile,
Interrupted: false,
client: cfg.scaffoldHttpClients(),
bar: &ProgressBar{
maxSecs: durationSecs,
size: 30,
chunkProps: make([]ChunkProps, 30),
},
interrupt: interrupt,
stopLiveWriters: make(chan struct{}),
stopThreads: initStopThreads(cfg),
}
}
func (p *P0d) StartTimeNow() {
now := time.Now()
p.Time.Start = now
p.ReqStats.Start = now
}
const backspace = "\x1b[%dD"
func (p *P0d) Race() {
osStatsDone := make(chan struct{}, 2)
p.initOSStats(osStatsDone)
p.detectRemoteConnSettings()
p.initLog()
defer func() {
if p.outFile != nil {
p.outFile.Close()
}
}()
p.initOutFile()
p.StartTimeNow()
p.bar.updateRampStateForTimerPhase(p.Time.Start, p)
//init timer for rampdown trigger
rampdown := make(chan struct{})
time.AfterFunc(time.Duration(p.Config.Exec.DurationSeconds-p.Config.Exec.RampSeconds)*time.Second, func() {
rampdown <- struct{}{}
})
//init timer for trigger end to totalruntime and start draining
drainer := make(chan struct{})
time.AfterFunc(time.Duration(p.Config.Exec.DurationSeconds)*time.Second, func() {
drainer <- struct{}{}
})
//init req attempts loop
ras := make(chan ReqAtmpt, 65535)
if !p.Interrupted {
//this done channel is buffered because it may be too late to signal. we don't want to block
initReqAtmptsDone := make(chan struct{}, 2)
p.initReqAtmpts(initReqAtmptsDone, ras)
p.initLiveWriterFastLoop(8)
const prefix string = ""
const indent string = " "
var comma = []byte(",\n")
drain := func() {
//this one log event renders the progress bar at 0 seconds remaining
initReqAtmptsDone <- struct{}{}
p.doLogLive()
p.Time.Stop = time.Now()
p.setTimerPhase(Draining)
//we still want to watch draining but much faster.
p.stopReqAtmptsThreads(time.Millisecond * 1)
p.stopLiveWriterFastLoop()
Drain:
for i := 0; i < 300; i++ {
if p.getOSOpenConns().OpenConns == 0 {
osStatsDone <- struct{}{}
break Drain
}
time.Sleep(time.Millisecond * 100)
p.doLogLive()
}
p.setTimerPhase(Drained)
//do this so no cur atmpts continue to be reported and all remaining decrease timers fire
time.Sleep(time.Millisecond * 1010)
atomic.StoreInt64(&p.ReqStats.CurReqAtmptsPSec, 0)
p.closeLiveWritersAndSummarize()
}
Main:
for {
select {
case <-p.interrupt:
//because CTRL+C is crazy and messes up our live log by two spaces
fmt.Fprintf(p.liveWriters[0], backspace, 2)
p.Interrupted = true
//in case of interupt we signal the inet speed test to cancel if it's still running
if !p.Config.Exec.SkipInetTest && !p.OS.isInetTestDone() {
p.OS.inetTestError <- struct{}{}
}
drain()
break Main
case <-drainer:
drain()
break Main
case <-rampdown:
p.setTimerPhase(RampDown)
p.stopReqAtmptsThreads(p.staggerThreadsDuration())
case ra := <-ras:
p.ReqStats.update(ra, ra.Stop, p.Config)
p.outFileRequestAttempt(ra, prefix, indent, comma)
}
}
}
p.setTimerPhase(Done)
osStatsDone <- struct{}{}
//adjust time stop for aborts
p.Time.Stop = time.Now()
p.finalizeOutFile()
log(Cyan("exiting").String())
}
const defMsg = "not detected"
func (p *P0d) detectRemoteConnSettings() {
c := p.Config.scaffoldHttpClientWith(1, true, p)
r := p.scaffoldHttpReq()
rr, e := c.Do(r)
if e == nil {
p.ReqStats.Sample.Server = rr.Header.Get("Server")
io.Copy(ioutil.Discard, rr.Body)
defer rr.Body.Close()
p.ReqStats.Sample.HTTPVersion = fmt.Sprintf("HTTP/%d.%d", rr.ProtoMajor, rr.ProtoMinor)
if rr.TLS != nil {
if rr.TLS.Version == tls.VersionSSL30 {
p.ReqStats.Sample.TLSVersion = "SSL3.0"
} else if rr.TLS.Version == tls.VersionTLS10 {
p.ReqStats.Sample.TLSVersion = "TLS1.0"
} else if rr.TLS.Version == tls.VersionTLS11 {
p.ReqStats.Sample.TLSVersion = "TLS1.1"
} else if rr.TLS.Version == tls.VersionTLS12 {
p.ReqStats.Sample.TLSVersion = "TLS1.2"
} else if rr.TLS.Version == tls.VersionTLS13 {
p.ReqStats.Sample.TLSVersion = "TLS1.3"
}
}
if p.sampleConn != nil {
addr, _, _ := net.SplitHostPort(p.sampleConn.RemoteAddr().String())
ip4 := net.ParseIP(addr).To4()
if ip4 != nil {
p.ReqStats.Sample.IPVersion = "IPV4"
} else {
p.ReqStats.Sample.IPVersion = "IPV6"
}
p.ReqStats.Sample.RemoteAddr = addr
p.sampleConn.Close()
}
}
c.CloseIdleConnections()
c = nil
}
func (p *P0d) initReqAtmpts(done chan struct{}, ras chan ReqAtmpt) {
//don't block because execution continues on to live updates
go func() {
bd := false
p.setTimerPhase(RampUp)
RampUp:
for i := 0; i < p.Config.Exec.Concurrency; i++ {
select {
case <-done:
bd = true
break RampUp
default:
//stagger the initialisation so we can watch ramp up live.
go p.doReqAtmpts(i, ras, p.stopThreads[i])
if p.Config.Exec.Concurrency > 1 && i < p.Config.Exec.Concurrency-1 {
time.Sleep(p.staggerThreadsDuration())
}
}
}
//we don't want to run this if we aborted above
if !bd && p.Time.Phase < Main {
MainUpdate:
for {
if p.getOSOpenConns().OpenConns >= p.Config.Exec.Concurrency {
p.setTimerPhase(Main)
break MainUpdate
}
time.Sleep(time.Millisecond * 100)
}
}
}()
}
func (p *P0d) staggerThreadsDuration() time.Duration {
return time.Duration(
float64(time.Second) * (float64(p.Config.Exec.RampSeconds) / float64(p.Config.Exec.Concurrency)),
)
}
func (p *P0d) doReqAtmpts(i int, ras chan<- ReqAtmpt, done <-chan struct{}) {
ReqAtmpt:
for {
select {
case <-done:
break ReqAtmpt
default:
}
//introduce artifical request latency
if p.Config.Exec.SpacingMillis > 0 {
time.Sleep(time.Duration(p.Config.Exec.SpacingMillis) * time.Millisecond)
}
ra := ReqAtmpt{
Start: time.Now(),
}
p.bar.updateRampStateForTimerPhase(ra.Start, p)
req := p.scaffoldHttpReq()
//measure for size before sending. We don't set content length, go does that internally
bq, _ := httputil.DumpRequest(req, true)
ra.ReqBytes = int64(len(bq))
_ = bq
//do the work and dump the response for size
res, e := p.client[i].Do(req)
if res != nil {
ra.ResCode = res.StatusCode
b, _ := httputil.DumpResponse(res, true)
ra.ResBytes = int64(len(b))
_ = b
res.Body.Close()
}
ra.Stop = time.Now()
ra.ElpsdNs = ra.Stop.Sub(ra.Start)
//report on errors
if e != nil {
em := N
Mapping:
for ek, ev := range errorMapping {
if strings.Contains(e.Error(), ek) {
em = ev
break Mapping
}
}
if em == N {
em = e.Error()
}
ra.ResErr = em
}
if len(ra.ResErr) > 0 {
p.bar.markError(ra.Stop, p)
}
//null this aggressively
req = nil
//and report back
ras <- ra
}
}
func (p *P0d) scaffoldHttpReq() *http.Request {
var body io.Reader
//multipartwriter adds a boundary
var mpContentType string
//needs to decide between url encoded, multipart form data and everything else
switch p.Config.Req.ContentType {
case applicationXWWWFormUrlEncoded:
data := url.Values{}
for _, fd := range p.Config.Req.FormData {
for k, v := range fd {
data.Add(k, v)
}
}
body = strings.NewReader(data.Encode())
case multipartFormdata:
var b bytes.Buffer
mpw := multipart.NewWriter(&b)
for _, fd := range p.Config.Req.FormData {
for k, v := range fd {
if strings.HasPrefix(k, AT) {
fw, _ := mpw.CreateFormFile(k, v)
mpContentType = mpw.FormDataContentType()
io.Copy(fw, bytes.NewReader(p.Config.Req.FormDataFiles[k]))
} else {
mpw.WriteField(k, v)
}
}
}
mpw.Close()
body = bytes.NewReader(b.Bytes())
case applicationJson:
fallthrough
default:
body = strings.NewReader(p.Config.Req.Body)
}
req, _ := http.NewRequest(p.Config.Req.Method,
p.Config.Req.Url,
body)
//set headers from config
if len(p.Config.Req.Headers) > 0 {
for _, h := range p.Config.Req.Headers {
for k, v := range h {
if k == ct && v == multipartFormdata {
req.Header.Set(k, mpContentType)
} else {
req.Header.Add(k, v)
}
}
}
}
//set user agent
req.Header.Set(ua, vs)
return req
}
func (p *P0d) stopReqAtmptsThreads(staggerThreadsDuration time.Duration) {
//again don't block because execution continues on with live udpates
go func() {
for i := 0; i < len(p.stopThreads); i++ {
if p.stopThreads[i] != nil {
//stagger the off ramp between threads so we can watch it live.
if staggerThreadsDuration > 0 {
time.Sleep(staggerThreadsDuration)
}
p.stopThreads[i] <- struct{}{}
}
}
}()
}
func (p *P0d) initLiveWriterFastLoop(n int) {
//start live logging
l0 := uilive.New()
//this prevents the writer from flushing inbetween lines. we flush manually after each iteration
l0.RefreshInterval = time.Hour * 24 * 30
l0.Start()
live := make([]io.Writer, 0)
live = append(live, l0)
for i := 0; i <= n; i++ {
live = append(live, live[0].(*uilive.Writer).Newline())
}
//do this before setting off goroutines
p.liveWriters = live
//now start live logging
go func(done chan struct{}) {
LiveWriters:
for {
select {
case <-done:
break LiveWriters
default:
p.doLogLive()
time.Sleep(time.Millisecond * 100)
}
}
}(p.stopLiveWriters)
}
func (p *P0d) stopLiveWriterFastLoop() {
p.stopLiveWriters <- struct{}{}
close(p.stopLiveWriters)
}
func (p *P0d) closeLiveWritersAndSummarize() {
//call final log manually to prevent differences between summary and what's on screen in live log.
p.doLogLive()
p.liveWriters[0].(*uilive.Writer).Stop()
p.logSummary()
}
func (p *P0d) finalizeOutFile() {
if len(p.Output) > 0 {
log("finalizing out file '%s'", Yellow(p.Output))
j, je := json.MarshalIndent(p, "", " ")
p.outFileCheckWrite(je)
_, we := p.outFile.Write(j)
p.outFileCheckWrite(we)
_, we = p.outFile.Write([]byte("]"))
p.outFileCheckWrite(we)
}
}
func (p *P0d) initLog() {
PrintLogo()
PrintVersion()
fmt.Printf("\n")
if p.Config.File != "" {
slog("config loaded from '%s'", Yellow(p.Config.File))
}
b128k := 2 << 16
wantRamBytes := uint64(p.Config.Exec.Concurrency * b128k)
ramUsagePct := (float32(wantRamBytes) / float32(p.OS.LimitRAMBytes)) * 100
var ramUsagePctPrec string
if ramUsagePct < 0.01 {
ramUsagePctPrec = "%.4f"
} else {
ramUsagePctPrec = "%.2f"
}
if p.OS.LimitRAMBytes == 0 {
msg := Red(fmt.Sprintf("unable to detect OS RAM"))
slog("%v", msg)
} else if p.OS.LimitRAMBytes < wantRamBytes {
msg := fmt.Sprintf("detected low OS RAM %s, increase to %s or reduce concurrency from %s",
Red(p.Config.byteCount(int64(p.OS.LimitRAMBytes))),
Red(p.Config.byteCount(int64(wantRamBytes))),
Red(FGroup(int64(p.Config.Exec.Concurrency))))
slog(msg)
} else {
slog("detected OS RAM: %s predicted use max %s %s",
Yellow(p.Config.byteCount(int64(p.OS.LimitRAMBytes))),
Yellow(p.Config.byteCount(int64(wantRamBytes))),
Yellow("("+fmt.Sprintf(ramUsagePctPrec, ramUsagePct)+"%)"))
}
if p.OS.LimitOpenFiles == 0 {
msg := Red(fmt.Sprintf("unable to detect OS open file limit"))
slog("%v", msg)
} else if p.OS.LimitOpenFiles <= int64(p.Config.Exec.Concurrency) {
msg := fmt.Sprintf("detected low OS open file limit %s, reduce concurrency from %s",
Red(FGroup(int64(p.OS.LimitOpenFiles))),
Red(FGroup(int64(p.Config.Exec.Concurrency))))
slog(msg)
} else {
ul, _ := getUlimit()
slog("detected local OS open file ulimit: %s", ul)
}
if !p.Config.Exec.SkipInetTest {
var unable = Red(fmt.Sprintf("unable to detect inet speed")).String()
uidelay := func() {
time.Sleep(time.Duration(100) * time.Millisecond)
}
if p.OS.InetTestAborted {
msg := unable
slog("%v", msg)
} else {
w := uilive.New()
w.RefreshInterval = time.Hour * 24 * 30
w.Start()
msg := "detecting inet ▼️ speed"
b := NewSpinnerAnim()
OSNet:
for {
select {
case <-p.interrupt:
msg = unable
fmt.Fprintf(w, backspace, 2)
w.Write([]byte(timefmt(msg)))
w.Flush()
p.OS.InetTestAborted = true
p.Interrupted = true
break OSNet
case <-p.OS.inetTestError:
msg = unable
w.Write([]byte(timefmt(msg)))
uidelay()
break OSNet
case <-p.OS.inetLatencyDone:
msg = fmt.Sprintf("detected inet ▼️ speed %s%s, ▲ speed %s%s, latency %s",
Yellow(fmt.Sprintf("%.2f", p.OS.InetDlSpeedMBits)),
Yellow("MBit/s"),
Yellow(fmt.Sprintf("%.2f", p.OS.InetUlSpeedMBits)),
Yellow("MBit/s"),
Yellow(durafmt.Parse(p.OS.InetLatencyNs).LimitFirstN(1).String()))
w.Write([]byte(timefmt(msg)))
uidelay()
break OSNet
case <-p.OS.inetUlSpeedDone:
msg = fmt.Sprintf("detected inet ▼️ speed %s%s, ▲ speed %s%s, detecting latency",
Yellow(fmt.Sprintf("%.2f", p.OS.InetDlSpeedMBits)),
Yellow("MBit/s"),
Yellow(fmt.Sprintf("%.2f", p.OS.InetUlSpeedMBits)),
Yellow("MBit/s"))
w.Write([]byte(timefmt(msg)))
uidelay()
case <-p.OS.inetDlSpeedDone:
msg = fmt.Sprintf("detected inet ▼️ speed %s%s, detecting ▲ speed",
Yellow(fmt.Sprintf("%.2f", p.OS.InetDlSpeedMBits)),
Yellow("MBit/s"))
w.Write([]byte(timefmt(msg)))
uidelay()
default:
w.Write([]byte(timefmt(msg + b.Next())))
}
w.Flush()
uidelay()
}
w.Stop()
}
}
slog("set test duration: %s",
Yellow(durafmt.Parse(time.Duration(p.Config.Exec.DurationSeconds)*time.Second).LimitFirstN(2).String()))
slog("set max concurrent TCP conn(s): %s", Yellow(FGroup(int64(p.Config.Exec.Concurrency))))
slog("set network dial timeout (inc. TLS handshake): %s",
Yellow(durafmt.Parse(time.Duration(p.Config.Exec.DialTimeoutSeconds)*time.Second).LimitFirstN(2).String()))
if p.Config.Exec.SpacingMillis > 0 {
slog("set request spacing: %s",
Yellow(durafmt.Parse(time.Duration(p.Config.Exec.SpacingMillis)*time.Millisecond).LimitFirstN(2).String()))
}
if len(p.Output) > 0 {
slog("set out file sampling rate: %s",
Yellow(strconv.FormatFloat(float64(p.Config.Exec.LogSampling), 'f', -1, 64)))
}
slog("set preferred http version: %s ",
Yellow(fmt.Sprintf("%.1f", p.Config.Exec.HttpVersion)),
)
fmt.Printf(timefmt("set URL %s (%s)"), Yellow(p.Config.Req.Url), Yellow(p.Config.Req.Method))
tv := ""
if p.ReqStats.Sample.TLSVersion == defMsg {
tv = N
} else {
tv = p.ReqStats.Sample.TLSVersion
}
sv := ""
if p.ReqStats.Sample.Server == defMsg {
sv = N
} else {
sv = p.ReqStats.Sample.Server + " "
}
slog("detected remote conn settings: %s%s%s%s%s %s %s",
Cyan(sv),
Cyan(p.ReqStats.Sample.RemoteAddr),
Cyan("["),
Cyan(p.ReqStats.Sample.IPVersion),
Cyan("]"),
Cyan(p.ReqStats.Sample.HTTPVersion),
Cyan(tv),
)
slog("starting engines: %v", Cyan(p.ID))
}
var logLiveLock = sync.Mutex{}
const conMsg = "concurrent TCP conns: %s%s%s"
var rampingUp = Cyan(" (ramping up)").String()
var rampingDown = Cyan(" (ramping down)").String()
var draining = Cyan(" (draining) ").String()
var drained = Cyan(" (drained)").String()
const httpReqSMsg = "HTTP req: %s"
const roundtripThroughputMsg = "roundtrip throughput: %s%s mean: %s%s max: %s%s"
const pctRoundTripLatency = "roundtrip latency pct10: %s pct50: %s pct90: %s pct99: %s"
const readthroughputMsg = "read throughput: %s%s mean: %s%s max: %s%s sum: %s"
const writeThroughputMsg = "write throughput: %s%s mean: %s%s max: %s%s sum: %s"
const matchingResponseCodesMsg = "matching HTTP response codes: %v"
const transportErrorsMsg = "transport errors: %v"
const maxMsg = " max: "
const perSecondMsg = "/s"
func (p *P0d) doLogLive() {
logLiveLock.Lock()
elpsd := time.Now()
lw := p.liveWriters
i := 0
fmt.Fprintf(lw[i], timefmt("%s"), p.bar.render(elpsd, p))
i++
oss := p.getOSOpenConns()
connMsg := conMsg
if p.isTimerPhase(RampUp) {
connMsg += rampingUp
} else if p.isTimerPhase(Main) {
//nothing here
} else if p.isTimerPhase(RampDown) {
connMsg += rampingDown
} else if p.isTimerPhase(Draining) {
connMsg += draining
} else if p.isTimerPhase(Drained) {
connMsg += drained
}
connMsg += maxMsg
connMsg += Magenta(FGroup(int64(p.OS.MaxOpenConns))).String()
fmt.Fprintf(lw[i], timefmt(connMsg),
Cyan(FGroup(int64(oss.OpenConns))),
Cyan("/"),
Cyan(FGroup(int64(p.Config.Exec.Concurrency))))
i++
fmt.Fprintf(lw[i], timefmt(httpReqSMsg),
Cyan(FGroup(int64(p.ReqStats.ReqAtmpts))))
i++
fmt.Fprintf(lw[i], timefmt(roundtripThroughputMsg),
Cyan(FGroup(int64(p.ReqStats.CurReqAtmptsPSec))),
Cyan(perSecondMsg),
Cyan(FGroup(int64(p.ReqStats.MeanReqAtmptsPSec))),
Cyan(perSecondMsg),
Magenta(FGroup(int64(p.ReqStats.MaxReqAtmptsPSec))),
Magenta(perSecondMsg))
i++
convertToMs := func(q *Quantile, v float64) string {
qv := q.Quantile(v)
if math.IsNaN(qv) {
qv = 0
}
c := time.Duration(int64(qv))
if c.Milliseconds() == 0 {
return FGroup(c.Microseconds()) + "μs"
} else {
return FGroup(c.Milliseconds()) + "ms"
}
}
fmt.Fprintf(lw[i], timefmt(pctRoundTripLatency),
Cyan(convertToMs(p.ReqStats.ElpsdAtmptLatencyNsQuantiles, 0.1)),
Cyan(convertToMs(p.ReqStats.ElpsdAtmptLatencyNsQuantiles, 0.5)),
Cyan(convertToMs(p.ReqStats.ElpsdAtmptLatencyNsQuantiles, 0.9)),
Cyan(convertToMs(p.ReqStats.ElpsdAtmptLatencyNsQuantiles, 0.99)),
)
i++
fmt.Fprintf(lw[i], timefmt(readthroughputMsg),
Cyan(p.Config.byteCount(int64(p.ReqStats.CurBytesReadPSec))),
Cyan(perSecondMsg),
Cyan(p.Config.byteCount(int64(p.ReqStats.MeanBytesReadPSec))),
Cyan(perSecondMsg),
Magenta(p.Config.byteCount(int64(p.ReqStats.MaxBytesReadPSec))),
Magenta(perSecondMsg),
Cyan(p.Config.byteCount(p.ReqStats.SumBytesRead)))
i++
fmt.Fprintf(lw[i], timefmt(writeThroughputMsg),
Cyan(p.Config.byteCount(int64(p.ReqStats.CurBytesWrittenPSec))),
Cyan(perSecondMsg),
Cyan(p.Config.byteCount(int64(p.ReqStats.MeanBytesWrittenPSec))),
Cyan(perSecondMsg),
Magenta(p.Config.byteCount(int64(p.ReqStats.MaxBytesWrittenPSec))),
Magenta(perSecondMsg),
Cyan(p.Config.byteCount(p.ReqStats.SumBytesWritten)))
i++
mrc := Cyan(fmt.Sprintf("%s (%s%%)",
FGroup(int64(p.ReqStats.SumMatchingResponseCodes)),
fmt.Sprintf("%.2f", math.Floor(float64(p.ReqStats.PctMatchingResponseCodes*100))/100)))
fmt.Fprintf(lw[i], timefmt(matchingResponseCodesMsg), mrc)
i++
tte := fmt.Sprintf("%s (%s%%)",
FGroup(int64(p.ReqStats.SumErrors)),
fmt.Sprintf("%.2f", math.Ceil(float64(p.ReqStats.PctErrors*100))/100))
if p.ReqStats.SumErrors > 0 {
fmt.Fprintf(lw[i], timefmt(transportErrorsMsg), Red(tte))
} else {
fmt.Fprintf(lw[i], timefmt(transportErrorsMsg), Cyan(tte))
}
//need to flush manually here to keep stdout updated
lw[0].(*uilive.Writer).Flush()
logLiveLock.Unlock()
}
func (p *P0d) logSummary() {
for k, v := range p.ReqStats.ErrorTypes {
pctv := 100 * (float32(v) / float32(p.ReqStats.ReqAtmpts))
err := Red(fmt.Sprintf(" - error: [%s]: %s/%s (%s%%)", k,
FGroup(int64(v)),
FGroup(int64(p.ReqStats.ReqAtmpts)),
fmt.Sprintf("%.2f", math.Ceil(float64(pctv*100))/100)))
logv(err)
}
}
func (p *P0d) initOutFile() {
var oe error
if len(p.Output) > 0 {
p.outFile, oe = os.Create(p.Output)
p.outFileCheckWrite(oe)
_, we := p.outFile.Write([]byte("["))
p.outFileCheckWrite(we)
}
}
func (p *P0d) outFileCheckWrite(e error) {
if e != nil {
fmt.Println(e)
msg := Red(fmt.Sprintf("unable to write to output file %s", p.Output))
logv(msg)
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}
}
func (p *P0d) outFileRequestAttempt(ra ReqAtmpt, prefix string, indent string, comma []byte) {
if len(p.Output) > 0 {
rand.Seed(time.Now().UnixNano())
//only sample a subset of requests
if rand.Float64() < p.Config.Exec.LogSampling {
j, je := json.MarshalIndent(ra, prefix, indent)
p.outFileCheckWrite(je)
_, we := p.outFile.Write(j)
p.outFileCheckWrite(we)
_, we = p.outFile.Write(comma)
p.outFileCheckWrite(we)
}
}
}
func (p *P0d) initOSStats(done chan struct{}) {
p.OS.PID = os.Getpid()
if !p.Config.Exec.SkipInetTest {
go p.getOSINetSpeed(30)
}
_, p.OS.LimitOpenFiles = getUlimit()
p.OS.LimitRAMBytes = getRAMBytes()
go func() {
OSStats:
for {
select {
case <-done:
break OSStats
default:
p.doOSOpenConns()
time.Sleep(time.Millisecond * 100)
}
}
}()
}
func (p *P0d) getOSINetSpeed(maxWaitSeconds int) {
abort := func(target *OSNet, contextCancel func()) {
if !(p.OS.isInetTestDone()) {
if contextCancel != nil {
contextCancel()
}
p.OS.InetTestAborted = true
p.OS.inetTestError <- struct{}{}
if target.client != nil {
target.client.CloseIdleConnections()
}
}
}
osn, e := NewOSNet()
if e == nil {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
time.AfterFunc(time.Duration(maxWaitSeconds)*time.Second, func() {
abort(osn, cancel)
})
e2 := osn.Target.DownloadTestContext(ctx, true)
if e2 == nil {
p.OS.InetDlSpeedMBits = osn.Target.DLSpeed
p.OS.inetDlSpeedDoneFlag = true
p.OS.inetDlSpeedDone <- struct{}{}
} else {
abort(osn, cancel)
}
e3 := osn.Target.UploadTestContext(ctx, true)
if e3 == nil {
p.OS.InetUlSpeedMBits = osn.Target.ULSpeed
p.OS.inetUlSpeedDoneFlag = true
p.OS.inetUlSpeedDone <- struct{}{}
} else {
abort(osn, cancel)
}
e1 := osn.Target.PingTestContext(ctx)
if e1 == nil {
p.OS.InetLatencyNs = osn.Target.Latency
p.OS.inetLatencyDoneFlag = true
p.OS.inetLatencyDone <- struct{}{}
} else {
abort(osn, cancel)
}
osn.client.CloseIdleConnections()
} else {
abort(osn, nil)
}
}
func (p *P0d) doOSOpenConns() {
p.OS.updateLock.Lock()
oss := NewOSOpenConns(p.OS.PID)
oss.updateOpenConns(p.Config)
//we only append this value to the array if the number of open conns has changed since last time.
if oss.OpenConns != p.getOSOpenConns().OpenConns {
p.OS.OpenConns = append(p.OS.OpenConns, *oss)
if oss.OpenConns > p.OS.MaxOpenConns {
p.OS.MaxOpenConns = oss.OpenConns
}
}
p.OS.updateLock.Unlock()
}
func (p *P0d) getOSOpenConns() OSOpenConns {
//first time this runs OSS Stats may not have been initialized
if len(p.OS.OpenConns) == 0 {
oss := *NewOSOpenConns(os.Getpid())
oss.OpenConns = 0
return oss
} else {
return p.OS.OpenConns[len(p.OS.OpenConns)-1]
}
}
func (p *P0d) setTimerPhase(phase TimerPhase) {
if phase > p.Time.Phase {
p.Time.Phase = phase
}
}
func (p *P0d) isTimerPhase(phase TimerPhase) bool {
return p.Time.Phase == phase
}
func createRunId() string {
uid, _ := uuid.NewRandom()
return fmt.Sprintf("p0d-%s-race-%s", Version, uid)
}