gfcp.go
// Package gfcp - A Fast and Reliable ARQ Protocol
//
// Copyright © 2021 Jeffrey H. Johnson <trnsz@pobox.com>.
// Copyright © 2015 Daniel Fu <daniel820313@gmail.com>.
// Copyright © 2019 Loki 'l0k18' Verloren <stalker.loki@protonmail.ch>.
// Copyright © 2021 Gridfinity, LLC. <admin@gridfinity.com>.
//
// All rights reserved.
//
// All use of this code is governed by the MIT license.
// The complete license is available in the LICENSE file.
package gfcp
import (
"encoding/binary"
"math"
"runtime/debug"
"sync/atomic"
gfcpLegal "go4.org/legal"
)
// Gfcp protocol constants
const (
GfcpRtoNdl = 20 // GfcpRtoNdl: NoDelay min RTO
GfcpRtoMin = 120 // GfcpRtoMin: Regular min RTO
GfcpRtoDef = 340
GfcpRtoMax = 60000
GfcpCmdPush = 81 // GfcpCmdPush: Push data
GfcpCmdAck = 82 // GfcpCmdAck: Ack
GfcpCmdWask = 83 // GfcpCmdWask: Get Window Size
GfcpCmdWins = 84 // GfcpCmdWins: Set window Size
GfcpAskSend = 1 // GfcpAskSend: Need to send GfcpCmdWask
GfcpAskTell = 2 // GfcpAskTell: Need to send GfcpCmdWins
GfcpWndSnd = 32
GfcpWndRcv = 32
GfcpMtuDef = 1480
GfcpAckFast = 3
GfcpInterval = 100
GfcpOverhead = 24
GfcpDeadLink = 20
GfcpThreshInit = 2
GfcpThreshMin = 2
GfcpProbeInit = 7000 // 7s initial probe window
GfcpProbeLimit = 102000 // 120s hard probe timeout
)
type outputCallback func(
buf []byte,
size int,
)
func gfcpEncode8u(
p []byte,
c byte,
) []byte {
p[0] = c
return p[1:]
}
func gfcpDecode8u(
p []byte,
c *byte,
) []byte {
*c = p[0]
return p[1:]
}
func gfcpEncode16u(
p []byte,
w uint16,
) []byte {
binary.LittleEndian.PutUint16(
p,
w,
)
return p[2:]
}
func gfcpDecode16u(
p []byte,
w *uint16,
) []byte {
*w = binary.LittleEndian.Uint16(
p,
)
return p[2:]
}
func gfcpEncode32u(
p []byte,
l uint32,
) []byte {
binary.LittleEndian.PutUint32(
p,
l,
)
return p[4:]
}
func gfcpDecode32u(
p []byte,
l *uint32,
) []byte {
*l = binary.LittleEndian.Uint32(
p,
)
return p[4:]
}
func _imin(
a,
b uint32,
) uint32 {
if a <= b {
return a
}
return b
}
func _imax(
a,
b uint32,
) uint32 {
if a >= b {
return a
}
return b
}
func _ibound(
lower,
middle,
upper uint32,
) uint32 {
return _imin(
_imax(
lower,
middle,
),
upper,
)
}
func _itimediff(
later,
earlier uint32,
) int32 {
return (int32)(later - earlier)
}
// Segment structure
type Segment struct {
conv uint32
cmd uint8
frg uint8
wnd uint16
ts uint32
sn uint32
una uint32
rto uint32
Kxmit uint32
GFcpResendTs uint32
fastack uint32
acked uint32
data []byte
}
func (
GFcpSeg *Segment,
) encode(
ptr []byte,
) []byte {
ptr = gfcpEncode32u(
ptr,
GFcpSeg.conv,
)
ptr = gfcpEncode8u(
ptr,
GFcpSeg.cmd,
)
ptr = gfcpEncode8u(
ptr,
GFcpSeg.frg,
)
ptr = gfcpEncode16u(
ptr,
GFcpSeg.wnd,
)
ptr = gfcpEncode32u(
ptr,
GFcpSeg.ts,
)
ptr = gfcpEncode32u(
ptr,
GFcpSeg.sn,
)
ptr = gfcpEncode32u(
ptr,
GFcpSeg.una,
)
ptr = gfcpEncode32u(
ptr, uint32(len(
GFcpSeg.data,
)))
atomic.AddUint64(
&DefaultSnsi.GFcpOutputSegments,
1,
)
return ptr
}
// GFCP primary structure
type GFCP struct {
conv, mtu, mss, state uint32
sndUna, sndNxt, rcvNxt uint32
ssthresh uint32
rxRttVar, rxSrtt int32
rxRto, rxMinRto uint32
sndWnd, rcvWnd, rmtWnd, cwnd, probe uint32
interval, tsFlush uint32
nodelay, updated uint32
tsProbe, probeWait uint32
deadLink, incr uint32
fastresend int32
nocwnd, stream int32
sndQueue []Segment
rcvQueue []Segment
SndBuf []Segment
rcvBuf []Segment
acklist []ackItem
buffer []byte
reserved int
output outputCallback
}
type ackItem struct {
sn uint32
ts uint32
}
// NewGFCP creates a new GFcp control object.
func NewGFCP(
conv uint32,
output outputCallback,
) *GFCP {
GFcp := new(
GFCP,
)
GFcp.conv = conv
GFcp.sndWnd = GfcpWndSnd
GFcp.rcvWnd = GfcpWndRcv
GFcp.rmtWnd = GfcpWndRcv
GFcp.mtu = GfcpMtuDef
GFcp.mss = GFcp.mtu - GfcpOverhead
GFcp.buffer = make(
[]byte,
GFcp.mtu,
)
GFcp.rxRto = GfcpRtoDef
GFcp.rxMinRto = GfcpRtoMin
GFcp.interval = GfcpInterval
GFcp.tsFlush = GfcpInterval
GFcp.ssthresh = GfcpThreshInit
GFcp.deadLink = GfcpDeadLink
GFcp.output = output
return GFcp
}
func (
GFcp *GFCP,
) newSegment(
size int,
) (
GFcpSeg Segment,
) {
GFcpSeg.data = KxmitBuf.Get().([]byte)[:size]
return
}
func (
GFcp *GFCP,
) delSegment(
GFcpSeg *Segment,
) {
if GFcpSeg.data != nil {
KxmitBuf.Put(
// TODO(jhj): Switch to pointer to avoid allocation
GFcpSeg.data,
)
GFcpSeg.data = nil
}
}
// ReserveBytes keeps 'n' bytes from the beginning of buffering.
// Output callbacks use this to return 'false' if 'n' >= 'mss'.
func (
GFcp *GFCP,
) ReserveBytes(
n int,
) bool {
if n >= int(
GFcp.mtu-GfcpOverhead,
) || n < 0 {
return false
}
GFcp.reserved = n
GFcp.mss = GFcp.mtu - GfcpOverhead - uint32(
n,
)
return true
}
// PeekSize checks the size of next message in the receive queue.
func (
GFcp *GFCP,
) PeekSize() (
length int,
) {
if len(
GFcp.rcvQueue,
) == 0 {
return -1
}
GFcpSeg := &GFcp.rcvQueue[0]
if GFcpSeg.frg == 0 {
return len(
GFcpSeg.data,
)
}
if len(
GFcp.rcvQueue,
) < int(
GFcpSeg.frg+1,
) {
return -1
}
for k := range GFcp.rcvQueue {
GFcpSeg := &GFcp.rcvQueue[k]
length += len(
GFcpSeg.data,
)
if GFcpSeg.frg == 0 {
break
}
}
return
}
// Recv is upper level recviver; returns size or EAGAIN on error.
func (
GFcp *GFCP,
) Recv(
buffer []byte,
) (
n int,
) {
if len(
GFcp.rcvQueue,
) == 0 {
return -1
}
peeksize := GFcp.PeekSize()
if peeksize < 0 {
return -2
}
if peeksize > len(
buffer,
) {
return -3
}
var fastRecovery bool
if len(
GFcp.rcvQueue,
) >= int(
GFcp.rcvWnd,
) {
fastRecovery = true
}
count := 0
for k := range GFcp.rcvQueue {
GFcpSeg := &GFcp.rcvQueue[k]
copy(
buffer,
GFcpSeg.data,
)
buffer = buffer[len(
GFcpSeg.data,
):]
n += len(
GFcpSeg.data,
)
count++
GFcp.delSegment(
GFcpSeg,
)
if GFcpSeg.frg == 0 {
break
}
}
if count > 0 {
GFcp.rcvQueue = GFcp.removeFront(
GFcp.rcvQueue,
count,
)
}
count = 0
for k := range GFcp.rcvBuf {
GFcpSeg := &GFcp.rcvBuf[k]
if GFcpSeg.sn == GFcp.rcvNxt && len(
GFcp.rcvQueue,
) < int(
GFcp.rcvWnd,
) {
GFcp.rcvNxt++
count++
} else {
break
}
}
if count > 0 {
GFcp.rcvQueue = append(
GFcp.rcvQueue,
GFcp.rcvBuf[:count]...,
)
GFcp.rcvBuf = GFcp.removeFront(
GFcp.rcvBuf,
count,
)
}
if len(
GFcp.rcvQueue,
) < int(
GFcp.rcvWnd,
) && fastRecovery {
GFcp.probe |= GfcpAskTell
}
return
}
// Send is upper level sender, returns <0 on error.
func (
GFcp *GFCP,
) Send(
buffer []byte,
) int {
var count int
if len(
buffer,
) == 0 {
return -1
}
if GFcp.stream != 0 {
n := len(
GFcp.sndQueue,
)
if n > 0 {
GFcpSeg := &GFcp.sndQueue[n-1]
if len(
GFcpSeg.data,
) < int(
GFcp.mss,
) {
capacity := int(
GFcp.mss,
) - len(
GFcpSeg.data,
)
extend := capacity
if len(
buffer,
) < capacity {
extend = len(
buffer,
)
}
oldlen := len(
GFcpSeg.data,
)
GFcpSeg.data = GFcpSeg.data[:oldlen+extend]
copy(
GFcpSeg.data[oldlen:],
buffer,
)
buffer = buffer[extend:]
}
}
if len(
buffer,
) == 0 {
return 0
}
}
if len(
buffer,
) <= int(
GFcp.mss,
) {
count = 1
} else {
count = (len(
buffer,
) + int(
GFcp.mss,
) - 1) / int(
GFcp.mss,
)
}
if count > 255 {
return -2
}
if count == 0 {
count = 1
}
for i := 0; i < count; i++ {
var size int
if len(
buffer,
) > int(
GFcp.mss,
) {
size = int(
GFcp.mss,
)
} else {
size = len(
buffer,
)
}
GFcpSeg := GFcp.newSegment(
size,
)
copy(
GFcpSeg.data,
buffer[:size],
)
if GFcp.stream == 0 {
GFcpSeg.frg = uint8(
count - i - 1,
)
} else {
GFcpSeg.frg = 0
}
GFcp.sndQueue = append(
GFcp.sndQueue,
GFcpSeg,
)
buffer = buffer[size:]
}
return 0
}
func (
GFcp *GFCP,
) updateAck(
rtt int32,
) {
var rto uint32
if GFcp.rxSrtt == 0 {
GFcp.rxSrtt = rtt
GFcp.rxRttVar = rtt >> 1
} else {
delta := rtt - GFcp.rxSrtt
GFcp.rxSrtt += delta >> 3
if delta < 0 {
delta = -delta
}
if rtt < GFcp.rxSrtt-GFcp.rxRttVar {
GFcp.rxRttVar += (delta - GFcp.rxRttVar) >> 5
} else {
GFcp.rxRttVar += (delta - GFcp.rxRttVar) >> 2
}
}
rto = uint32(
GFcp.rxSrtt,
) + _imax(
GFcp.interval,
uint32(
GFcp.rxRttVar,
)<<2)
GFcp.rxRto = _ibound(
GFcp.rxMinRto,
rto,
GfcpRtoMax,
)
}
func (
GFcp *GFCP,
) shrinkBuf() {
if len(
GFcp.SndBuf,
) > 0 {
GFcpSeg := &GFcp.SndBuf[0]
GFcp.sndUna = GFcpSeg.sn
} else {
GFcp.sndUna = GFcp.sndNxt
}
}
func (
GFcp *GFCP,
) parseAck(
sn uint32,
) {
if _itimediff(
sn,
GFcp.sndUna,
) < 0 || _itimediff(
sn,
GFcp.sndNxt,
) >= 0 {
return
}
for k := range GFcp.SndBuf {
GFcpSeg := &GFcp.SndBuf[k]
if sn == GFcpSeg.sn {
GFcpSeg.acked = 1
GFcp.delSegment(
GFcpSeg,
)
break
}
if _itimediff(
sn,
GFcpSeg.sn,
) < 0 {
break
}
}
}
func (
GFcp *GFCP,
) parseFastack(
sn, ts uint32,
) {
if _itimediff(
sn,
GFcp.sndUna,
) < 0 || _itimediff(
sn,
GFcp.sndNxt,
) >= 0 {
return
}
for k := range GFcp.SndBuf {
GFcpSeg := &GFcp.SndBuf[k]
if _itimediff(
sn,
GFcpSeg.sn,
) < 0 {
break
} else if sn != GFcpSeg.sn && _itimediff(
GFcpSeg.ts,
ts,
) <= 0 {
GFcpSeg.fastack++
}
}
}
func (
GFcp *GFCP,
) parseUna(
una uint32,
) {
count := 0
for k := range GFcp.SndBuf {
GFcpSeg := &GFcp.SndBuf[k]
if _itimediff(
una,
GFcpSeg.sn,
) > 0 {
GFcp.delSegment(
GFcpSeg,
)
count++
} else {
break
}
}
if count > 0 {
GFcp.SndBuf = GFcp.removeFront(
GFcp.SndBuf,
count,
)
}
}
func (
GFcp *GFCP,
) ackPush(
sn,
ts uint32,
) {
GFcp.acklist = append(
GFcp.acklist,
ackItem{
sn,
ts,
})
}
func (
GFcp *GFCP,
) parseData(
newGFcpSeg Segment,
) bool {
sn := newGFcpSeg.sn
if _itimediff(
sn,
GFcp.rcvNxt+GFcp.rcvWnd,
) >= 0 ||
_itimediff(
sn,
GFcp.rcvNxt,
) < 0 {
return true
}
n := len(
GFcp.rcvBuf,
) - 1
insertIdx := 0
repeat := false
for i := n; i >= 0; i-- {
GFcpSeg := &GFcp.rcvBuf[i]
if GFcpSeg.sn == sn {
repeat = true
break
}
if _itimediff(
sn,
GFcpSeg.sn,
) > 0 {
insertIdx = i + 1
break
}
}
if !repeat {
dataCopy := KxmitBuf.Get().([]byte)[:len(newGFcpSeg.data)]
copy(
dataCopy,
newGFcpSeg.data,
)
newGFcpSeg.data = dataCopy
if insertIdx == n+1 {
GFcp.rcvBuf = append(
GFcp.rcvBuf,
newGFcpSeg,
)
} else {
GFcp.rcvBuf = append(
GFcp.rcvBuf,
Segment{},
)
copy(
GFcp.rcvBuf[insertIdx+1:],
GFcp.rcvBuf[insertIdx:],
)
GFcp.rcvBuf[insertIdx] = newGFcpSeg
}
}
count := 0
for k := range GFcp.rcvBuf {
GFcpSeg := &GFcp.rcvBuf[k]
if GFcpSeg.sn == GFcp.rcvNxt && len(
GFcp.rcvQueue,
) < int(
GFcp.rcvWnd,
) {
GFcp.rcvNxt++
count++
} else {
break
}
}
if count > 0 {
GFcp.rcvQueue = append(
GFcp.rcvQueue,
GFcp.rcvBuf[:count]...,
)
GFcp.rcvBuf = GFcp.removeFront(
GFcp.rcvBuf,
count,
)
}
return repeat
}
// Input receives a (low-level) UDP packet, and determinines if
// a full packet has been processsed (not by the FEC algorithm)
func (
GFcp *GFCP,
) Input(
data []byte,
regular,
ackNoDelay bool,
) int {
sndUna := GFcp.sndUna
if len(
data,
) < GfcpOverhead {
return -1
}
var latest uint32
var flag int
var inSegs uint64
for {
var ts,
sn,
length,
una,
conv uint32
var wnd uint16
var cmd,
frg uint8
if len(
data,
) < int(
GfcpOverhead,
) {
break
}
data = gfcpDecode32u(
data,
&conv,
)
if conv != GFcp.conv {
return -1
}
data = gfcpDecode8u(
data,
&cmd,
)
data = gfcpDecode8u(
data,
&frg,
)
data = gfcpDecode16u(
data,
&wnd,
)
data = gfcpDecode32u(
data,
&ts,
)
data = gfcpDecode32u(
data,
&sn,
)
data = gfcpDecode32u(
data,
&una,
)
data = gfcpDecode32u(
data,
&length,
)
if len(
data,
) < int(
length,
) {
return -2
}
if cmd != GfcpCmdPush && cmd != GfcpCmdAck &&
cmd != GfcpCmdWask && cmd != GfcpCmdWins {
return -3
}
if regular {
GFcp.rmtWnd = uint32(
wnd,
)
}
GFcp.parseUna(
una,
)
GFcp.shrinkBuf()
if cmd == GfcpCmdAck {
GFcp.parseAck(
sn,
)
GFcp.parseFastack(
sn,
ts,
)
flag |= 1
latest = ts
} else if cmd == GfcpCmdPush {
repeat := true
if _itimediff(
sn,
GFcp.rcvNxt+GFcp.rcvWnd,
) < 0 {
GFcp.ackPush(
sn,
ts,
)
if _itimediff(
sn,
GFcp.rcvNxt,
) >= 0 {
var GFcpSeg Segment
GFcpSeg.conv = conv
GFcpSeg.cmd = cmd
GFcpSeg.frg = frg
GFcpSeg.wnd = wnd
GFcpSeg.ts = ts
GFcpSeg.sn = sn
GFcpSeg.una = una
GFcpSeg.data = data[:length]
repeat = GFcp.parseData(
GFcpSeg,
)
}
}
if regular && repeat {
atomic.AddUint64(
&DefaultSnsi.GFcpDupSegments,
1,
)
}
} else if cmd == GfcpCmdWask {
GFcp.probe |= GfcpAskTell
//} else if cmd == GfcpCmdWins {
// XXX(jhj) ??? FUCK YOU CHINKS
} else {
return -3
}
inSegs++
data = data[length:]
}
atomic.AddUint64(
&DefaultSnsi.GFcpInputSegments,
inSegs,
)
if flag != 0 && regular {
current := CurrentMs()
if _itimediff(
current,
latest,
) >= 0 {
GFcp.updateAck(
_itimediff(
current,
latest,
),
)
}
}
if GFcp.nocwnd == 0 {
if _itimediff(
GFcp.sndUna,
sndUna,
) > 0 {
if GFcp.cwnd < GFcp.rmtWnd {
mss := GFcp.mss
if GFcp.cwnd < GFcp.ssthresh {
GFcp.cwnd++
GFcp.incr += mss
} else {
if GFcp.incr < mss {
GFcp.incr = mss
}
GFcp.incr += (mss*mss)/GFcp.incr + (mss / 16)
if (GFcp.cwnd+1)*mss <= GFcp.incr {
GFcp.cwnd++
}
}
if GFcp.cwnd > GFcp.rmtWnd {
GFcp.cwnd = GFcp.rmtWnd
GFcp.incr = GFcp.rmtWnd * mss
}
}
}
}
if ackNoDelay && len(
GFcp.acklist,
) > 0 {
GFcp.Flush(
true,
)
}
return 0
}
func (
GFcp *GFCP,
) wndUnused() uint16 {
if len(
GFcp.rcvQueue,
) < int(GFcp.rcvWnd) {
return uint16(
int(
GFcp.rcvWnd,
) - len(
GFcp.rcvQueue,
),
)
}
return 0
}
// Flush ...
func (
GFcp *GFCP,
) Flush(
ackOnly bool,
) uint32 {
var GFcpSeg Segment
GFcpSeg.conv = GFcp.conv
GFcpSeg.cmd = GfcpCmdAck
GFcpSeg.wnd = GFcp.wndUnused()
GFcpSeg.una = GFcp.rcvNxt
buffer := GFcp.buffer
ptr := buffer[GFcp.reserved:]
makeSpace := func(
space int,
) {
size := len(
buffer,
) - len(
ptr,
)
if size+space > int(
GFcp.mtu,
) {
GFcp.output(
buffer,
size,
)
ptr = buffer[GFcp.reserved:]
}
}
FlushBuffer := func() {
size := len(
buffer,
) - len(
ptr,
)
if size > GFcp.reserved {
GFcp.output(
buffer,
size,
)
}
}
for i, ack := range GFcp.acklist {
makeSpace(
GfcpOverhead,
)
if ack.sn >= GFcp.rcvNxt || len(
GFcp.acklist,
)-1 == i {
GFcpSeg.sn,
GFcpSeg.ts = ack.sn,
ack.ts
ptr = GFcpSeg.encode(
ptr,
)
}
}
GFcp.acklist = GFcp.acklist[0:0]
if ackOnly {
FlushBuffer()
return GFcp.interval
}
if GFcp.rmtWnd == 0 {
current := CurrentMs()
if GFcp.probeWait == 0 {
GFcp.probeWait = GfcpProbeInit
GFcp.tsProbe = current + GFcp.probeWait
} else if _itimediff(
current,
GFcp.tsProbe,
) >= 0 {
if GFcp.probeWait < GfcpProbeInit {
GFcp.probeWait = GfcpProbeInit
}
GFcp.probeWait += GFcp.probeWait / 2
if GFcp.probeWait > GfcpProbeLimit {
GFcp.probeWait = GfcpProbeLimit
}
GFcp.tsProbe = current + GFcp.probeWait
GFcp.probe |= GfcpAskSend
}
}
GFcp.tsProbe = 0
GFcp.probeWait = 0
if (GFcp.probe & GfcpAskSend) != 0 {
GFcpSeg.cmd = GfcpCmdWask
makeSpace(
GfcpOverhead,
)
ptr = GFcpSeg.encode(
ptr,
)
}
if (GFcp.probe & GfcpAskTell) != 0 {
GFcpSeg.cmd = GfcpCmdWins
makeSpace(
GfcpOverhead,
)
ptr = GFcpSeg.encode(
ptr,
)
}
GFcp.probe = 0
cwnd := _imin(
GFcp.sndWnd,
GFcp.rmtWnd,
)
if GFcp.nocwnd == 0 {
cwnd = _imin(
GFcp.cwnd,
cwnd,
)
}
newSegsCount := 0
for k := range GFcp.sndQueue {
if _itimediff(
GFcp.sndNxt,
GFcp.sndUna+cwnd,
) >= 0 {
break
}
newGFcpSeg := GFcp.sndQueue[k]
newGFcpSeg.conv = GFcp.conv
newGFcpSeg.cmd = GfcpCmdPush
newGFcpSeg.sn = GFcp.sndNxt
GFcp.SndBuf = append(
GFcp.SndBuf,
newGFcpSeg,
)
GFcp.sndNxt++
newSegsCount++
}
if newSegsCount > 0 {
GFcp.sndQueue = GFcp.removeFront(
GFcp.sndQueue,
newSegsCount,
)
}
resent := uint32(
GFcp.fastresend,
)
if GFcp.fastresend <= 0 {
resent = 0xFFFFFFFF
}
current := CurrentMs()
var change,
lostSegs,
fastGFcpRestransmittedSegments,
earlyGFcpRestransmittedSegments uint64
minrto := int32(
GFcp.interval,
)
ref := GFcp.SndBuf[:len(
GFcp.SndBuf,
)]
for k := range ref {
Segment := &ref[k]
needsend := false
if Segment.acked == 1 {
continue
}
if Segment.Kxmit == 0 {
needsend = true
Segment.rto = GFcp.rxRto
Segment.GFcpResendTs = current + Segment.rto
} else if _itimediff(
current,
Segment.GFcpResendTs,
) >= 0 {
needsend = true
if GFcp.nodelay == 0 {
Segment.rto += GFcp.rxRto
} else {
Segment.rto += GFcp.rxRto / 2
}
Segment.GFcpResendTs = current + Segment.rto
lostSegs++
} else if Segment.fastack >= resent {
needsend = true
Segment.fastack = 0
Segment.rto = GFcp.rxRto
Segment.GFcpResendTs = current + Segment.rto
change++
fastGFcpRestransmittedSegments++
} else if Segment.fastack > 0 && newSegsCount == 0 {
needsend = true
Segment.fastack = 0
Segment.rto = GFcp.rxRto
Segment.GFcpResendTs = current + Segment.rto
change++
earlyGFcpRestransmittedSegments++
}
if needsend {
current = CurrentMs()
Segment.Kxmit++
Segment.ts = current
Segment.wnd = GFcpSeg.wnd
Segment.una = GFcpSeg.una
need := GfcpOverhead + len(
Segment.data,
)
makeSpace(
need,
)
ptr = Segment.encode(
ptr,
)
copy(
ptr,
Segment.data,
)
ptr = ptr[len(
Segment.data,
):]
if Segment.Kxmit >= GFcp.deadLink {
GFcp.state = 0xFFFFFFFF
}
}
if rto := _itimediff(
Segment.GFcpResendTs,
current,
); rto > 0 && rto < minrto {
minrto = rto
}
}
FlushBuffer()
sum := lostSegs
if lostSegs > 0 {
atomic.AddUint64(
&DefaultSnsi.GFcpLostSegments,
lostSegs,
)
}
if fastGFcpRestransmittedSegments > 0 {
atomic.AddUint64(
&DefaultSnsi.FastGFcpRestransmittedSegments,
fastGFcpRestransmittedSegments,
)
sum += fastGFcpRestransmittedSegments
}
if earlyGFcpRestransmittedSegments > 0 {
atomic.AddUint64(
&DefaultSnsi.EarlyGFcpRestransmittedSegments,
earlyGFcpRestransmittedSegments,
)
sum += earlyGFcpRestransmittedSegments
}
if sum > 0 {
atomic.AddUint64(
&DefaultSnsi.GFcpRestransmittedSegments,
sum,
)
}
if GFcp.nocwnd == 0 {
if change > 0 {
inflight := GFcp.sndNxt - GFcp.sndUna
GFcp.ssthresh = inflight / 2
if GFcp.ssthresh < GfcpThreshMin {
GFcp.ssthresh = GfcpThreshMin
}
GFcp.cwnd = GFcp.ssthresh + resent
GFcp.incr = GFcp.cwnd * GFcp.mss
}
if lostSegs > 0 {
GFcp.ssthresh = cwnd / 2
if GFcp.ssthresh < GfcpThreshMin {
GFcp.ssthresh = GfcpThreshMin
}
GFcp.cwnd = 1
GFcp.incr = GFcp.mss
}
if GFcp.cwnd < 1 {
GFcp.cwnd = 1
GFcp.incr = GFcp.mss
}
}
return uint32(
minrto,
)
}
// Update is called repeatedly, 10ms to 100ms, queried via gfcp_check
// without gfcp_input or _send executing, returning timestamp in ms.
func (
GFcp *GFCP,
) Update() {
var slap int32
current := CurrentMs()
if GFcp.updated == 0 {
GFcp.updated = 1
GFcp.tsFlush = current
}
slap = _itimediff(
current,
GFcp.tsFlush,
)
if slap >= 10000 || slap < -10000 {
GFcp.tsFlush = current
slap = 0
}
if slap >= 0 {
GFcp.tsFlush += GFcp.interval
if _itimediff(
current,
GFcp.tsFlush,
) >= 0 {
GFcp.tsFlush = current + GFcp.interval
}
GFcp.Flush(
false,
)
}
}
// Check function helps determine when to invoke an gfcp_update.
// It returns when you should invoke gfcp_update, in milliseconds,
// if there is no gfcp_input or _send calling. You may repeatdly
// call gfcp_update instead of update, to reduce most unnacessary
// gfcp_update invocations. This function may be used to schedule
// gfcp_updates, when implementing an epoll-like mechanism, or for
// optimizing an gfcp_update loop handling massive GFcp connections.
func (
GFcp *GFCP,
) Check() uint32 {
current := CurrentMs()
tsFlush := GFcp.tsFlush
tmFlush := int32(
math.MaxInt32,
)
tmPacket := int32(
math.MaxInt32,
)
minimal := uint32(
0,
)
if GFcp.updated == 0 {
return current
}
if _itimediff(
current,
tsFlush,
) >= 10000 ||
_itimediff(
current,
tsFlush,
) < -10000 {
tsFlush = current
}
if _itimediff(
current,
tsFlush,
) >= 0 {
return current
}
tmFlush = _itimediff(
tsFlush,
current,
)
for k := range GFcp.SndBuf {
GFcpSeg := &GFcp.SndBuf[k]
diff := _itimediff(
GFcpSeg.GFcpResendTs,
current,
)
if diff <= 0 {
return current
}
if diff < tmPacket {
tmPacket = diff
}
}
minimal = uint32(
tmPacket,
)
if tmPacket >= tmFlush {
minimal = uint32(
tmFlush,
)
}
if minimal >= GFcp.interval {
minimal = GFcp.interval
}
return current + minimal
}
// SetMtu changes MTU size.
func (
GFcp *GFCP,
) SetMtu(
mtu int,
) int {
if mtu < 50 || mtu < GfcpOverhead {
return -1
}
if GFcp.reserved >= int(
GFcp.mtu-GfcpOverhead,
) || GFcp.reserved < 0 {
return -1
}
buffer := make(
[]byte,
mtu,
)
/*if buffer == nil {
return -2
}*/ // XXX(jhj): buffer can't be nil?
GFcp.mtu = uint32(
mtu,
)
GFcp.mss = GFcp.mtu - GfcpOverhead - uint32(
GFcp.reserved,
)
GFcp.buffer = buffer
return 0
}
// NoDelay options:
// * fastest: gfcp_nodelay(GFcp, 1, 20, 2, 1)
// * nodelay: 0: disable (default), 1: enable
// * interval: internal update timer interval in milliseconds, defaults to 100ms
// * resend: 0: disable fast resends (default), 1: enable fast resends
// * nc: 0: normal congestion control (default), 1: disable congestion control
func (
GFcp *GFCP,
) NoDelay(
nodelay,
interval,
resend,
nc int,
) int {
if nodelay >= 0 {
GFcp.nodelay = uint32(
nodelay,
)
if nodelay != 0 {
GFcp.rxMinRto = GfcpRtoNdl
} else {
GFcp.rxMinRto = GfcpRtoMin
}
}
if interval >= 0 {
if interval > 5000 {
interval = 5000
} else if interval < 10 {
interval = 10
}
GFcp.interval = uint32(
interval,
)
}
if resend >= 0 {
GFcp.fastresend = int32(
resend,
)
}
if nc >= 0 {
GFcp.nocwnd = int32(
nc,
)
}
return 0
}
// WndSize sets maximum window size (efaults: sndwnd=32 and rcvwnd=32)
func (
GFcp *GFCP,
) WndSize(
sndwnd,
rcvwnd int,
) int {
if sndwnd > 0 {
GFcp.sndWnd = uint32(
sndwnd,
)
}
if rcvwnd > 0 {
GFcp.rcvWnd = uint32(
rcvwnd,
)
}
return 0
}
// WaitSnd shows how many packets are queued to be sent
func (
GFcp *GFCP,
) WaitSnd() int {
return len(
GFcp.SndBuf,
) + len(
GFcp.sndQueue,
)
}
func (
GFcp *GFCP,
) removeFront(
q []Segment,
n int,
) []Segment {
if n > cap(
q,
)/2 {
newn := copy(
q,
q[n:],
)
return q[:newn]
}
return q[n:]
}
func init() {
debug.SetGCPercent(
180,
)
gfcpLegal.RegisterLicense(
"\nThe MIT License (MIT)\n\nCopyright © 2021 Jeffrey H. Johnson <trnsz@pobox.com>.\nCopyright © 2015 Daniel Fu <daniel820313@gmail.com>.\nCopyright © 2019 Loki 'l0k18' Verloren <stalker.loki@protonmail.ch>.\nCopyright © 2021 Gridfinity, LLC. <admin@gridfinity.com>.\n\nPermission is hereby granted, free of charge, to any person obtaining a copy\nof this software and associated documentation files (the \"Software\"), to deal\nin the Software without restriction, including, without limitation, the rights\nto use, copy, modify, merge, publish, distribute, sub-license, and/or sell\ncopies of the Software, and to permit persons to whom the Software is\nfurnished to do so, subject to the following conditions:\n\nThe above copyright notice, and this permission notice, shall be\nincluded in all copies, or substantial portions, of the Software.\n\nTHE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\nIMPLIED, INCLUDING, BUT NOT LIMITED TO, THE WARRANTIES OF MERCHANTABILITY,\nFITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. IN NO EVENT SHALL THE\nAUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER\nLIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\nOUT OF, OR IN CONNECTION WITH THE SOFTWARE, OR THE USE OR OTHER DEALINGS IN\nTHE SOFTWARE.\n",
)
}