provider/eventstore/eventstore.go
File `eventstore.go` has 760 lines of code (exceeds 500 allowed). Consider refactoring.package eventstore import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "log" "net/http" "os" "strings" "sync" "time" "github.com/EventStore/EventStore-Client-Go/esdb" "github.com/gofrs/uuid" "github.com/inklabs/rangedb" "github.com/inklabs/rangedb/pkg/broadcast" "github.com/inklabs/rangedb/pkg/clock" "github.com/inklabs/rangedb/pkg/clock/provider/systemclock" "github.com/inklabs/rangedb/pkg/rangedberror" "github.com/inklabs/rangedb/pkg/recordsubscriber" "github.com/inklabs/rangedb/pkg/shortuuid" "github.com/inklabs/rangedb/provider/jsonrecordserializer") const ( rpcErrContextCanceled = "Canceled desc = context canceled" broadcastRecordBuffSize = 100) type RangeDBMetadata struct { StreamName string `json:"streamName"` AggregateType string `json:"aggregateType"` AggregateID string `json:"aggregateID"` InsertTimestamp uint64 `json:"insertTimestamp"` EventType string `json:"eventType"` EventID string `json:"eventID"`} type ESDBMetadata struct { RangeDBMetadata RangeDBMetadata `json:"rangeDBMetadata"` EventMetadata interface{} `json:"eventMetadata"`} type StreamPrefixer interface { WithPrefix(name string) string GetPrefix() string} type Config struct { IPAddr string Username string Password string} func NewConfigFromEnvironment() (Config, error) { ipAddr := os.Getenv("ESDB_IP_ADDR") username := os.Getenv("ESDB_USERNAME") password := os.Getenv("ESDB_PASSWORD") if ipAddr == "" || username == "" || password == "" { return Config{}, fmt.Errorf("EventStoreDB has not been configured via environment variables") } return Config{ IPAddr: ipAddr, Username: username, Password: password, }, nil} func (c Config) ConnectionString() string { return fmt.Sprintf("esdb://%s:%s@%s", c.Username, c.Password, c.IPAddr)} `eventStore` has 22 methods (exceeds 20 allowed). Consider refactoring.type eventStore struct { client *esdb.Client clock clock.Clock streamPrefixer StreamPrefixer uuidGenerator shortuuid.Generator broadcaster broadcast.Broadcaster eventTypeIdentifier rangedb.EventTypeIdentifier config Config recordDeletedStreams bool mu sync.RWMutex deletedStreams map[string]struct{}} // Option defines functional option parameters for eventStore.type Option func(*eventStore) // WithClock is a function option to inject a clock.Clockfunc WithClock(clock clock.Clock) Option { return func(store *eventStore) { store.clock = clock }} // WithUUIDGenerator is a functional option to inject a shortuuid.Generator.func WithUUIDGenerator(uuidGenerator shortuuid.Generator) Option { return func(store *eventStore) { store.uuidGenerator = uuidGenerator }} // WithStreamPrefix is a functional option to inject a stream prefix.func WithStreamPrefix(streamPrefixer StreamPrefixer) Option { return func(store *eventStore) { store.streamPrefixer = streamPrefixer }} // RecordDeletedStreams is a functional option to keep track of deleted streams used by OptimisticDeleteStream.// Warning: when using this option, clients must connect via a central RangeDB server and not directly to ESDB.func RecordDeletedStreams() Option { return func(store *eventStore) { store.recordDeletedStreams = true }} // New constructs an eventStore. Experimental: Use at your own risk!func New(config Config, options ...Option) (*eventStore, error) { s := &eventStore{ clock: systemclock.New(), uuidGenerator: shortuuid.NewUUIDGenerator(), broadcaster: broadcast.New(broadcastRecordBuffSize, broadcast.DefaultTimeout), eventTypeIdentifier: rangedb.NewEventIdentifier(), config: config, deletedStreams: make(map[string]struct{}), } for _, option := range options { option(s) } err := s.setupClient() if err != nil { return nil, err } go s.startSubscription() return s, nil} func (s *eventStore) Close() error { return s.client.Close()} func (s *eventStore) setupClient() error { config, err := esdb.ParseConnectionString(s.config.ConnectionString()) if err != nil { return fmt.Errorf("unexpected configuration error: %s", err.Error()) } config.DisableTLS = true config.SkipCertificateVerification = true client, err := esdb.NewClient(config) if err != nil { return fmt.Errorf("unable to create client: %s", err.Error()) } s.client = client return nil} func (s *eventStore) streamName(name string) string { if s.streamPrefixer == nil { return name } return s.streamPrefixer.WithPrefix(name)} func (s *eventStore) Bind(events ...rangedb.Event) { s.eventTypeIdentifier.Bind(events...)} Method `eventStore.Events` has a Cognitive Complexity of 46 (exceeds 20 allowed). Consider refactoring.
Method `eventStore.Events` has 85 lines of code (exceeds 50 allowed). Consider refactoring.
Method `eventStore.Events` has 11 return statements (exceeds 4 allowed).func (s *eventStore) Events(ctx context.Context, globalSequenceNumber uint64) rangedb.RecordIterator { resultRecords := make(chan rangedb.ResultRecord) go func() { defer close(resultRecords) readAllOptions := esdb.ReadAllOptions{ From: esdb.Position{Commit: globalSequenceNumber}, } readStream, err := s.client.ReadAll(ctx, readAllOptions, ^uint64(0)) if err != nil { if errors.Is(err, io.EOF) { return } if errors.Is(err, esdb.ErrStreamNotFound) { return } if strings.HasSuffix(err.Error(), rpcErrContextCanceled) { resultRecords <- rangedb.ResultRecord{ Record: nil, Err: context.Canceled, } return } resultRecords <- rangedb.ResultRecord{ Record: nil, Err: fmt.Errorf("unexpected failure ReadStreamEvents: %w", err), } return } for { resolvedEvent, err := readStream.Recv() if err != nil { if errors.Is(err, io.EOF) { return } if strings.HasSuffix(err.Error(), rpcErrContextCanceled) { resultRecords <- rangedb.ResultRecord{ Record: nil, Err: context.Canceled, } return } if errors.Is(err, esdb.ErrStreamNotFound) { return } resultRecords <- rangedb.ResultRecord{ Record: nil, Err: fmt.Errorf("unable to receive event: %w", err), } return } if resolvedEvent.Event == nil { continue } if s.isInternalESDBEvent(resolvedEvent) { continue } if !s.inCurrentVersion(resolvedEvent) { continue } if s.inDeletedStreams(resolvedEvent) { continue } record, err := s.recordFromLinkedEvent(resolvedEvent) if err != nil { resultRecords <- rangedb.ResultRecord{ Record: nil, Err: fmt.Errorf("unable to deserialize resolved event: %w", err), } return } select { case <-ctx.Done(): resultRecords <- rangedb.ResultRecord{ Record: nil, Err: context.Canceled, } return default: resultRecords <- rangedb.ResultRecord{ Record: record, Err: nil, } } } }() return rangedb.NewRecordIterator(resultRecords)} Method `eventStore.EventsByAggregateTypes` has a Cognitive Complexity of 43 (exceeds 20 allowed). Consider refactoring.
Method `eventStore.EventsByAggregateTypes` has 79 lines of code (exceeds 50 allowed). Consider refactoring.
Method `eventStore.EventsByAggregateTypes` has 8 return statements (exceeds 4 allowed).func (s *eventStore) EventsByAggregateTypes(ctx context.Context, globalSequenceNumber uint64, aggregateTypes ...string) rangedb.RecordIterator { resultRecords := make(chan rangedb.ResultRecord) go func() { defer close(resultRecords) aggregateTypesMap := make(map[string]struct{}) for _, aggregateType := range aggregateTypes { aggregateTypesMap[aggregateType] = struct{}{} } readAllOptions := esdb.ReadAllOptions{ From: esdb.Position{Commit: globalSequenceNumber}, } readStream, err := s.client.ReadAll(ctx, readAllOptions, ^uint64(0)) if err != nil { if strings.HasSuffix(err.Error(), rpcErrContextCanceled) { resultRecords <- rangedb.ResultRecord{ Record: nil, Err: context.Canceled, } return } resultRecords <- rangedb.ResultRecord{ Record: nil, Err: fmt.Errorf("unexpected failure ReadStreamEvents: %w", err), } return } for { resolvedEvent, err := readStream.Recv() if err != nil { if err == io.EOF { return } if errors.Is(err, esdb.ErrStreamNotFound) { return } resultRecords <- rangedb.ResultRecord{ Record: nil, Err: fmt.Errorf("unable to receive event: %w", err), } return } if s.isInternalESDBEvent(resolvedEvent) { continue } if !s.inCurrentVersion(resolvedEvent) { continue } if s.inDeletedStreams(resolvedEvent) { continue } record, err := s.recordFromLinkedEvent(resolvedEvent) if err != nil { resultRecords <- rangedb.ResultRecord{ Record: nil, Err: fmt.Errorf("unable to deserialize resolved event: %w", err), } return } select { case <-ctx.Done(): resultRecords <- rangedb.ResultRecord{ Record: nil, Err: context.Canceled, } return default: if _, ok := aggregateTypesMap[record.AggregateType]; !ok { continue } if record.GlobalSequenceNumber < globalSequenceNumber { continue } resultRecords <- rangedb.ResultRecord{ Record: record, Err: nil, } } } }() return rangedb.NewRecordIterator(resultRecords)} Method `eventStore.EventsByStream` has a Cognitive Complexity of 49 (exceeds 20 allowed). Consider refactoring.
Method `eventStore.EventsByStream` has 100 lines of code (exceeds 50 allowed). Consider refactoring.
Method `eventStore.EventsByStream` has 11 return statements (exceeds 4 allowed).func (s *eventStore) EventsByStream(ctx context.Context, streamSequenceNumber uint64, streamName string) rangedb.RecordIterator { resultRecords := make(chan rangedb.ResultRecord) go func() { defer close(resultRecords) readStreamOptions := esdb.ReadStreamOptions{ From: esdb.Revision(zeroBasedSequenceNumber(streamSequenceNumber)), ResolveLinkTos: true, } readStream, err := s.client.ReadStream(ctx, s.streamName(streamName), readStreamOptions, ^uint64(0)) if err != nil { if strings.HasSuffix(err.Error(), rpcErrContextCanceled) { resultRecords <- rangedb.ResultRecord{ Record: nil, Err: context.Canceled, } return } var streamDeletedError *esdb.StreamDeletedError if errors.Is(err, esdb.ErrStreamNotFound) || errors.As(err, &streamDeletedError) { resultRecords <- rangedb.ResultRecord{ Record: nil, Err: rangedb.ErrStreamNotFound, } return } resultRecords <- rangedb.ResultRecord{ Record: nil, Err: fmt.Errorf("unexpected failure ReadStream: %w", err), } return } totalRecords := 0 for { resolvedEvent, err := readStream.Recv() if err != nil { if errors.Is(err, io.EOF) { if totalRecords == 0 { resultRecords <- rangedb.ResultRecord{ Record: nil, Err: rangedb.ErrStreamNotFound, } return } return } if strings.HasSuffix(err.Error(), rpcErrContextCanceled) { resultRecords <- rangedb.ResultRecord{ Record: nil, Err: context.Canceled, } return } if errors.Is(err, esdb.ErrStreamNotFound) { resultRecords <- rangedb.ResultRecord{ Record: nil, Err: rangedb.ErrStreamNotFound, } return } resultRecords <- rangedb.ResultRecord{ Record: nil, Err: fmt.Errorf("unable to receive event: %w", err), } return } if s.isInternalESDBEvent(resolvedEvent) { continue } if !s.inCurrentVersion(resolvedEvent) { continue } record, err := s.recordFromLinkedEvent(resolvedEvent) if err != nil { resultRecords <- rangedb.ResultRecord{ Record: nil, Err: fmt.Errorf("unable to deserialize resolved event: %w", err), } return } if record.StreamName != streamName { continue } if record.StreamSequenceNumber < streamSequenceNumber { continue } select { case <-ctx.Done(): resultRecords <- rangedb.ResultRecord{ Record: nil, Err: context.Canceled, } return case resultRecords <- rangedb.ResultRecord{ Record: record, Err: nil, }: totalRecords++ } } }() return rangedb.NewRecordIterator(resultRecords)} func zeroBasedSequenceNumber(sequenceNumber uint64) uint64 { if sequenceNumber < 1 { return 0 } return sequenceNumber - 1} func oneBasedSequenceNumber(sequenceNumber uint64) uint64 { return sequenceNumber + 1} Method `eventStore.OptimisticDeleteStream` has 6 return statements (exceeds 4 allowed).func (s *eventStore) OptimisticDeleteStream(ctx context.Context, expectedStreamSequenceNumber uint64, streamName string) error { versionedStreamName := s.streamName(streamName) tombstoneStreamOptions := esdb.TombstoneStreamOptions{ ExpectedRevision: esdb.Revision(zeroBasedSequenceNumber(expectedStreamSequenceNumber)), Authenticated: nil, } _, err := s.client.TombstoneStream(ctx, versionedStreamName, tombstoneStreamOptions) if err != nil { var streamDeletedError *esdb.StreamDeletedError if errors.Is(err, esdb.ErrStreamNotFound) || errors.As(err, &streamDeletedError) { return rangedb.ErrStreamNotFound } if errors.Is(err, esdb.ErrWrongExpectedStreamRevision) { // We have to manually obtain the current stream sequence number // err does not contain "Actual version" and must be a bug in the EventStoreDB gRPC API. // re: https://github.com/EventStore/EventStore/issues/3226 log.Printf("### Actual version missing: %#v", err) actualSequenceNumber, err := s.getStreamSequenceNumber(ctx, streamName) if err != nil { return err } return &rangedberror.UnexpectedSequenceNumber{ Expected: expectedStreamSequenceNumber, ActualSequenceNumber: actualSequenceNumber, } } if strings.HasSuffix(err.Error(), rpcErrContextCanceled) { return context.Canceled } return err } if s.recordDeletedStreams { s.mu.Lock() s.deletedStreams[s.streamName(streamName)] = struct{}{} s.mu.Unlock() // s.waitForScavenge(ctx) } return nil} func (s *eventStore) OptimisticSave(ctx context.Context, expectedStreamSequenceNumber uint64, streamName string, eventRecords ...*rangedb.EventRecord) (uint64, error) { return s.saveEvents(ctx, &expectedStreamSequenceNumber, streamName, eventRecords...)} func (s *eventStore) Save(ctx context.Context, streamName string, eventRecords ...*rangedb.EventRecord) (uint64, error) { return s.saveEvents(ctx, nil, streamName, eventRecords...)} Similar blocks of code found in 5 locations. Consider refactoring.func (s *eventStore) AllEventsSubscription(ctx context.Context, bufferSize int, subscriber rangedb.RecordSubscriber) rangedb.RecordSubscription { return recordsubscriber.New( recordsubscriber.AllEventsConfig(ctx, s, s.broadcaster, bufferSize, func(record *rangedb.Record) error { subscriber.Accept(record) return nil }, ))} Similar blocks of code found in 5 locations. Consider refactoring.func (s *eventStore) AggregateTypesSubscription(ctx context.Context, bufferSize int, subscriber rangedb.RecordSubscriber, aggregateTypes ...string) rangedb.RecordSubscription { return recordsubscriber.New( recordsubscriber.AggregateTypesConfig(ctx, s, s.broadcaster, bufferSize, aggregateTypes, func(record *rangedb.Record) error { subscriber.Accept(record) return nil }, ))} func (s *eventStore) TotalEventsInStream(ctx context.Context, streamName string) (uint64, error) { select { case <-ctx.Done(): return 0, context.Canceled default: } iter := s.EventsByStream(ctx, 0, streamName) total := uint64(0) for iter.Next() { if iter.Err() != nil { break } total++ } return total, nil} Method `eventStore.saveEvents` has 77 lines of code (exceeds 50 allowed). Consider refactoring.
Method `eventStore.saveEvents` has 10 return statements (exceeds 4 allowed).
Method `eventStore.saveEvents` has a Cognitive Complexity of 22 (exceeds 20 allowed). Consider refactoring.func (s *eventStore) saveEvents(ctx context.Context, expectedStreamSequenceNumber *uint64, streamName string, eventRecords ...*rangedb.EventRecord) (uint64, error) { if len(eventRecords) < 1 { return 0, fmt.Errorf("missing events") } aggregateType := eventRecords[0].Event.AggregateType() aggregateID := eventRecords[0].Event.AggregateID() if expectedStreamSequenceNumber != nil { streamSequenceNumber, _ := s.getStreamSequenceNumber(ctx, streamName) if *expectedStreamSequenceNumber != streamSequenceNumber { return 0, &rangedberror.UnexpectedSequenceNumber{ Expected: *expectedStreamSequenceNumber, ActualSequenceNumber: streamSequenceNumber, } } } var proposedEvents []esdb.EventData for _, eventRecord := range eventRecords { if aggregateType != "" && aggregateType != eventRecord.Event.AggregateType() { return 0, fmt.Errorf("unmatched aggregate type") } if aggregateID != "" && aggregateID != eventRecord.Event.AggregateID() { return 0, fmt.Errorf("unmatched aggregate ID") } aggregateType = eventRecord.Event.AggregateType() aggregateID = eventRecord.Event.AggregateID() eventID := s.uuidGenerator.New() esDBMetadata := ESDBMetadata{ RangeDBMetadata: RangeDBMetadata{ StreamName: streamName, AggregateType: aggregateType, AggregateID: aggregateID, InsertTimestamp: uint64(s.clock.Now().Unix()), EventType: eventRecord.Event.EventType(), EventID: eventID, }, EventMetadata: eventRecord.Metadata, } var eventMetadata []byte eventMetadata, err := json.Marshal(esDBMetadata) if err != nil { return 0, err } eventData, err := json.Marshal(eventRecord.Event) if err != nil { return 0, err } eventUUID, _ := uuid.FromString(eventID) proposedEvent := esdb.EventData{ EventID: eventUUID, EventType: eventRecord.Event.EventType(), ContentType: esdb.JsonContentType, Data: eventData, Metadata: eventMetadata, } proposedEvents = append(proposedEvents, proposedEvent) } var streamRevision esdb.ExpectedRevision if expectedStreamSequenceNumber != nil && *expectedStreamSequenceNumber > 0 { streamRevision = esdb.Revision(zeroBasedSequenceNumber(*expectedStreamSequenceNumber)) } appendToStreamRevisionOptions := esdb.AppendToStreamOptions{ ExpectedRevision: streamRevision, } result, err := s.client.AppendToStream(ctx, s.streamName(streamName), appendToStreamRevisionOptions, proposedEvents...) if err != nil { if errors.Is(err, esdb.ErrWrongExpectedStreamRevision) { return 0, &rangedberror.UnexpectedSequenceNumber{ Expected: *expectedStreamSequenceNumber, ActualSequenceNumber: 0, } } if strings.HasSuffix(err.Error(), rpcErrContextCanceled) { return 0, context.Canceled } return 0, err } streamSequenceNumber := oneBasedSequenceNumber(result.NextExpectedVersion) return streamSequenceNumber, nil} func (s *eventStore) isInternalESDBEvent(event *esdb.ResolvedEvent) bool { return strings.HasPrefix(event.Event.EventType, "$")} func (s *eventStore) inCurrentVersion(event *esdb.ResolvedEvent) bool { if event.Event == nil { return false } if s.streamPrefixer == nil { return true } return strings.HasPrefix(event.Event.StreamID, s.streamPrefixer.GetPrefix())} func (s *eventStore) Ping() error { ctx, done := context.WithTimeout(context.Background(), 5*time.Second) defer done() iter := s.EventsByStream(ctx, 0, "no!stream") iter.Next() if iter.Err() != nil && strings.Contains(iter.Err().Error(), "connection refused") { return iter.Err() } return nil} func (s *eventStore) getStreamSequenceNumber(ctx context.Context, stream string) (uint64, error) { iter := s.EventsByStream(ctx, 0, stream) lastStreamSequenceNumber := uint64(0) for iter.Next() { if iter.Err() != nil { return 0, iter.Err() } lastStreamSequenceNumber = iter.Record().StreamSequenceNumber } if iter.Err() != nil { return 0, iter.Err() } return lastStreamSequenceNumber, nil} func (s *eventStore) recordFromLinkedEvent(resolvedEvent *esdb.ResolvedEvent) (*rangedb.Record, error) { if resolvedEvent.Event == nil { return nil, fmt.Errorf("not found") } var eventStoreDBMetadata ESDBMetadata err := json.Unmarshal(resolvedEvent.Event.UserMetadata, &eventStoreDBMetadata) if err != nil { return nil, fmt.Errorf("unable to unmarshal ESDBMetadata err: %w", err) } globalSequenceNumber := uint64(0) if resolvedEvent.Commit != nil { globalSequenceNumber = *resolvedEvent.Commit } record := &rangedb.Record{ StreamName: eventStoreDBMetadata.RangeDBMetadata.StreamName, AggregateType: eventStoreDBMetadata.RangeDBMetadata.AggregateType, AggregateID: eventStoreDBMetadata.RangeDBMetadata.AggregateID, GlobalSequenceNumber: globalSequenceNumber, StreamSequenceNumber: oneBasedSequenceNumber(resolvedEvent.Event.EventNumber), InsertTimestamp: eventStoreDBMetadata.RangeDBMetadata.InsertTimestamp, EventID: eventStoreDBMetadata.RangeDBMetadata.EventID, EventType: eventStoreDBMetadata.RangeDBMetadata.EventType, Metadata: eventStoreDBMetadata.EventMetadata, } event, err := jsonrecordserializer.DecodeJsonData(record.EventType, bytes.NewReader(resolvedEvent.Event.Data), s.eventTypeIdentifier) if err != nil { return nil, err } record.Data = event return record, nil} Method `eventStore.startSubscription` has a Cognitive Complexity of 26 (exceeds 20 allowed). Consider refactoring.
Method `eventStore.startSubscription` has 55 lines of code (exceeds 50 allowed). Consider refactoring.
Method `eventStore.startSubscription` has 8 return statements (exceeds 4 allowed).func (s *eventStore) startSubscription() { ctx := context.Background() opts := esdb.SubscribeToAllOptions{ From: esdb.Start{}, } subscription, err := s.client.SubscribeToAll(ctx, opts) if err != nil { return } defer func() { err := subscription.Close() if err != nil { log.Printf("subscription: unable to close") return } }() for { subscriptionEvent := subscription.Recv() if err != nil { if err == io.EOF { log.Printf("subscription: io.EOF") return } if strings.HasSuffix(err.Error(), rpcErrContextCanceled) { log.Printf("subscription: context canceled") return } if errors.Is(err, esdb.ErrStreamNotFound) { log.Printf("subscription: stream not found") return } log.Printf("subscription: unable to receive event: %v", err) return } if subscriptionEvent.EventAppeared == nil || subscriptionEvent.EventAppeared.Event == nil { continue } if s.isInternalESDBEvent(subscriptionEvent.EventAppeared) { continue } if !s.inCurrentVersion(subscriptionEvent.EventAppeared) { continue } record, err := s.recordFromLinkedEvent(subscriptionEvent.EventAppeared) if err != nil { log.Printf("subscription: unable to deserialize resolved event: %v", err) return } select { case <-ctx.Done(): log.Printf("subscription: context canceled") return default: s.broadcaster.Accept(record) } }} Method `eventStore.waitForScavenge` has 6 return statements (exceeds 4 allowed).func (s *eventStore) waitForScavenge(ctx context.Context) { log.Print("starting scavenge") uri := "http://0.0.0.0:2113/admin/scavenge" req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri, nil) if err != nil { log.Print(err) return } req.SetBasicAuth(s.config.Username, s.config.Password) req.Header.Add("Accept", "application/json") response, err := http.DefaultClient.Do(req) if err != nil { log.Print(err) return } if response.StatusCode != 200 { log.Print(response) return } log.Printf("response: %#v", response) log.Printf("headers: %v", response.Header) type ScavengeResponse struct { ScavengeID string `json:"scavengeId"` } var scavengeResponse ScavengeResponse err = json.NewDecoder(response.Body).Decode(&scavengeResponse) if err != nil { log.Print(err) return } scavengeStream := fmt.Sprintf("$scavenges-%s", scavengeResponse.ScavengeID) stream, err := s.client.SubscribeToStream(ctx, scavengeStream, esdb.SubscribeToStreamOptions{}) if err != nil { log.Print(err) return } log.Print("Waiting for scavenge") for { log.Print(".") subscriptionEvent := stream.Recv() log.Print(subscriptionEvent.EventAppeared.Event) if subscriptionEvent.EventAppeared.Event.EventType == "$scavengeCompleted" { log.Print("Scavenge complete!") return } }} func (s *eventStore) inDeletedStreams(event *esdb.ResolvedEvent) bool { if !s.recordDeletedStreams { return false } s.mu.RLock() defer s.mu.RUnlock() if _, ok := s.deletedStreams[event.Event.StreamID]; ok { return true } return false}