gfcp_sess.go
// Copyright © 2021 Jeffrey H. Johnson <trnsz@pobox.com>.
// Copyright © 2015 Daniel Fu <daniel820313@gmail.com>.
// Copyright © 2019 Loki 'l0k18' Verloren <stalker.loki@protonmail.ch>.
// Copyright © 2021 Gridfinity, LLC. <admin@gridfinity.com>.
//
// All rights reserved.
//
// All use of this code is governed by the MIT license.
// The complete license is available in the LICENSE file.
package gfcp
import (
"crypto/rand"
"encoding/binary"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
)
type errTimeout struct {
error
}
func (
errTimeout,
) Timeout() bool {
return true
}
func (
errTimeout,
) Temporary() bool {
return true
}
func (
errTimeout,
) Error() string {
return "i/o timeout"
}
const (
// GFcpMtuLimit ...
GFcpMtuLimit = 9000
rxFECMulti = 3
acceptBacklog = 1024
)
const (
errBrokenPipe = "broken pipe"
errInvalidOperation = "invalid operation"
)
// KxmitBuf ...
var KxmitBuf sync.Pool
func init() {
KxmitBuf.New = func() interface{} {
return make(
[]byte,
GFcpMtuLimit,
)
}
}
type (
// UDPSession ...
UDPSession struct {
updaterIdx int // record slice index in updater
conn net.PacketConn // the underlying packet connection
GFcp *GFCP // GFCP ARQ protocol
l *Listener // pointing to the Listener object if it's been accepted by a Listener
recvbuf []byte
bufptr []byte
// FecDecoder ...
FecDecoder *FecDecoder
// FecEncoder ...
FecEncoder *FecEncoder
remote net.Addr // remote peer address
rd time.Time // read deadline
wd time.Time // write deadline
headerSize int // the header size additional to a GFCP frame
ackNoDelay bool // send ack immediately for each incoming packet(testing purpose)
writeDelay bool // delay GFcp.flush() for Write() for bulk transfer
dup int // duplicate udp packets(testing purpose)
die chan struct{} // notify current session has Closed
chReadEvent chan struct{} // notify Read() can be called without blocking
chWriteEvent chan struct{} // notify Write() can be called without blocking
chReadError chan error // notify PacketConn.Read() have an error
chWriteError chan error // notify PacketConn.Write() have an error
nonce Entropy
isClosed bool // flag the session has Closed
mu sync.Mutex
}
setReadBuffer interface {
SetReadBuffer(
bytes int,
) error
}
setWriteBuffer interface {
SetWriteBuffer(
bytes int,
) error
}
)
// newUDPSession creates a new UDP session (client or server)
func newUDPSession(
conv uint32,
dataShards,
parityShards int,
l *Listener,
conn net.PacketConn,
remote net.Addr,
) *UDPSession {
sess := new(
UDPSession,
)
sess.die = make(
chan struct{},
)
sess.nonce = new(
Nonce,
)
sess.nonce.Init()
sess.chReadEvent = make(
chan struct{},
1,
)
sess.chWriteEvent = make(
chan struct{},
1,
)
sess.chReadError = make(
chan error,
1,
)
sess.chWriteError = make(
chan error,
1,
)
sess.remote = remote
sess.conn = conn
sess.l = l
sess.recvbuf = make(
[]byte,
GFcpMtuLimit,
)
sess.FecDecoder = NewFECDecoder(
rxFECMulti*(dataShards+parityShards),
dataShards,
parityShards,
)
sess.FecEncoder = NewFECEncoder(
dataShards,
parityShards,
0,
)
if sess.FecEncoder != nil {
sess.headerSize += fecHeaderSizePlus2
}
sess.GFcp = NewGFCP(conv, func(
buf []byte,
size int,
) {
if size >= GfcpOverhead+sess.headerSize {
sess.output(
buf[:size],
)
}
})
sess.GFcp.ReserveBytes(
sess.headerSize,
)
updater.addSession(
sess,
)
if sess.l == nil {
go sess.readLoop()
atomic.AddUint64(
&DefaultSnsi.GFcpActiveOpen,
1,
)
} else {
atomic.AddUint64(
&DefaultSnsi.GFcpPassiveOpen,
1,
)
}
currestab := atomic.AddUint64(
&DefaultSnsi.GFcpNowEstablished,
1,
)
maxconn := atomic.LoadUint64(
&DefaultSnsi.GFcpMaxConn,
)
if currestab > maxconn {
atomic.CompareAndSwapUint64(
&DefaultSnsi.GFcpMaxConn,
maxconn,
currestab,
)
}
return sess
}
// Read implements net.Conn
// Function is safe for concurrent access.
func (
s *UDPSession,
) Read(
b []byte,
) (
n int,
err error,
) {
for {
s.mu.Lock()
if len(
s.bufptr,
) > 0 {
n = copy(
b,
s.bufptr,
)
s.bufptr = s.bufptr[n:]
s.mu.Unlock()
atomic.AddUint64(
&DefaultSnsi.GFcpBytesReceived,
uint64(n),
)
return n, nil
}
if s.isClosed {
s.mu.Unlock()
return 0, errors.New(
errBrokenPipe,
)
}
if size := s.GFcp.PeekSize(); size > 0 {
if len(b) >= size {
s.GFcp.Recv(
b,
)
s.mu.Unlock()
atomic.AddUint64(
&DefaultSnsi.GFcpBytesReceived,
uint64(size),
)
return size, nil
}
if cap(
s.recvbuf,
) < size {
s.recvbuf = make(
[]byte,
size,
)
}
s.recvbuf = s.recvbuf[:size]
s.GFcp.Recv(
s.recvbuf,
)
n = copy(
b,
s.recvbuf,
)
s.bufptr = s.recvbuf[n:]
s.mu.Unlock()
atomic.AddUint64(
&DefaultSnsi.GFcpBytesReceived,
uint64(n),
)
return n, nil
}
var timeout *time.Timer
var c <-chan time.Time
if !s.rd.IsZero() {
if time.Now().After(
s.rd,
) {
s.mu.Unlock()
return 0, errTimeout{}
}
delay := time.Until(
s.rd,
)
timeout = time.NewTimer(
delay,
)
c = timeout.C
}
s.mu.Unlock()
select {
case <-s.chReadEvent:
case <-c:
case <-s.die:
case err = <-s.chReadError:
if timeout != nil {
timeout.Stop()
}
return n, err
}
if timeout != nil {
timeout.Stop()
}
}
}
func (
s *UDPSession,
) Write(
b []byte,
) (
n int,
err error,
) {
return s.WriteBuffers(
[][]byte{b},
)
}
// WriteBuffers ...
func (
s *UDPSession,
) WriteBuffers(
v [][]byte,
) (
n int,
err error,
) {
for {
s.mu.Lock()
if s.isClosed {
s.mu.Unlock()
return 0,
errors.New(
errBrokenPipe,
)
}
if s.GFcp.WaitSnd() < int(s.GFcp.sndWnd) {
for _, b := range v {
n += len(
b)
for {
if len(
b,
) <= int(
s.GFcp.mss,
) {
s.GFcp.Send(
b,
)
break
}
s.GFcp.Send(
b[:s.GFcp.mss],
)
b = b[s.GFcp.mss:]
}
}
if s.GFcp.WaitSnd() >= int(
s.GFcp.sndWnd,
) || !s.writeDelay {
s.GFcp.Flush(
false,
)
}
s.mu.Unlock()
atomic.AddUint64(
&DefaultSnsi.GFcpBytesSent,
uint64(
n,
),
)
return n, nil
}
var timeout *time.Timer
var c <-chan time.Time
if !s.wd.IsZero() {
if time.Now().After(
s.wd,
) {
s.mu.Unlock()
return 0, errTimeout{}
}
delay := time.Until(
s.wd,
)
timeout = time.NewTimer(
delay,
)
c = timeout.C
}
s.mu.Unlock()
select {
case <-s.chWriteEvent:
case <-c:
case <-s.die:
case err = <-s.chWriteError:
if timeout != nil {
timeout.Stop()
}
return n, err
}
if timeout != nil {
timeout.Stop()
}
}
}
// Close ...
func (
s *UDPSession,
) Close() error {
updater.removeSession(
s,
)
if s.l != nil {
s.l.CloseSession(
s.remote,
)
}
s.mu.Lock()
defer s.mu.Unlock()
if s.isClosed {
return errors.New(
errBrokenPipe,
)
}
close(
s.die,
)
s.isClosed = true
atomic.AddUint64(
&DefaultSnsi.GFcpNowEstablished,
^uint64(
0,
),
)
if s.l == nil {
return s.conn.Close()
}
return nil
}
// LocalAddr returns the local network address.
// The address returned is shared by all invocations of LocalAddr - do not modify it.
func (
s *UDPSession,
) LocalAddr() net.Addr {
return s.conn.LocalAddr()
}
// RemoteAddr returns the remote network address.
// The address returned is shared by all invocations of RemoteAddr - do not modify it.
func (
s *UDPSession,
) RemoteAddr() net.Addr {
return s.remote
}
// SetDeadline sets a deadline associated with the listener.
// A zero time value disables a deadline.
func (
s *UDPSession,
) SetDeadline(
t time.Time,
) error {
s.mu.Lock()
defer s.mu.Unlock()
s.rd = t
s.wd = t
s.notifyReadEvent()
s.notifyWriteEvent()
return nil
}
// SetReadDeadline implements the Conn SetReadDeadline method.
func (
s *UDPSession,
) SetReadDeadline(
t time.Time,
) error {
s.mu.Lock()
defer s.mu.Unlock()
s.rd = t
s.notifyReadEvent()
return nil
}
// SetWriteDeadline implements the Conn SetWriteDeadline method.
func (
s *UDPSession,
) SetWriteDeadline(
t time.Time,
) error {
s.mu.Lock()
defer s.mu.Unlock()
s.wd = t
s.notifyWriteEvent()
return nil
}
// SetWriteDelay delays writes for bulk transfers, until the next update interval.
func (
s *UDPSession,
) SetWriteDelay(
delay bool,
) {
s.mu.Lock()
defer s.mu.Unlock()
s.writeDelay = delay
}
// SetWindowSize sets the maximum window size
func (
s *UDPSession,
) SetWindowSize(
sndwnd,
rcvwnd int,
) {
s.mu.Lock()
defer s.mu.Unlock()
s.GFcp.WndSize(
sndwnd,
rcvwnd,
)
}
// SetMtu sets the maximum transmission unit
// This size does not including UDP header itself.
func (
s *UDPSession,
) SetMtu(
mtu int,
) bool {
if mtu > GFcpMtuLimit {
return false
}
s.mu.Lock()
defer s.mu.Unlock()
s.GFcp.SetMtu(
mtu,
)
return true
}
// SetStreamMode toggles the streaming mode on or off
func (s *UDPSession) SetStreamMode(
enable bool,
) {
s.mu.Lock()
defer s.mu.Unlock()
if enable {
s.GFcp.stream = 1
} else {
s.GFcp.stream = 0
}
}
// SetACKNoDelay changes the ACK flushing option.
// If set to true, ACKs are flusghed immediately,
func (
s *UDPSession,
) SetACKNoDelay(
nodelay bool,
) {
s.mu.Lock()
defer s.mu.Unlock()
s.ackNoDelay = nodelay
}
// SetDUP duplicates UDP packets for GFcp output.
// Useful for testing, not for normal use.
func (
s *UDPSession,
) SetDUP(
dup int,
) {
s.mu.Lock()
defer s.mu.Unlock()
s.dup = dup
}
// SetNoDelay sets TCP_DELAY, for GFcp.
func (
s *UDPSession,
) SetNoDelay(
nodelay,
interval,
resend,
nc int,
) {
s.mu.Lock()
defer s.mu.Unlock()
s.GFcp.NoDelay(
nodelay,
interval,
resend,
nc,
)
}
// SetDSCP sets the 6-bit DSCP field of IP header.
// Has no effect, unless accepted by your Listener.
func (
s *UDPSession,
) SetDSCP(
dscp int,
) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.l == nil {
if nc, ok := s.conn.(net.Conn); ok {
addr, _ := net.ResolveUDPAddr(
"udp",
nc.LocalAddr().String(),
)
if addr.IP.To4() != nil {
return ipv4.NewConn(
nc,
).SetTOS(
dscp << 2,
)
}
return ipv6.NewConn(
nc,
).SetTrafficClass(
dscp,
)
}
}
return errors.New(
errInvalidOperation,
)
}
// SetReadBuffer sets the socket read buffer.
// Has no effect, unless it's accepted by your Listener.
func (
s *UDPSession,
) SetReadBuffer(
bytes int,
) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.l == nil {
if nc, ok := s.conn.(setReadBuffer); ok {
return nc.SetReadBuffer(
bytes,
)
}
}
return errors.New(
errInvalidOperation,
)
}
// SetWriteBuffer sets the socket write buffer.
// Has no effect, unless it's accepted by your Listener.
func (
s *UDPSession,
) SetWriteBuffer(
bytes int,
) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.l == nil {
if nc, ok := s.conn.(setWriteBuffer); ok {
return nc.SetWriteBuffer(
bytes,
)
}
}
return errors.New(
errInvalidOperation,
)
}
func (
s *UDPSession,
) output(
buf []byte,
) {
var ecc [][]byte
if s.FecEncoder != nil {
ecc = s.FecEncoder.Encode(
buf,
)
}
nbytes := 0
npkts := 0
for i := 0; i < s.dup+1; i++ {
if n, err := s.conn.WriteTo(
buf,
s.remote,
); err == nil {
nbytes += n
npkts++
} else {
s.notifyWriteError(
err,
)
}
}
for k := range ecc {
if n, err := s.conn.WriteTo(
ecc[k],
s.remote,
); err == nil {
nbytes += n
npkts++
} else {
s.notifyWriteError(
err,
)
}
}
atomic.AddUint64(
&DefaultSnsi.GFcpOutputPackets,
uint64(
npkts,
),
)
atomic.AddUint64(
&DefaultSnsi.GFcpOutputBytes,
uint64(
nbytes,
),
)
}
func (
s *UDPSession,
) update() (
interval time.Duration,
) {
s.mu.Lock()
waitsnd := s.GFcp.WaitSnd()
interval = time.Duration(
s.GFcp.Flush(
false,
),
) * time.Millisecond
if s.GFcp.WaitSnd() < waitsnd {
s.notifyWriteEvent()
}
s.mu.Unlock()
return
}
// GetConv ...
func (
s *UDPSession,
) GetConv() uint32 {
return s.GFcp.conv
}
func (
s *UDPSession,
) notifyReadEvent() {
select {
case s.chReadEvent <- struct{}{}:
default:
}
}
func (
s *UDPSession,
) notifyWriteEvent() {
select {
case s.chWriteEvent <- struct{}{}:
default:
}
}
func (
s *UDPSession,
) notifyWriteError(
err error,
) {
select {
case s.chWriteError <- err:
default:
}
}
func (
s *UDPSession,
) packetInput(
data []byte,
) {
s.GFcpInput(
data,
)
}
// GFcpInput ...
func (
s *UDPSession,
) GFcpInput(
data []byte,
) {
var GFcpInErrors,
fecErrs,
fecRecovered,
fecParityShards uint64
if s.FecDecoder != nil {
if len(
data,
) > fecHeaderSize {
f := FecPacket(
data,
)
if f.flag() == KTypeData || f.flag() == KTypeParity {
if f.flag() == KTypeParity {
fecParityShards++
}
s.mu.Lock()
recovers := s.FecDecoder.Decode(
f,
)
waitsnd := s.GFcp.WaitSnd()
if f.flag() == KTypeData {
if ret := s.GFcp.Input(
data[fecHeaderSizePlus2:],
true,
s.ackNoDelay,
); ret != 0 {
GFcpInErrors++
}
}
for _, r := range recovers {
if len(
r,
) >= 2 {
sz := binary.LittleEndian.Uint16(
r,
)
if int(
sz,
) <= len(
r,
) && sz >= 2 {
if ret := s.GFcp.Input(
r[2:sz],
false,
s.ackNoDelay,
); ret == 0 {
fecRecovered++
} else {
GFcpInErrors++
}
} else {
fecErrs++
}
} else {
fecErrs++
}
// TODO(jhj): Switch to pointer to avoid allocation.
KxmitBuf.Put(
r,
)
}
if n := s.GFcp.PeekSize(); n > 0 {
s.notifyReadEvent()
}
if s.GFcp.WaitSnd() < waitsnd {
s.notifyWriteEvent()
}
s.mu.Unlock()
} else {
atomic.AddUint64(
&DefaultSnsi.GFcpPreInputErrors,
1,
)
}
} else {
atomic.AddUint64(
&DefaultSnsi.GFcpInputErrors,
1,
)
}
} else {
s.mu.Lock()
waitsnd := s.GFcp.WaitSnd()
if ret := s.GFcp.Input(
data,
true,
s.ackNoDelay,
); ret != 0 {
GFcpInErrors++
}
if n := s.GFcp.PeekSize(); n > 0 {
s.notifyReadEvent()
}
if s.GFcp.WaitSnd() < waitsnd {
s.notifyWriteEvent()
}
s.mu.Unlock()
}
atomic.AddUint64(
&DefaultSnsi.GFcpInputPackets,
1,
)
atomic.AddUint64(
&DefaultSnsi.GFcpInputBytes,
uint64(
len(
data,
),
),
)
if fecParityShards > 0 {
atomic.AddUint64(
&DefaultSnsi.GFcpFECParityShards,
fecParityShards,
)
}
if GFcpInErrors > 0 {
atomic.AddUint64(
&DefaultSnsi.GFcpInputErrors,
GFcpInErrors,
)
}
if fecErrs > 0 {
atomic.AddUint64(
&DefaultSnsi.GFcpFailures,
fecErrs,
)
}
if fecRecovered > 0 {
atomic.AddUint64(
&DefaultSnsi.GFcpFECRecovered,
fecRecovered,
)
}
}
type (
// Listener ...
Listener struct {
dataShards int // FEC data shard
parityShards int // FEC parity shard
/// FecDecoder ...
FecDecoder *FecDecoder // FEC mock initialization
conn net.PacketConn // the underlying packet connection
sessions map[string]*UDPSession // all sessions accepted by this Listener
sessionLock sync.Mutex
chAccepts chan *UDPSession // Listen() backlog
chSessionClosed chan net.Addr // session close queue
headerSize int // additional header for a GFcp frame
die chan struct{} // notify when the Listener has closed
rd atomic.Value // read deadline for Accept()
wd atomic.Value
}
)
func (
l *Listener,
) packetInput(
data []byte,
addr net.Addr,
) {
l.sessionLock.Lock()
s, ok := l.sessions[addr.String()]
l.sessionLock.Unlock()
if !ok {
if len(
l.chAccepts,
) < cap(
l.chAccepts,
) {
var conv uint32
convValid := false
if l.FecDecoder != nil {
isfec := binary.LittleEndian.Uint16(
data[4:],
)
if isfec == KTypeData {
conv = binary.LittleEndian.Uint32(
data[fecHeaderSizePlus2:],
)
convValid = true
}
} else {
conv = binary.LittleEndian.Uint32(
data,
)
convValid = true
}
if convValid {
s := newUDPSession(
conv,
l.dataShards,
l.parityShards,
l,
l.conn,
addr,
)
s.GFcpInput(
data,
)
l.sessionLock.Lock()
l.sessions[addr.String()] = s
l.sessionLock.Unlock()
l.chAccepts <- s
}
}
} else {
s.GFcpInput(
data,
)
}
}
// SetReadBuffer sets the socket read buffer for the Listener.
func (
l *Listener,
) SetReadBuffer(
bytes int,
) error {
if nc, ok := l.conn.(setReadBuffer); ok {
return nc.SetReadBuffer(
bytes,
)
}
return errors.New(
errInvalidOperation,
)
}
// SetWriteBuffer sets the socket write buffer for the Listener.
func (
l *Listener,
) SetWriteBuffer(
bytes int,
) error {
if nc, ok := l.conn.(setWriteBuffer); ok {
return nc.SetWriteBuffer(
bytes,
)
}
return errors.New(
errInvalidOperation,
)
}
// SetDSCP sets the 6-bit DSCP field of IP header.
func (
l *Listener,
) SetDSCP(
dscp int,
) error {
if nc, ok := l.conn.(net.Conn); ok {
addr, _ := net.ResolveUDPAddr(
"udp",
nc.LocalAddr().String(),
)
if addr.IP.To4() != nil {
return ipv4.NewConn(
nc,
).SetTOS(
dscp << 2,
)
}
return ipv6.NewConn(
nc,
).SetTrafficClass(
dscp,
)
}
return errors.New(
errInvalidOperation,
)
}
// Accept implements the Accept method in the Listener interface.
// It waits until the next call, then returns a generic 'Conn'.
func (
l *Listener,
) Accept() (
net.Conn,
error,
) {
return l.AcceptGFCP()
}
// AcceptGFCP accepts a GFcp connection
func (
l *Listener,
) AcceptGFCP() (
*UDPSession,
error,
) {
var timeout <-chan time.Time
if tdeadline, ok := l.rd.Load().(time.Time); ok && !tdeadline.IsZero() {
timeout = time.After(
time.Since(
tdeadline,
),
)
}
select {
case <-timeout:
return nil, &errTimeout{}
case c := <-l.chAccepts:
return c, nil
case <-l.die:
return nil, errors.New(
errBrokenPipe,
)
}
}
// SetDeadline sets the deadline associated with the Listener.
// A zero value will disable all deadlines.
func (
l *Listener,
) SetDeadline(
t time.Time,
) error {
var err error
err = l.SetReadDeadline(
t,
)
if err != nil {
panic(
fmt.Sprintf(
"SetReadDeadLine failure: %v",
err,
),
)
}
err = l.SetWriteDeadline(
t,
)
if err != nil {
panic(
fmt.Sprintf(
"SetWriteDeadline failure: %v",
err,
),
)
}
return nil
}
// SetReadDeadline implements the Conn SetReadDeadline method.
func (
l *Listener,
) SetReadDeadline(
t time.Time,
) error {
l.rd.Store(
t,
)
return nil
}
// SetWriteDeadline implements the Conn SetWriteDeadline method.
func (
l *Listener,
) SetWriteDeadline(
t time.Time,
) error {
l.wd.Store(
t,
)
return nil
}
// Close stops listening on the UDP address.
// Any already accepted connections will not be closed.
func (
l *Listener,
) Close() error {
close(
l.die,
)
return l.conn.Close()
}
// CloseSession notifies the Listener when a Session is Closed.
func (
l *Listener,
) CloseSession(
remote net.Addr,
) (
ret bool,
) {
l.sessionLock.Lock()
defer l.sessionLock.Unlock()
if _, ok := l.sessions[remote.String()]; ok {
delete(
l.sessions,
remote.String(),
)
return true
}
return false
}
// Addr returns the listener's network address.
// The address returned is shared by all invocations of Addr - do not modify it.
func (
l *Listener,
) Addr() net.Addr {
return l.conn.LocalAddr()
}
// Listen listens for incoming GFcp packets addressed to our local address (laddr) via "udp"
func Listen(
laddr string,
) (
net.Listener,
error,
) {
return ListenWithOptions(
laddr,
0,
0,
)
}
// ListenWithOptions listens for incoming GFcp packets addressed to our local address (laddr) via "udp"
// Porvides for encryption, sharding, parity, and RS coding parameters to be specified.
func ListenWithOptions(
laddr string,
dataShards,
parityShards int,
) (
*Listener,
error,
) {
udpaddr,
err := net.ResolveUDPAddr(
"udp",
laddr,
)
if err != nil {
return nil,
errors.Wrap(
err,
"net.ResolveUDPAddr",
)
}
conn, err := net.ListenUDP(
"udp",
udpaddr,
)
if err != nil {
return nil,
errors.Wrap(
err,
"net.ListenUDP",
)
}
return ServeConn(
dataShards,
parityShards,
conn,
)
}
// ServeConn serves the GFcp protocol - a single packet is processed.
func ServeConn(
dataShards,
parityShards int,
conn net.PacketConn,
) (
*Listener,
error,
) {
l := new(
Listener,
)
l.conn = conn
l.sessions = make(
map[string]*UDPSession,
)
l.chAccepts = make(
chan *UDPSession,
acceptBacklog,
)
l.chSessionClosed = make(
chan net.Addr,
)
l.die = make(
chan struct{},
)
l.dataShards = dataShards
l.parityShards = parityShards
l.FecDecoder = NewFECDecoder(
rxFECMulti*(dataShards+parityShards),
dataShards,
parityShards,
)
if l.FecDecoder != nil {
l.headerSize += fecHeaderSizePlus2
}
go l.monitor()
return l, nil
}
// Dial connects to the remote address "raddr" via "udp"
func Dial(
raddr string,
) (
net.Conn,
error,
) {
return DialWithOptions(
raddr,
0,
0,
)
}
// DialWithOptions connects to the remote address "raddr" via "udp" with encryption options.
func DialWithOptions(
raddr string,
dataShards,
parityShards int,
) (
*UDPSession,
error,
) {
udpaddr, err := net.ResolveUDPAddr(
"udp",
raddr,
)
if err != nil {
return nil, errors.Wrap(
err,
"net.ResolveUDPAddr",
)
}
network := "udp4"
if udpaddr.IP.To4() == nil {
network = "udp"
}
conn, err := net.ListenUDP(
network,
nil,
)
if err != nil {
return nil, errors.Wrap(
err,
"net.DialUDP",
)
}
return NewConn(
raddr,
dataShards,
parityShards,
conn,
)
}
// NewConn establishes a session, talking GFcp over a packet connection.
func NewConn(
raddr string,
dataShards,
parityShards int,
conn net.PacketConn,
) (
*UDPSession,
error,
) {
udpaddr, err := net.ResolveUDPAddr(
"udp",
raddr,
)
if err != nil {
return nil, errors.Wrap(
err,
"net.ResolveUDPAddr",
)
}
var convid uint32
err = binary.Read(
rand.Reader,
binary.LittleEndian,
&convid,
)
if err != nil {
panic(
"binary.Read failure",
)
}
return newUDPSession(
convid,
dataShards,
parityShards,
nil,
conn,
udpaddr,
), nil
}
var refTime = time.Now()
// CurrentMs ...
func CurrentMs() uint32 {
return uint32(time.Since(refTime) / time.Millisecond)
}