johnsonjh/jleveldb

View on GitHub
leveldb/db_compaction.go

Summary

Maintainability
F
4 days
Test Coverage
// Copyright © 2012, Suryandaru Triandana <syndtr@gmail.com>
// Copyright © 2021, Jeffrey H. Johnson <trnsz@pobox.com>
//
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

package leveldb

import (
    "sync"
    "sync/atomic"
    "time"

    "github.com/johnsonjh/jleveldb/leveldb/errors"
    "github.com/johnsonjh/jleveldb/leveldb/opt"
    "github.com/johnsonjh/jleveldb/leveldb/storage"
)

var errCompactionTransactExiting = errors.New("leveldb: compaction transact exiting")

type cStat struct {
    duration time.Duration
    read     int64
    write    int64
}

func (p *cStat) add(n *cStatStaging) {
    p.duration += n.duration
    p.read += n.read
    p.write += n.write
}

func (p *cStat) get() (duration time.Duration, read, write int64) {
    return p.duration, p.read, p.write
}

type cStatStaging struct {
    start    time.Time
    duration time.Duration
    on       bool
    read     int64
    write    int64
}

func (p *cStatStaging) startTimer() {
    if !p.on {
        p.start = time.Now()
        p.on = true
    }
}

func (p *cStatStaging) stopTimer() {
    if p.on {
        p.duration += time.Since(p.start)
        p.on = false
    }
}

type cStats struct {
    lk    sync.Mutex
    stats []cStat
}

func (p *cStats) addStat(level int, n *cStatStaging) {
    p.lk.Lock()
    if level >= len(p.stats) {
        newStats := make([]cStat, level+1)
        copy(newStats, p.stats)
        p.stats = newStats
    }
    p.stats[level].add(n)
    p.lk.Unlock()
}

func (p *cStats) getStat(level int) (duration time.Duration, read, write int64) {
    p.lk.Lock()
    defer p.lk.Unlock()
    if level < len(p.stats) {
        return p.stats[level].get()
    }
    return
}

func (db *DB) compactionError() {
    var err error
noerr:
    // No error.
    for {
        select {
        case err = <-db.compErrSetC:
            switch {
            case err == nil:
            case err == ErrReadOnly, errors.IsCorrupted(err):
                goto hasperr
            default:
                goto haserr
            }
        case <-db.closeC:
            return
        }
    }
haserr:
    // Transient error.
    for {
        select {
        case db.compErrC <- err:
        case err = <-db.compErrSetC:
            switch {
            case err == nil:
                goto noerr
            case err == ErrReadOnly, errors.IsCorrupted(err):
                goto hasperr
            default:
            }
        case <-db.closeC:
            return
        }
    }
hasperr:
    // Persistent error.
    for {
        select {
        case db.compErrC <- err:
        case db.compPerErrC <- err:
        case db.writeLockC <- struct{}{}:
            // Hold write lock, so that write won't pass-through.
            db.compWriteLocking = true
        case <-db.closeC:
            if db.compWriteLocking {
                // We should release the lock or Close will hang.
                <-db.writeLockC
            }
            return
        }
    }
}

type compactionTransactCounter int

func (cnt *compactionTransactCounter) incr() {
    *cnt++
}

type compactionTransactInterface interface {
    run(cnt *compactionTransactCounter) error
    revert() error
}

