johnsonjh/jleveldb

View on GitHub
leveldb/db.go

Summary

Maintainability
F
6 days
Test Coverage
// Copyright © 2012, Suryandaru Triandana <syndtr@gmail.com>
// Copyright © 2021, Jeffrey H. Johnson <trnsz@pobox.com>
//
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

package leveldb

import (
    "container/list"
    "fmt"
    "io"
    "os"
    "runtime"
    "strings"
    "sync"
    "sync/atomic"
    "time"

    "github.com/johnsonjh/jleveldb/leveldb/errors"
    "github.com/johnsonjh/jleveldb/leveldb/iterator"
    "github.com/johnsonjh/jleveldb/leveldb/journal"
    "github.com/johnsonjh/jleveldb/leveldb/memdb"
    "github.com/johnsonjh/jleveldb/leveldb/opt"
    "github.com/johnsonjh/jleveldb/leveldb/storage"
    "github.com/johnsonjh/jleveldb/leveldb/table"
    "github.com/johnsonjh/jleveldb/leveldb/util"
)

// DB is a LevelDB database.
type DB struct {
    // Need 64-bit alignment.
    seq uint64

    // Stats. Need 64-bit alignment.
    cWriteDelay            int64 // The cumulative duration of write delays
    cWriteDelayN           int32 // The cumulative number of write delays
    inWritePaused          int32 // The indicator whether write operation is paused by compaction
    aliveSnaps, aliveIters int32

    // Compaction statistic
    memComp       uint32 // The cumulative number of memory compaction
    level0Comp    uint32 // The cumulative number of level0 compaction
    nonLevel0Comp uint32 // The cumulative number of non-level0 compaction
    seekComp      uint32 // The cumulative number of seek compaction

    // Session.
    s *session

    // MemDB.
    memMu           sync.RWMutex
    memPool         chan *memdb.DB
    mem, frozenMem  *memDB
    journal         *journal.Writer
    journalWriter   storage.Writer
    journalFd       storage.FileDesc
    frozenJournalFd storage.FileDesc
    frozenSeq       uint64

    // Snapshot.
    snapsMu   sync.Mutex
    snapsList *list.List

    // Write.
    batchPool    sync.Pool
    writeMergeC  chan writeMerge
    writeMergedC chan bool
    writeLockC   chan struct{}
    writeAckC    chan error
    writeDelay   time.Duration
    writeDelayN  int
    tr           *Transaction

    // Compaction.
    compCommitLk     sync.Mutex
    tcompCmdC        chan cCmd
    tcompPauseC      chan chan<- struct{}
    mcompCmdC        chan cCmd
    compErrC         chan error
    compPerErrC      chan error
    compErrSetC      chan error
    compWriteLocking bool
    compStats        cStats
    memdbMaxLevel    int // For testing.
    compActiveLk     sync.RWMutex
    memCompActive    bool
    tableCompActive  bool

    // Close.
    closeW sync.WaitGroup
    closeC chan struct{}
    closed uint32
    closer io.Closer
}

func openDB(s *session) (*DB, error) {
    s.log("db@open opening")
    start := time.Now()
    db := &DB{
        s: s,
        // Initial sequence
        seq: s.stSeqNum,
        // MemDB
        memPool: make(chan *memdb.DB, 1),
        // Snapshot
        snapsList: list.New(),
        // Write
        batchPool:    sync.Pool{New: newBatch},
        writeMergeC:  make(chan writeMerge),
        writeMergedC: make(chan bool),
        writeLockC:   make(chan struct{}, 1),
        writeAckC:    make(chan error),
        // Compaction
        tcompCmdC:   make(chan cCmd),
        tcompPauseC: make(chan chan<- struct{}),
        mcompCmdC:   make(chan cCmd),
        compErrC:    make(chan error),
        compPerErrC: make(chan error),
        compErrSetC: make(chan error),
        // Close
        closeC: make(chan struct{}),
    }

    // Read-only mode.
    readOnly := s.o.GetReadOnly()

    if readOnly {
        // Recover journals (read-only mode).
        if err := db.recoverJournalRO(); err != nil {
            return nil, err
        }
    } else {
        // Recover journals.
        if err := db.recoverJournal(); err != nil {
            return nil, err
        }

        // Remove any obsolete files.
        if err := db.checkAndCleanFiles(); err != nil {
            // Close journal.
            if db.journal != nil {
                db.journal.Close()
                db.journalWriter.Close()
            }
            return nil, err
        }

    }

    // Doesn't need to be included in the wait group.
    go db.compactionError()
    go db.mpoolDrain()

    if readOnly {
        db.SetReadOnly()
    } else {
        db.closeW.Add(2)
        go db.tCompaction()
        go db.mCompaction()
        // go db.jWriter()
    }

    s.logf("db@open done T·%v", time.Since(start))

    runtime.SetFinalizer(db, (*DB).Close)
    return db, nil
}

