inklabs/rangedb

View on GitHub
pkg/grpc/rangedbserver/server.go

Summary

Maintainability
C
1 day
Test Coverage
B
86%
package rangedbserver
 
import (
"context"
"encoding/json"
"fmt"
"log"
 
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
 
"github.com/inklabs/rangedb"
"github.com/inklabs/rangedb/pkg/broadcast"
"github.com/inklabs/rangedb/pkg/grpc/rangedbpb"
"github.com/inklabs/rangedb/pkg/recordsubscriber"
"github.com/inklabs/rangedb/provider/inmemorystore"
)
 
const (
broadcastRecordBuffSize = 100
subscriberRecordBuffSize = 20
)
 
type void struct{}
 
type streamSender interface {
Send(*rangedbpb.Record) error
}
 
type rangeDBServer struct {
rangedbpb.UnimplementedRangeDBServer
store rangedb.Store
broadcaster broadcast.Broadcaster
stopChan chan void
}
 
// Option defines functional option parameters for rangeDBServer.
type Option func(*rangeDBServer)
 
// WithStore is a functional option to inject a Store.
func WithStore(store rangedb.Store) Option {
return func(api *rangeDBServer) {
api.store = store
}
}
 
// New constructs a new rangeDBServer.
func New(options ...Option) (*rangeDBServer, error) {
server := &rangeDBServer{
store: inmemorystore.New(),
broadcaster: broadcast.New(broadcastRecordBuffSize, broadcast.DefaultTimeout),
stopChan: make(chan void),
}
 
for _, option := range options {
option(server)
}
 
err := server.initBroadcaster()
if err != nil {
return nil, err
}
 
return server, nil
}
 
func (s *rangeDBServer) initBroadcaster() error {
ctx := context.Background()
subscription := s.store.AllEventsSubscription(ctx,
subscriberRecordBuffSize,
s.broadcaster,
)
return subscription.Start()
}
 
func (s *rangeDBServer) Stop() error {
s.broadcaster.Close()
close(s.stopChan)
return nil
}
 
func (s *rangeDBServer) Events(req *rangedbpb.EventsRequest, stream rangedbpb.RangeDB_EventsServer) error {
recordIterator := s.store.Events(stream.Context(), req.GlobalSequenceNumber)
for recordIterator.Next() {
if recordIterator.Err() != nil {
return recordIterator.Err()
}
 
pbRecord, err := rangedbpb.ToPbRecord(recordIterator.Record())
if err != nil {
return err
}
 
err = stream.Send(pbRecord)
if err != nil {
return err
}
}
 
return nil
}
 
Method `rangeDBServer.EventsByStream` has 5 return statements (exceeds 4 allowed).
func (s *rangeDBServer) EventsByStream(req *rangedbpb.EventsByStreamRequest, stream rangedbpb.RangeDB_EventsByStreamServer) error {
recordIterator := s.store.EventsByStream(stream.Context(), req.StreamSequenceNumber, req.StreamName)
 
for recordIterator.Next() {
if recordIterator.Err() != nil {
return recordIterator.Err()
}
 
pbRecord, err := rangedbpb.ToPbRecord(recordIterator.Record())
if err != nil {
return err
}
 
err = stream.Send(pbRecord)
if err != nil {
return err
}
}
 
if recordIterator.Err() != nil {
return recordIterator.Err()
}
 
return nil
}
 
func (s *rangeDBServer) EventsByAggregateType(req *rangedbpb.EventsByAggregateTypeRequest, stream rangedbpb.RangeDB_EventsByAggregateTypeServer) error {
recordIterator := s.store.EventsByAggregateTypes(stream.Context(), req.GlobalSequenceNumber, req.AggregateTypes...)
for recordIterator.Next() {
if recordIterator.Err() != nil {
return recordIterator.Err()
}
 
pbRecord, err := rangedbpb.ToPbRecord(recordIterator.Record())
if err != nil {
return err
}
 
err = stream.Send(pbRecord)
if err != nil {
return err
}
}
 
return nil
}
 
func (s *rangeDBServer) OptimisticDeleteStream(ctx context.Context, req *rangedbpb.OptimisticDeleteStreamRequest) (*rangedbpb.OptimisticDeleteStreamResponse, error) {
err := s.store.OptimisticDeleteStream(ctx, req.ExpectedStreamSequenceNumber, req.StreamName)
if err != nil {
return nil, err
}
 
return &rangedbpb.OptimisticDeleteStreamResponse{
EventsDeleted: uint32(req.ExpectedStreamSequenceNumber),
}, nil
}
 
func (s *rangeDBServer) OptimisticSave(ctx context.Context, req *rangedbpb.OptimisticSaveRequest) (*rangedbpb.SaveResponse, error) {
var eventRecords []*rangedb.EventRecord
 
Identical blocks of code found in 2 locations. Consider refactoring.
for _, event := range req.Events {
var data interface{}
err := json.Unmarshal([]byte(event.Data), &data)
if err != nil {
message := fmt.Sprintf("unable to read event data: %v", err)
st := status.New(codes.InvalidArgument, message)
st, _ = st.WithDetails(&rangedbpb.SaveFailureResponse{
Message: message,
})
return nil, st.Err()
}
 
var metadata interface{}
if event.Metadata != "" {
err = json.Unmarshal([]byte(event.Metadata), &metadata)
if err != nil {
message := fmt.Sprintf("unable to read event metadata: %v", err)
st := status.New(codes.InvalidArgument, message)
st, _ = st.WithDetails(&rangedbpb.SaveFailureResponse{
Message: message,
})
return nil, st.Err()
}
}
 
eventRecords = append(eventRecords, &rangedb.EventRecord{
Event: rangedb.NewRawEvent(event.AggregateType, event.AggregateID, event.EventType, data),
Metadata: metadata,
})
}
 
lastStreamSequenceNumber, saveErr := s.store.OptimisticSave(ctx,
req.ExpectedStreamSequenceNumber,
req.StreamName,
eventRecords...,
)
 
if saveErr != nil {
message := fmt.Sprintf("unable to save to store: %v", saveErr)
st := status.New(codes.Internal, message)
st, _ = st.WithDetails(&rangedbpb.SaveFailureResponse{
Message: message,
})
return nil, st.Err()
}
 
return &rangedbpb.SaveResponse{
EventsSaved: uint32(len(eventRecords)),
LastStreamSequenceNumber: lastStreamSequenceNumber,
}, nil
}
 
