johnsonjh/jleveldb

View on GitHub
leveldb/cache/cache.go

Summary

Maintainability
D
1 day
Test Coverage
// Copyright © 2012, Suryandaru Triandana <syndtr@gmail.com>
// Copyright © 2021, Jeffrey H. Johnson <trnsz@pobox.com>
//
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

// Package cache provides interface and implementation of a cache algorithms.
package cache

import (
    "sync"
    "sync/atomic"
    "unsafe"

    "github.com/johnsonjh/jleveldb/leveldb/util"
)

// Cacher provides interface to implements a caching functionality.
// An implementation must be safe for concurrent use.
type Cacher interface {
    // Capacity returns cache capacity.
    Capacity() int

    // SetCapacity sets cache capacity.
    SetCapacity(capacity int)

    // Promote promotes the 'cache node'.
    Promote(n *Node)

    // Ban evicts the 'cache node' and prevent subsequent 'promote'.
    Ban(n *Node)

    // Evict evicts the 'cache node'.
    Evict(n *Node)

    // EvictNS evicts 'cache node' with the given namespace.
    EvictNS(ns uint64)

    // EvictAll evicts all 'cache node'.
    EvictAll()

    // Close closes the 'cache tree'
    Close() error
}

// Value is a 'cacheable object'. It may implements util.Releaser, if
// so the the Release method will be called once object is released.
type Value interface{}

// NamespaceGetter provides convenient wrapper for namespace.
type NamespaceGetter struct {
    Cache *Cache
    NS    uint64
}

// Get simply calls Cache.Get() method.
func (g *NamespaceGetter) Get(key uint64, setFunc func() (size int, value Value)) *Handle {
    return g.Cache.Get(g.NS, key, setFunc)
}

// The hash tables implementation is based on:
// "Dynamic-Sized Nonblocking Hash Tables", by Yujie Liu,
// Kunlong Zhang, and Michael Spear.
// ACM Symposium on Principles of Distributed Computing, Jul 2014.

const (
    mInitialSize           = 1 << 4
    mOverflowThreshold     = 1 << 5
    mOverflowGrowThreshold = 1 << 7
)

type mBucket struct {
    mu     sync.Mutex
    node   []*Node
    frozen bool
}

func (b *mBucket) freeze() []*Node {
    b.mu.Lock()
    defer b.mu.Unlock()
    if !b.frozen {
        b.frozen = true
    }
    return b.node
}

func (b *mBucket) get(r *Cache, h *mNode, hash uint32, ns, key uint64, noset bool) (done, added bool, n *Node) {
    b.mu.Lock()

    if b.frozen {
        b.mu.Unlock()
        return
    }

    // Scan the node.
    for _, n := range b.node {
        if n.hash == hash && n.ns == ns && n.key == key {
            atomic.AddInt32(&n.ref, 1)
            b.mu.Unlock()
            return true, false, n
        }
    }

    // Get only.
    if noset {
        b.mu.Unlock()
        return true, false, nil
    }

    // Create node.
    n = &Node{
        r:    r,
        hash: hash,
        ns:   ns,
        key:  key,
        ref:  1,
    }
    // Add node to bucket.
    b.node = append(b.node, n)
    bLen := len(b.node)
    b.mu.Unlock()

    // Update counter.
    grow := atomic.AddInt32(&r.nodes, 1) >= h.growThreshold
    if bLen > mOverflowThreshold {
        grow = grow || atomic.AddInt32(&h.overflow, 1) >= mOverflowGrowThreshold
    }

    // Grow.
    if grow && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) {
        nhLen := len(h.buckets) << 1
        nh := &mNode{
            buckets:         make([]unsafe.Pointer, nhLen),
            mask:            uint32(nhLen) - 1,
            pred:            unsafe.Pointer(h),
            growThreshold:   int32(nhLen * mOverflowThreshold),
            shrinkThreshold: int32(nhLen >> 1),
        }
        ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
        if !ok {
            panic("BUG: failed swapping head")
        }
        go nh.initBuckets()
    }

    return true, true, n
}