// Open opens or creates a DB for the given storage.
// The DB will be created if not exist, unless ErrorIfMissing is true.
// Also, if ErrorIfExist is true and the DB exist Open will returns
// os.ErrExist error.
//
// Open will return an error with type of ErrCorrupted if corruption
// detected in the DB. Use errors.IsCorrupted to test whether an error is
// due to corruption. Corrupted DB can be recovered with Recover function.
//
// The returned DB instance is safe for concurrent use.
// The DB must be closed after use, by calling Close method.
func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
    s, err := newSession(stor, o)
    if err != nil {
        return
    }
    defer func() {
        if err != nil {
            s.close()
            s.release()
        }
    }()

    err = s.recover()
    if err != nil {
        if !os.IsNotExist(err) || s.o.GetErrorIfMissing() || s.o.GetReadOnly() {
            return
        }
        err = s.create()
        if err != nil {
            return
        }
    } else if s.o.GetErrorIfExist() {
        err = os.ErrExist
        return
    }

    return openDB(s)
}

// OpenFile opens or creates a DB for the given path.
// The DB will be created if not exist, unless ErrorIfMissing is true.
// Also, if ErrorIfExist is true and the DB exist OpenFile will returns
// os.ErrExist error.
//
// OpenFile uses standard file-system backed storage implementation as
// described in the leveldb/storage package.
//
// OpenFile will return an error with type of ErrCorrupted if corruption
// detected in the DB. Use errors.IsCorrupted to test whether an error is
// due to corruption. Corrupted DB can be recovered with Recover function.
//
// The returned DB instance is safe for concurrent use.
// The DB must be closed after use, by calling Close method.
func OpenFile(path string, o *opt.Options) (db *DB, err error) {
    stor, err := storage.OpenFile(path, o.GetReadOnly())
    if err != nil {
        return
    }
    db, err = Open(stor, o)
    if err != nil {
        stor.Close()
    } else {
        db.closer = stor
    }
    return
}

// Recover recovers and opens a DB with missing or corrupted manifest files
// for the given storage. It will ignore any manifest files, valid or not.
// The DB must already exist or it will returns an error.
// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
//
// The returned DB instance is safe for concurrent use.
// The DB must be closed after use, by calling Close method.
func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
    s, err := newSession(stor, o)
    if err != nil {
        return
    }
    defer func() {
        if err != nil {
            s.close()
            s.release()
        }
    }()

    err = recoverTable(s, o)
    if err != nil {
        return
    }
    return openDB(s)
}

// RecoverFile recovers and opens a DB with missing or corrupted manifest files
// for the given path. It will ignore any manifest files, valid or not.
// The DB must already exist or it will returns an error.
// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
//
// RecoverFile uses standard file-system backed storage implementation as described
// in the leveldb/storage package.
//
// The returned DB instance is safe for concurrent use.
// The DB must be closed after use, by calling Close method.
func RecoverFile(path string, o *opt.Options) (db *DB, err error) {
    stor, err := storage.OpenFile(path, false)
    if err != nil {
        return
    }
    db, err = Recover(stor, o)
    if err != nil {
        stor.Close()
    } else {
        db.closer = stor
    }
    return
}