func (db *DB) compactionTransact(name string, t compactionTransactInterface) {
    defer func() {
        if x := recover(); x != nil {
            if x == errCompactionTransactExiting {
                if err := t.revert(); err != nil {
                    db.logf("%s revert error %q", name, err)
                }
            }
            panic(x)
        }
    }()

    const (
        backoffMin = 1 * time.Second
        backoffMax = 8 * time.Second
        backoffMul = 2 * time.Second
    )
    var (
        backoff  = backoffMin
        backoffT *time.Timer
        lastCnt  = compactionTransactCounter(0)

        disableBackoff = db.s.o.GetDisableCompactionBackoff()
    )
    defer func() {
        if backoffT != nil {
            backoffT.Stop()
        }
    }()
    for n := 0; ; n++ {
        // Check whether the DB is closed.
        if db.isClosed() {
            db.logf("%s exiting", name)
            db.compactionExitTransact()
        } else if n > 0 {
            db.logf("%s retrying N·%d", name, n)
        }

        // Execute.
        cnt := compactionTransactCounter(0)
        err := t.run(&cnt)
        if err != nil {
            db.logf("%s error I·%d %q", name, cnt, err)
        }

        // Set compaction error status.
        select {
        case db.compErrSetC <- err:
        case perr := <-db.compPerErrC:
            if err != nil {
                db.logf("%s exiting (persistent error %q)", name, perr)
                db.compactionExitTransact()
            }
        case <-db.closeC:
            db.logf("%s exiting", name)
            db.compactionExitTransact()
        }
        if err == nil {
            return
        }
        if errors.IsCorrupted(err) {
            db.logf("%s exiting (corruption detected)", name)
            db.compactionExitTransact()
        }

        if !disableBackoff {
            // Reset backoff duration if counter is advancing.
            if cnt > lastCnt {
                backoff = backoffMin
                lastCnt = cnt
            }

            // Better Backoff.
            if backoffT == nil {
                backoffT = time.NewTimer(backoff)
            } else {
                backoffT.Reset(backoff)
            }
            if backoff < backoffMax {
                backoff *= backoffMul
                if backoff > backoffMax {
                    backoff = backoffMax
                }
            }
            select {
            case <-backoffT.C:
            case <-db.closeC:
                db.logf("%s exiting", name)
                db.compactionExitTransact()
            }
        }
    }
}

type compactionTransactFunc struct {
    runFunc    func(cnt *compactionTransactCounter) error
    revertFunc func() error
}

func (t *compactionTransactFunc) run(cnt *compactionTransactCounter) error {
    return t.runFunc(cnt)
}

func (t *compactionTransactFunc) revert() error {
    if t.revertFunc != nil {
        return t.revertFunc()
    }
    return nil
}

func (db *DB) compactionTransactFunc(name string, run func(cnt *compactionTransactCounter) error, revert func() error) {
    db.compactionTransact(name, &compactionTransactFunc{run, revert})
}

func (db *DB) compactionExitTransact() {
    panic(errCompactionTransactExiting)
}

func (db *DB) compactionCommit(name string, rec *sessionRecord) {
    db.compCommitLk.Lock()
    defer db.compCommitLk.Unlock() // Defer is necessary.
    db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error {
        return db.s.commit(rec, true)
    }, nil)
}

func (db *DB) memCompaction() {
    mdb := db.getFrozenMem()
    if mdb == nil {
        return
    }
    defer mdb.decref()

    db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(mdb.Size()))

    // Don't compact empty memdb.
    if mdb.Len() == 0 {
        db.logf("memdb@flush skipping")
        // drop frozen memdb
        db.dropFrozenMem()
        return
    }

    // Pause table compaction.
    resumeC := make(chan struct{})
    select {
    case db.tcompPauseC <- (chan<- struct{})(resumeC):
    case <-db.compPerErrC:
        close(resumeC)
        resumeC = nil
    case <-db.closeC:
        db.compactionExitTransact()
    }

    var (
        rec        = &sessionRecord{}
        stats      = &cStatStaging{}
        flushLevel int
    )

    // Generate tables.
    db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) {
        stats.startTimer()
        flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel)
        stats.stopTimer()
        return
    }, func() error {
        for _, r := range rec.addedTables {
            db.logf("memdb@flush revert @%d", r.num)
            if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil {
                return err
            }
        }
        return nil
    })

    rec.setJournalNum(db.journalFd.Num)
    rec.setSeqNum(db.frozenSeq)

    // Commit.
    stats.startTimer()
    db.compactionCommit("memdb", rec)
    stats.stopTimer()

    db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration)

    // Save compaction stats
    for _, r := range rec.addedTables {
        stats.write += r.size
    }
    db.compStats.addStat(flushLevel, stats)
    atomic.AddUint32(&db.memComp, 1)

    // Drop frozen memdb.
    db.dropFrozenMem()

    // Resume table compaction.
    if resumeC != nil {
        select {
        case <-resumeC:
            close(resumeC)
        case <-db.closeC:
            db.compactionExitTransact()
        }
    }

    // Trigger table compaction.
    db.compTrigger(db.tcompCmdC)
}

type tableCompactionBuilder struct {
    db           *DB
    s            *session
    c            *compaction
    rec          *sessionRecord
    stat0, stat1 *cStatStaging

    snapHasLastUkey bool
    snapLastUkey    []byte
    snapLastSeq     uint64
    snapIter        int
    snapKerrCnt     int
    snapDropCnt     int

    kerrCnt int
    dropCnt int

    minSeq    uint64
    strict    bool
    tableSize int

    tw *tWriter

    // to complete the last flush
    complete func() error
}

