provider/leveldbstore/leveldb_store.go
package leveldbstore import ( "bytes" "context" "encoding/binary" "fmt" "io/ioutil" "log" "sync" "time" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/util" "github.com/inklabs/rangedb" "github.com/inklabs/rangedb/pkg/broadcast" "github.com/inklabs/rangedb/pkg/clock" "github.com/inklabs/rangedb/pkg/clock/provider/systemclock" "github.com/inklabs/rangedb/pkg/rangedberror" "github.com/inklabs/rangedb/pkg/recordsubscriber" "github.com/inklabs/rangedb/pkg/shortuuid" "github.com/inklabs/rangedb/provider/jsonrecordserializer") const ( broadcastRecordBuffSize = 100 separator = "!" allEventsPrefix = "$all$" streamPrefix = "$s$" aggregateTypePrefix = "$at$" globalSequenceNumberKey = "$gsn$") type levelDbStore struct { clock clock.Clock uuidGenerator shortuuid.Generator serializer rangedb.RecordSerializer broadcaster broadcast.Broadcaster logger *log.Logger mux sync.RWMutex db *leveldb.DB} // Option defines functional option parameters for levelDbStore.type Option func(*levelDbStore) // WithClock is a functional option to inject a Clock.func WithClock(clock clock.Clock) Option { return func(store *levelDbStore) { store.clock = clock }} // WithSerializer is a functional option to inject a RecordSerializer.func WithSerializer(serializer rangedb.RecordSerializer) Option { return func(store *levelDbStore) { store.serializer = serializer }} // WithLogger is a functional option to inject a Logger.func WithLogger(logger *log.Logger) Option { return func(store *levelDbStore) { store.logger = logger }} // WithUUIDGenerator is a functional option to inject a shortuuid.Generator.func WithUUIDGenerator(uuidGenerator shortuuid.Generator) Option { return func(store *levelDbStore) { store.uuidGenerator = uuidGenerator }} // New constructs a levelDbStore.func New(dbFilePath string, options ...Option) (*levelDbStore, error) { db, err := leveldb.OpenFile(dbFilePath, nil) if err != nil { return nil, fmt.Errorf("failed opening db: %v", err) } s := &levelDbStore{ clock: systemclock.New(), uuidGenerator: shortuuid.NewUUIDGenerator(), serializer: jsonrecordserializer.New(), logger: log.New(ioutil.Discard, "", 0), broadcaster: broadcast.New(broadcastRecordBuffSize, broadcast.DefaultTimeout), db: db, } for _, option := range options { option(s) } return s, nil} func (s *levelDbStore) Stop() error { s.mux.Lock() defer s.mux.Unlock() return s.db.Close()} func (s *levelDbStore) Bind(events ...rangedb.Event) { s.serializer.Bind(events...)} func (s *levelDbStore) Events(ctx context.Context, globalSequenceNumber uint64) rangedb.RecordIterator { return s.getEventsByLookup(ctx, allEventsPrefix, globalSequenceNumber)} func (s *levelDbStore) EventsByAggregateTypes(ctx context.Context, globalSequenceNumber uint64, aggregateTypes ...string) rangedb.RecordIterator { if len(aggregateTypes) == 1 { return s.getEventsByLookup(ctx, getAggregateTypeKeyPrefix(aggregateTypes[0]), globalSequenceNumber) } var recordIterators []rangedb.RecordIterator for _, aggregateType := range aggregateTypes { recordIterators = append(recordIterators, s.getEventsByLookup(ctx, getAggregateTypeKeyPrefix(aggregateType), globalSequenceNumber)) } return rangedb.MergeRecordIteratorsInOrder(recordIterators)} func (s *levelDbStore) EventsByStream(ctx context.Context, streamSequenceNumber uint64, stream string) rangedb.RecordIterator { return s.getEventsByPrefix(ctx, getStreamKeyPrefix(stream), streamSequenceNumber)} Method `levelDbStore.OptimisticDeleteStream` has 8 return statements (exceeds 4 allowed).func (s *levelDbStore) OptimisticDeleteStream(ctx context.Context, expectedStreamSequenceNumber uint64, streamName string) error { select { case <-ctx.Done(): return context.Canceled default: } s.mux.Lock() defer s.mux.Unlock() transaction, err := s.db.OpenTransaction() if err != nil { return err } iter := s.db.NewIterator(util.BytesPrefix([]byte(getStreamKeyPrefix(streamName))), nil) defer iter.Release() cnt := uint64(0) batch := new(leveldb.Batch) for iter.Next() { cnt++ record, err := s.getRecordByValue(iter.Value()) if err != nil { return err } streamKey := iter.Key() allAggregateTypeKey := getKeyWithNumber(getAggregateTypeKeyPrefix(record.AggregateType), record.GlobalSequenceNumber) allEventsKey := getKeyWithNumber(allEventsPrefix, record.GlobalSequenceNumber) batch.Delete(streamKey) batch.Delete(allAggregateTypeKey) batch.Delete(allEventsKey) } if cnt == 0 { return rangedb.ErrStreamNotFound } if cnt != expectedStreamSequenceNumber { return &rangedberror.UnexpectedSequenceNumber{ Expected: expectedStreamSequenceNumber, ActualSequenceNumber: cnt, } } _ = iter.Error() err = transaction.Write(batch, nil) if err != nil { return err } err = transaction.Commit() if err != nil { return err } return nil} func (s *levelDbStore) OptimisticSave(ctx context.Context, expectedStreamSequenceNumber uint64, streamName string, eventRecords ...*rangedb.EventRecord) (uint64, error) { return s.saveEvents(ctx, &expectedStreamSequenceNumber, streamName, eventRecords...)} func (s *levelDbStore) Save(ctx context.Context, streamName string, eventRecords ...*rangedb.EventRecord) (uint64, error) { return s.saveEvents(ctx, nil, streamName, eventRecords...)} Method `levelDbStore.saveEvents` has 59 lines of code (exceeds 50 allowed). Consider refactoring.
Method `levelDbStore.saveEvents` has 7 return statements (exceeds 4 allowed).func (s *levelDbStore) saveEvents(ctx context.Context, expectedStreamSequenceNumber *uint64, streamName string, eventRecords ...*rangedb.EventRecord) (uint64, error) { if len(eventRecords) < 1 { return 0, fmt.Errorf("missing events") } var pendingEventsData [][]byte var aggregateType, aggregateID string var lastStreamSequenceNumber uint64 s.mux.Lock() defer s.mux.Unlock() transaction, err := s.db.OpenTransaction() if err != nil { return 0, err } for _, eventRecord := range eventRecords { if aggregateType != "" && aggregateType != eventRecord.Event.AggregateType() { transaction.Discard() return 0, fmt.Errorf("unmatched aggregate type") } if aggregateID != "" && aggregateID != eventRecord.Event.AggregateID() { transaction.Discard() return 0, fmt.Errorf("unmatched aggregate ID") } aggregateType = eventRecord.Event.AggregateType() aggregateID = eventRecord.Event.AggregateID() var data []byte var err error data, lastStreamSequenceNumber, err = s.saveEvent( ctx, transaction, streamName, aggregateType, aggregateID, eventRecord.Event.EventType(), s.uuidGenerator.New(), expectedStreamSequenceNumber, eventRecord.Event, eventRecord.Metadata, ) if err != nil { transaction.Discard() return 0, err } pendingEventsData = append(pendingEventsData, data) if expectedStreamSequenceNumber != nil { *expectedStreamSequenceNumber++ } } err = transaction.Commit() if err != nil { return 0, err } for _, data := range pendingEventsData { deSerializedRecord, err := s.serializer.Deserialize(data) if err == nil { s.broadcaster.Accept(deSerializedRecord) } else { s.logger.Print(err) } } return lastStreamSequenceNumber, nil} // saveEvent persists a single event without locking the mutex, or notifying subscribers.Method `levelDbStore.saveEvent` has 5 arguments (exceeds 4 allowed). Consider refactoring.
Method `levelDbStore.saveEvent` has 5 return statements (exceeds 4 allowed).func (s *levelDbStore) saveEvent(ctx context.Context, transaction *leveldb.Transaction, streamName, aggregateType, aggregateID, eventType, eventID string, expectedStreamSequenceNumber *uint64, event, metadata interface{}) ([]byte, uint64, error) { select { case <-ctx.Done(): return nil, 0, context.Canceled default: } streamSequenceNumber := s.getStreamSequenceNumber(transaction, streamName) if expectedStreamSequenceNumber != nil && *expectedStreamSequenceNumber != streamSequenceNumber { return nil, 0, &rangedberror.UnexpectedSequenceNumber{ Expected: *expectedStreamSequenceNumber, ActualSequenceNumber: streamSequenceNumber, } } globalSequenceNumber := s.getGlobalSequenceNumber(transaction) + 1 record := &rangedb.Record{ StreamName: streamName, AggregateType: aggregateType, AggregateID: aggregateID, GlobalSequenceNumber: globalSequenceNumber, StreamSequenceNumber: streamSequenceNumber + 1, EventType: eventType, EventID: eventID, InsertTimestamp: uint64(s.clock.Now().Unix()), Data: event, Metadata: metadata, } data, err := s.serializer.Serialize(record) if err != nil { return nil, 0, err } batch := new(leveldb.Batch) batch.Put([]byte(globalSequenceNumberKey), uint64ToBytes(globalSequenceNumber)) streamKey := getKeyWithNumber(getStreamKeyPrefix(streamName), record.StreamSequenceNumber) batch.Put(streamKey, data) allAggregateTypeKey := getKeyWithNumber(getAggregateTypeKeyPrefix(aggregateType), record.GlobalSequenceNumber) batch.Put(allAggregateTypeKey, streamKey) allEventsKey := getKeyWithNumber(allEventsPrefix, record.GlobalSequenceNumber) batch.Put(allEventsKey, streamKey) err = transaction.Write(batch, nil) if err != nil { return nil, 0, err } return data, record.StreamSequenceNumber, nil} Similar blocks of code found in 5 locations. Consider refactoring.func (s *levelDbStore) AllEventsSubscription(ctx context.Context, bufferSize int, subscriber rangedb.RecordSubscriber) rangedb.RecordSubscription { return recordsubscriber.New( recordsubscriber.AllEventsConfig(ctx, s, s.broadcaster, bufferSize, func(record *rangedb.Record) error { subscriber.Accept(record) return nil }, ))} Similar blocks of code found in 5 locations. Consider refactoring.func (s *levelDbStore) AggregateTypesSubscription(ctx context.Context, bufferSize int, subscriber rangedb.RecordSubscriber, aggregateTypes ...string) rangedb.RecordSubscription { return recordsubscriber.New( recordsubscriber.AggregateTypesConfig(ctx, s, s.broadcaster, bufferSize, aggregateTypes, func(record *rangedb.Record) error { subscriber.Accept(record) return nil }, ))} func (s *levelDbStore) TotalEventsInStream(ctx context.Context, streamName string) (uint64, error) { select { case <-ctx.Done(): return 0, context.Canceled default: } return s.getStreamSequenceNumber(s.db, streamName), nil} func (s *levelDbStore) getEventsByPrefix(ctx context.Context, prefix string, streamSequenceNumber uint64) rangedb.RecordIterator { resultRecords := make(chan rangedb.ResultRecord) s.mux.RLock() go func() { defer s.mux.RUnlock() defer close(resultRecords) iter := s.db.NewIterator(util.BytesPrefix([]byte(prefix)), nil) defer iter.Release() cnt := 0 for iter.Next() { cnt++ record, err := s.getRecordByValue(iter.Value()) if err != nil { resultRecords <- rangedb.ResultRecord{Err: err} return } if record.StreamSequenceNumber < streamSequenceNumber { continue } if !rangedb.PublishRecordOrCancel(ctx, resultRecords, record, time.Second) { return } } if cnt == 0 { resultRecords <- rangedb.ResultRecord{Err: rangedb.ErrStreamNotFound} return } _ = iter.Error() }() return rangedb.NewRecordIterator(resultRecords)} func (s *levelDbStore) getEventsByLookup(ctx context.Context, key string, globalSequenceNumber uint64) rangedb.RecordIterator { resultRecords := make(chan rangedb.ResultRecord) s.mux.RLock() go func() { defer s.mux.RUnlock() defer close(resultRecords) iter := s.db.NewIterator(util.BytesPrefix([]byte(key)), nil) defer iter.Release() if globalSequenceNumber > 0 { seekKey := getKeyWithNumber(key, globalSequenceNumber) found := iter.Seek(seekKey) if found { iter.Prev() } } for iter.Next() { targetKey := iter.Value() record, err := s.getRecordByLookup(targetKey, iter) if err != nil { resultRecords <- rangedb.ResultRecord{Err: err} return } if record.GlobalSequenceNumber < globalSequenceNumber { continue } if !rangedb.PublishRecordOrCancel(ctx, resultRecords, record, time.Second) { return } } }() return rangedb.NewRecordIterator(resultRecords)} func (s *levelDbStore) getRecordByLookup(targetKey []byte, iter iterator.Iterator) (*rangedb.Record, error) { data, err := s.db.Get(targetKey, nil) if err != nil { s.logger.Printf("unable to find lookup record %s for %s: %v", targetKey, iter.Key(), err) return nil, err } return s.getRecordByValue(data)} func (s *levelDbStore) getRecordByValue(value []byte) (*rangedb.Record, error) { record, err := s.serializer.Deserialize(value) if err != nil { s.logger.Printf("failed to deserialize record: %v", err) return nil, err } return record, nil} func getStreamKeyPrefix(stream string) string { return fmt.Sprintf("%s%s$%s", streamPrefix, stream, separator)} func getAggregateTypeKeyPrefix(aggregateType string) string { return fmt.Sprintf("%s%s$%s", aggregateTypePrefix, aggregateType, separator)} func getKeyWithNumber(inputKey string, number uint64) []byte { return append([]byte(inputKey), uint64ToBytes(number)...)} func uint64ToBytes(number uint64) []byte { var buf bytes.Buffer _ = binary.Write(&buf, binary.BigEndian, number) return buf.Bytes()} func bytesToUint64(input []byte) uint64 { var number uint64 _ = binary.Read(bytes.NewReader(input), binary.BigEndian, &number) return number} type dbNewIterable interface { NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator} func (s *levelDbStore) getGlobalSequenceNumber(transaction *leveldb.Transaction) uint64 { var globalSequenceNumber uint64 globalSequenceNumberBytes, err := transaction.Get([]byte(globalSequenceNumberKey), nil) if err == nil { globalSequenceNumber = bytesToUint64(globalSequenceNumberBytes) } return globalSequenceNumber} func (s *levelDbStore) getStreamSequenceNumber(iterable dbNewIterable, stream string) uint64 { return s.getSequenceNumber(iterable, getStreamKeyPrefix(stream))} func (s *levelDbStore) getSequenceNumber(iterable dbNewIterable, key string) uint64 { iter := iterable.NewIterator(util.BytesPrefix([]byte(key)), nil) iter.Last() keySize := len(key) if len(iter.Key()) > keySize { lastSequenceNumber := bytesToUint64(iter.Key()[keySize:]) return lastSequenceNumber } return 0}