xmidt-org/heimdall

View on GitHub
shuffle/shuffle.go

Summary

Maintainability
A
0 mins
Test Coverage
package shuffle

import (
    mapset "github.com/deckarep/golang-set"
    "github.com/go-kit/kit/metrics/provider"
    "github.com/xmidt-org/webpa-common/v2/semaphore"
)

type Interface interface {
    Add(interface{})
    Get() interface{}
}

type stream struct {
    set       mapset.Set
    measures  *Measures
    semaphore semaphore.Interface
}

func (s *stream) Add(key interface{}) {
    if s.set.Contains(key) {
        return
    }
    s.semaphore.Acquire()
    if ok := s.set.Add(key); ok {
        s.measures.DeviceSize.Add(1.0)
    }
}

func (s *stream) Get() interface{} {
    var value interface{}
    // keep popping til value is not nil
    for value == nil {
        value = s.set.Pop()
    }
    s.semaphore.Release()
    s.measures.DeviceSize.Add(-1.0)
    return value
}

func NewStreamShuffler(poolSize int, provider provider.Provider) Interface {

    shuffler := stream{
        semaphore: semaphore.New(poolSize),
        set:       mapset.NewSet(),
        measures:  NewMeasures(provider),
    }

    return &shuffler
}