inklabs/rangedb

View on GitHub
provider/inmemorystore/inmemory_store.go

Summary

Maintainability
A
3 hrs
Test Coverage
A
97%
package inmemorystore
 
import (
"context"
"fmt"
"io/ioutil"
"log"
"sync"
"time"
 
"github.com/inklabs/rangedb"
"github.com/inklabs/rangedb/pkg/broadcast"
"github.com/inklabs/rangedb/pkg/clock"
"github.com/inklabs/rangedb/pkg/clock/provider/systemclock"
"github.com/inklabs/rangedb/pkg/rangedberror"
"github.com/inklabs/rangedb/pkg/recordsubscriber"
"github.com/inklabs/rangedb/pkg/shortuuid"
"github.com/inklabs/rangedb/provider/jsonrecordserializer"
)
 
const broadcastRecordBuffSize = 100
 
type inMemoryStore struct {
clock clock.Clock
uuidGenerator shortuuid.Generator
serializer rangedb.RecordSerializer
logger *log.Logger
broadcaster broadcast.Broadcaster
 
mux sync.RWMutex
globalSequenceNumber uint64
records []uint64
streams map[string][]uint64
aggregateTypes map[string][]uint64
recordData map[uint64][]byte
}
 
// Option defines functional option parameters for inMemoryStore.
type Option func(*inMemoryStore)
 
// WithClock is a functional option to inject a clock.Clock.
func WithClock(clock clock.Clock) Option {
return func(store *inMemoryStore) {
store.clock = clock
}
}
 
// WithSerializer is a functional option to inject a RecordSerializer.
func WithSerializer(serializer rangedb.RecordSerializer) Option {
return func(store *inMemoryStore) {
store.serializer = serializer
}
}
 
// WithLogger is a functional option to inject a Logger.
func WithLogger(logger *log.Logger) Option {
return func(store *inMemoryStore) {
store.logger = logger
}
}
 
// WithUUIDGenerator is a functional option to inject a shortuuid.Generator.
func WithUUIDGenerator(uuidGenerator shortuuid.Generator) Option {
return func(store *inMemoryStore) {
store.uuidGenerator = uuidGenerator
}
}
 
// New constructs an inMemoryStore.
func New(options ...Option) *inMemoryStore {
s := &inMemoryStore{
clock: systemclock.New(),
uuidGenerator: shortuuid.NewUUIDGenerator(),
serializer: jsonrecordserializer.New(),
logger: log.New(ioutil.Discard, "", 0),
broadcaster: broadcast.New(broadcastRecordBuffSize, broadcast.DefaultTimeout),
recordData: make(map[uint64][]byte),
streams: make(map[string][]uint64),
aggregateTypes: make(map[string][]uint64),
}
 
for _, option := range options {
option(s)
}
 
return s
}
 
func (s *inMemoryStore) Bind(events ...rangedb.Event) {
s.serializer.Bind(events...)
}
 
func (s *inMemoryStore) Events(ctx context.Context, globalSequenceNumber uint64) rangedb.RecordIterator {
s.mux.RLock()
 
return s.recordsByIDs(ctx, s.records, func(record *rangedb.Record) bool {
return record.GlobalSequenceNumber >= globalSequenceNumber
})
}
 
func (s *inMemoryStore) EventsByAggregateTypes(ctx context.Context, globalSequenceNumber uint64, aggregateTypes ...string) rangedb.RecordIterator {
if len(aggregateTypes) == 1 {
s.mux.RLock()
return s.recordsByIDs(ctx, s.aggregateTypes[aggregateTypes[0]], compareByGlobalSequenceNumber(globalSequenceNumber))
}
 
var recordIterators []rangedb.RecordIterator
for _, aggregateType := range aggregateTypes {
s.mux.RLock()
recordIterators = append(recordIterators, s.recordsByIDs(ctx, s.aggregateTypes[aggregateType], compareByGlobalSequenceNumber(globalSequenceNumber)))
}
 
return rangedb.MergeRecordIteratorsInOrder(recordIterators)
}
 
func compareByGlobalSequenceNumber(globalSequenceNumber uint64) func(record *rangedb.Record) bool {
return func(record *rangedb.Record) bool {
return record.GlobalSequenceNumber >= globalSequenceNumber
}
}
 
func (s *inMemoryStore) EventsByStream(ctx context.Context, streamSequenceNumber uint64, streamName string) rangedb.RecordIterator {
s.mux.RLock()
 
if _, ok := s.streams[streamName]; !ok || len(s.streams[streamName]) == 0 {
s.mux.RUnlock()
return rangedb.NewRecordIteratorWithError(rangedb.ErrStreamNotFound)
}
 
return s.recordsByIDs(ctx, s.streams[streamName], func(record *rangedb.Record) bool {
return record.StreamSequenceNumber >= streamSequenceNumber
})
}
 