func (b *mBucket) delete(r *Cache, h *mNode, hash uint32, ns, key uint64) (done, deleted bool) {
    b.mu.Lock()

    if b.frozen {
        b.mu.Unlock()
        return
    }

    // Scan the node.
    var (
        n    *Node
        bLen int
    )
    for i := range b.node {
        n = b.node[i]
        if n.ns == ns && n.key == key {
            if atomic.LoadInt32(&n.ref) == 0 {
                deleted = true

                // Call releaser.
                if n.value != nil {
                    if r, ok := n.value.(util.Releaser); ok {
                        r.Release()
                    }
                    n.value = nil
                }

                // Remove node from bucket.
                b.node = append(b.node[:i], b.node[i+1:]...)
                bLen = len(b.node)
            }
            break
        }
    }
    b.mu.Unlock()

    if deleted {
        // Call OnDel.
        for _, f := range n.onDel {
            f()
        }

        // Update counter.
        atomic.AddInt32(&r.size, int32(n.size)*-1)
        shrink := atomic.AddInt32(&r.nodes, -1) < h.shrinkThreshold
        if bLen >= mOverflowThreshold {
            atomic.AddInt32(&h.overflow, -1)
        }

        // Shrink.
        if shrink && len(h.buckets) > mInitialSize && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) {
            nhLen := len(h.buckets) >> 1
            nh := &mNode{
                buckets:         make([]unsafe.Pointer, nhLen),
                mask:            uint32(nhLen) - 1,
                pred:            unsafe.Pointer(h),
                growThreshold:   int32(nhLen * mOverflowThreshold),
                shrinkThreshold: int32(nhLen >> 1),
            }
            ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
            if !ok {
                panic("BUG: failed swapping head")
            }
            go nh.initBuckets()
        }
    }

    return true, deleted
}

type mNode struct {
    buckets         []unsafe.Pointer // []*mBucket
    mask            uint32
    pred            unsafe.Pointer // *mNode
    resizeInProgess int32

    overflow        int32
    growThreshold   int32
    shrinkThreshold int32
}

func (n *mNode) initBucket(i uint32) *mBucket {
    if b := (*mBucket)(atomic.LoadPointer(&n.buckets[i])); b != nil {
        return b
    }

    p := (*mNode)(atomic.LoadPointer(&n.pred))
    if p != nil {
        var node []*Node
        if n.mask > p.mask {
            // Grow.
            pb := (*mBucket)(atomic.LoadPointer(&p.buckets[i&p.mask]))
            if pb == nil {
                pb = p.initBucket(i & p.mask)
            }
            m := pb.freeze()
            // Split nodes.
            for _, x := range m {
                if x.hash&n.mask == i {
                    node = append(node, x)
                }
            }
        } else {
            // Shrink.
            pb0 := (*mBucket)(atomic.LoadPointer(&p.buckets[i]))
            if pb0 == nil {
                pb0 = p.initBucket(i)
            }
            pb1 := (*mBucket)(atomic.LoadPointer(&p.buckets[i+uint32(len(n.buckets))]))
            if pb1 == nil {
                pb1 = p.initBucket(i + uint32(len(n.buckets)))
            }
            m0 := pb0.freeze()
            m1 := pb1.freeze()
            // Merge nodes.
            node = make([]*Node, 0, len(m0)+len(m1))
            node = append(node, m0...)
            node = append(node, m1...)
        }
        b := &mBucket{node: node}
        if atomic.CompareAndSwapPointer(&n.buckets[i], nil, unsafe.Pointer(b)) {
            if len(node) > mOverflowThreshold {
                atomic.AddInt32(&n.overflow, int32(len(node)-mOverflowThreshold))
            }
            return b
        }
    }

    return (*mBucket)(atomic.LoadPointer(&n.buckets[i]))
}

func (n *mNode) initBuckets() {
    for i := range n.buckets {
        n.initBucket(uint32(i))
    }
    atomic.StorePointer(&n.pred, nil)
}

// Cache is a 'cache map'.
type Cache struct {
    mu     sync.RWMutex
    mHead  unsafe.Pointer // *mNode
    nodes  int32
    size   int32
    cacher Cacher
    closed bool
}

// NewCache creates a new 'cache map'. The cacher is optional and
// may be nil.
func NewCache(cacher Cacher) *Cache {
    h := &mNode{
        buckets:         make([]unsafe.Pointer, mInitialSize),
        mask:            mInitialSize - 1,
        growThreshold:   int32(mInitialSize * mOverflowThreshold),
        shrinkThreshold: 0,
    }
    for i := range h.buckets {
        h.buckets[i] = unsafe.Pointer(&mBucket{})
    }
    r := &Cache{
        mHead:  unsafe.Pointer(h),
        cacher: cacher,
    }
    return r
}

func (r *Cache) getBucket(hash uint32) (*mNode, *mBucket) {
    h := (*mNode)(atomic.LoadPointer(&r.mHead))
    i := hash & h.mask
    b := (*mBucket)(atomic.LoadPointer(&h.buckets[i]))
    if b == nil {
        b = h.initBucket(i)
    }
    return h, b
}

