inklabs/rangedb

View on GitHub
store.go

Summary

Maintainability
A
0 mins
Test Coverage
A
95%
package rangedb
 
import (
"context"
"encoding/json"
"fmt"
"strings"
)
 
// Version for RangeDB.
const Version = "0.13.0-dev"
 
// Record contains event data and metadata.
type Record struct {
StreamName string `msgpack:"n" json:"streamName"`
AggregateType string `msgpack:"a" json:"aggregateType"`
AggregateID string `msgpack:"i" json:"aggregateID"`
GlobalSequenceNumber uint64 `msgpack:"g" json:"globalSequenceNumber"`
StreamSequenceNumber uint64 `msgpack:"s" json:"streamSequenceNumber"`
InsertTimestamp uint64 `msgpack:"u" json:"insertTimestamp"`
EventID string `msgpack:"e" json:"eventID"`
EventType string `msgpack:"t" json:"eventType"`
Data interface{} `msgpack:"d" json:"data"`
Metadata interface{} `msgpack:"m" json:"metadata"`
}
 
// EventRecord stores the event and metadata to be used for persisting.
type EventRecord struct {
Event Event
Metadata interface{}
}
 
// EventBinder defines how to bind events for serialization.
type EventBinder interface {
Bind(events ...Event)
}
 
// Store is the interface that stores and retrieves event records.
type Store interface {
EventBinder
 
// Events returns a RecordIterator containing all events in the store starting with globalSequenceNumber.
Events(ctx context.Context, globalSequenceNumber uint64) RecordIterator
 
// EventsByAggregateTypes returns a RecordIterator containing all events for each aggregateType(s) starting
// with globalSequenceNumber.
EventsByAggregateTypes(ctx context.Context, globalSequenceNumber uint64, aggregateTypes ...string) RecordIterator
 
// EventsByStream returns a RecordIterator containing all events in the stream starting with streamSequenceNumber.
EventsByStream(ctx context.Context, streamSequenceNumber uint64, streamName string) RecordIterator
 
// OptimisticDeleteStream removes an entire stream. If the expectedStreamSequenceNumber does not match the current
// stream sequence number, an rangedberror.UnexpectedSequenceNumber error is returned.
OptimisticDeleteStream(ctx context.Context, expectedStreamSequenceNumber uint64, streamName string) error
 
// OptimisticSave persists events to a single stream returning the new StreamSequenceNumber or an error. If
// the expectedStreamSequenceNumber does not match the current stream sequence number,
// an rangedberror.UnexpectedSequenceNumber error is returned.
OptimisticSave(ctx context.Context, expectedStreamSequenceNumber uint64, streamName string, eventRecords ...*EventRecord) (uint64, error)
 
// Save persists events to a single stream returning the new StreamSequenceNumber or an error.
Save(ctx context.Context, streamName string, eventRecords ...*EventRecord) (uint64, error)
 
AllEventsSubscription(ctx context.Context, bufferSize int, subscriber RecordSubscriber) RecordSubscription
AggregateTypesSubscription(ctx context.Context, bufferSize int, subscriber RecordSubscriber, aggregateTypes ...string) RecordSubscription
TotalEventsInStream(ctx context.Context, streamName string) (uint64, error)
}
 
// RecordSubscription defines how a subscription starts and stops.
type RecordSubscription interface {
// Start returns immediately after subscribing only to new events in a goroutine.
Start() error
 
// StartFrom blocks until all previous events have been processed, then returns after subscribing to new events in a goroutine.
StartFrom(globalSequenceNumber uint64) error
 
// Stop cancels the subscription and stops.
Stop()
}
 
// ResultRecord combines Record and error as a result struct for event queries.
type ResultRecord struct {
Record *Record
Err error
}
 
// RecordIterator is used to traverse a stream of record events.
type RecordIterator interface {
Next() bool
NextContext(context.Context) bool
Record() *Record
Err() error
}
 
// Event is the interface that defines the required event methods.
type Event interface {
AggregateMessage
EventType() string
}
 
// AggregateMessage is the interface that supports building an event stream name.
type AggregateMessage interface {
AggregateID() string
AggregateType() string
}
 
// The RecordSubscriberFunc type is an adapter to allow the use of
// ordinary functions as record subscribers. If f is a function
// with the appropriate signature, RecordSubscriberFunc(f) is a
// Handler that calls f.
type RecordSubscriberFunc func(*Record)
 
// Accept receives a record.
func (f RecordSubscriberFunc) Accept(record *Record) {
f(record)
}
 
// RecordSubscriber is the interface that defines how a projection receives Records.
type RecordSubscriber interface {
Accept(record *Record)
}
 
// GetEventStream returns the stream name for an event.
func GetEventStream(message AggregateMessage) string {
return GetStream(message.AggregateType(), message.AggregateID())
}
 
// GetStream returns the stream name for an aggregateType and aggregateID.
func GetStream(aggregateType, aggregateID string) string {
return fmt.Sprintf("%s!%s", aggregateType, aggregateID)
}
 
// ParseStream returns the aggregateType and aggregateID for a stream name.
func ParseStream(streamName string) (aggregateType, aggregateID string) {
pieces := strings.Split(streamName, "!")
return pieces[0], pieces[1]
}
 
// ReadNRecords reads up to N records from the channel returned by f into a slice
func ReadNRecords(totalEvents uint64, f func() (RecordIterator, context.CancelFunc)) []*Record {
var records []*Record
cnt := uint64(0)
recordIterator, done := f()
 
for recordIterator.Next() {
if recordIterator.Err() != nil {
break
}
cnt++
if cnt > totalEvents {
break
}
 
records = append(records, recordIterator.Record())
}
 
done()
for recordIterator.Next() {
}
 
return records
}
 
type rawEvent struct {
aggregateType string
aggregateID string
eventType string
data interface{}
}
 
// NewRawEvent constructs a new raw event when an event struct is unavailable or unknown.
func NewRawEvent(aggregateType, aggregateID, eventType string, data interface{}) Event {
return &rawEvent{
aggregateType: aggregateType,
aggregateID: aggregateID,
eventType: eventType,
data: data,
}
}
 
func (e rawEvent) AggregateID() string {
return e.aggregateID
}
 
func (e rawEvent) AggregateType() string {
return e.aggregateType
}
 
func (e rawEvent) EventType() string {
return e.eventType
}
 
func (e rawEvent) MarshalJSON() ([]byte, error) {
return json.Marshal(e.data)
}
 
var ErrStreamNotFound = fmt.Errorf("stream not found")