johnsonjh/jleveldb

View on GitHub
leveldb/db_write.go

Summary

Maintainability
D
1 day
Test Coverage
// Copyright © 2012, Suryandaru Triandana <syndtr@gmail.com>
// Copyright © 2021, Jeffrey H. Johnson <trnsz@pobox.com>
//
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

package leveldb

import (
    "sync/atomic"
    "time"

    "github.com/johnsonjh/jleveldb/leveldb/memdb"
    "github.com/johnsonjh/jleveldb/leveldb/opt"
    "github.com/johnsonjh/jleveldb/leveldb/util"
)

func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error {
    wr, err := db.journal.Next()
    if err != nil {
        return err
    }
    if err := writeBatchesWithHeader(wr, batches, seq); err != nil {
        return err
    }
    if err := db.journal.Flush(); err != nil {
        return err
    }
    if sync {
        return db.journalWriter.Sync()
    }
    return nil
}

func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
    retryLimit := 3
retry:
    // Wait for pending memdb compaction.
    err = db.compTriggerWait(db.mcompCmdC)
    if err != nil {
        return
    }
    retryLimit--

    // Create new memdb and journal.
    mem, err = db.newMem(n)
    if err != nil {
        if err == errHasFrozenMem {
            if retryLimit <= 0 {
                panic("BUG: still has frozen memdb")
            }
            goto retry
        }
        return
    }

    // Schedule memdb compaction.
    if wait {
        err = db.compTriggerWait(db.mcompCmdC)
    } else {
        db.compTrigger(db.mcompCmdC)
    }
    return
}

func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
    delayed := false
    slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger()
    pauseTrigger := db.s.o.GetWriteL0PauseTrigger()
    // 请问您知道这里为什么要用匿名函数吗,我理解的是直接用循环就可以
    // Do we know why an anonymous function is used here?
    // (A loop should be able to be used directly.)
    flush := func() (retry bool) {
        mdb = db.getEffectiveMem()
        if mdb == nil {
            err = ErrClosed
            return false
        }
        defer func() {
            if retry {
                mdb.decref()
                mdb = nil
            }
        }()
        tLen := db.s.tLen(0)
        mdbFree = mdb.Free()
        switch {
        case tLen >= slowdownTrigger && !delayed:
            delayed = true
            time.Sleep(time.Millisecond)
            return false
        case mdbFree >= n:
            return false
        case tLen >= pauseTrigger:
            delayed = true
            // Set the write paused flag explicitly.
            atomic.StoreInt32(&db.inWritePaused, 1)
            err = db.compTriggerWait(db.tcompCmdC)
            // Unset the write paused flag.
            atomic.StoreInt32(&db.inWritePaused, 0)
            if err != nil {
                return false
            }
        default:
            // Allow memdb to grow if it has no entry.
            if mdb.Len() == 0 {
                mdbFree = n
            } else {
                mdb.decref()
                mdb, err = db.rotateMem(n, false)
                if err == nil {
                    mdbFree = mdb.Free()
                } else {
                    mdbFree = 0
                }
            }
            return false
        }
        return true
    }
    start := time.Now()
    for flush() {
    }
    if delayed {
        db.writeDelay += time.Since(start)
        db.writeDelayN++
    } else if db.writeDelayN > 0 {
        db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
        atomic.AddInt32(&db.cWriteDelayN, int32(db.writeDelayN))
        atomic.AddInt64(&db.cWriteDelay, int64(db.writeDelay))
        db.writeDelay = 0
        db.writeDelayN = 0
    }
    return
}

type writeMerge struct {
    sync       bool
    batch      *Batch
    keyType    keyType
    key, value []byte
}

func (db *DB) unlockWrite(overflow bool, merged int, err error) {
    for i := 0; i < merged; i++ {
        db.writeAckC <- err
    }
    if overflow {
        // Pass lock to the next write (that failed to merge).
        db.writeMergedC <- false
    } else {
        // Release lock.
        <-db.writeLockC
    }
}

