johnsonjh/jleveldb

View on GitHub
leveldb/db_state.go

Summary

Maintainability
A
0 mins
Test Coverage
// Copyright © 2013, Suryandaru Triandana <syndtr@gmail.com>
// Copyright © 2021, Jeffrey H. Johnson <trnsz@pobox.com>
//
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

package leveldb

import (
    "errors"
    "sync/atomic"
    "time"

    "github.com/johnsonjh/jleveldb/leveldb/journal"
    "github.com/johnsonjh/jleveldb/leveldb/memdb"
    "github.com/johnsonjh/jleveldb/leveldb/storage"
)

var errHasFrozenMem = errors.New("has frozen mem")

type memDB struct {
    db *DB
    *memdb.DB
    ref int32
}

func (m *memDB) getref() int32 {
    return atomic.LoadInt32(&m.ref)
}

func (m *memDB) incref() {
    atomic.AddInt32(&m.ref, 1)
}

func (m *memDB) decref() {
    if ref := atomic.AddInt32(&m.ref, -1); ref == 0 {
        // Only put back memdb with std capacity.
        if m.Capacity() == m.db.s.o.GetWriteBuffer() {
            m.Reset()
            m.db.mpoolPut(m.DB)
        }
        m.db = nil
        m.DB = nil
    } else if ref < 0 {
        panic("negative memdb ref")
    }
}

// Get latest sequence number.
func (db *DB) getSeq() uint64 {
    return atomic.LoadUint64(&db.seq)
}

// Atomically adds delta to seq.
func (db *DB) addSeq(delta uint64) {
    atomic.AddUint64(&db.seq, delta)
}

func (db *DB) setSeq(seq uint64) {
    atomic.StoreUint64(&db.seq, seq)
}

func (db *DB) sampleSeek(ikey internalKey) {
    v := db.s.version()
    if v.sampleSeek(ikey) {
        // Trigger table compaction.
        db.compTrigger(db.tcompCmdC)
    }
    v.release()
}

func (db *DB) mpoolPut(mem *memdb.DB) {
    if !db.isClosed() {
        select {
        case db.memPool <- mem:
        default:
        }
    }
}

func (db *DB) mpoolGet(n int) *memDB {
    var mdb *memdb.DB
    select {
    case mdb = <-db.memPool:
    default:
    }
    if mdb == nil || mdb.Capacity() < n {
        mdb = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n))
    }
    return &memDB{
        db: db,
        DB: mdb,
    }
}

func (db *DB) mpoolDrain() {
    ticker := time.NewTicker(30 * time.Second)
    for {
        select {
        case <-ticker.C:
            select {
            case <-db.memPool:
            default:
            }
        case <-db.closeC:
            ticker.Stop()
            // Make sure the pool is drained.
            select {
            case <-db.memPool:
            case <-time.After(time.Second):
            }
            close(db.memPool)
            return
        }
    }
}

// Create new memdb and froze the old one; need external synchronization.
// newMem only called synchronously by the writer.
func (db *DB) newMem(n int) (mem *memDB, err error) {
    fd := storage.FileDesc{Type: storage.TypeJournal, Num: db.s.allocFileNum()}
    w, err := db.s.stor.Create(fd)
    if err != nil {
        db.s.reuseFileNum(fd.Num)
        return
    }

    db.memMu.Lock()
    defer db.memMu.Unlock()

    if db.frozenMem != nil {
        return nil, errHasFrozenMem
    }

    if db.journal == nil {
        db.journal = journal.NewWriter(w)
    } else {
        db.journal.Reset(w)
        db.journalWriter.Close()
        db.frozenJournalFd = db.journalFd
    }
    db.journalWriter = w
    db.journalFd = fd
    db.frozenMem = db.mem
    mem = db.mpoolGet(n)
    mem.incref() // for self
    mem.incref() // for caller
    db.mem = mem
    // The seq only incremented by the writer. And whoever called newMem
    // should hold write lock, so no need additional synchronization here.
    db.frozenSeq = db.seq
    return
}

// Get all memdbs.
func (db *DB) getMems() (e, f *memDB) {
    db.memMu.RLock()
    defer db.memMu.RUnlock()
    if db.mem != nil {
        db.mem.incref()
    } else if !db.isClosed() {
        panic("nil effective mem")
    }
    if db.frozenMem != nil {
        db.frozenMem.incref()
    }
    return db.mem, db.frozenMem
}

// Get effective memdb.
func (db *DB) getEffectiveMem() *memDB {
    db.memMu.RLock()
    defer db.memMu.RUnlock()
    if db.mem != nil {
        db.mem.incref()
    } else if !db.isClosed() {
        panic("nil effective mem")
    }
    return db.mem
}

// Get frozen memdb.
func (db *DB) getFrozenMem() *memDB {
    db.memMu.RLock()
    defer db.memMu.RUnlock()
    if db.frozenMem != nil {
        db.frozenMem.incref()
    }
    return db.frozenMem
}

// Drop frozen memdb; assume that frozen memdb isn't nil.
func (db *DB) dropFrozenMem() {
    db.memMu.Lock()
    if err := db.s.stor.Remove(db.frozenJournalFd); err != nil {
        db.logf("journal@remove removing @%d %q", db.frozenJournalFd.Num, err)
    } else {
        db.logf("journal@remove removed @%d", db.frozenJournalFd.Num)
    }
    db.frozenJournalFd = storage.FileDesc{}
    db.frozenMem.decref()
    db.frozenMem = nil
    db.memMu.Unlock()
}

// Clear mems ptr; used by DB.Close().
func (db *DB) clearMems() {
    db.memMu.Lock()
    db.mem = nil
    db.frozenMem = nil
    db.memMu.Unlock()
}

// Set closed flag; return true if not already closed.
func (db *DB) setClosed() bool {
    return atomic.CompareAndSwapUint32(&db.closed, 0, 1)
}

// Check whether DB was closed.
func (db *DB) isClosed() bool {
    return atomic.LoadUint32(&db.closed) != 0
}

// Check read ok status.
func (db *DB) ok() error {
    if db.isClosed() {
        return ErrClosed
    }
    return nil
}