inklabs/rangedb

View on GitHub
pkg/rangedbapi/api.go

Summary

Maintainability
A
35 mins
Test Coverage
A
98%
package rangedbapi
 
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"strconv"
"strings"
 
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
 
"github.com/inklabs/rangedb"
"github.com/inklabs/rangedb/pkg/projection"
"github.com/inklabs/rangedb/pkg/rangedberror"
"github.com/inklabs/rangedb/provider/inmemorystore"
"github.com/inklabs/rangedb/provider/jsonrecordiostream"
"github.com/inklabs/rangedb/provider/msgpackrecordiostream"
"github.com/inklabs/rangedb/provider/ndjsonrecordiostream"
)
 
type api struct {
jsonRecordIoStream rangedb.RecordIoStream
ndJSONRecordIoStream rangedb.RecordIoStream
msgpackRecordIoStream rangedb.RecordIoStream
store rangedb.Store
snapshotStore projection.SnapshotStore
handler http.Handler
logger *log.Logger
baseUri string
projections struct {
aggregateTypeStats *projection.AggregateTypeStats
}
}
 
// Option defines functional option parameters for api.
type Option func(*api)
 
// WithBaseUri is a functional option to inject the base URI for use in API links.
func WithBaseUri(baseUri string) Option {
return func(api *api) {
api.baseUri = baseUri
}
}
 
// WithLogger is a functional option to inject a Logger.
func WithLogger(logger *log.Logger) Option {
return func(api *api) {
api.logger = logger
}
}
 
// WithSnapshotStore is a functional option to inject a SnapshotStore.
func WithSnapshotStore(snapshotStore projection.SnapshotStore) Option {
return func(api *api) {
api.snapshotStore = snapshotStore
}
}
 
// WithStore is a functional option to inject a Store.
func WithStore(store rangedb.Store) Option {
return func(api *api) {
api.store = store
}
}
 
// New constructs an api.
func New(options ...Option) (*api, error) {
api := &api{
jsonRecordIoStream: jsonrecordiostream.New(),
ndJSONRecordIoStream: ndjsonrecordiostream.New(),
msgpackRecordIoStream: msgpackrecordiostream.New(),
store: inmemorystore.New(),
logger: log.New(ioutil.Discard, "", 0),
baseUri: "http://127.0.0.1",
}
 
for _, option := range options {
option(api)
}
 
api.initRoutes()
err := api.initProjections()
if err != nil {
return nil, err
}
 
return api, nil
}
 
func (a *api) initRoutes() {
const streamName = "{streamName}"
const extension = ".{extension:json|ndjson|msgpack}"
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/health-check", a.healthCheck)
router.HandleFunc("/delete-stream/"+streamName, a.deleteStream)
router.HandleFunc("/save-events/"+streamName, a.saveEvents)
router.HandleFunc("/all-events"+extension, a.allEvents)
router.HandleFunc("/events-by-stream/"+streamName+extension, a.eventsByStream)
router.HandleFunc("/events-by-aggregate-type/{aggregateType:[a-zA-Z-,]+}"+extension, a.eventsByAggregateType)
router.HandleFunc("/list-aggregate-types", a.listAggregateTypes)
a.handler = handlers.CompressHandler(router)
}
 
func (a *api) initProjections() error {
a.projections.aggregateTypeStats = projection.NewAggregateTypeStats()
globalSequenceNumber := uint64(0)
 
if a.snapshotStore != nil {
err := a.snapshotStore.Load(a.projections.aggregateTypeStats)
if err != nil {
a.logger.Printf("unable to load from snapshot store: %v", err)
}
 
if a.projections.aggregateTypeStats.TotalEvents() > 0 {
globalSequenceNumber = a.projections.aggregateTypeStats.LatestGlobalSequenceNumber() + 1
}
}
 
ctx := context.Background()
const bufferSize = 10
subscription := a.store.AllEventsSubscription(ctx, bufferSize, a.projections.aggregateTypeStats)
err := subscription.StartFrom(globalSequenceNumber)
if err != nil {
return err
}
 
if a.snapshotStore != nil {
err := a.snapshotStore.Save(a.projections.aggregateTypeStats)
if err != nil {
a.logger.Print(err)
}
}
 
return nil
}
 