func (s *inMemoryStore) recordsByIDs(ctx context.Context, ids []uint64, compare func(record *rangedb.Record) bool) rangedb.RecordIterator {
resultRecords := make(chan rangedb.ResultRecord)
 
go func() {
defer s.mux.RUnlock()
defer close(resultRecords)
 
for _, id := range ids {
recordData, exists := s.recordData[id]
if !exists {
resultRecords <- rangedb.ResultRecord{Err: fmt.Errorf("record missing")}
return
}
 
record, err := s.serializer.Deserialize(recordData)
if err != nil {
deserializeErr := fmt.Errorf("failed to deserialize record: %v", err)
s.logger.Printf(deserializeErr.Error())
resultRecords <- rangedb.ResultRecord{Err: deserializeErr}
return
}
 
if compare(record) {
if !rangedb.PublishRecordOrCancel(ctx, resultRecords, record, time.Second) {
return
}
}
}
}()
 
return rangedb.NewRecordIterator(resultRecords)
}
 
func (s *inMemoryStore) OptimisticDeleteStream(ctx context.Context, expectedStreamSequenceNumber uint64, streamName string) error {
select {
case <-ctx.Done():
return context.Canceled
 
default:
}
 
s.mux.Lock()
defer s.mux.Unlock()
 
if _, ok := s.streams[streamName]; !ok {
return rangedb.ErrStreamNotFound
}
 
currentStreamSequenceNumber := s.getStreamSequenceNumber(streamName)
if expectedStreamSequenceNumber != currentStreamSequenceNumber {
return &rangedberror.UnexpectedSequenceNumber{
Expected: expectedStreamSequenceNumber,
ActualSequenceNumber: currentStreamSequenceNumber,
}
}
 
for _, globalSequenceNumberInStream := range s.streams[streamName] {
for i, globalSequenceNumber := range s.records {
if globalSequenceNumber == globalSequenceNumberInStream {
s.records = RemoveIndex(s.records, i)
delete(s.recordData, globalSequenceNumber)
}
}
 
for aggregateType, globalSequenceNumbers := range s.aggregateTypes {
for i, globalSequenceNumber := range globalSequenceNumbers {
if globalSequenceNumber == globalSequenceNumberInStream {
s.aggregateTypes[aggregateType] = RemoveIndex(s.aggregateTypes[aggregateType], i)
}
}
}
}
 
delete(s.streams, streamName)
 
return nil
}
 
func RemoveIndex(s []uint64, index int) []uint64 {
return append(s[:index], s[index+1:]...)
}
 
func (s *inMemoryStore) OptimisticSave(ctx context.Context, expectedStreamSequenceNumber uint64, streamName string, eventRecords ...*rangedb.EventRecord) (uint64, error) {
return s.saveEvents(ctx, &expectedStreamSequenceNumber, streamName, eventRecords...)
}
 
func (s *inMemoryStore) Save(ctx context.Context, streamName string, eventRecords ...*rangedb.EventRecord) (uint64, error) {
return s.saveEvents(ctx, nil, streamName, eventRecords...)
}
 
// saveEvents persists one or more events inside a locked mutex, and notifies subscribers.
Method `inMemoryStore.saveEvents` has 59 lines of code (exceeds 50 allowed). Consider refactoring.
Method `inMemoryStore.saveEvents` has 6 return statements (exceeds 4 allowed).
func (s *inMemoryStore) saveEvents(ctx context.Context, expectedStreamSequenceNumber *uint64, streamName string, eventRecords ...*rangedb.EventRecord) (uint64, error) {
if len(eventRecords) < 1 {
return 0, fmt.Errorf("missing events")
}
 
var pendingEventsData [][]byte
var totalSavedEvents int
var aggregateType, aggregateID string
var lastStreamSequenceNumber uint64
var createdGlobalSequenceNumbers []uint64
 
s.mux.Lock()
for _, eventRecord := range eventRecords {
if aggregateType != "" && aggregateType != eventRecord.Event.AggregateType() {
s.mux.Unlock()
return 0, fmt.Errorf("unmatched aggregate type")
}
 
if aggregateID != "" && aggregateID != eventRecord.Event.AggregateID() {
s.mux.Unlock()
return 0, fmt.Errorf("unmatched aggregate ID")
}
 
aggregateType = eventRecord.Event.AggregateType()
aggregateID = eventRecord.Event.AggregateID()
 
select {
case <-ctx.Done():
s.mux.Unlock()
return 0, context.Canceled
 
default:
}
 
data, record, err := s.saveEvent(
streamName,
aggregateType,
aggregateID,
eventRecord.Event.EventType(),
s.uuidGenerator.New(),
expectedStreamSequenceNumber,
eventRecord.Event,
eventRecord.Metadata,
)
if err != nil {
s.removeRecentEvents(createdGlobalSequenceNumbers, eventRecord.Event.AggregateType(), eventRecord.Event.AggregateID())
s.mux.Unlock()
return 0, err
}
 
createdGlobalSequenceNumbers = append(createdGlobalSequenceNumbers, record.GlobalSequenceNumber)
lastStreamSequenceNumber = record.StreamSequenceNumber
 
totalSavedEvents++
pendingEventsData = append(pendingEventsData, data)
 
if expectedStreamSequenceNumber != nil {
*expectedStreamSequenceNumber++
}
}
s.mux.Unlock()
 
for _, data := range pendingEventsData {
deSerializedRecord, err := s.serializer.Deserialize(data)
if err == nil {
s.broadcaster.Accept(deSerializedRecord)
} else {
s.logger.Print(err)
}
}
 
return lastStreamSequenceNumber, nil
}
 
