ucpr/mongo-streamer

View on GitHub
internal/mongo/change_stream.go

Summary

Maintainability
A
35 mins
Test Coverage
F
0%
package mongo
 
import (
"context"
"encoding/json"
"errors"
 
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
 
mmetric "github.com/ucpr/mongo-streamer/internal/metric/mongo"
"github.com/ucpr/mongo-streamer/internal/model"
"github.com/ucpr/mongo-streamer/internal/persistent"
"github.com/ucpr/mongo-streamer/pkg/log"
)
 
type (
// ChangeStream is a struct that represents a change stream.
ChangeStream struct {
cs *mongo.ChangeStream
handler ChangeStreamHandler
tokenManager persistent.StorageBuffer
db string
col string
}
 
// ChangeStreamOptions is a struct that represents options for change stream.
ChangeStreamOptions struct {
*options.ChangeStreamOptions
}
 
// ChangeStreamOption is a type of option function for change stream.
ChangeStreamOption func(opts *ChangeStreamOptions)
 
// ChangeStreamHandler is a type of handler function that handles ChangeStream.
ChangeStreamHandler func(ctx context.Context, event model.ChangeEvent) error
)
 
// WithBatchSize sets the batch size for ChangeStream.
func WithBatchSize(batchSize int32) ChangeStreamOption {
return func(o *ChangeStreamOptions) {
o.BatchSize = &batchSize
}
}
 
// ChangeStreamParams is a struct that represents parameters for creating a ChangeStream.
type ChangeStreamParams struct {
Client *Client
Handler ChangeStreamHandler
Storage persistent.StorageBuffer
Database string
Collection string
}
 
// NewChangeStream creates a new change stream instance.
Function `NewChangeStream` has 5 return statements (exceeds 4 allowed).
func NewChangeStream(ctx context.Context, params ChangeStreamParams, opts ...ChangeStreamOption) (*ChangeStream, error) {
chopts := &options.ChangeStreamOptions{}
for _, opt := range opts {
opt(&ChangeStreamOptions{chopts})
}
 
rt, err := params.Storage.Get()
if err != nil {
return nil, err
}
if rt != "" {
chopts.SetResumeAfter(rt)
}
 
// TODO: refactor
pipeline := mongo.Pipeline{}
db, col := params.Database, params.Collection
collection := params.Client.cli.Database(db).Collection(col)
changeStream, err := collection.Watch(ctx, pipeline, chopts)
if err != nil {
// if resume token is not found, reset resume token and retry
if errors.Is(err, mongo.ErrMissingResumeToken) {
log.Warn("Resume token is not found, reset resume token and retry", log.Fstring("db", db), log.Fstring("col", col))
chopts.SetResumeAfter(nil)
if err := params.Storage.Clear(); err != nil {
return nil, err
}
 
changeStream, err = collection.Watch(ctx, pipeline, chopts)
if err != nil {
return nil, err
}
} else {
return nil, err
}
}
 
cs := &ChangeStream{
cs: changeStream,
handler: params.Handler,
tokenManager: params.Storage,
db: db,
col: col,
}
return cs, nil
}
 
// Run starts watching change stream.
func (c *ChangeStream) Run(ctx context.Context) {
for c.cs.Next(ctx) {
mmetric.ReceiveChangeStream(c.db, c.col)
 
var streamObject model.ChangeEvent
if err := c.cs.Decode(&streamObject); err != nil {
mmetric.HandleChangeEventFailed(c.db, c.col)
log.Error("Failed to decode steream object", log.Ferror(err))
continue
}
 
// marshal stream object to json
jb, err := json.Marshal(streamObject)
if err != nil {
// skip for metrics retention use
log.Error("Failed to marshal stream object to json", log.Ferror(err))
}
mmetric.ReceiveBytes(c.db, c.col, len(jb))
 
if err := c.handler(context.Background(), streamObject); err != nil {
mmetric.HandleChangeEventFailed(c.db, c.col)
log.Error("Failed to handle change stream", log.Ferror(err))
// TODO: If handle fails, the process is repeated again
continue
}
mmetric.HandleChangeEventSuccess(c.db, c.col)
 
// save resume token
if err := c.tokenManager.Set(c.resumeToken()); err != nil {
log.Error("Failed to save resume token", log.Ferror(err))
continue
}
}
}
 
// resumeToken returns the resume token of the change stream.
func (c *ChangeStream) resumeToken() string {
return c.cs.ResumeToken().String()
}
 
// Close closes the change stream cursor.
func (c *ChangeStream) Close(ctx context.Context) error {
return c.cs.Close(ctx)
}