func (a *api) ServeHTTP(w http.ResponseWriter, r *http.Request) {
a.handler.ServeHTTP(w, r)
}
 
func (a *api) healthCheck(w http.ResponseWriter, _ *http.Request) {
w.Header().Set(`Content-Type`, `application/json`)
_, _ = fmt.Fprintf(w, `{"status":"OK"}`)
}
 
func (a *api) allEvents(w http.ResponseWriter, r *http.Request) {
extension := mux.Vars(r)["extension"]
 
recordIterator := a.store.Events(r.Context(), 0)
a.writeEvents(w, recordIterator, extension)
}
 
func (a *api) eventsByStream(w http.ResponseWriter, r *http.Request) {
streamName := mux.Vars(r)["streamName"]
extension := mux.Vars(r)["extension"]
 
events := a.store.EventsByStream(r.Context(), 0, streamName)
a.writeEvents(w, events, extension)
}
 
func (a *api) eventsByAggregateType(w http.ResponseWriter, r *http.Request) {
aggregateTypeInput := mux.Vars(r)["aggregateType"]
aggregateTypes := strings.Split(aggregateTypeInput, ",")
extension := mux.Vars(r)["extension"]
 
events := a.store.EventsByAggregateTypes(r.Context(), 0, aggregateTypes...)
 
a.writeEvents(w, events, extension)
}
 
func (a *api) deleteStream(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
 
streamName := mux.Vars(r)["streamName"]
 
expectedStreamSequenceNumber, err := expectedStreamSequenceNumberFromRequest(r)
if err != nil {
writeFailedResponse(w, "invalid ExpectedStreamSequenceNumber", http.StatusBadRequest)
return
}
 
deleteErr := a.store.OptimisticDeleteStream(r.Context(), expectedStreamSequenceNumber, streamName)
eventsDeleted := expectedStreamSequenceNumber
 
if deleteErr != nil {
if unexpectedErr, ok := deleteErr.(*rangedberror.UnexpectedSequenceNumber); ok {
writeFailedResponse(w, unexpectedErr.Error(), http.StatusConflict)
return
}
 
if deleteErr == rangedb.ErrStreamNotFound {
writeFailedResponse(w, deleteErr.Error(), http.StatusNotFound)
return
}
 
a.logger.Printf("unable to delete: %v", deleteErr)
writeFailedResponse(w, "internal server error", http.StatusInternalServerError)
return
}
 
w.WriteHeader(http.StatusOK)
_, _ = fmt.Fprintf(w, `{"status":"OK","eventsDeleted":%d}`, eventsDeleted)
}
 
Method `api.saveEvents` has 5 return statements (exceeds 4 allowed).
func (a *api) saveEvents(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Content-Type") != "application/json" {
http.Error(w, "invalid content type", http.StatusBadRequest)
return
}
 
w.Header().Set("Content-Type", "application/json")
 
eventRecords, err := getEventRecords(r)
if err != nil {
writeFailedResponse(w, "invalid json request body", http.StatusBadRequest)
return
}
 
streamName := mux.Vars(r)["streamName"]
 
var lastStreamSequenceNumber uint64
var saveErr error
expectedStreamSequenceNumberInput := r.Header.Get("ExpectedStreamSequenceNumber")
if expectedStreamSequenceNumberInput != "" {
expectedStreamSequenceNumber, err := strconv.ParseUint(expectedStreamSequenceNumberInput, 10, 64)
if err != nil {
writeFailedResponse(w, "invalid ExpectedStreamSequenceNumber", http.StatusBadRequest)
return
}
lastStreamSequenceNumber, saveErr = a.store.OptimisticSave(r.Context(), expectedStreamSequenceNumber, streamName, eventRecords...)
} else {
lastStreamSequenceNumber, saveErr = a.store.Save(r.Context(), streamName, eventRecords...)
}
 
if saveErr != nil {
if unexpectedErr, ok := saveErr.(*rangedberror.UnexpectedSequenceNumber); ok {
writeFailedResponse(w, unexpectedErr.Error(), http.StatusConflict)
return
}
 
a.logger.Printf("unable to save: %v", saveErr)
writeFailedResponse(w, "internal server error", http.StatusInternalServerError)
return
}
 