func (r *Cache) delete(n *Node) bool {
    for {
        h, b := r.getBucket(n.hash)
        done, deleted := b.delete(r, h, n.hash, n.ns, n.key)
        if done {
            return deleted
        }
    }
}

// Nodes returns number of 'cache node' in the map.
func (r *Cache) Nodes() int {
    return int(atomic.LoadInt32(&r.nodes))
}

// Size returns sums of 'cache node' size in the map.
func (r *Cache) Size() int {
    return int(atomic.LoadInt32(&r.size))
}

// Capacity returns cache capacity.
func (r *Cache) Capacity() int {
    if r.cacher == nil {
        return 0
    }
    return r.cacher.Capacity()
}

// SetCapacity sets cache capacity.
func (r *Cache) SetCapacity(capacity int) {
    if r.cacher != nil {
        r.cacher.SetCapacity(capacity)
    }
}

// Get gets 'cache node' with the given namespace and key.
// If cache node is not found and setFunc is not nil, Get will atomically creates
// the 'cache node' by calling setFunc. Otherwise Get will returns nil.
//
// The returned 'cache handle' should be released after use by calling Release
// method.
func (r *Cache) Get(ns, key uint64, setFunc func() (size int, value Value)) *Handle {
    r.mu.RLock()
    defer r.mu.RUnlock()
    if r.closed {
        return nil
    }

    hash := murmur32(ns, key, 0xf00)
    for {
        h, b := r.getBucket(hash)
        done, _, n := b.get(r, h, hash, ns, key, setFunc == nil)
        if done {
            if n != nil {
                n.mu.Lock()
                if n.value == nil {
                    if setFunc == nil {
                        n.mu.Unlock()
                        n.unref()
                        return nil
                    }

                    n.size, n.value = setFunc()
                    if n.value == nil {
                        n.size = 0
                        n.mu.Unlock()
                        n.unref()
                        return nil
                    }
                    atomic.AddInt32(&r.size, int32(n.size))
                }
                n.mu.Unlock()
                if r.cacher != nil {
                    r.cacher.Promote(n)
                }
                return &Handle{unsafe.Pointer(n)}
            }

            break
        }
    }
    return nil
}

// Delete removes and ban 'cache node' with the given namespace and key.
// A banned 'cache node' will never inserted into the 'cache tree'. Ban
// only attributed to the particular 'cache node', so when a 'cache node'
// is recreated it will not be banned.
//
// If onDel is not nil, then it will be executed if such 'cache node'
// doesn't exist or once the 'cache node' is released.
//
// Delete return true is such 'cache node' exist.
func (r *Cache) Delete(ns, key uint64, onDel func()) bool {
    r.mu.RLock()
    defer r.mu.RUnlock()
    if r.closed {
        return false
    }

    hash := murmur32(ns, key, 0xf00)
    for {
        h, b := r.getBucket(hash)
        done, _, n := b.get(r, h, hash, ns, key, true)
        if done {
            if n != nil {
                if onDel != nil {
                    n.mu.Lock()
                    n.onDel = append(n.onDel, onDel)
                    n.mu.Unlock()
                }
                if r.cacher != nil {
                    r.cacher.Ban(n)
                }
                n.unref()
                return true
            }

            break
        }
    }

    if onDel != nil {
        onDel()
    }

    return false
}

// Evict evicts 'cache node' with the given namespace and key. This will
// simply call Cacher.Evict.
//
// Evict return true is such 'cache node' exist.
func (r *Cache) Evict(ns, key uint64) bool {
    r.mu.RLock()
    defer r.mu.RUnlock()
    if r.closed {
        return false
    }

    hash := murmur32(ns, key, 0xf00)
    for {
        h, b := r.getBucket(hash)
        done, _, n := b.get(r, h, hash, ns, key, true)
        if done {
            if n != nil {
                if r.cacher != nil {
                    r.cacher.Evict(n)
                }
                n.unref()
                return true
            }

            break
        }
    }

    return false
}

// EvictNS evicts 'cache node' with the given namespace. This will
// simply call Cacher.EvictNS.
func (r *Cache) EvictNS(ns uint64) {
    r.mu.RLock()
    defer r.mu.RUnlock()
    if r.closed {
        return
    }

    if r.cacher != nil {
        r.cacher.EvictNS(ns)
    }
}

