status-im/status-go

View on GitHub
mailserver/mailserver_db_leveldb.go

Summary

Maintainability
A
0 mins
Test Coverage
C
73%
package mailserver

import (
    "fmt"
    "time"

    "github.com/syndtr/goleveldb/leveldb"
    "github.com/syndtr/goleveldb/leveldb/errors"
    "github.com/syndtr/goleveldb/leveldb/iterator"
    "github.com/syndtr/goleveldb/leveldb/util"

    "github.com/ethereum/go-ethereum/log"
    "github.com/ethereum/go-ethereum/rlp"

    "github.com/status-im/status-go/common"
    "github.com/status-im/status-go/eth-node/types"
    waku "github.com/status-im/status-go/waku/common"
)

type LevelDB struct {
    // We can't embed as there are some state problems with go-routines
    ldb  *leveldb.DB
    name string
    done chan struct{}
}

type LevelDBIterator struct {
    iterator.Iterator
}

func (i *LevelDBIterator) DBKey() (*DBKey, error) {
    return &DBKey{
        raw: i.Key(),
    }, nil
}

func (i *LevelDBIterator) GetEnvelopeByTopicsMap(topics map[types.TopicType]bool) ([]byte, error) {
    rawValue := make([]byte, len(i.Value()))
    copy(rawValue, i.Value())

    key, err := i.DBKey()
    if err != nil {
        return nil, err
    }

    if !topics[key.Topic()] {
        return nil, nil
    }

    return rawValue, nil
}

func (i *LevelDBIterator) GetEnvelopeByBloomFilter(bloom []byte) ([]byte, error) {
    var envelopeBloom []byte
    rawValue := make([]byte, len(i.Value()))
    copy(rawValue, i.Value())

    key, err := i.DBKey()
    if err != nil {
        return nil, err
    }

    if len(key.Bytes()) != DBKeyLength {
        var err error
        envelopeBloom, err = extractBloomFromEncodedEnvelope(rawValue)
        if err != nil {
            return nil, err
        }
    } else {
        envelopeBloom = types.TopicToBloom(key.Topic())
    }
    if !types.BloomFilterMatch(bloom, envelopeBloom) {
        return nil, nil
    }
    return rawValue, nil
}

func (i *LevelDBIterator) Release() error {
    i.Iterator.Release()
    return nil
}

func NewLevelDB(dataDir string) (*LevelDB, error) {
    // Open opens an existing leveldb database
    db, err := leveldb.OpenFile(dataDir, nil)
    if _, corrupted := err.(*errors.ErrCorrupted); corrupted {
        log.Info("database is corrupted trying to recover", "path", dataDir)
        db, err = leveldb.RecoverFile(dataDir, nil)
    }

    instance := LevelDB{
        ldb:  db,
        name: dataDir, // name is used for metrics labels
        done: make(chan struct{}),
    }

    // initialize the metric value
    instance.updateArchivedEnvelopesCount()
    // checking count on every insert is inefficient
    go func() {
        defer common.LogOnPanic()
        for {
            select {
            case <-instance.done:
                return
            case <-time.After(time.Second * envelopeCountCheckInterval):
                instance.updateArchivedEnvelopesCount()
            }
        }
    }()
    return &instance, err
}

// GetEnvelope get an envelope by its key
func (db *LevelDB) GetEnvelope(key *DBKey) ([]byte, error) {
    defer recoverLevelDBPanics("GetEnvelope")
    return db.ldb.Get(key.Bytes(), nil)
}

func (db *LevelDB) updateArchivedEnvelopesCount() {
    if count, err := db.envelopesCount(); err != nil {
        log.Warn("db query for envelopes count failed", "err", err)
    } else {
        archivedEnvelopesGauge.WithLabelValues(db.name).Set(float64(count))
    }
}

// Build iterator returns an iterator given a start/end and a cursor
func (db *LevelDB) BuildIterator(query CursorQuery) (Iterator, error) {
    defer recoverLevelDBPanics("BuildIterator")

    i := db.ldb.NewIterator(&util.Range{Start: query.start, Limit: query.end}, nil)

    envelopeQueriesCounter.WithLabelValues("unknown", "unknown").Inc()
    // seek to the end as we want to return envelopes in a descending order
    if len(query.cursor) == CursorLength {
        i.Seek(query.cursor)
    }
    return &LevelDBIterator{i}, nil
}

// Prune removes envelopes older than time
func (db *LevelDB) Prune(t time.Time, batchSize int) (int, error) {
    defer recoverLevelDBPanics("Prune")

    var zero types.Hash
    var emptyTopic types.TopicType
    kl := NewDBKey(0, emptyTopic, zero)
    ku := NewDBKey(uint32(t.Unix()), emptyTopic, zero)
    query := CursorQuery{
        start: kl.Bytes(),
        end:   ku.Bytes(),
    }
    i, err := db.BuildIterator(query)
    if err != nil {
        return 0, err
    }
    defer func() { _ = i.Release() }()

    batch := leveldb.Batch{}
    removed := 0

    for i.Next() {
        dbKey, err := i.DBKey()
        if err != nil {
            return 0, err
        }

        batch.Delete(dbKey.Bytes())

        if batch.Len() == batchSize {
            if err := db.ldb.Write(&batch, nil); err != nil {
                return removed, err
            }

            removed = removed + batch.Len()
            batch.Reset()
        }
    }

    if batch.Len() > 0 {
        if err := db.ldb.Write(&batch, nil); err != nil {
            return removed, err
        }

        removed = removed + batch.Len()
    }

    return removed, nil
}

func (db *LevelDB) envelopesCount() (int, error) {
    defer recoverLevelDBPanics("envelopesCount")
    iterator, err := db.BuildIterator(CursorQuery{})
    if err != nil {
        return 0, err
    }
    // LevelDB does not have API for getting a count
    var count int
    for iterator.Next() {
        count++
    }
    return count, nil
}

// SaveEnvelope stores an envelope in leveldb and increments the metrics
func (db *LevelDB) SaveEnvelope(env types.Envelope) error {
    defer recoverLevelDBPanics("SaveEnvelope")

    key := NewDBKey(env.Expiry()-env.TTL(), env.Topic(), env.Hash())
    rawEnvelope, err := rlp.EncodeToBytes(env.Unwrap())
    if err != nil {
        log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err))
        archivedErrorsCounter.WithLabelValues(db.name).Inc()
        return err
    }

    if err = db.ldb.Put(key.Bytes(), rawEnvelope, nil); err != nil {
        log.Error(fmt.Sprintf("Writing to DB failed: %s", err))
        archivedErrorsCounter.WithLabelValues(db.name).Inc()
    }
    archivedEnvelopesGauge.WithLabelValues(db.name).Inc()
    archivedEnvelopeSizeMeter.WithLabelValues(db.name).Observe(
        float64(waku.EnvelopeHeaderLength + env.Size()))
    return err
}

func (db *LevelDB) Close() error {
    select {
    case <-db.done:
    default:
        close(db.done)
    }
    return db.ldb.Close()
}

func recoverLevelDBPanics(calleMethodName string) {
    // Recover from possible goleveldb panics
    if r := recover(); r != nil {
        if errString, ok := r.(string); ok {
            log.Error(fmt.Sprintf("recovered from panic in %s: %s", calleMethodName, errString))
        }
    }
}