func recoverTable(s *session, o *opt.Options) error {
    o = dupOptions(o)
    // Mask StrictReader, lets StrictRecovery doing its job.
    o.Strict &= ^opt.StrictReader

    // Get all tables and sort it by file number.
    fds, err := s.stor.List(storage.TypeTable)
    if err != nil {
        return err
    }
    sortFds(fds)

    var (
        maxSeq                                                            uint64
        recoveredKey, goodKey, corruptedKey, corruptedBlock, droppedTable int

        // We will drop corrupted table.
        strict = o.GetStrict(opt.StrictRecovery)
        noSync = o.GetNoSync()

        rec   = &sessionRecord{}
        bpool = util.NewBufferPool(o.GetBlockSize() + 5)
    )
    buildTable := func(iter iterator.Iterator) (tmpFd storage.FileDesc, size int64, err error) {
        tmpFd = s.newTemp()
        writer, err := s.stor.Create(tmpFd)
        if err != nil {
            return
        }
        defer func() {
            writer.Close()
            if err != nil {
                s.stor.Remove(tmpFd)
                tmpFd = storage.FileDesc{}
            }
        }()

        // Copy entries.
        tw := table.NewWriter(writer, o)
        for iter.Next() {
            key := iter.Key()
            if validInternalKey(key) {
                err = tw.Append(key, iter.Value())
                if err != nil {
                    return
                }
            }
        }
        err = iter.Error()
        if err != nil && !errors.IsCorrupted(err) {
            return
        }
        err = tw.Close()
        if err != nil {
            return
        }
        if !noSync {
            err = writer.Sync()
            if err != nil {
                return
            }
        }
        size = int64(tw.BytesLen())
        return
    }
    recoverTable := func(fd storage.FileDesc) error {
        s.logf("table@recovery recovering @%d", fd.Num)
        reader, err := s.stor.Open(fd)
        if err != nil {
            return err
        }
        var closed bool
        defer func() {
            if !closed {
                reader.Close()
            }
        }()

        // Get file size.
        size, err := reader.Seek(0, 2)
        if err != nil {
            return err
        }

        var (
            tSeq                                     uint64
            tgoodKey, tcorruptedKey, tcorruptedBlock int
            imin, imax                               []byte
        )
        tr, err := table.NewReader(reader, size, fd, nil, bpool, o)
        if err != nil {
            return err
        }
        iter := tr.NewIterator(nil, nil)
        if itererr, ok := iter.(iterator.ErrorCallbackSetter); ok {
            itererr.SetErrorCallback(func(err error) {
                if errors.IsCorrupted(err) {
                    s.logf("table@recovery block corruption @%d %q", fd.Num, err)
                    tcorruptedBlock++
                }
            })
        }

        // Scan the table.
        for iter.Next() {
            key := iter.Key()
            _, seq, _, kerr := parseInternalKey(key)
            if kerr != nil {
                tcorruptedKey++
                continue
            }
            tgoodKey++
            if seq > tSeq {
                tSeq = seq
            }
            if imin == nil {
                imin = append([]byte{}, key...)
            }
            imax = append(imax[:0], key...)
        }
        if err := iter.Error(); err != nil && !errors.IsCorrupted(err) {
            iter.Release()
            return err
        }
        iter.Release()

        goodKey += tgoodKey
        corruptedKey += tcorruptedKey
        corruptedBlock += tcorruptedBlock

        if strict && (tcorruptedKey > 0 || tcorruptedBlock > 0) {
            droppedTable++
            s.logf("table@recovery dropped @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
            return nil
        }

        if tgoodKey > 0 {
            if tcorruptedKey > 0 || tcorruptedBlock > 0 {
                // Rebuild the table.
                s.logf("table@recovery rebuilding @%d", fd.Num)
                iter := tr.NewIterator(nil, nil)
                tmpFd, newSize, err := buildTable(iter)
                iter.Release()
                if err != nil {
                    return err
                }
                closed = true
                reader.Close()
                if err := s.stor.Rename(tmpFd, fd); err != nil {
                    return err
                }
                size = newSize
            }
            if tSeq > maxSeq {
                maxSeq = tSeq
            }
            recoveredKey += tgoodKey
            // Add table to level 0.
            rec.addTable(0, fd.Num, size, imin, imax)
            s.logf("table@recovery recovered @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
        } else {
            droppedTable++
            s.logf("table@recovery unrecoverable @%d Ck·%d Cb·%d S·%d", fd.Num, tcorruptedKey, tcorruptedBlock, size)
        }

        return nil
    }

    // Recover all tables.
    if len(fds) > 0 {
        s.logf("table@recovery F·%d", len(fds))

        // Mark file number as used.
        s.markFileNum(fds[len(fds)-1].Num)

        for _, fd := range fds {
            if err := recoverTable(fd); err != nil {
                return err
            }
        }

        s.logf("table@recovery recovered F·%d N·%d Gk·%d Ck·%d Q·%d", len(fds), recoveredKey, goodKey, corruptedKey, maxSeq)
    }

    // Set sequence number.
    rec.setSeqNum(maxSeq)

    // Create new manifest.
    if err := s.create(); err != nil {
        return err
    }

    // Commit.
    return s.commit(rec, false)
}

func (db *DB) recoverJournal() error {
    // Get all journals and sort it by file number.
    rawFds, err := db.s.stor.List(storage.TypeJournal)
    if err != nil {
        return err
    }
    sortFds(rawFds)

    // Journals that will be recovered.
    var fds []storage.FileDesc
    for _, fd := range rawFds {
        if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
            fds = append(fds, fd)
        }
    }

    var (
        ofd storage.FileDesc // Obsolete file.
        rec = &sessionRecord{}
    )

    // Recover journals.
    if len(fds) > 0 {
        db.logf("journal@recovery F·%d", len(fds))

        // Mark file number as used.
        db.s.markFileNum(fds[len(fds)-1].Num)

        var (
            // Options.
            strict      = db.s.o.GetStrict(opt.StrictJournal)
            checksum    = db.s.o.GetStrict(opt.StrictJournalChecksum)
            writeBuffer = db.s.o.GetWriteBuffer()

            jr       *journal.Reader
            mdb      = memdb.New(db.s.icmp, writeBuffer)
            buf      = &util.Buffer{}
            batchSeq uint64
            batchLen int
        )

        for _, fd := range fds {
            db.logf("journal@recovery recovering @%d", fd.Num)

            fr, err := db.s.stor.Open(fd)
            if err != nil {
                return err
            }

            // Create or reset journal reader instance.
            if jr == nil {
                jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
            } else {
                jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
            }

            // Flush memdb and remove obsolete journal file.
            if !ofd.Zero() {
                if mdb.Len() > 0 {
                    if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
                        fr.Close()
                        return err
                    }
                }

                rec.setJournalNum(fd.Num)
                rec.setSeqNum(db.seq)
                if err := db.s.commit(rec, false); err != nil {
                    fr.Close()
                    return err
                }
                rec.resetAddedTables()

                db.s.stor.Remove(ofd)
                ofd = storage.FileDesc{}
            }

            // Replay journal to memdb.
            mdb.Reset()
            for {
                r, err := jr.Next()
                if err != nil {
                    if err == io.EOF {
                        break
                    }

                    fr.Close()
                    return errors.SetFd(err, fd)
                }

                buf.Reset()
                if _, err := buf.ReadFrom(r); err != nil {
                    if err == io.ErrUnexpectedEOF {
                        // This is error returned due to corruption, with strict == false.
                        continue
                    }

                    fr.Close()
                    return errors.SetFd(err, fd)
                }
                batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
                if err != nil {
                    if !strict && errors.IsCorrupted(err) {
                        db.s.logf("journal error: %v (skipped)", err)
                        // We won't apply sequence number as it might be corrupted.
                        continue
                    }

                    fr.Close()
                    return errors.SetFd(err, fd)
                }

                // Save sequence number.
                db.seq = batchSeq + uint64(batchLen)

                // Flush it if large enough.
                if mdb.Size() >= writeBuffer {
                    if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
                        fr.Close()
                        return err
                    }

                    mdb.Reset()
                }
            }

            fr.Close()
            ofd = fd
        }

        // Flush the last memdb.
        if mdb.Len() > 0 {
            if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
                return err
            }
        }
    }

    // Create a new journal.
    if _, err := db.newMem(0); err != nil {
        return err
    }

    // Commit.
    rec.setJournalNum(db.journalFd.Num)
    rec.setSeqNum(db.seq)
    if err := db.s.commit(rec, false); err != nil {
        // Close journal on error.
        if db.journal != nil {
            db.journal.Close()
            db.journalWriter.Close()
        }
        return err
    }

    // Remove the last obsolete journal file.
    if !ofd.Zero() {
        db.s.stor.Remove(ofd)
    }

    return nil
}

