johnsonjh/jleveldb

View on GitHub
leveldb/util/buffer_pool.go

Summary

Maintainability
C
1 day
Test Coverage
// Copyright © 2014, Suryandaru Triandana <syndtr@gmail.com>
// Copyright © 2021, Jeffrey H. Johnson <trnsz@pobox.com>
//
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

package util

import (
    "fmt"
    "runtime"
    "sync"
    "sync/atomic"
    "time"
)

// BufferPool is a 'buffer pool'.
type BufferPool struct {
    pool      []chan []byte
    size      [5]uint32
    sizeMiss  [5]uint32
    sizeHalf  [5]uint32
    baseline  [4]int
    baseline0 int

    get     uint32
    put     uint32
    half    uint32
    less    uint32
    equal   uint32
    greater uint32
    miss    uint32
    mu      sync.RWMutex
}

func (p *BufferPool) poolNum(n int) int {
    if n <= p.baseline0 && n > p.baseline0/2 {
        return 0
    }
    for i, x := range p.baseline {
        if n <= x {
            return i + 1
        }
    }
    return len(p.baseline) + 1
}

// Get returns buffer with length of n.
func (p *BufferPool) Get(n int) []byte {
    if p == nil {
        return make([]byte, n)
    }

    atomic.AddUint32(&p.get, 1)

    poolNum := p.poolNum(n)
    pool := p.pool[poolNum]
    if poolNum == 0 {
        // Fast path.
        select {
        case b := <-pool:
            switch {
            case cap(b) > n:
                if cap(b)-n >= n {
                    atomic.AddUint32(&p.half, 1)
                    select {
                    case pool <- b:
                    default:
                    }
                    return make([]byte, n)
                } else {
                    atomic.AddUint32(&p.less, 1)
                    return b[:n]
                }
            case cap(b) == n:
                atomic.AddUint32(&p.equal, 1)
                return b[:n]
            default:
                atomic.AddUint32(&p.greater, 1)
            }
        default:
            atomic.AddUint32(&p.miss, 1)
        }

        return make([]byte, n, p.baseline0)
    } else {
        sizePtr := &p.size[poolNum-1]

        select {
        case b := <-pool:
            switch {
            case cap(b) > n:
                if cap(b)-n >= n {
                    atomic.AddUint32(&p.half, 1)
                    sizeHalfPtr := &p.sizeHalf[poolNum-1]
                    if atomic.AddUint32(sizeHalfPtr, 1) == 20 {
                        atomic.StoreUint32(sizePtr, uint32(cap(b)/2))
                        atomic.StoreUint32(sizeHalfPtr, 0)
                    } else {
                        select {
                        case pool <- b:
                        default:
                        }
                    }
                    return make([]byte, n)
                } else {
                    atomic.AddUint32(&p.less, 1)
                    return b[:n]
                }
            case cap(b) == n:
                atomic.AddUint32(&p.equal, 1)
                return b[:n]
            default:
                atomic.AddUint32(&p.greater, 1)
                if uint32(cap(b)) >= atomic.LoadUint32(sizePtr) {
                    select {
                    case pool <- b:
                    default:
                    }
                }
            }
        default:
            atomic.AddUint32(&p.miss, 1)
        }

        if size := atomic.LoadUint32(sizePtr); uint32(n) > size {
            if size == 0 {
                atomic.CompareAndSwapUint32(sizePtr, 0, uint32(n))
            } else {
                sizeMissPtr := &p.sizeMiss[poolNum-1]
                if atomic.AddUint32(sizeMissPtr, 1) == 20 {
                    atomic.StoreUint32(sizePtr, uint32(n))
                    atomic.StoreUint32(sizeMissPtr, 0)
                }
            }
            return make([]byte, n)
        } else {
            return make([]byte, n, size)
        }
    }
}

// Put adds given buffer to the pool.
func (p *BufferPool) Put(b []byte) {
    if p == nil {
        return
    }

    atomic.AddUint32(&p.put, 1)

    pool := p.pool[p.poolNum(cap(b))]
    select {
    case pool <- b:
    default:
    }
}

func (p *BufferPool) String() string {
    if p == nil {
        return "<nil>"
    }

    p.mu.Lock()
    defer p.mu.Unlock()

    return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v Zh·%v G·%d P·%d H·%d <·%d =·%d >·%d M·%d}",
        p.baseline0, p.size, p.sizeMiss, p.sizeHalf, p.get, p.put, p.half, p.less, p.equal, p.greater, p.miss)
}

func drain(pool []chan []byte, closeC <-chan struct{}) {
    ticker := time.NewTicker(2 * time.Second)
    defer ticker.Stop()
    for {
        select {
        case <-ticker.C:
            for _, ch := range pool {
                select {
                case <-ch:
                default:
                }
            }
        case <-closeC:
            for _, ch := range pool {
                close(ch)
            }
            return
        }
    }
}

// NewBufferPool creates a new initialized 'buffer pool'.
func NewBufferPool(baseline int) *BufferPool {
    if baseline <= 0 {
        panic("baseline can't be <= 0")
    }
    p := &BufferPool{
        pool:      make([]chan []byte, 6),
        baseline0: baseline,
        baseline:  [...]int{baseline / 4, baseline / 2, baseline * 2, baseline * 4},
    }
    closeC := make(chan struct{}, 1)
    for i, cap := range []int{2, 2, 4, 4, 2, 1} {
        p.pool[i] = make(chan []byte, cap)
    }
    runtime.SetFinalizer(p, func(*BufferPool) { close(closeC) })
    go drain(p.pool, closeC)
    return p
}