johnsonjh/jleveldb

View on GitHub
manualtest/dbstress/main.go

Summary

Maintainability
F
5 days
Test Coverage
package main

import (
    crand "crypto/rand"
    "encoding/binary"
    "flag"
    "fmt"
    "log"
    rand "math/rand"
    "net/http"
    _ "net/http/pprof"
    "os"
    "os/signal"
    "path"
    "runtime"
    "strconv"
    "strings"
    "sync"
    "sync/atomic"
    "syscall"
    "time"

    "github.com/johnsonjh/jleveldb/leveldb"
    "github.com/johnsonjh/jleveldb/leveldb/errors"
    "github.com/johnsonjh/jleveldb/leveldb/filter"
    "github.com/johnsonjh/jleveldb/leveldb/opt"
    "github.com/johnsonjh/jleveldb/leveldb/storage"
    "github.com/johnsonjh/jleveldb/leveldb/table"
    "github.com/johnsonjh/jleveldb/leveldb/util"
)

var (
    dbPath                 = path.Join(os.TempDir(), "goleveldb-testdb")
    openFilesCacheCapacity = 500
    keyLen                 = 63
    valueLen               = 256
    numKeys                = arrayInt{100000, 1332, 531, 1234, 9553, 1024, 35743}
    httpProf               = "127.0.0.1:5454"
    transactionProb        = 0.5
    enableBlockCache       = false
    enableCompression      = false
    enableBufferPool       = false

    wg         = new(sync.WaitGroup)
    done, fail uint32

    bpool *util.BufferPool
)

type arrayInt []int

func (a arrayInt) String() string {
    var str string
    for i, n := range a {
        if i > 0 {
            str += ","
        }
        str += strconv.Itoa(n)
    }
    return str
}

func (a *arrayInt) Set(str string) error {
    var na arrayInt
    for _, s := range strings.Split(str, ",") {
        s = strings.TrimSpace(s)
        if s != "" {
            n, err := strconv.Atoi(s)
            if err != nil {
                return err
            }
            na = append(na, n)
        }
    }
    *a = na
    return nil
}

func init() {
    flag.StringVar(&dbPath, "db", dbPath, "testdb path")
    flag.IntVar(&openFilesCacheCapacity, "openfilescachecap", openFilesCacheCapacity, "open files cache capacity")
    flag.IntVar(&keyLen, "keylen", keyLen, "key length")
    flag.IntVar(&valueLen, "valuelen", valueLen, "value length")
    flag.Var(&numKeys, "numkeys", "num keys")
    flag.StringVar(&httpProf, "httpprof", httpProf, "http pprof listen addr")
    flag.Float64Var(&transactionProb, "transactionprob", transactionProb, "probablity of writes using transaction")
    flag.BoolVar(&enableBufferPool, "enablebufferpool", enableBufferPool, "enable buffer pool")
    flag.BoolVar(&enableBlockCache, "enableblockcache", enableBlockCache, "enable block cache")
    flag.BoolVar(&enableCompression, "enablecompression", enableCompression, "enable block compression")
}

func randomData(dst []byte, ns, prefix byte, i uint32, dataLen int) []byte {
    if dataLen < (2+4+4)*2+4 {
        panic("dataLen is too small")
    }
    if cap(dst) < dataLen {
        dst = make([]byte, dataLen)
    } else {
        dst = dst[:dataLen]
    }
    half := (dataLen - 4) / 2
    if _, err := crand.Reader.Read(dst[2 : half-8]); err != nil {
        panic(err)
    }
    dst[0] = ns
    dst[1] = prefix
    binary.LittleEndian.PutUint32(dst[half-8:], i)
    binary.LittleEndian.PutUint32(dst[half-8:], i)
    binary.LittleEndian.PutUint32(dst[half-4:], util.NewCRC(dst[:half-4]).Value())
    full := half * 2
    copy(dst[half:full], dst[:half])
    if full < dataLen-4 {
        if _, err := crand.Reader.Read(dst[full : dataLen-4]); err != nil {
            panic(err)
        }
    }
    binary.LittleEndian.PutUint32(dst[dataLen-4:], util.NewCRC(dst[:dataLen-4]).Value())
    return dst
}

func dataSplit(data []byte) (data0, data1 []byte) {
    n := (len(data) - 4) / 2
    return data[:n], data[n : n+n]
}

func dataNS(data []byte) byte {
    return data[0]
}

func dataPrefix(data []byte) byte {
    return data[1]
}