func (db *DB) recoverJournalRO() error {
    // Get all journals and sort it by file number.
    rawFds, err := db.s.stor.List(storage.TypeJournal)
    if err != nil {
        return err
    }
    sortFds(rawFds)

    // Journals that will be recovered.
    var fds []storage.FileDesc
    for _, fd := range rawFds {
        if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
            fds = append(fds, fd)
        }
    }

    var (
        // Options.
        strict      = db.s.o.GetStrict(opt.StrictJournal)
        checksum    = db.s.o.GetStrict(opt.StrictJournalChecksum)
        writeBuffer = db.s.o.GetWriteBuffer()

        mdb = memdb.New(db.s.icmp, writeBuffer)
    )

    // Recover journals.
    if len(fds) > 0 {
        db.logf("journal@recovery RO·Mode F·%d", len(fds))

        var (
            jr       *journal.Reader
            buf      = &util.Buffer{}
            batchSeq uint64
            batchLen int
        )

        for _, fd := range fds {
            db.logf("journal@recovery recovering @%d", fd.Num)

            fr, err := db.s.stor.Open(fd)
            if err != nil {
                return err
            }

            // Create or reset journal reader instance.
            if jr == nil {
                jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
            } else {
                jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
            }

            // Replay journal to memdb.
            for {
                r, err := jr.Next()
                if err != nil {
                    if err == io.EOF {
                        break
                    }

                    fr.Close()
                    return errors.SetFd(err, fd)
                }

                buf.Reset()
                if _, err := buf.ReadFrom(r); err != nil {
                    if err == io.ErrUnexpectedEOF {
                        // This is error returned due to corruption, with strict == false.
                        continue
                    }

                    fr.Close()
                    return errors.SetFd(err, fd)
                }
                batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
                if err != nil {
                    if !strict && errors.IsCorrupted(err) {
                        db.s.logf("journal error: %v (skipped)", err)
                        // We won't apply sequence number as it might be corrupted.
                        continue
                    }

                    fr.Close()
                    return errors.SetFd(err, fd)
                }

                // Save sequence number.
                db.seq = batchSeq + uint64(batchLen)
            }

            fr.Close()
        }
    }

    // Set memDB.
    db.mem = &memDB{db: db, DB: mdb, ref: 1}

    return nil
}