// EvictAll evicts all 'cache node'. This will simply call Cacher.EvictAll.
func (r *Cache) EvictAll() {
    r.mu.RLock()
    defer r.mu.RUnlock()
    if r.closed {
        return
    }

    if r.cacher != nil {
        r.cacher.EvictAll()
    }
}

// Close closes the 'cache map' and forcefully releases all 'cache node'.
func (r *Cache) Close() error {
    r.mu.Lock()
    if !r.closed {
        r.closed = true

        h := (*mNode)(r.mHead)
        h.initBuckets()

        for i := range h.buckets {
            b := (*mBucket)(h.buckets[i])
            for _, n := range b.node {
                // Call releaser.
                if n.value != nil {
                    if r, ok := n.value.(util.Releaser); ok {
                        r.Release()
                    }
                    n.value = nil
                }

                // Call OnDel.
                for _, f := range n.onDel {
                    f()
                }
                n.onDel = nil
            }
        }
    }
    r.mu.Unlock()

    // Avoid deadlock.
    if r.cacher != nil {
        if err := r.cacher.Close(); err != nil {
            return err
        }
    }
    return nil
}

// CloseWeak closes the 'cache map' and evict all 'cache node' from cacher, but
// unlike Close it doesn't forcefully releases 'cache node'.
func (r *Cache) CloseWeak() error {
    r.mu.Lock()
    if !r.closed {
        r.closed = true
    }
    r.mu.Unlock()

    // Avoid deadlock.
    if r.cacher != nil {
        r.cacher.EvictAll()
        if err := r.cacher.Close(); err != nil {
            return err
        }
    }
    return nil
}

// Node is a 'cache node'.
type Node struct {
    r *Cache

    hash    uint32
    ns, key uint64

    mu    sync.Mutex
    size  int
    value Value

    ref   int32
    onDel []func()

    CacheData unsafe.Pointer
}

// NS returns this 'cache node' namespace.
func (n *Node) NS() uint64 {
    return n.ns
}

// Key returns this 'cache node' key.
func (n *Node) Key() uint64 {
    return n.key
}

// Size returns this 'cache node' size.
func (n *Node) Size() int {
    return n.size
}

// Value returns this 'cache node' value.
func (n *Node) Value() Value {
    return n.value
}

// Ref returns this 'cache node' ref counter.
func (n *Node) Ref() int32 {
    return atomic.LoadInt32(&n.ref)
}

// GetHandle returns an handle for this 'cache node'.
func (n *Node) GetHandle() *Handle {
    if atomic.AddInt32(&n.ref, 1) <= 1 {
        panic("BUG: Node.GetHandle on zero ref")
    }
    return &Handle{unsafe.Pointer(n)}
}

func (n *Node) unref() {
    if atomic.AddInt32(&n.ref, -1) == 0 {
        n.r.delete(n)
    }
}

func (n *Node) unrefLocked() {
    if atomic.AddInt32(&n.ref, -1) == 0 {
        n.r.mu.RLock()
        if !n.r.closed {
            n.r.delete(n)
        }
        n.r.mu.RUnlock()
    }
}

// Handle is a 'cache handle' of a 'cache node'.
type Handle struct {
    n unsafe.Pointer // *Node
}

// Value returns the value of the 'cache node'.
func (h *Handle) Value() Value {
    n := (*Node)(atomic.LoadPointer(&h.n))
    if n != nil {
        return n.value
    }
    return nil
}

// Release releases this 'cache handle'.
// It is safe to call release multiple times.
func (h *Handle) Release() {
    nPtr := atomic.LoadPointer(&h.n)
    if nPtr != nil && atomic.CompareAndSwapPointer(&h.n, nPtr, nil) {
        n := (*Node)(nPtr)
        n.unrefLocked()
    }
}

func murmur32(ns, key uint64, seed uint32) uint32 {
    const (
        m = uint32(0x5bd1e995)
        r = 24
    )

    k1 := uint32(ns >> 32)
    k2 := uint32(ns)
    k3 := uint32(key >> 32)
    k4 := uint32(key)

    k1 *= m
    k1 ^= k1 >> r
    k1 *= m

    k2 *= m
    k2 ^= k2 >> r
    k2 *= m

    k3 *= m
    k3 ^= k3 >> r
    k3 *= m

    k4 *= m
    k4 ^= k4 >> r
    k4 *= m

    h := seed

    h *= m
    h ^= k1
    h *= m
    h ^= k2
    h *= m
    h ^= k3
    h *= m
    h ^= k4

    h ^= h >> 13
    h *= m
    h ^= h >> 15

    return h
}