func (b *tableCompactionBuilder) appendKV(key, value []byte) error {
    // Create new table if not already.
    if b.tw == nil {
        // Create new table.
        var err error
        b.tw, err = b.s.tops.create()
        if err != nil {
            return err
        }
    }

    // Write key/value into table.
    return b.tw.append(key, value)
}

func (b *tableCompactionBuilder) needFlush() bool {
    return b.tw.tw.BytesLen() >= b.tableSize
}

func (b *tableCompactionBuilder) flush(stash stash) error {
    // complete the last flush
    if f := b.complete; f != nil {
        b.complete = nil
        if err := f(); err != nil {
            return err
        }
    }

    tw := b.tw
    b.tw = nil

    c := make(chan interface{}, 1)
    // asynchronously finish the tWriter (pending write/fsync)
    go func() {
        defer close(c)
        if t, err := tw.finish(); err != nil {
            // drop on error
            tw.drop()
            c <- err
        } else {
            c <- t
        }
    }()

    b.complete = func() error {
        switch r := (<-c).(type) {
        case error:
            return r
        case *tFile:
            b.rec.addTableFile(b.c.sourceLevel+1, r)
            b.stat1.write += r.size
            b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, r.fd.Num, tw.tw.EntriesLen(), shortenb(int(r.size)), r.imin, r.imax)
            stash()
            return nil
        default:
            panic("unexpected")
        }
    }
    return nil
}

func (b *tableCompactionBuilder) cleanup() {
    if b.tw != nil {
        b.tw.drop()
        b.tw = nil
    }
}

func (b *tableCompactionBuilder) stash(hasLastUkey bool, lastUkey []byte, lastSeq uint64, iter int) stash {
    cstash := b.c.stash()
    lastUkey = append([]byte(nil), lastUkey...)
    kerrCnt := b.kerrCnt
    dropCnt := b.dropCnt

    return func() {
        // save compaction state
        cstash.save()
        // and builder's state
        b.snapHasLastUkey = hasLastUkey
        b.snapLastUkey = lastUkey
        b.snapLastSeq = lastSeq
        b.snapIter = iter
        b.snapKerrCnt = kerrCnt
        b.snapDropCnt = dropCnt
    }
}

func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) (err error) {
    snapResumed := b.snapIter > 0
    hasLastUkey := b.snapHasLastUkey // The key might has zero length, so this is necessary.
    lastUkey := append([]byte{}, b.snapLastUkey...)
    lastSeq := b.snapLastSeq
    b.kerrCnt = b.snapKerrCnt
    b.dropCnt = b.snapDropCnt
    // Restore compaction state.
    b.c.restore()

    defer b.cleanup()

    defer func() {
        if f := b.complete; f != nil {
            b.complete = nil
            if e := f(); e != nil && err == nil {
                err = e
            }
        }
    }()

    b.stat1.startTimer()
    defer b.stat1.stopTimer()

    iter := newAsyncIterator(b.c.newIterator(), b.s.o.GetBlockSize(), 16, b.s.tops.sbpool)
    defer iter.Release()
    for i := 0; iter.Next(); i++ {
        // Incr transact counter.
        cnt.incr()

        // Skip until last state.
        if i < b.snapIter {
            continue
        }

        resumed := false
        if snapResumed {
            resumed = true
            snapResumed = false
        }

        ikey := iter.Key()
        ukey, seq, kt, kerr := parseInternalKey(ikey)

        if kerr == nil {
            shouldStop := !resumed && b.c.shouldStopBefore(ikey)

            if !hasLastUkey || b.s.icmp.uCompare(lastUkey, ukey) != 0 {
                // First occurrence of this user key.

                // Only rotate tables if ukey doesn't hop across.
                if b.tw != nil && (shouldStop || b.needFlush()) {
                    if err := b.flush(b.stash(hasLastUkey, lastUkey, lastSeq, i)); err != nil {
                        return err
                    }
                }

                hasLastUkey = true
                lastUkey = append(lastUkey[:0], ukey...)
                lastSeq = keyMaxSeq
            }

            switch {
            case lastSeq <= b.minSeq:
                // Dropped because newer entry for same user key exist
                fallthrough // (A)
            case kt == keyTypeDel && seq <= b.minSeq && b.c.baseLevelForKey(lastUkey):
                // For this user key:
                // (1) there is no data in higher levels
                // (2) data in lower levels will have larger seq numbers
                // (3) data in layers that are being compacted here and have
                //     smaller seq numbers will be dropped in the next
                //     few iterations of this loop (by rule (A) above).
                // Therefore this deletion marker is obsolete and can be dropped.
                lastSeq = seq
                b.dropCnt++
                continue
            default:
                lastSeq = seq
            }
        } else {
            if b.strict {
                return kerr
            }

            // Don't drop corrupted keys.
            hasLastUkey = false
            lastUkey = lastUkey[:0]
            lastSeq = keyMaxSeq
            b.kerrCnt++
        }

        if err := b.appendKV(ikey, iter.Value()); err != nil {
            return err
        }
    }

    if err := iter.Error(); err != nil {
        return err
    }

    // Finish last table.
    if b.tw != nil && !b.tw.empty() {
        return b.flush(func() {})
    }
    return nil
}