func memGet(mdb *memdb.DB, ikey internalKey, icmp *iComparer) (ok bool, mv []byte, err error) {
    mk, mv, err := mdb.Find(ikey)
    if err == nil {
        ukey, _, kt, kerr := parseInternalKey(mk)
        if kerr != nil {
            // Shouldn't have had happen.
            panic(kerr)
        }
        if icmp.uCompare(ukey, ikey.ukey()) == 0 {
            if kt == keyTypeDel {
                return true, nil, ErrNotFound
            }
            return true, mv, nil

        }
    } else if err != ErrNotFound {
        return true, nil, err
    }
    return
}

func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
    ikey := makeInternalKey(nil, key, seq, keyTypeSeek)

    if auxm != nil {
        if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok {
            return append([]byte{}, mv...), me
        }
    }

    em, fm := db.getMems()
    for _, m := range [...]*memDB{em, fm} {
        if m == nil {
            continue
        }
        defer m.decref()

        if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok {
            return append([]byte{}, mv...), me
        }
    }

    v := db.s.version()
    value, cSched, err := v.get(auxt, ikey, ro, false)
    v.release()
    if cSched {
        // Trigger table compaction.
        db.compTrigger(db.tcompCmdC)
    }
    return
}

func nilIfNotFound(err error) error {
    if err == ErrNotFound {
        return nil
    }
    return err
}