// saveEvent persists a single event without locking the mutex, or notifying subscribers.
func (s *inMemoryStore) saveEvent(
streamName, aggregateType, aggregateID, eventType, eventID string,
expectedStreamSequenceNumber *uint64,
event interface{}, metadata interface{}) ([]byte, *rangedb.Record, error) {
 
streamSequenceNumber := s.getStreamSequenceNumber(streamName)
 
if expectedStreamSequenceNumber != nil && *expectedStreamSequenceNumber != streamSequenceNumber {
return nil, nil, &rangedberror.UnexpectedSequenceNumber{
Expected: *expectedStreamSequenceNumber,
ActualSequenceNumber: streamSequenceNumber,
}
}
 
s.globalSequenceNumber++
globalSequenceNumber := s.globalSequenceNumber
 
record := &rangedb.Record{
StreamName: streamName,
AggregateType: aggregateType,
AggregateID: aggregateID,
GlobalSequenceNumber: globalSequenceNumber,
StreamSequenceNumber: streamSequenceNumber + 1,
EventType: eventType,
EventID: eventID,
InsertTimestamp: uint64(s.clock.Now().Unix()),
Data: event,
Metadata: metadata,
}
 
data, err := s.serializer.Serialize(record)
if err != nil {
return nil, nil, err
}
 
s.records = append(s.records, globalSequenceNumber)
s.recordData[globalSequenceNumber] = data
s.streams[streamName] = append(s.streams[streamName], globalSequenceNumber)
s.aggregateTypes[aggregateType] = append(s.aggregateTypes[aggregateType], globalSequenceNumber)
 
return data, record, nil
}
 
func (s *inMemoryStore) removeRecentEvents(globalSequenceNumbers []uint64, aggregateType, aggregateID string) {
stream := rangedb.GetStream(aggregateType, aggregateID)
 
for _, globalSequenceNumber := range globalSequenceNumbers {
delete(s.recordData, globalSequenceNumber)
}
 
total := len(globalSequenceNumbers)
 
s.globalSequenceNumber -= uint64(total)
s.records = rTrimFromUint64Slice(s.records, total)
s.streams[stream] = rTrimFromUint64Slice(s.streams[stream], total)
s.aggregateTypes[aggregateType] = rTrimFromUint64Slice(s.aggregateTypes[aggregateType], total)
}
 
func rTrimFromUint64Slice(slice []uint64, total int) []uint64 {
return slice[:len(slice)-total]
}
 
Similar blocks of code found in 5 locations. Consider refactoring.
func (s *inMemoryStore) AllEventsSubscription(ctx context.Context, bufferSize int, subscriber rangedb.RecordSubscriber) rangedb.RecordSubscription {
return recordsubscriber.New(
recordsubscriber.AllEventsConfig(ctx, s, s.broadcaster, bufferSize,
func(record *rangedb.Record) error {
subscriber.Accept(record)
return nil
},
))
}
 
Similar blocks of code found in 5 locations. Consider refactoring.
func (s *inMemoryStore) AggregateTypesSubscription(ctx context.Context, bufferSize int, subscriber rangedb.RecordSubscriber, aggregateTypes ...string) rangedb.RecordSubscription {
return recordsubscriber.New(
recordsubscriber.AggregateTypesConfig(ctx, s, s.broadcaster, bufferSize,
aggregateTypes,
func(record *rangedb.Record) error {
subscriber.Accept(record)
return nil
},
))
}
 
func (s *inMemoryStore) TotalEventsInStream(ctx context.Context, streamName string) (uint64, error) {
select {
case <-ctx.Done():
return 0, context.Canceled
 
default:
}
 
s.mux.RLock()
defer s.mux.RUnlock()
return uint64(len(s.streams[streamName])), nil
}
 
func (s *inMemoryStore) getStreamSequenceNumber(stream string) uint64 {
return uint64(len(s.streams[stream]))
}
 
func (s *inMemoryStore) getGlobalSequenceNumber() uint64 {
s.mux.RLock()
defer s.mux.RUnlock()
return s.globalSequenceNumber
}