johnsonjh/gfcp

View on GitHub
gfcp_fec.go

Summary

Maintainability
D
1 day
Test Coverage
// Copyright © 2021 Jeffrey H. Johnson <trnsz@pobox.com>.
// Copyright © 2015 Daniel Fu <daniel820313@gmail.com>.
// Copyright © 2019 Loki 'l0k18' Verloren <stalker.loki@protonmail.ch>.
// Copyright © 2021 Gridfinity, LLC. <admin@gridfinity.com>.
//
// All rights reserved.
//
// All use of this code is governed by the MIT license.
// The complete license is available in the LICENSE file.

package gfcp

import (
    "encoding/binary"
    "sync/atomic"

    "github.com/klauspost/reedsolomon"
)

const (
    fecHeaderSize      = 6
    fecHeaderSizePlus2 = fecHeaderSize + 2
    // KTypeData ...
    KTypeData = 0xf1
    // KTypeParity ...
    KTypeParity = 0xf2
)

// FecPacket ...
type FecPacket []byte

func (
    bts FecPacket,
) seqid() uint32 {
    return binary.LittleEndian.Uint32(
        bts,
    )
}

func (
    bts FecPacket,
) flag() uint16 {
    return binary.LittleEndian.Uint16(
        bts[4:],
    )
}

func (
    bts FecPacket,
) data() []byte {
    return bts[6:]
}

// FecDecoder ...
type FecDecoder struct {
    rxlimit      int
    dataShards   int
    parityShards int
    shardSize    int
    rx           []FecPacket
    DecodeCache  [][]byte
    flagCache    []bool
    zeros        []byte
    codec        reedsolomon.Encoder
}

// NewFECDecoder ...
func NewFECDecoder(
    rxlimit,
    dataShards,
    parityShards int,
) *FecDecoder {
    if dataShards <= 0 || parityShards <= 0 {
        return nil
    }
    if rxlimit < dataShards+parityShards {
        return nil
    }

    dec := new(
        FecDecoder,
    )
    dec.rxlimit = rxlimit
    dec.dataShards = dataShards
    dec.parityShards = parityShards
    dec.shardSize = dataShards + parityShards
    codec, err := reedsolomon.New(
        dataShards,
        parityShards,
    )
    if err != nil {
        return nil
    }
    dec.codec = codec
    dec.DecodeCache = make(
        [][]byte,
        dec.shardSize,
    )
    dec.flagCache = make(
        []bool,
        dec.shardSize,
    )
    dec.zeros = make(
        []byte,
        GFcpMtuLimit,
    )
    return dec
}

// Decode ...
func (
    dec *FecDecoder,
) Decode(
    in FecPacket,
) (
    recovered [][]byte,
) {
    n := len(
        dec.rx,
    ) - 1
    insertIdx := 0
    for i := n; i >= 0; i-- {
        if in.seqid() == dec.rx[i].seqid() {
            return nil
        } else if _itimediff(
            in.seqid(),
            dec.rx[i].seqid(),
        ) > 0 {
            insertIdx = i + 1
            break
        }
    }

    // make a copy
    pkt := FecPacket(KxmitBuf.Get().([]byte)[:len(in)])
    copy(
        pkt,
        in,
    )

    if insertIdx == n+1 {
        dec.rx = append(
            dec.rx,
            pkt,
        )
    } else {
        dec.rx = append(
            dec.rx,
            FecPacket{},
        )
        copy(
            dec.rx[insertIdx+1:],
            dec.rx[insertIdx:],
        )
        dec.rx[insertIdx] = pkt
    }

    shardBegin := pkt.seqid() - pkt.seqid()%uint32(dec.shardSize)
    shardEnd := shardBegin + uint32(dec.shardSize) - 1

    searchBegin := insertIdx - int(pkt.seqid()%uint32(dec.shardSize))
    if searchBegin < 0 {
        searchBegin = 0
    }
    searchEnd := searchBegin + dec.shardSize - 1
    if searchEnd >= len(
        dec.rx,
    ) {
        searchEnd = len(
            dec.rx,
        ) - 1
    }

    if searchEnd-searchBegin+1 >= dec.dataShards {
        var numshard, numDataShard, first, maxlen int

        shards := dec.DecodeCache
        shardsflag := dec.flagCache
        for k := range dec.DecodeCache {
            shards[k] = nil
            shardsflag[k] = false
        }

        for i := searchBegin; i <= searchEnd; i++ {
            seqid := dec.rx[i].seqid()
            if _itimediff(
                seqid,
                shardEnd,
            ) > 0 {
                break
            } else if _itimediff(
                seqid,
                shardBegin,
            ) >= 0 {
                shards[seqid%uint32(
                    dec.shardSize,
                )] = dec.rx[i].data()
                shardsflag[seqid%uint32(
                    dec.shardSize,
                )] = true
                numshard++
                if dec.rx[i].flag() == KTypeData {
                    numDataShard++
                }
                if numshard == 1 {
                    first = i
                }
                if len(
                    dec.rx[i].data(),
                ) > maxlen {
                    maxlen = len(
                        dec.rx[i].data(),
                    )
                }
            }
        }

        if numDataShard == dec.dataShards {
            dec.rx = dec.freeRange(
                first,
                numshard,
                dec.rx,
            )
        } else if numshard >= dec.dataShards {
            for k := range shards {
                if shards[k] != nil {
                    dlen := len(
                        shards[k],
                    )
                    shards[k] = shards[k][:maxlen]
                    copy(shards[k][dlen:], dec.zeros)
                } else if k < dec.dataShards {
                    shards[k] = KxmitBuf.Get().([]byte)[:0]
                }
            }
            if err := dec.codec.ReconstructData(
                shards,
            ); err == nil {
                for k := range shards[:dec.dataShards] {
                    if !shardsflag[k] {
                        recovered = append(
                            recovered,
                            shards[k],
                        )
                    }
                }
            }
            dec.rx = dec.freeRange(
                first,
                numshard,
                dec.rx,
            )
        }
    }

    if len(dec.rx) > dec.rxlimit {
        if dec.rx[0].flag() == KTypeData {
            atomic.AddUint64(
                &DefaultSnsi.GFcpFECRuntShards,
                1,
            )
        }
        dec.rx = dec.freeRange(
            0,
            1,
            dec.rx,
        )
    }
    return
}