func (b *tableCompactionBuilder) revert() error {
    for _, at := range b.rec.addedTables {
        b.s.logf("table@build revert @%d", at.num)
        if err := b.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil {
            return err
        }
    }
    return nil
}

func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
    defer c.release()

    rec := &sessionRecord{}
    rec.addCompPtr(c.sourceLevel, c.imax)

    if !noTrivial && c.trivial() {
        t := c.levels[0][0]
        db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.fd.Num, c.sourceLevel+1)
        rec.delTable(c.sourceLevel, t.fd.Num)
        rec.addTableFile(c.sourceLevel+1, t)
        db.compactionCommit("table-move", rec)
        return
    }

    var stats [2]cStatStaging
    for i, tables := range c.levels {
        for _, t := range tables {
            stats[i].read += t.size
            // Insert deleted tables into record
            rec.delTable(c.sourceLevel+i, t.fd.Num)
        }
    }
    sourceSize := int(stats[0].read + stats[1].read)
    minSeq := db.minSeq()
    db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq)

    b := &tableCompactionBuilder{
        db:        db,
        s:         db.s,
        c:         c,
        rec:       rec,
        stat1:     &stats[1],
        minSeq:    minSeq,
        strict:    db.s.o.GetStrict(opt.StrictCompaction),
        tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1),
    }
    db.compactionTransact("table@build", b)

    // Commit.
    stats[1].startTimer()
    db.compactionCommit("table", rec)
    stats[1].stopTimer()

    resultSize := int(stats[1].write)
    db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration)

    // Save compaction stats
    for i := range stats {
        db.compStats.addStat(c.sourceLevel+1, &stats[i])
    }
    switch c.typ {
    case level0Compaction:
        atomic.AddUint32(&db.level0Comp, 1)
    case nonLevel0Compaction:
        atomic.AddUint32(&db.nonLevel0Comp, 1)
    case seekCompaction:
        atomic.AddUint32(&db.seekComp, 1)
    }
}

func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error {
    db.logf("table@compaction range L%d %q:%q", level, umin, umax)
    if level >= 0 {
        if c := db.s.getCompactionRange(level, umin, umax, true); c != nil {
            db.tableCompaction(c, true)
        }
    } else {
        // Retry until nothing to compact.
        for {
            compacted := false

            // Scan for maximum level with overlapped tables.
            v := db.s.version()
            m := 1
            for i := m; i < len(v.levels); i++ {
                tables := v.levels[i]
                if tables.overlaps(db.s.icmp, umin, umax, false) {
                    m = i
                }
            }
            v.release()

            for level := 0; level < m; level++ {
                if c := db.s.getCompactionRange(level, umin, umax, false); c != nil {
                    db.tableCompaction(c, true)
                    compacted = true
                }
            }

            if !compacted {
                break
            }
        }
    }

    return nil
}

func (db *DB) tableAutoCompaction() {
    if c := db.s.pickCompaction(); c != nil {
        db.tableCompaction(c, false)
    }
}

func (db *DB) tableNeedCompaction() bool {
    v := db.s.version()
    defer v.release()
    return v.needCompaction()
}

// resumeWrite returns an indicator whether we should resume write operation if enough level0 files are compacted.
func (db *DB) resumeWrite() bool {
    v := db.s.version()
    defer v.release()
    return v.tLen(0) < db.s.o.GetWriteL0PauseTrigger()
}

func (db *DB) pauseCompaction(ch chan<- struct{}) {
    select {
    case ch <- struct{}{}:
    case <-db.closeC:
        db.compactionExitTransact()
    }
}