func dataI(data []byte) uint32 {
    return binary.LittleEndian.Uint32(data[(len(data)-4)/2-8:])
}

func dataChecksum(data []byte) (uint32, uint32) {
    checksum0 := binary.LittleEndian.Uint32(data[len(data)-4:])
    checksum1 := util.NewCRC(data[:len(data)-4]).Value()
    return checksum0, checksum1
}

func dataPrefixSlice(ns, prefix byte) *util.Range {
    return util.BytesPrefix([]byte{ns, prefix})
}

func dataNsSlice(ns byte) *util.Range {
    return util.BytesPrefix([]byte{ns})
}

type testingStorage struct {
    storage.Storage
}

func (ts *testingStorage) scanTable(fd storage.FileDesc, checksum bool) (corrupted bool) {
    r, err := ts.Open(fd)
    if err != nil {
        log.Fatal(err)
    }
    defer r.Close()

    size, err := r.Seek(0, os.SEEK_END)
    if err != nil {
        log.Fatal(err)
    }

    o := &opt.Options{
        DisableLargeBatchTransaction: true,
        Strict:                       opt.NoStrict,
    }
    if checksum {
        o.Strict = opt.StrictBlockChecksum | opt.StrictReader
    }
    tr, err := table.NewReader(r, size, fd, nil, bpool, o)
    if err != nil {
        log.Fatal(err)
    }
    defer tr.Release()

    checkData := func(i int, t string, data []byte) bool {
        if len(data) == 0 {
            panic(fmt.Sprintf("[%v] nil data: i=%d t=%s", fd, i, t))
        }

        checksum0, checksum1 := dataChecksum(data)
        if checksum0 != checksum1 {
            atomic.StoreUint32(&fail, 1)
            atomic.StoreUint32(&done, 1)
            corrupted = true

            data0, data1 := dataSplit(data)
            data0c0, data0c1 := dataChecksum(data0)
            data1c0, data1c1 := dataChecksum(data1)
            log.Printf("FATAL: [%v] Corrupted data i=%d t=%s (%#x != %#x): %x(%v) vs %x(%v)",
                fd, i, t, checksum0, checksum1, data0, data0c0 == data0c1, data1, data1c0 == data1c1)
            return true
        }
        return false
    }

    iter := tr.NewIterator(nil, nil)
    defer iter.Release()
    for i := 0; iter.Next(); i++ {
        ukey, _, kt, kerr := parseIkey(iter.Key())
        if kerr != nil {
            atomic.StoreUint32(&fail, 1)
            atomic.StoreUint32(&done, 1)
            corrupted = true

            log.Printf("FATAL: [%v] Corrupted ikey i=%d: %v", fd, i, kerr)
            return
        }
        if checkData(i, "key", ukey) {
            return
        }
        if kt == ktVal && checkData(i, "value", iter.Value()) {
            return
        }
    }
    if err := iter.Error(); err != nil {
        if errors.IsCorrupted(err) {
            atomic.StoreUint32(&fail, 1)
            atomic.StoreUint32(&done, 1)
            corrupted = true

            log.Printf("FATAL: [%v] Corruption detected: %v", fd, err)
        } else {
            log.Fatal(err)
        }
    }

    return
}

func (ts *testingStorage) Remove(fd storage.FileDesc) error {
    if atomic.LoadUint32(&fail) == 1 {
        return nil
    }

    if fd.Type == storage.TypeTable {
        if ts.scanTable(fd, true) {
            return nil
        }
    }
    return ts.Storage.Remove(fd)
}

type latencyStats struct {
    mark          time.Time
    dur, min, max time.Duration
    num           int
}

func (s *latencyStats) start() {
    s.mark = time.Now()
}

func (s *latencyStats) record(n int) {
    if s.mark.IsZero() {
        panic("not started")
    }
    dur := time.Since(s.mark)
    dur1 := dur / time.Duration(n)
    if dur1 < s.min || s.min == 0 {
        s.min = dur1
    }
    if dur1 > s.max {
        s.max = dur1
    }
    s.dur += dur
    s.num += n
    s.mark = time.Time{}
}

func (s *latencyStats) ratePerSec() int {
    durSec := s.dur / time.Second
    if durSec > 0 {
        return s.num / int(durSec)
    }
    return s.num
}

func (s *latencyStats) avg() time.Duration {
    if s.num > 0 {
        return s.dur / time.Duration(s.num)
    }
    return 0
}