// ourBatch is batch that we can modify.
func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
    // Try to flush memdb. This method would also trying to throttle writes
    // if it is too fast and compaction cannot catch-up.
    mdb, mdbFree, err := db.flush(batch.internalLen)
    if err != nil {
        db.unlockWrite(false, 0, err)
        return err
    }
    defer mdb.decref()

    var (
        overflow bool
        merged   int
        batches  = []*Batch{batch}
    )

    if merge {
        // Merge limit.
        var mergeLimit int
        if batch.internalLen > 128<<10 {
            mergeLimit = (1 << 20) - batch.internalLen
        } else {
            mergeLimit = 128 << 10
        }
        mergeCap := mdbFree - batch.internalLen
        if mergeLimit > mergeCap {
            mergeLimit = mergeCap
        }

    merge:
        for mergeLimit > 0 {
            select {
            case incoming := <-db.writeMergeC:
                if incoming.batch != nil {
                    // Merge batch.
                    if incoming.batch.internalLen > mergeLimit {
                        overflow = true
                        break merge
                    }
                    batches = append(batches, incoming.batch)
                    mergeLimit -= incoming.batch.internalLen
                } else {
                    // Merge put.
                    internalLen := len(incoming.key) + len(incoming.value) + 8
                    if internalLen > mergeLimit {
                        overflow = true
                        break merge
                    }
                    if ourBatch == nil {
                        ourBatch = db.batchPool.Get().(*Batch)
                        ourBatch.Reset()
                        batches = append(batches, ourBatch)
                    }
                    // We can use same batch since concurrent write doesn't
                    // guarantee write order.
                    ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
                    mergeLimit -= internalLen
                }
                sync = sync || incoming.sync
                merged++
                db.writeMergedC <- true

            default:
                break merge
            }
        }
    }

    // Release ourBatch if any.
    if ourBatch != nil {
        defer db.batchPool.Put(ourBatch)
    }

    // Seq number.
    seq := db.seq + 1

    // Write journal.
    if err := db.writeJournal(batches, seq, sync); err != nil {
        db.unlockWrite(overflow, merged, err)
        return err
    }

    // Put batches.
    for _, batch := range batches {
        if err := batch.putMem(seq, mdb.DB); err != nil {
            panic(err)
        }
        seq += uint64(batch.Len())
    }

    // Incr seq number.
    db.addSeq(uint64(batchesLen(batches)))

    // Rotate memdb if it's reach the threshold.
    if batch.internalLen >= mdbFree {
        db.rotateMem(0, false)
    }

    db.unlockWrite(overflow, merged, nil)
    return nil
}

// Write apply the given batch to the DB. The batch records will be applied
// sequentially. Write might be used concurrently, when used concurrently and
// batch is small enough, write will try to merge the batches. Set NoWriteMerge
// option to true to disable write merge.
//
// It is safe to modify the contents of the arguments after Write returns but
// not before. Write will not modify content of the batch.
func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error {
    if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 {
        return err
    }

    // If the batch size is larger than write buffer, it may justified to write
    // using transaction instead. Using transaction the batch will be written
    // into tables directly, skipping the journaling.
    if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
        tr, err := db.OpenTransaction()
        if err != nil {
            return err
        }
        if err := tr.Write(batch, wo); err != nil {
            tr.Discard()
            return err
        }
        return tr.Commit()
    }

    merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
    sync := wo.GetSync() && !db.s.o.GetNoSync()

    // Acquire write lock.
    if merge {
        select {
        case db.writeMergeC <- writeMerge{sync: sync, batch: batch}:
            if <-db.writeMergedC {
                // Write is merged.
                return <-db.writeAckC
            }
            // Write is not merged, the write lock is handed to us. Continue.
        case db.writeLockC <- struct{}{}:
            // Write lock acquired.
        case err := <-db.compPerErrC:
            // Compaction error.
            return err
        case <-db.closeC:
            // Closed
            return ErrClosed
        }
    } else {
        select {
        case db.writeLockC <- struct{}{}:
            // Write lock acquired.
        case err := <-db.compPerErrC:
            // Compaction error.
            return err
        case <-db.closeC:
            // Closed
            return ErrClosed
        }
    }

    return db.writeLocked(batch, nil, merge, sync)
}