func (db *DB) has(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err error) {
    ikey := makeInternalKey(nil, key, seq, keyTypeSeek)

    if auxm != nil {
        if ok, _, me := memGet(auxm, ikey, db.s.icmp); ok {
            return me == nil, nilIfNotFound(me)
        }
    }

    em, fm := db.getMems()
    for _, m := range [...]*memDB{em, fm} {
        if m == nil {
            continue
        }
        defer m.decref()

        if ok, _, me := memGet(m.DB, ikey, db.s.icmp); ok {
            return me == nil, nilIfNotFound(me)
        }
    }

    v := db.s.version()
    _, cSched, err := v.get(auxt, ikey, ro, true)
    v.release()
    if cSched {
        // Trigger table compaction.
        db.compTrigger(db.tcompCmdC)
    }
    if err == nil {
        ret = true
    } else if err == ErrNotFound {
        err = nil
    }
    return
}

// Get gets the value for the given key. It returns ErrNotFound if the
// DB does not contains the key.
//
// The returned slice is its own copy, it is safe to modify the contents
// of the returned slice.
// It is safe to modify the contents of the argument after Get returns.
func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
    err = db.ok()
    if err != nil {
        return
    }

    se := db.acquireSnapshot()
    defer db.releaseSnapshot(se)
    return db.get(nil, nil, key, se.seq, ro)
}

// Has returns true if the DB does contains the given key.
//
// It is safe to modify the contents of the argument after Has returns.
func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
    err = db.ok()
    if err != nil {
        return
    }

    se := db.acquireSnapshot()
    defer db.releaseSnapshot(se)
    return db.has(nil, nil, key, se.seq, ro)
}

// NewIterator returns an iterator for the latest snapshot of the
// underlying DB.
// The returned iterator is not safe for concurrent use, but it is safe to use
// multiple iterators concurrently, with each in a dedicated goroutine.
// It is also safe to use an iterator concurrently with modifying its
// underlying DB. The resultant key/value pairs are guaranteed to be
// consistent.
//
// Slice allows slicing the iterator to only contains keys in the given
// range. A nil Range.Start is treated as a key before all keys in the
// DB. And a nil Range.Limit is treated as a key after all keys in
// the DB.
//
// WARNING: Any slice returned by interator (e.g. slice returned by calling
// Iterator.Key() or Iterator.Key() methods), its content should not be modified
// unless noted otherwise.
//
// The iterator must be released after use, by calling Release method.
//
// Also read Iterator documentation of the leveldb/iterator package.
func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
    if err := db.ok(); err != nil {
        return iterator.NewEmptyIterator(err)
    }

    se := db.acquireSnapshot()
    defer db.releaseSnapshot(se)
    // Iterator holds 'version' lock, 'version' is immutable so snapshot
    // can be released after iterator created.
    return db.newIterator(nil, nil, se.seq, slice, ro)
}

// GetSnapshot returns a latest snapshot of the underlying DB. A snapshot
// is a frozen snapshot of a DB state at a particular point in time. The
// content of snapshot are guaranteed to be consistent.
//
// The snapshot must be released after use, by calling Release method.
func (db *DB) GetSnapshot() (*Snapshot, error) {
    if err := db.ok(); err != nil {
        return nil, err
    }

    return db.newSnapshot(), nil
}