func (
    dec *FecDecoder,
) freeRange(
    first,
    n int,
    q []FecPacket,
) []FecPacket {
    for i := first; i < first+n; i++ {
        // TODO(jhj): Switch to pointer to avoid allocation.
        KxmitBuf.Put(
            []byte(
                q[i],
            ),
        )
    }

    if first == 0 && n < (cap(q)/2) {
        return q[n:]
    }
    copy(
        q[first:],
        q[first+n:],
    )
    return q[:len(
        q,
    )-n]
}

type (
    // FecEncoder ...
    FecEncoder struct {
        dataShards    int
        parityShards  int
        shardSize     int
        paws          uint32 // Protect Against Wrapped Sequence numbers
        next          uint32 // next seqid
        shardCount    int    // count the number of datashards collected
        maxSize       int    // track maximum data length in datashard
        headerOffset  int    // FEC header offset
        payloadOffset int    // FEC payload offset
        shardCache    [][]byte
        EncodeCache   [][]byte
        zeros         []byte
        codec         reedsolomon.Encoder
    }
)

// NewFECEncoder ...
func NewFECEncoder(
    dataShards,
    parityShards,
    offset int,
) *FecEncoder {
    if dataShards <= 0 || parityShards <= 0 {
        return nil
    }
    enc := new(
        FecEncoder,
    )
    enc.dataShards = dataShards
    enc.parityShards = parityShards
    enc.shardSize = dataShards + parityShards
    enc.paws = (0xFFFFFFFF/uint32(enc.shardSize) - 1) * uint32(enc.shardSize)
    enc.headerOffset = offset
    enc.payloadOffset = enc.headerOffset + fecHeaderSize
    codec, err := reedsolomon.New(
        dataShards,
        parityShards,
    )
    if err != nil {
        return nil
    }
    enc.codec = codec
    enc.EncodeCache = make(
        [][]byte,
        enc.shardSize,
    )
    enc.shardCache = make(
        [][]byte,
        enc.shardSize,
    )
    for k := range enc.shardCache {
        enc.shardCache[k] = make(
            []byte,
            GFcpMtuLimit,
        )
    }
    enc.zeros = make(
        []byte,
        GFcpMtuLimit,
    )
    return enc
}

// Encode ...
func (
    enc *FecEncoder,
) Encode(
    b []byte,
) (
    ps [][]byte,
) {
    enc.markData(
        b[enc.headerOffset:],
    )
    binary.LittleEndian.PutUint16(
        b[enc.payloadOffset:],
        uint16(
            len(
                b[enc.payloadOffset:],
            ),
        ),
    )
    sz := len(
        b,
    )
    enc.shardCache[enc.shardCount] = enc.shardCache[enc.shardCount][:sz]
    copy(
        enc.shardCache[enc.shardCount][enc.payloadOffset:],
        b[enc.payloadOffset:],
    )
    enc.shardCount++
    if sz > enc.maxSize {
        enc.maxSize = sz
    }
    if enc.shardCount == enc.dataShards {
        for i := 0; i < enc.dataShards; i++ {
            shard := enc.shardCache[i]
            slen := len(
                shard,
            )
            copy(
                shard[slen:enc.maxSize],
                enc.zeros,
            )
        }
        cache := enc.EncodeCache
        for k := range cache {
            cache[k] = enc.shardCache[k][enc.payloadOffset:enc.maxSize]
        }
        if err := enc.codec.Encode(
            cache,
        ); err == nil {
            ps = enc.shardCache[enc.dataShards:]
            for k := range ps {
                enc.markParity(
                    ps[k][enc.headerOffset:],
                )
                ps[k] = ps[k][:enc.maxSize]
            }
        }
        enc.shardCount = 0
        enc.maxSize = 0
    }
    return
}

func (
    enc *FecEncoder,
) markData(
    data []byte,
) {
    binary.LittleEndian.PutUint32(
        data,
        enc.next,
    )
    binary.LittleEndian.PutUint16(
        data[4:],
        KTypeData,
    )
    enc.next++
}

func (
    enc *FecEncoder,
) markParity(
    data []byte,
) {
    binary.LittleEndian.PutUint32(
        data,
        enc.next,
    )
    binary.LittleEndian.PutUint16(
        data[4:],
        KTypeParity,
    )
    enc.next = (enc.next + 1) % enc.paws
}