func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
    if err := db.ok(); err != nil {
        return err
    }

    merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
    sync := wo.GetSync() && !db.s.o.GetNoSync()

    // Acquire write lock.
    if merge {
        select {
        case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
            if <-db.writeMergedC {
                // Write is merged.
                return <-db.writeAckC
            }
            // Write is not merged, the write lock is handed to us. Continue.
        case db.writeLockC <- struct{}{}:
            // Write lock acquired.
        case err := <-db.compPerErrC:
            // Compaction error.
            return err
        case <-db.closeC:
            // Closed
            return ErrClosed
        }
    } else {
        select {
        case db.writeLockC <- struct{}{}:
            // Write lock acquired.
        case err := <-db.compPerErrC:
            // Compaction error.
            return err
        case <-db.closeC:
            // Closed
            return ErrClosed
        }
    }

    batch := db.batchPool.Get().(*Batch)
    batch.Reset()
    batch.appendRec(kt, key, value)
    return db.writeLocked(batch, batch, merge, sync)
}

// Put sets the value for the given key. It overwrites any previous value
// for that key; a DB is not a multi-map. Write merge also applies for Put, see
// Write.
//
// It is safe to modify the contents of the arguments after Put returns but not
// before.
func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
    return db.putRec(keyTypeVal, key, value, wo)
}

// Delete deletes the value for the given key. Delete will not returns error if
// key doesn't exist. Write merge also applies for Delete, see Write.
//
// It is safe to modify the contents of the arguments after Delete returns but
// not before.
func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
    return db.putRec(keyTypeDel, key, nil, wo)
}

func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
    iter := mem.NewIterator(nil)
    defer iter.Release()
    return (max == nil || (iter.First() && icmp.uCompare(max, internalKey(iter.Key()).ukey()) >= 0)) &&
        (min == nil || (iter.Last() && icmp.uCompare(min, internalKey(iter.Key()).ukey()) <= 0))
}

// CompactRange compacts the underlying DB for the given key range.
// In particular, deleted and overwritten versions are discarded,
// and the data is rearranged to reduce the cost of operations
// needed to access the data. This operation should typically only
// be invoked by users who understand the underlying implementation.
//
// A nil Range.Start is treated as a key before all keys in the DB.
// And a nil Range.Limit is treated as a key after all keys in the DB.
// Therefore if both is nil then it will compact entire DB.
func (db *DB) CompactRange(r util.Range) error {
    if err := db.ok(); err != nil {
        return err
    }

    // Lock writer.
    select {
    case db.writeLockC <- struct{}{}:
    case err := <-db.compPerErrC:
        return err
    case <-db.closeC:
        return ErrClosed
    }

    // Check for overlaps in memdb.
    mdb := db.getEffectiveMem()
    if mdb == nil {
        return ErrClosed
    }
    defer mdb.decref()
    if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
        // Memdb compaction.
        if _, err := db.rotateMem(0, false); err != nil {
            <-db.writeLockC
            return err
        }
        <-db.writeLockC
        if err := db.compTriggerWait(db.mcompCmdC); err != nil {
            return err
        }
    } else {
        <-db.writeLockC
    }

    // Table compaction.
    return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit)
}

// SetReadOnly makes DB read-only. It will stay read-only until reopened.
func (db *DB) SetReadOnly() error {
    if err := db.ok(); err != nil {
        return err
    }

    // Lock writer.
    select {
    case db.writeLockC <- struct{}{}:
        db.compWriteLocking = true
    case err := <-db.compPerErrC:
        return err
    case <-db.closeC:
        return ErrClosed
    }

    // Set compaction read-only.
    select {
    case db.compErrSetC <- ErrReadOnly:
    case perr := <-db.compPerErrC:
        return perr
    case <-db.closeC:
        return ErrClosed
    }

    return nil
}