func (s *latencyStats) add(x *latencyStats) {
    if x.min < s.min || s.min == 0 {
        s.min = x.min
    }
    if x.max > s.max {
        s.max = x.max
    }
    s.dur += x.dur
    s.num += x.num
}

func main() {
    flag.Parse()

    var src cryptoSource
    rnd := rand.New(src)

    if enableBufferPool {
        bpool = util.NewBufferPool(opt.DefaultBlockSize + 128)
    }

    log.Printf("Test DB stored at %q", dbPath)
    if httpProf != "" {
        log.Printf("HTTP pprof listening at %q", httpProf)
        runtime.SetBlockProfileRate(1)
        go func() {
            if err := http.ListenAndServe(httpProf, nil); err != nil {
                log.Fatalf("HTTPPROF: %v", err)
            }
        }()
    }

    runtime.GOMAXPROCS(runtime.NumCPU())

    os.RemoveAll(dbPath)
    stor, err := storage.OpenFile(dbPath, false)
    if err != nil {
        log.Fatal(err)
    }
    tstor := &testingStorage{stor}
    defer tstor.Close()

    fatalf := func(err error, format string, v ...interface{}) {
        atomic.StoreUint32(&fail, 1)
        atomic.StoreUint32(&done, 1)
        log.Printf("FATAL: "+format, v...)
        if err != nil && errors.IsCorrupted(err) {
            cerr := err.(*errors.ErrCorrupted)
            if !cerr.Fd.Zero() && cerr.Fd.Type == storage.TypeTable {
                log.Print("FATAL: corruption detected, scanning...")
                if !tstor.scanTable(storage.FileDesc{Type: storage.TypeTable, Num: cerr.Fd.Num}, false) {
                    log.Printf("FATAL: unable to find corrupted key/value pair in table %v", cerr.Fd)
                }
            }
        }
        runtime.Goexit()
    }

    if openFilesCacheCapacity == 0 {
        openFilesCacheCapacity = -1
    }
    o := &opt.Options{
        OpenFilesCacheCapacity: openFilesCacheCapacity,
        DisableBufferPool:      !enableBufferPool,
        DisableBlockCache:      !enableBlockCache,
        ErrorIfExist:           true,
        Compression:            opt.NoCompression,
        Filter:                 filter.NewBloomFilter(10),
    }
    if enableCompression {
        o.Compression = opt.DefaultCompression
    }

    db, err := leveldb.Open(tstor, o)
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    var (
        mu              = &sync.Mutex{}
        gGetStat        = &latencyStats{}
        gIterStat       = &latencyStats{}
        gWriteStat      = &latencyStats{}
        gTrasactionStat = &latencyStats{}
        startTime       = time.Now()

        writeReq    = make(chan *leveldb.Batch)
        writeAck    = make(chan error)
        writeAckAck = make(chan struct{})
    )

    go func() {
        for b := range writeReq {

            var err error
            if rnd.Float64() < transactionProb {
                log.Print("> Write using transaction")
                gTrasactionStat.start()
                var tr *leveldb.Transaction
                if tr, err = db.OpenTransaction(); err == nil {
                    if err = tr.Write(b, nil); err == nil {
                        if err = tr.Commit(); err == nil {
                            gTrasactionStat.record(b.Len())
                        }
                    } else {
                        tr.Discard()
                    }
                }
            } else {
                gWriteStat.start()
                if err = db.Write(b, nil); err == nil {
                    gWriteStat.record(b.Len())
                }
            }
            writeAck <- err
            <-writeAckAck
        }
    }()

    go func() {
        for {
            time.Sleep(3 * time.Second)

            log.Print("------------------------")

            log.Printf("> Elapsed=%v", time.Since(startTime))
            mu.Lock()
            log.Printf("> GetLatencyMin=%v GetLatencyMax=%v GetLatencyAvg=%v GetRatePerSec=%d",
                gGetStat.min, gGetStat.max, gGetStat.avg(), gGetStat.ratePerSec())
            log.Printf("> IterLatencyMin=%v IterLatencyMax=%v IterLatencyAvg=%v IterRatePerSec=%d",
                gIterStat.min, gIterStat.max, gIterStat.avg(), gIterStat.ratePerSec())
            log.Printf("> WriteLatencyMin=%v WriteLatencyMax=%v WriteLatencyAvg=%v WriteRatePerSec=%d",
                gWriteStat.min, gWriteStat.max, gWriteStat.avg(), gWriteStat.ratePerSec())
            log.Printf("> TransactionLatencyMin=%v TransactionLatencyMax=%v TransactionLatencyAvg=%v TransactionRatePerSec=%d",
                gTrasactionStat.min, gTrasactionStat.max, gTrasactionStat.avg(), gTrasactionStat.ratePerSec())
            mu.Unlock()

            cachedblock, _ := db.GetProperty("leveldb.cachedblock")
            openedtables, _ := db.GetProperty("leveldb.openedtables")
            alivesnaps, _ := db.GetProperty("leveldb.alivesnaps")
            aliveiters, _ := db.GetProperty("leveldb.aliveiters")
            blockpool, _ := db.GetProperty("leveldb.blockpool")
            writeDelay, _ := db.GetProperty("leveldb.writedelay")
            ioStats, _ := db.GetProperty("leveldb.iostats")
            compCount, _ := db.GetProperty("leveldb.compcount")
            log.Printf("> BlockCache=%s OpenedTables=%s AliveSnaps=%s AliveIter=%s BlockPool=%q WriteDelay=%q IOStats=%q CompCount=%q",
                cachedblock, openedtables, alivesnaps, aliveiters, blockpool, writeDelay, ioStats, compCount)
            log.Print("------------------------")
        }
    }()

    for ns, numKey := range numKeys {
        func(ns, numKey int) {
            log.Printf("[%02d] STARTING: numKey=%d", ns, numKey)

            keys := make([][]byte, numKey)
            for i := range keys {
                keys[i] = randomData(nil, byte(ns), 1, uint32(i), keyLen)
            }

            wg.Add(1)
            go func() {
                var wi uint32
                defer func() {
                    log.Printf("[%02d] WRITER DONE #%d", ns, wi)
                    wg.Done()
                }()

                var (
                    b       = new(leveldb.Batch)
                    k2, v2  []byte
                    nReader int32
                )
                for atomic.LoadUint32(&done) == 0 {
                    log.Printf("[%02d] WRITER #%d", ns, wi)

                    b.Reset()
                    for _, k1 := range keys {
                        k2 = randomData(k2, byte(ns), 2, wi, keyLen)
                        v2 = randomData(v2, byte(ns), 3, wi, valueLen)
                        b.Put(k2, v2)
                        b.Put(k1, k2)
                    }
                    writeReq <- b
                    if err := <-writeAck; err != nil {
                        writeAckAck <- struct{}{}
                        fatalf(err, "[%02d] WRITER #%d db.Write: %v", ns, wi, err)
                    }

                    snap, err := db.GetSnapshot()
                    if err != nil {
                        writeAckAck <- struct{}{}
                        fatalf(err, "[%02d] WRITER #%d db.GetSnapshot: %v", ns, wi, err)
                    }

                    writeAckAck <- struct{}{}

                    wg.Add(1)
                    atomic.AddInt32(&nReader, 1)
                    go func(snapwi uint32, snap *leveldb.Snapshot) {
                        var (
                            ri       int
                            iterStat = &latencyStats{}
                            getStat  = &latencyStats{}
                        )
                        defer func() {
                            mu.Lock()
                            gGetStat.add(getStat)
                            gIterStat.add(iterStat)
                            mu.Unlock()

                            atomic.AddInt32(&nReader, -1)
                            log.Printf("[%02d] READER #%d.%d DONE Snap=%v Alive=%d IterLatency=%v GetLatency=%v", ns, snapwi, ri, snap, atomic.LoadInt32(&nReader), iterStat.avg(), getStat.avg())
                            snap.Release()
                            wg.Done()
                        }()

                        stopi := snapwi + 3
                        for (ri < 3 || atomic.LoadUint32(&wi) < stopi) && atomic.LoadUint32(&done) == 0 {
                            var n int
                            iter := snap.NewIterator(dataPrefixSlice(byte(ns), 1), nil)
                            iterStat.start()
                            for iter.Next() {
                                k1 := iter.Key()
                                k2 := iter.Value()
                                iterStat.record(1)

                                if dataNS(k2) != byte(ns) {
                                    fatalf(nil, "[%02d] READER #%d.%d K%d invalid in-key NS: want=%d got=%d", ns, snapwi, ri, n, ns, dataNS(k2))
                                }

                                kwritei := dataI(k2)
                                if kwritei != snapwi {
                                    fatalf(nil, "[%02d] READER #%d.%d K%d invalid in-key iter num: %d", ns, snapwi, ri, n, kwritei)
                                }

                                getStat.start()
                                v2, err := snap.Get(k2, nil)
                                if err != nil {
                                    fatalf(err, "[%02d] READER #%d.%d K%d snap.Get: %v\nk1: %x\n -> k2: %x", ns, snapwi, ri, n, err, k1, k2)
                                }
                                getStat.record(1)

                                if checksum0, checksum1 := dataChecksum(v2); checksum0 != checksum1 {
                                    err := &errors.ErrCorrupted{Fd: storage.FileDesc{Type: 0xff, Num: 0}, Err: fmt.Errorf("v2: %x: checksum mismatch: %v vs %v", v2, checksum0, checksum1)}
                                    fatalf(err, "[%02d] READER #%d.%d K%d snap.Get: %v\nk1: %x\n -> k2: %x", ns, snapwi, ri, n, err, k1, k2)
                                }

                                n++
                                iterStat.start()
                            }
                            iter.Release()
                            if err := iter.Error(); err != nil {
                                fatalf(err, "[%02d] READER #%d.%d K%d iter.Error: %v", ns, snapwi, ri, numKey, err)
                            }
                            if n != numKey {
                                fatalf(nil, "[%02d] READER #%d.%d missing keys: want=%d got=%d", ns, snapwi, ri, numKey, n)
                            }

                            ri++
                        }
                    }(wi, snap)

                    atomic.AddUint32(&wi, 1)
                }
            }()

            delB := new(leveldb.Batch)
            wg.Add(1)
            go func() {
                var (
                    i        int
                    iterStat = &latencyStats{}
                )
                defer func() {
                    log.Printf("[%02d] SCANNER DONE #%d", ns, i)
                    wg.Done()
                }()

                time.Sleep(2 * time.Second)

                for atomic.LoadUint32(&done) == 0 {
                    var n int
                    delB.Reset()
                    iter := db.NewIterator(dataNsSlice(byte(ns)), nil)
                    iterStat.start()
                    for iter.Next() && atomic.LoadUint32(&done) == 0 {
                        k := iter.Key()
                        v := iter.Value()
                        iterStat.record(1)

                        for ci, x := range [...][]byte{k, v} {
                            checksum0, checksum1 := dataChecksum(x)
                            if checksum0 != checksum1 {
                                if ci == 0 {
                                    fatalf(nil, "[%02d] SCANNER %d.%d invalid key checksum: want %d, got %d\n%x -> %x", ns, i, n, checksum0, checksum1, k, v)
                                } else {
                                    fatalf(nil, "[%02d] SCANNER %d.%d invalid value checksum: want %d, got %d\n%x -> %x", ns, i, n, checksum0, checksum1, k, v)
                                }
                            }
                        }

                        if dataPrefix(k) == 2 || rnd.Int()%999 == 0 {
                            delB.Delete(k)
                        }

                        n++
                        iterStat.start()
                    }
                    iter.Release()
                    if err := iter.Error(); err != nil {
                        fatalf(err, "[%02d] SCANNER #%d.%d iter.Error: %v", ns, i, n, err)
                    }

                    if n > 0 {
                        log.Printf("[%02d] SCANNER #%d IterLatency=%v", ns, i, iterStat.avg())
                    }

                    if delB.Len() > 0 && atomic.LoadUint32(&done) == 0 {
                        t := time.Now()
                        writeReq <- delB
                        if err := <-writeAck; err != nil {
                            writeAckAck <- struct{}{}
                            fatalf(err, "[%02d] SCANNER #%d db.Write: %v", ns, i, err)
                        } else {
                            writeAckAck <- struct{}{}
                        }
                        log.Printf("[%02d] SCANNER #%d Deleted=%d Time=%v", ns, i, delB.Len(), time.Since(t))
                    }

                    i++
                }
            }()
        }(ns, numKey)
    }

    go func() {
        sig := make(chan os.Signal, 1)
        signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
        log.Printf("Got signal: %v, exiting...", <-sig)
        atomic.StoreUint32(&done, 1)
    }()

    wg.Wait()
}

type cryptoSource struct{}

func (s cryptoSource) Seed(_ int64) {}

func (s cryptoSource) Int63() int64 {
    return int64(s.Uint64() & ^uint64(1<<63))
}

func (s cryptoSource) Uint64() (v uint64) {
    err := binary.Read(crand.Reader, binary.BigEndian, &v)
    if err != nil {
        log.Fatal(err)
    }
    return v
}