w.WriteHeader(http.StatusCreated)
_, _ = fmt.Fprintf(w, `{"status":"OK","streamSequenceNumber":%d}`, lastStreamSequenceNumber)
}
 
func writeFailedResponse(w http.ResponseWriter, message string, statusCode int) {
w.WriteHeader(statusCode)
_, _ = fmt.Fprintf(w, `{"status":"Failed","message":"%s"}`, message)
}
 
func getEventRecords(r *http.Request) ([]*rangedb.EventRecord, error) {
var events []struct {
AggregateType string `json:"aggregateType"`
AggregateID string `json:"aggregateID"`
EventType string `json:"eventType"`
Data interface{} `json:"data"`
Metadata interface{} `json:"metadata"`
}
err := json.NewDecoder(r.Body).Decode(&events)
if err != nil {
return nil, fmt.Errorf("invalid json request body: %v", err)
}
 
var eventRecords []*rangedb.EventRecord
for _, event := range events {
eventRecords = append(eventRecords, &rangedb.EventRecord{
Event: rangedb.NewRawEvent(event.AggregateType, event.AggregateID, event.EventType, event.Data),
Metadata: event.Metadata,
})
}
 
return eventRecords, nil
}
 
func (a *api) AggregateTypeStatsProjection() *projection.AggregateTypeStats {
return a.projections.aggregateTypeStats
}
 
func (a *api) listAggregateTypes(w http.ResponseWriter, _ *http.Request) {
var data []map[string]interface{}
for _, aggregateType := range a.projections.aggregateTypeStats.SortedAggregateTypes() {
data = append(data, map[string]interface{}{
"name": aggregateType,
"totalEvents": a.projections.aggregateTypeStats.TotalEventsByAggregateType(aggregateType),
"links": map[string]interface{}{
"self": fmt.Sprintf("%s/events-by-aggregate-type/%s.json", a.baseUri, aggregateType),
},
})
}
 
listResponse := struct {
Data interface{} `json:"data"`
TotalEvents uint64 `json:"totalEvents"`
Links map[string]string `json:"links"`
}{
Data: data,
TotalEvents: a.projections.aggregateTypeStats.TotalEvents(),
Links: map[string]string{
"allEvents": fmt.Sprintf("%s/all-events.json", a.baseUri),
"self": fmt.Sprintf("%s/list-aggregate-types", a.baseUri),
},
}
 
w.Header().Set(`Content-Type`, `application/json`)
encoder := json.NewEncoder(w)
_ = encoder.Encode(listResponse)
}
 
func (a *api) writeEvents(w http.ResponseWriter, recordIterator rangedb.RecordIterator, extension string) {
switch extension {
case "json":
w.Header().Set(`Content-Type`, `application/json`)
errors := a.jsonRecordIoStream.Write(w, recordIterator)
<-errors
 
case "ndjson":
w.Header().Set(`Content-Type`, `application/json; boundary=LF`)
errors := a.ndJSONRecordIoStream.Write(w, recordIterator)
<-errors
 
case "msgpack":
w.Header().Set(`Content-Type`, `application/msgpack`)
base64Writer := base64.NewEncoder(base64.RawStdEncoding, w)
errors := a.msgpackRecordIoStream.Write(base64Writer, recordIterator)
<-errors
_ = base64Writer.Close()
 
}
}
 
type invalidInput struct {
err error
}
 
func newInvalidInput(err error) *invalidInput {
return &invalidInput{err: err}
}
 
func (i invalidInput) Error() string {
return fmt.Sprintf("invalid input: %v", i.err)
}
 
func expectedStreamSequenceNumberFromRequest(r *http.Request) (uint64, error) {
expectedStreamSequenceNumberInput := r.Header.Get("ExpectedStreamSequenceNumber")
if expectedStreamSequenceNumberInput != "" {
expectedStreamSequenceNumber, err := strconv.ParseUint(expectedStreamSequenceNumberInput, 10, 64)
if err != nil {
return 0, fmt.Errorf("invalid ExpectedStreamSequenceNumber")
}
 
return expectedStreamSequenceNumber, nil
}
 
return 0, nil
}