// GetProperty returns value of the given property name.
//
// Property names:
//    leveldb.num-files-at-level{n}
//        Returns the number of files at level 'n'.
//    leveldb.stats
//        Returns statistics of the underlying DB.
//    leveldb.iostats
//        Returns statistics of effective disk read and write.
//    leveldb.writedelay
//        Returns cumulative write delay caused by compaction.
//    leveldb.sstables
//        Returns sstables list for each level.
//    leveldb.blockpool
//        Returns block pool stats.
//    leveldb.cachedblock
//        Returns size of cached block.
//    leveldb.openedtables
//        Returns number of opened tables.
//    leveldb.alivesnaps
//        Returns number of alive snapshots.
//    leveldb.aliveiters
//        Returns number of alive iterators.
func (db *DB) GetProperty(name string) (value string, err error) {
    err = db.ok()
    if err != nil {
        return
    }

    const prefix = "leveldb."
    if !strings.HasPrefix(name, prefix) {
        return "", ErrNotFound
    }
    p := name[len(prefix):]

    v := db.s.version()
    defer v.release()

    numFilesPrefix := "num-files-at-level"
    switch {
    case strings.HasPrefix(p, numFilesPrefix):
        var level uint
        var rest string
        n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest)
        if n != 1 {
            err = ErrNotFound
        } else {
            value = fmt.Sprint(v.tLen(int(level)))
        }
    case p == "stats":
        value = "Compactions\n" +
            " Level |   Tables   |    Size(MB)   |    Time(sec)  |    Read(MB)   |   Write(MB)\n" +
            "-------+------------+---------------+---------------+---------------+---------------\n"
        var totalTables int
        var totalSize, totalRead, totalWrite int64
        var totalDuration time.Duration
        for level, tables := range v.levels {
            duration, read, write := db.compStats.getStat(level)
            if len(tables) == 0 && duration == 0 {
                continue
            }
            totalTables += len(tables)
            totalSize += tables.size()
            totalRead += read
            totalWrite += write
            totalDuration += duration
            value += fmt.Sprintf(" %3d   | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
                level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(),
                float64(read)/1048576.0, float64(write)/1048576.0)
        }
        value += "-------+------------+---------------+---------------+---------------+---------------\n"
        value += fmt.Sprintf(" Total | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
            totalTables, float64(totalSize)/1048576.0, totalDuration.Seconds(),
            float64(totalRead)/1048576.0, float64(totalWrite)/1048576.0)
    case p == "compcount":
        value = fmt.Sprintf("MemComp:%d Level0Comp:%d NonLevel0Comp:%d SeekComp:%d", atomic.LoadUint32(&db.memComp), atomic.LoadUint32(&db.level0Comp), atomic.LoadUint32(&db.nonLevel0Comp), atomic.LoadUint32(&db.seekComp))
    case p == "iostats":
        value = fmt.Sprintf("Read(MB):%.5f Write(MB):%.5f",
            float64(db.s.stor.reads())/1048576.0,
            float64(db.s.stor.writes())/1048576.0)
    case p == "writedelay":
        writeDelayN, writeDelay := atomic.LoadInt32(&db.cWriteDelayN), time.Duration(atomic.LoadInt64(&db.cWriteDelay))
        paused := atomic.LoadInt32(&db.inWritePaused) == 1
        value = fmt.Sprintf("DelayN:%d Delay:%s Paused:%t", writeDelayN, writeDelay, paused)
    case p == "sstables":
        for level, tables := range v.levels {
            value += fmt.Sprintf("--- level %d ---\n", level)
            for _, t := range tables {
                value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.fd.Num, t.size, t.imin, t.imax)
            }
        }
    case p == "blockpool":
        value = fmt.Sprintf("%v", db.s.tops.bpool)
    case p == "cachedblock":
        if db.s.tops.bcache != nil {
            value = fmt.Sprintf("%d", db.s.tops.bcache.Size())
        } else {
            value = "<nil>"
        }
    case p == "openedtables":
        value = fmt.Sprintf("%d", db.s.tops.cache.Size())
    case p == "alivesnaps":
        value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps))
    case p == "aliveiters":
        value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters))
    default:
        err = ErrNotFound
    }

    return
}

// DBStats is database statistics.
type DBStats struct {
    WriteDelayCount    int32
    WriteDelayDuration time.Duration
    WritePaused        bool

    AliveSnapshots int32
    AliveIterators int32

    IOWrite uint64
    IORead  uint64

    BlockCacheSize    int
    OpenedTablesCount int

    LevelSizes        Sizes
    LevelTablesCounts []int
    LevelRead         Sizes
    LevelWrite        Sizes
    LevelDurations    []time.Duration

    MemCompactionActive   bool
    TableCompactionActive bool

    MemComp       uint32
    Level0Comp    uint32
    NonLevel0Comp uint32
    SeekComp      uint32
}