func (s *rangeDBServer) Save(ctx context.Context, req *rangedbpb.SaveRequest) (*rangedbpb.SaveResponse, error) {
var eventRecords []*rangedb.EventRecord
 
Identical blocks of code found in 2 locations. Consider refactoring.
for _, event := range req.Events {
var data interface{}
err := json.Unmarshal([]byte(event.Data), &data)
if err != nil {
message := fmt.Sprintf("unable to read event data: %v", err)
st := status.New(codes.InvalidArgument, message)
st, _ = st.WithDetails(&rangedbpb.SaveFailureResponse{
Message: message,
})
return nil, st.Err()
}
 
var metadata interface{}
if event.Metadata != "" {
err = json.Unmarshal([]byte(event.Metadata), &metadata)
if err != nil {
message := fmt.Sprintf("unable to read event metadata: %v", err)
st := status.New(codes.InvalidArgument, message)
st, _ = st.WithDetails(&rangedbpb.SaveFailureResponse{
Message: message,
})
return nil, st.Err()
}
}
 
eventRecords = append(eventRecords, &rangedb.EventRecord{
Event: rangedb.NewRawEvent(event.AggregateType, event.AggregateID, event.EventType, data),
Metadata: metadata,
})
}
 
lastStreamSequenceNumber, saveErr := s.store.Save(ctx,
req.StreamName,
eventRecords...,
)
 
if saveErr != nil {
message := fmt.Sprintf("unable to save to store: %v", saveErr)
st := status.New(codes.Internal, message)
st, _ = st.WithDetails(&rangedbpb.SaveFailureResponse{
Message: message,
})
return nil, st.Err()
}
 
return &rangedbpb.SaveResponse{
EventsSaved: uint32(len(eventRecords)),
LastStreamSequenceNumber: lastStreamSequenceNumber,
}, nil
}
 
func (s *rangeDBServer) SubscribeToLiveEvents(_ *rangedbpb.SubscribeToLiveEventsRequest, stream rangedbpb.RangeDB_SubscribeToLiveEventsServer) error {
config := recordsubscriber.AllEventsConfig(stream.Context(),
s.store,
s.broadcaster,
subscriberRecordBuffSize,
func(record *rangedb.Record) error {
return s.broadcastRecord(stream, record)
},
)
subscriber := recordsubscriber.New(config)
err := subscriber.Start()
if err != nil {
return err
}
 
<-stream.Context().Done()
 
return nil
}
 
func (s *rangeDBServer) SubscribeToEvents(req *rangedbpb.SubscribeToEventsRequest, stream rangedbpb.RangeDB_SubscribeToEventsServer) error {
config := recordsubscriber.AllEventsConfig(
stream.Context(),
s.store,
s.broadcaster,
subscriberRecordBuffSize,
func(record *rangedb.Record) error {
return s.broadcastRecord(stream, record)
},
)
subscriber := recordsubscriber.New(config)
err := subscriber.StartFrom(req.GlobalSequenceNumber)
if err != nil {
return err
}
 
<-stream.Context().Done()
 
return nil
}
 
func (s *rangeDBServer) SubscribeToEventsByAggregateType(req *rangedbpb.SubscribeToEventsByAggregateTypeRequest, stream rangedbpb.RangeDB_SubscribeToEventsByAggregateTypeServer) error {
config := recordsubscriber.AggregateTypesConfig(
stream.Context(),
s.store,
s.broadcaster,
subscriberRecordBuffSize,
req.AggregateTypes,
func(record *rangedb.Record) error {
return s.broadcastRecord(stream, record)
},
)
subscriber := recordsubscriber.New(config)
err := subscriber.StartFrom(req.GlobalSequenceNumber)
if err != nil {
return err
}
 
<-stream.Context().Done()
 
return nil
}
 
func (s *rangeDBServer) TotalEventsInStream(ctx context.Context, request *rangedbpb.TotalEventsInStreamRequest) (*rangedbpb.TotalEventsInStreamResponse, error) {
totalEvents, err := s.store.TotalEventsInStream(ctx, request.StreamName)
if err != nil {
return nil, err
}
return &rangedbpb.TotalEventsInStreamResponse{
TotalEvents: totalEvents,
}, nil
}
 
func (s *rangeDBServer) broadcastRecord(stream streamSender, record *rangedb.Record) error {
pbRecord, err := rangedbpb.ToPbRecord(record)
if err != nil {
log.Printf("unable to marshal record: %v", err)
return err
}
 
err = stream.Send(pbRecord)
if err != nil {
log.Printf("unable to send record to gRPC client: %v", err)
return err
}
 
return nil
}
 
// PbRecordSender defines the interface for sending a protobuf record.
type PbRecordSender interface {
Send(*rangedbpb.Record) error
}