type cCmd interface {
    ack(err error)
}

type cAuto struct {
    // Note for table compaction, an non-empty ackC represents it's a compaction waiting command.
    ackC chan<- error
}

func (r cAuto) ack(err error) {
    if r.ackC != nil {
        defer func() {
            recover()
        }()
        r.ackC <- err
    }
}

type cRange struct {
    level    int
    min, max []byte
    ackC     chan<- error
}

func (r cRange) ack(err error) {
    if r.ackC != nil {
        defer func() {
            recover()
        }()
        r.ackC <- err
    }
}

// This will trigger auto compaction but will not wait for it.
func (db *DB) compTrigger(compC chan<- cCmd) {
    select {
    case compC <- cAuto{}:
    default:
    }
}

// This will trigger auto compaction and/or wait for all compaction to be done.
func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) {
    ch := make(chan error)
    defer close(ch)
    // Send cmd.
    select {
    case compC <- cAuto{ch}:
    case err = <-db.compErrC:
        return
    case <-db.closeC:
        return ErrClosed
    }
    // Wait cmd.
    select {
    case err = <-ch:
    case err = <-db.compErrC:
    case <-db.closeC:
        return ErrClosed
    }
    return err
}

// Send range compaction request.
func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (err error) {
    ch := make(chan error)
    defer close(ch)
    // Send cmd.
    select {
    case compC <- cRange{level, min, max, ch}:
    case err := <-db.compErrC:
        return err
    case <-db.closeC:
        return ErrClosed
    }
    // Wait cmd.
    select {
    case err = <-ch:
    case err = <-db.compErrC:
    case <-db.closeC:
        return ErrClosed
    }
    return err
}

func (db *DB) mCompaction() {
    var x cCmd

    defer func() {
        if x := recover(); x != nil {
            if x != errCompactionTransactExiting {
                panic(x)
            }
        }
        if x != nil {
            x.ack(ErrClosed)
        }
        db.closeW.Done()
    }()

    for {
        db.compActiveLk.Lock()
        db.memCompActive = false
        db.compActiveLk.Unlock()

        select {
        case x = <-db.mcompCmdC:
            switch x.(type) {
            case cAuto:
                db.compActiveLk.Lock()
                db.memCompActive = true
                db.compActiveLk.Unlock()

                db.memCompaction()
                x.ack(nil)
                x = nil
            default:
                panic("leveldb: unknown command")
            }
        case <-db.closeC:
            return
        }
    }
}

func (db *DB) tCompaction() {
    var (
        x     cCmd
        waitQ []cCmd
    )

    defer func() {
        if x := recover(); x != nil {
            if x != errCompactionTransactExiting {
                panic(x)
            }
        }
        for i := range waitQ {
            waitQ[i].ack(ErrClosed)
            waitQ[i] = nil
        }
        if x != nil {
            x.ack(ErrClosed)
        }
        db.closeW.Done()
    }()

    for {
        db.compActiveLk.Lock()
        db.tableCompActive = false
        db.compActiveLk.Unlock()

        if db.tableNeedCompaction() {
            select {
            case x = <-db.tcompCmdC:
            case ch := <-db.tcompPauseC:
                db.pauseCompaction(ch)
                continue
            case <-db.closeC:
                return
            default:
            }
            // Resume write operation as soon as possible.
            if len(waitQ) > 0 && db.resumeWrite() {
                for i := range waitQ {
                    waitQ[i].ack(nil)
                    waitQ[i] = nil
                }
                waitQ = waitQ[:0]
            }
        } else {
            for i := range waitQ {
                waitQ[i].ack(nil)
                waitQ[i] = nil
            }
            waitQ = waitQ[:0]
            select {
            case x = <-db.tcompCmdC:
            case ch := <-db.tcompPauseC:
                db.pauseCompaction(ch)
                continue
            case <-db.closeC:
                return
            }
        }
        db.compActiveLk.Lock()
        db.tableCompActive = true
        db.compActiveLk.Unlock()
        if x != nil {
            switch cmd := x.(type) {
            case cAuto:
                if cmd.ackC != nil {
                    // Check the write pause state before caching it.
                    if db.resumeWrite() {
                        x.ack(nil)
                    } else {
                        waitQ = append(waitQ, x)
                    }
                }
            case cRange:
                x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
            default:
                panic("leveldb: unknown command")
            }
            x = nil
        }
        db.tableAutoCompaction()
    }
}