// Stats populates s with database statistics.
func (db *DB) Stats(s *DBStats) error {
    err := db.ok()
    if err != nil {
        return err
    }

    s.IORead = db.s.stor.reads()
    s.IOWrite = db.s.stor.writes()
    s.WriteDelayCount = atomic.LoadInt32(&db.cWriteDelayN)
    s.WriteDelayDuration = time.Duration(atomic.LoadInt64(&db.cWriteDelay))
    s.WritePaused = atomic.LoadInt32(&db.inWritePaused) == 1

    s.OpenedTablesCount = db.s.tops.cache.Size()
    if db.s.tops.bcache != nil {
        s.BlockCacheSize = db.s.tops.bcache.Size()
    } else {
        s.BlockCacheSize = 0
    }

    s.AliveIterators = atomic.LoadInt32(&db.aliveIters)
    s.AliveSnapshots = atomic.LoadInt32(&db.aliveSnaps)

    s.LevelDurations = s.LevelDurations[:0]
    s.LevelRead = s.LevelRead[:0]
    s.LevelWrite = s.LevelWrite[:0]
    s.LevelSizes = s.LevelSizes[:0]
    s.LevelTablesCounts = s.LevelTablesCounts[:0]

    func() {
        db.compActiveLk.RLock()
        defer db.compActiveLk.RUnlock()
        s.MemCompactionActive = db.memCompActive
        s.TableCompactionActive = db.tableCompActive
    }()

    v := db.s.version()
    defer v.release()

    for level, tables := range v.levels {
        duration, read, write := db.compStats.getStat(level)

        s.LevelDurations = append(s.LevelDurations, duration)
        s.LevelRead = append(s.LevelRead, read)
        s.LevelWrite = append(s.LevelWrite, write)
        s.LevelSizes = append(s.LevelSizes, tables.size())
        s.LevelTablesCounts = append(s.LevelTablesCounts, len(tables))
    }
    s.MemComp = atomic.LoadUint32(&db.memComp)
    s.Level0Comp = atomic.LoadUint32(&db.level0Comp)
    s.NonLevel0Comp = atomic.LoadUint32(&db.nonLevel0Comp)
    s.SeekComp = atomic.LoadUint32(&db.seekComp)
    return nil
}

// SizeOf calculates approximate sizes of the given key ranges.
// The length of the returned sizes are equal with the length of the given
// ranges. The returned sizes measure storage space usage, so if the user
// data compresses by a factor of ten, the returned sizes will be one-tenth
// the size of the corresponding user data size.
// The results may not include the sizes of recently written data.
func (db *DB) SizeOf(ranges []util.Range) (Sizes, error) {
    if err := db.ok(); err != nil {
        return nil, err
    }

    v := db.s.version()
    defer v.release()

    sizes := make(Sizes, 0, len(ranges))
    for _, r := range ranges {
        imin := makeInternalKey(nil, r.Start, keyMaxSeq, keyTypeSeek)
        imax := makeInternalKey(nil, r.Limit, keyMaxSeq, keyTypeSeek)
        start, err := v.offsetOf(imin)
        if err != nil {
            return nil, err
        }
        limit, err := v.offsetOf(imax)
        if err != nil {
            return nil, err
        }
        var size int64
        if limit >= start {
            size = limit - start
        }
        sizes = append(sizes, size)
    }

    return sizes, nil
}

// Close closes the DB. This will also releases any outstanding snapshot,
// abort any in-flight compaction and discard open transaction.
//
// It is not safe to close a DB until all outstanding iterators are released.
// It is valid to call Close multiple times. Other methods should not be
// called after the DB has been closed.
func (db *DB) Close() error {
    if !db.setClosed() {
        return ErrClosed
    }

    start := time.Now()
    db.log("db@close closing")

    // Clear the finalizer.
    runtime.SetFinalizer(db, nil)

    // Get compaction error.
    var err error
    select {
    case err = <-db.compErrC:
        if err == ErrReadOnly {
            err = nil
        }
    default:
    }

    // Signal all goroutines.
    close(db.closeC)

    // Discard open transaction.
    if db.tr != nil {
        db.tr.Discard()
    }

    // Acquire writer lock.
    db.writeLockC <- struct{}{}

    // Wait for all gorotines to exit.
    db.closeW.Wait()

    // Closes journal.
    if db.journal != nil {
        db.journal.Close()
        db.journalWriter.Close()
        db.journal = nil
        db.journalWriter = nil
    }

    if db.writeDelayN > 0 {
        db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
    }

    // Close session.
    db.s.close()
    db.logf("db@close done T·%v", time.Since(start))
    db.s.release()

    if db.closer != nil {
        if err1 := db.closer.Close(); err == nil {
            err = err1
        }
        db.closer = nil
    }

    // Clear memdbs.
    db.clearMems()

    return err
}