go_sdk.go
// Package streamdal is a library that allows running of Client data pipelines against data
// This package is designed to be included in golang message bus libraries. The only public
// method is Process() which is used to run pipelines against data.
//
// Use of this package requires a running instance of a streamdal server©.
// The server can be downloaded at https://github.com/streamdal/streamdal/tree/main/apps/server
//
// The following environment variables must be set:
// - STREAMDAL_URL: The address of the Client server
// - STREAMDAL_TOKEN: The token to use when connecting to the Client server
// - STREAMDAL_SERVICE_NAME: The name of the service to identify it in the streamdal console
//
// Optional parameters:
// - STREAMDAL_DRY_RUN: If true, rule hits will only be logged, no failure modes will be ran
package streamdal
import (
"context"
"fmt"
"os"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/relistan/go-director"
"google.golang.org/protobuf/proto"
"github.com/streamdal/streamdal/libs/protos/build/go/protos"
"github.com/streamdal/go-sdk/hostfunc"
"github.com/streamdal/go-sdk/kv"
"github.com/streamdal/go-sdk/logger"
"github.com/streamdal/go-sdk/metrics"
"github.com/streamdal/go-sdk/server"
"github.com/streamdal/go-sdk/types"
)
// OperationType is used to indicate if the operation is a consumer or a producer
type OperationType int
// ClientType is used to indicate if this library is being used by a shim or directly (as an SDK)
type ClientType int
// ProcessResponse is the response struct from a Process() call
type ProcessResponse protos.SDKResponse
const (
// DefaultPipelineTimeoutDurationStr is the default timeout for a pipeline execution
DefaultPipelineTimeoutDurationStr = "100ms"
// DefaultStepTimeoutDurationStr is the default timeout for a single step.
DefaultStepTimeoutDurationStr = "10ms"
// ReconnectSleep determines the length of time to wait between reconnect attempts to streamdal server©
ReconnectSleep = time.Second * 5
// MaxWASMPayloadSize is the maximum size of data that can be sent to the WASM module
MaxWASMPayloadSize = 1024 * 1024 // 1Mi
// ClientTypeSDK & ClientTypeShim are referenced by shims and SDKs to indicate
// in what context this SDK is being used.
ClientTypeSDK ClientType = 1
ClientTypeShim ClientType = 2
// OperationTypeConsumer and OperationTypeProducer are used to indicate the
// type of operation the Process() call is performing.
OperationTypeConsumer OperationType = 1
OperationTypeProducer OperationType = 2
AbortAllStr = "aborted all pipelines"
AbortCurrentStr = "aborted current pipeline"
AbortNoneStr = "no abort condition"
// ExecStatusTrue & ExecStatusFalse & ExecStatusError are used to indicate
// the execution status of a step.
ExecStatusTrue = protos.ExecStatus_EXEC_STATUS_TRUE
ExecStatusFalse = protos.ExecStatus_EXEC_STATUS_FALSE
ExecStatusError = protos.ExecStatus_EXEC_STATUS_ERROR
)
var (
ErrEmptyConfig = errors.New("config cannot be empty")
ErrEmptyServiceName = errors.New("data source cannot be empty")
ErrEmptyOperationName = errors.New("operation name cannot be empty")
ErrInvalidOperationType = errors.New("operation type must be set to either OperationTypeConsumer or OperationTypeProducer")
ErrEmptyComponentName = errors.New("component name cannot be empty")
ErrMissingShutdownCtx = errors.New("shutdown context cannot be nil")
ErrEmptyCommand = errors.New("command cannot be empty")
ErrEmptyProcessRequest = errors.New("process request cannot be empty")
// ErrMaxPayloadSizeExceeded is returned when the payload is bigger than MaxWASMPayloadSize
ErrMaxPayloadSizeExceeded = fmt.Errorf("payload size exceeds maximum of '%d' bytes", MaxWASMPayloadSize)
// ErrPipelineTimeout is returned when a pipeline exceeds the configured timeout
ErrPipelineTimeout = errors.New("pipeline timeout exceeded")
)
type IStreamdal interface {
// Process is used to run data pipelines against data
Process(ctx context.Context, req *ProcessRequest) *ProcessResponse
}
// Streamdal is the main struct for this library
type Streamdal struct {
config *Config
functions map[string]*function
functionsMtx *sync.RWMutex
pipelines map[string][]*protos.Pipeline // k: audienceStr
pipelinesMtx *sync.RWMutex
serverClient server.IServerClient
metrics metrics.IMetrics
audiences map[string]struct{} // k: audienceStr
audiencesMtx *sync.RWMutex
sessionID string
kv kv.IKV
hf *hostfunc.HostFunc
tailsMtx *sync.RWMutex
tails map[string]map[string]*Tail // k1: audienceStr k2: tailID
pausedTailsMtx *sync.RWMutex
pausedTails map[string]map[string]*Tail // k1: audienceStr k2: tailID
schemas map[string]*protos.Schema // k: audienceStr
schemasMtx *sync.RWMutex
}
type Config struct {
// ServerURL the hostname and port for the gRPC API of Streamdal Server
// If this value is left empty, the library will not attempt to connect to the server
// and New() will return nil
ServerURL string
// ServerToken is the authentication token for the gRPC API of the Streamdal server
// If this value is left empty, the library will not attempt to connect to the server
// and New() will return nil
ServerToken string
// ServiceName is the name that this library will identify as in the UI. Required
ServiceName string
// PipelineTimeout defines how long this library will allow a pipeline to
// run. Optional; default: 100ms
PipelineTimeout time.Duration
// StepTimeout defines how long this library will allow a single step to run.
// Optional; default: 10ms
StepTimeout time.Duration
// IgnoreStartupError defines how to handle an error on initial startup via
// New(). If left as false, failure to complete startup (such as bad auth)
// will cause New() to return an error. If true, the library will block and
// continue trying to initialize. You may want to adjust this if you want
// your application to behave a certain way on startup when the server
// is unavailable. Optional; default: false
IgnoreStartupError bool
// If specified, library will connect to the server but won't apply any
// pipelines. Optional; default: false
DryRun bool
// ShutdownCtx is a context that the library will listen to for cancellation
// notices. Optional; default: nil
ShutdownCtx context.Context
// Logger is a logger you can inject (such as logrus) to allow this library
// to log output. Optional; default: nil
Logger logger.Logger
// Audiences is a list of audiences you can specify at registration time.
// This is useful if you know your audiences in advance and want to populate
// service groups in the Streamdal UI _before_ your code executes any .Process()
// calls. Optional; default: nil
Audiences []*Audience
// ClientType specifies whether this of the SDK is used in a shim library or
// as a standalone SDK. This information is used for both debug info and to
// help the library determine whether ServerURL and ServerToken should be
// optional or required. Optional; default: ClientTypeSDK
ClientType ClientType
}
// Audience is used to announce an audience to the Streamdal server on library initialization
// We use this to avoid end users having to import our protos
type Audience struct {
ComponentName string
OperationType OperationType
OperationName string
}
// ProcessRequest is used to maintain a consistent API for the Process() call
type ProcessRequest struct {
ComponentName string
OperationType OperationType
OperationName string
Data []byte
}
func New(cfg *Config) (*Streamdal, error) {
if err := validateConfig(cfg); err != nil {
return nil, errors.Wrap(err, "unable to validate config")
}
// We instantiate this library based on whether we have a Client URL+token or not.
// If these are not provided, the wrapper library will not perform rule checks and
// will act as normal
if cfg.ServerURL == "" || cfg.ServerToken == "" {
return nil, nil
}
serverClient, err := server.New(cfg.ServerURL, cfg.ServerToken)
if err != nil {
return nil, errors.Wrapf(err, "failed to connect to streamdal server© '%s'", cfg.ServerURL)
}
m, err := metrics.New(&metrics.Config{
ServerClient: serverClient,
ShutdownCtx: cfg.ShutdownCtx,
Log: cfg.Logger,
})
if err != nil {
return nil, errors.Wrap(err, "failed to start metrics service")
}
kvInstance, err := kv.New(&kv.Config{
Logger: cfg.Logger,
})
if err != nil {
return nil, errors.Wrap(err, "failed to start kv service")
}
hf, err := hostfunc.New(kvInstance, cfg.Logger)
if err != nil {
return nil, errors.Wrap(err, "failed to create hostfunc instance")
}
s := &Streamdal{
functions: make(map[string]*function),
functionsMtx: &sync.RWMutex{},
serverClient: serverClient,
pipelines: make(map[string][]*protos.Pipeline),
pipelinesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
audiencesMtx: &sync.RWMutex{},
config: cfg,
metrics: m,
sessionID: uuid.New().String(),
kv: kvInstance,
hf: hf,
tailsMtx: &sync.RWMutex{},
tails: make(map[string]map[string]*Tail),
pausedTailsMtx: &sync.RWMutex{},
pausedTails: make(map[string]map[string]*Tail),
schemasMtx: &sync.RWMutex{},
schemas: make(map[string]*protos.Schema),
}
if cfg.DryRun {
cfg.Logger.Warn("data pipelines running in dry run mode")
}
if err := s.pullInitialPipelines(cfg.ShutdownCtx); err != nil {
return nil, err
}
errCh := make(chan error)
// Start register
go func() {
if err := s.register(director.NewFreeLooper(director.FOREVER, make(chan error, 1))); err != nil {
errCh <- errors.Wrap(err, "register error")
}
}()
// Start heartbeat
go s.heartbeat(director.NewTimedLooper(director.FOREVER, time.Second, make(chan error, 1)))
go s.watchForShutdown()
// Make sure we were able to start without issues
select {
case err := <-errCh:
return nil, errors.Wrap(err, "received error on startup")
case <-time.After(time.Second * 5):
return s, nil
}
}
func validateConfig(cfg *Config) error {
if cfg == nil {
return ErrEmptyConfig
}
if cfg.ShutdownCtx == nil {
return ErrMissingShutdownCtx
}
if cfg.ServiceName == "" {
cfg.ServiceName = os.Getenv("STREAMDAL_SERVICE_NAME")
if cfg.ServiceName == "" {
return ErrEmptyServiceName
}
}
// Can be specified in config for lib use, or via envar for shim use
if cfg.ServerURL == "" {
cfg.ServerURL = os.Getenv("STREAMDAL_URL")
}
// Can be specified in config for lib use, or via envar for shim use
if cfg.ServerToken == "" {
cfg.ServerToken = os.Getenv("STREAMDAL_TOKEN")
}
// Can be specified in config for lib use, or via envar for shim use
if os.Getenv("STREAMDAL_DRY_RUN") == "true" {
cfg.DryRun = true
}
// Can be specified in config for lib use, or via envar for shim use
if cfg.StepTimeout == 0 {
to := os.Getenv("STREAMDAL_STEP_TIMEOUT")
if to == "" {
to = DefaultStepTimeoutDurationStr
}
timeout, err := time.ParseDuration(to)
if err != nil {
return errors.Wrapf(err, "unable to parse StepTimeout '%s'", to)
}
cfg.StepTimeout = timeout
}
// Can be specified in config for lib use, or via envar for shim use
if cfg.PipelineTimeout == 0 {
to := os.Getenv("STREAMDAL_PIPELINE_TIMEOUT")
if to == "" {
to = DefaultPipelineTimeoutDurationStr
}
timeout, err := time.ParseDuration(to)
if err != nil {
return errors.Wrapf(err, "unable to parse PipelineTimeout '%s'", to)
}
cfg.PipelineTimeout = timeout
}
// Default to NOOP logger if none is provided
if cfg.Logger == nil {
cfg.Logger = &logger.TinyLogger{}
}
// Default to ClientTypeSDK
if cfg.ClientType != ClientTypeShim && cfg.ClientType != ClientTypeSDK {
cfg.ClientType = ClientTypeSDK
}
return nil
}
func validateProcessRequest(req *ProcessRequest) error {
if req == nil {
return ErrEmptyProcessRequest
}
if req.OperationName == "" {
return ErrEmptyOperationName
}
if req.ComponentName == "" {
return ErrEmptyComponentName
}
if req.OperationType != OperationTypeProducer && req.OperationType != OperationTypeConsumer {
return ErrInvalidOperationType
}
return nil
}
func (s *Streamdal) watchForShutdown() {
<-s.config.ShutdownCtx.Done()
// Shut down all tails
s.tailsMtx.RLock()
defer s.tailsMtx.RUnlock()
for _, tails := range s.tails {
for reqID, tail := range tails {
s.config.Logger.Debugf("Shutting down tail '%s' for pipeline %s", reqID, tail.Request.GetTail().Request.PipelineId)
tail.CancelFunc()
}
}
}
func (s *Streamdal) pullInitialPipelines(ctx context.Context) error {
cmds, err := s.serverClient.GetSetPipelinesCommandByService(ctx, s.config.ServiceName)
if err != nil {
return errors.Wrap(err, "unable to pull initial pipelines")
}
// Commands won't include paused pipelines but we can check just in case
for _, cmd := range cmds.SetPipelineCommands {
for _, p := range cmd.GetSetPipelines().Pipelines {
s.config.Logger.Debugf("saving pipeline '%s' for audience '%s' to internal map", p.Name, audToStr(cmd.Audience))
// Fill in WASM data from the deduplication map
for _, step := range p.Steps {
wasmData, ok := cmds.WasmModules[step.GetXWasmId()]
if !ok {
return errors.Errorf("BUG: unable to find WASM data for step '%s'", step.Name)
}
step.XWasmBytes = wasmData.Bytes
}
if err := s.setPipelines(ctx, cmd); err != nil {
s.config.Logger.Errorf("failed to attach pipeline: %s", err)
}
}
}
return nil
}
func (s *Streamdal) heartbeat(loop *director.TimedLooper) {
var quit bool
loop.Loop(func() error {
if quit {
time.Sleep(time.Millisecond * 50)
return nil
}
select {
case <-s.config.ShutdownCtx.Done():
quit = true
loop.Quit()
return nil
default:
// NOOP
}
hb := &protos.HeartbeatRequest{
SessionId: s.sessionID,
Audiences: s.getCurrentAudiences(),
ClientInfo: s.genClientInfo(),
ServiceName: s.config.ServiceName,
}
if err := s.serverClient.HeartBeat(s.config.ShutdownCtx, hb); err != nil {
if strings.Contains(err.Error(), "connection refused") {
// Streamdal server went away, log, sleep, and wait for reconnect
s.config.Logger.Warn("failed to send heartbeat, streamdal server© went away, waiting for reconnect")
time.Sleep(ReconnectSleep)
return nil
}
s.config.Logger.Errorf("failed to send heartbeat: %s", err)
}
return nil
})
}
func (s *Streamdal) runStep(ctx context.Context, aud *protos.Audience, step *protos.PipelineStep, data []byte, isr *protos.InterStepResult) (*protos.WASMResponse, error) {
s.config.Logger.Debugf("Running step '%s'", step.Name)
// Get WASM module
f, err := s.getFunction(ctx, step)
if err != nil {
return nil, errors.Wrap(err, "failed to get wasm data")
}
f.mtx.Lock()
defer f.mtx.Unlock()
// Don't need this anymore, and don't want to send it to the wasm function
step.XWasmBytes = nil
req := &protos.WASMRequest{
InputPayload: data,
Step: step,
InterStepResult: isr,
}
reqBytes, err := proto.Marshal(req)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal WASM request")
}
timeoutCtx, cancel := context.WithTimeout(ctx, s.config.StepTimeout)
defer cancel()
// Run WASM module
respBytes, err := f.Exec(timeoutCtx, reqBytes)
if err != nil {
return nil, errors.Wrap(err, "failed to execute wasm module")
}
resp := &protos.WASMResponse{}
if err := proto.Unmarshal(respBytes, resp); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal WASM response")
}
// Don't use parent context here since it will be cancelled by the time
// the goroutine in handleSchema runs
s.handleSchema(context.Background(), aud, step, resp)
return resp, nil
}
func (s *Streamdal) getPipelines(ctx context.Context, aud *protos.Audience) []*protos.Pipeline {
s.pipelinesMtx.RLock()
defer s.pipelinesMtx.RUnlock()
s.addAudience(ctx, aud)
pipelines, ok := s.pipelines[audToStr(aud)]
if !ok {
return make([]*protos.Pipeline, 0)
}
return pipelines
}
func (s *Streamdal) getCounterLabels(req *ProcessRequest, pipeline *protos.Pipeline) map[string]string {
l := map[string]string{
"service": s.config.ServiceName,
"component": req.ComponentName,
"operation": req.OperationName,
"pipeline_name": "",
"pipeline_id": "",
}
if pipeline != nil {
l["pipeline_name"] = pipeline.Name
l["pipeline_id"] = pipeline.Id
}
return l
}
func newAudience(req *ProcessRequest, cfg *Config) *protos.Audience {
if req == nil || cfg == nil {
panic("BUG: newAudience() called with nil arguments")
}
return &protos.Audience{
ServiceName: cfg.ServiceName,
ComponentName: req.ComponentName,
OperationType: protos.OperationType(req.OperationType),
OperationName: req.OperationName,
}
}
func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) *ProcessResponse {
resp := &ProcessResponse{
PipelineStatus: make([]*protos.PipelineStatus, 0),
Metadata: make(map[string]string),
}
if err := validateProcessRequest(req); err != nil {
resp.Status = protos.ExecStatus_EXEC_STATUS_ERROR
resp.StatusMessage = proto.String(err.Error())
return resp
}
resp.Data = req.Data
payloadSize := int64(len(resp.Data))
aud := newAudience(req, s.config)
// TODO: DRY this up
counterError := types.ConsumeErrorCount
counterProcessed := types.ConsumeProcessedCount
counterBytes := types.ConsumeBytes
rateBytes := types.ConsumeBytesRate
rateProcessed := types.ConsumeProcessedRate
if req.OperationType == OperationTypeProducer {
counterError = types.ProduceErrorCount
counterProcessed = types.ProduceProcessedCount
counterBytes = types.ProduceBytes
rateBytes = types.ProduceBytesRate
rateProcessed = types.ProduceProcessedRate
}
// Rate counters
_ = s.metrics.Incr(ctx, &types.CounterEntry{Name: rateBytes, Labels: map[string]string{}, Value: payloadSize, Audience: aud})
_ = s.metrics.Incr(ctx, &types.CounterEntry{Name: rateProcessed, Labels: map[string]string{}, Value: 1, Audience: aud})
pipelines := s.getPipelines(ctx, aud)
// WARNING: This case will (usually) only "hit" for the first <100ms of
// running the SDK - after that, the server will have sent us at least one,
// "hidden" pipeline - "infer schema". All of this happens asynchronously
// (to prevent Register() from blocking).
//
// This means that setting resp.StatusMessage here means that it will only
// survive for the first few messages - after that, StatusMessage might get
// updated by the infer schema pipeline step.
if len(pipelines) == 0 {
// Send tail if there is any. Tails do not require a pipeline to operate
s.sendTail(aud, "", resp.Data, resp.Data)
// No pipelines for this mode, nothing to do
resp.Status = protos.ExecStatus_EXEC_STATUS_TRUE
return resp
}
if payloadSize > MaxWASMPayloadSize {
_ = s.metrics.Incr(ctx, &types.CounterEntry{Name: counterError, Labels: s.getCounterLabels(req, nil), Value: 1, Audience: aud})
s.config.Logger.Warn(ErrMaxPayloadSizeExceeded)
resp.Status = protos.ExecStatus_EXEC_STATUS_ERROR
resp.StatusMessage = proto.String(ErrMaxPayloadSizeExceeded.Error())
return resp
}
totalPipelines := len(pipelines)
var (
pIndex int
sIndex int
)
PIPELINE:
for _, pipeline := range pipelines {
var isr *protos.InterStepResult
pIndex += 1
pipelineTimeoutCtx, pipelineTimeoutCxl := context.WithTimeout(ctx, s.config.PipelineTimeout)
pipelineStatus := &protos.PipelineStatus{
Id: pipeline.Id,
Name: pipeline.Name,
StepStatus: make([]*protos.StepStatus, 0),
}
_ = s.metrics.Incr(ctx, &types.CounterEntry{Name: counterProcessed, Labels: s.getCounterLabels(req, pipeline), Value: 1, Audience: aud})
_ = s.metrics.Incr(ctx, &types.CounterEntry{Name: counterBytes, Labels: s.getCounterLabels(req, pipeline), Value: payloadSize, Audience: aud})
totalSteps := len(pipeline.Steps)
for _, step := range pipeline.Steps {
sIndex += 1
stepTimeoutCtx, stepTimeoutCxl := context.WithTimeout(ctx, s.config.StepTimeout)
stepStatus := &protos.StepStatus{
Name: step.Name,
}
select {
case <-pipelineTimeoutCtx.Done():
pipelineTimeoutCxl()
stepTimeoutCxl()
stepStatus.Status = protos.ExecStatus_EXEC_STATUS_ERROR
stepStatus.StatusMessage = proto.String("Pipeline error: " + ErrPipelineTimeout.Error())
// Maybe notify, maybe include metadata
cond := s.handleCondition(ctx, req, resp, step.OnError, step, pipeline, aud)
// Update the abort condition before we populate statuses in resp
stepStatus.AbortCondition = cond.abortCondition
if cond.abortCurrent {
// Aborting CURRENT, LOCAL step & pipeline status needs to be updated
if pIndex == totalPipelines {
s.updateStatus(resp, pipelineStatus, stepStatus)
} else {
s.updateStatus(nil, pipelineStatus, stepStatus)
}
s.config.Logger.Warnf("exceeded timeout for pipeline '%s' - aborting CURRENT pipeline", pipeline.Name)
continue PIPELINE
} else if cond.abortAll {
s.config.Logger.Warnf("exceeded timeout for pipeline '%s' - aborting ALL pipelines", pipeline.Name)
// Aborting ALL, RESP should have step & pipeline status updated
s.updateStatus(resp, pipelineStatus, stepStatus)
return resp
}
// NOT aborting, don't need to update step or pipeline status
s.config.Logger.Warnf("exceeded timeout for pipeline '%s' but no abort condition defined - continuing execution", step.Name)
default:
// NOOP
}
// Pipeline timeout either has not occurred OR it occurred and execution was not aborted
wasmResp, err := s.runStep(stepTimeoutCtx, aud, step, resp.Data, isr)
if err != nil {
stepTimeoutCxl()
err = fmt.Errorf("wasm error during step '%s:%s': %s", pipeline.Name, step.Name, err)
stepStatus.Status = protos.ExecStatus_EXEC_STATUS_ERROR
stepStatus.StatusMessage = proto.String("Wasm Error: " + err.Error())
// Maybe notify, maybe include metadata
cond := s.handleCondition(ctx, req, resp, step.OnError, step, pipeline, aud)
// Update the abort condition before we populate statuses in resp
stepStatus.AbortCondition = cond.abortCondition
if cond.abortCurrent {
pipelineTimeoutCxl()
// Aborting CURRENT, update LOCAL step & pipeline status
//
// It is possible that resp won't have its status filled out IF the
// last pipeline AND last step has an abort condition. To get around
// that, all steps check to see if they are the last in line to exec
// and if they are, they will fill the response status.
if pIndex == totalPipelines && sIndex == totalSteps {
s.updateStatus(resp, pipelineStatus, stepStatus)
} else {
s.updateStatus(nil, pipelineStatus, stepStatus)
}
s.config.Logger.Errorf(err.Error() + " (aborting CURRENT pipeline)")
continue PIPELINE
} else if cond.abortAll {
pipelineTimeoutCxl()
// Aborting ALL, update RESP step & pipeline status
s.updateStatus(resp, pipelineStatus, stepStatus)
s.config.Logger.Errorf(err.Error() + " (aborting ALL pipelines)")
return resp
}
// NOT aborting, update LOCAL step & pipeline status
s.updateStatus(nil, pipelineStatus, stepStatus)
s.config.Logger.Warnf("Step '%s:%s' failed (no abort condition defined - continuing step execution)", pipeline.Name, step.Name)
continue // Move on to the next step in the pipeline
}
// Only update working payload if one is returned
if len(wasmResp.OutputPayload) > 0 {
resp.Data = wasmResp.OutputPayload
}
isr = wasmResp.InterStepResult // Pass inter-step result to next step
var (
stepCondStr string
stepConds *protos.PipelineStepConditions
stepExecStatus protos.ExecStatus
)
// Execution worked - check wasm exit code
switch wasmResp.ExitCode {
case protos.WASMExitCode_WASM_EXIT_CODE_TRUE:
// Data was potentially modified
resp.Data = wasmResp.OutputPayload
stepCondStr = "true"
stepConds = step.OnTrue
stepExecStatus = protos.ExecStatus_EXEC_STATUS_TRUE
case protos.WASMExitCode_WASM_EXIT_CODE_FALSE:
// Data was potentially modified
resp.Data = wasmResp.OutputPayload
stepCondStr = "false"
stepConds = step.OnFalse
stepExecStatus = protos.ExecStatus_EXEC_STATUS_FALSE
case protos.WASMExitCode_WASM_EXIT_CODE_ERROR:
// Ran into an error - return original data
resp.Data = req.Data
stepCondStr = "error"
stepConds = step.OnError
stepExecStatus = protos.ExecStatus_EXEC_STATUS_ERROR
default:
_ = s.metrics.Incr(ctx, &types.CounterEntry{Name: counterError, Labels: s.getCounterLabels(req, pipeline), Value: 1, Audience: aud})
s.config.Logger.Debugf("Step '%s:%s' returned unknown exit code %d", pipeline.Name, step.Name, wasmResp.ExitCode)
// TODO: Is an unknown exit code considered an error?
}
stepTimeoutCxl()
statusMsg := fmt.Sprintf("step '%s:%s' returned %s: %s", pipeline.Name, step.Name, stepCondStr, wasmResp.ExitMsg)
// Maybe notify, maybe include metadata
cond := s.handleCondition(ctx, req, resp, stepConds, step, pipeline, aud)
// Update step status bits
stepStatus.Status = stepExecStatus
stepStatus.StatusMessage = proto.String(statusMsg)
stepStatus.AbortCondition = cond.abortCondition
// Increase error metrics (if wasm returned err)
if stepExecStatus == protos.ExecStatus_EXEC_STATUS_ERROR {
_ = s.metrics.Incr(ctx, &types.CounterEntry{Name: counterError, Labels: s.getCounterLabels(req, pipeline), Value: 1, Audience: aud})
}
if cond.abortCurrent {
pipelineTimeoutCxl()
// Aborting CURRENT, update LOCAL step & pipeline status
//
// It is possible that resp won't have its status filled out IF the
// last pipeline AND last step has an abort condition. To get around
// that, all steps check to see if they are the last in line to exec
// and if they are, they will fill the response status.
if pIndex == totalPipelines && sIndex == totalSteps {
s.updateStatus(resp, pipelineStatus, stepStatus)
} else {
s.updateStatus(nil, pipelineStatus, stepStatus)
}
s.config.Logger.Debug(statusMsg + " (aborting CURRENT pipeline)")
continue PIPELINE
} else if cond.abortAll {
pipelineTimeoutCxl()
// Aborting ALL, update RESP step & pipeline status
s.updateStatus(resp, pipelineStatus, stepStatus)
s.config.Logger.Debug(statusMsg + " (aborting ALL pipelines)")
return resp
}
// NO abort condition, update LOCAL step & pipeline status
s.updateStatus(nil, pipelineStatus, stepStatus)
s.config.Logger.Debug(statusMsg + " (no abort condition defined - continuing execution)")
// END step loop
}
pipelineTimeoutCxl()
// Pipeline completed, update RESP pipeline status (step status already updated)
s.updateStatus(resp, pipelineStatus, nil)
// END pipeline loop
}
// Perform tail if necessary
s.sendTail(aud, "", req.Data, resp.Data)
// Dry run should not modify anything, but we must allow pipeline to
// mutate internal state in order to function properly
if s.config.DryRun {
resp.Data = req.Data
}
return resp
}
type condition struct {
abortCurrent bool
abortAll bool
// This is here to make it easier to perform assignment in stepStatus
abortCondition protos.AbortCondition
}
// handleCondition is a wrapper for inspecting the step condition and potentially
// performing a notification and injecting metadata back into the response.
func (s *Streamdal) handleCondition(
ctx context.Context,
req *ProcessRequest,
resp *ProcessResponse,
stepCond *protos.PipelineStepConditions,
step *protos.PipelineStep,
pipeline *protos.Pipeline,
aud *protos.Audience,
) condition {
// If no condition is set, we don't need to do anything
if stepCond == nil {
return condition{}
}
// Should we notify?
if stepCond.Notify && !s.config.DryRun {
s.config.Logger.Debugf("Performing 'notify' condition for step '%s'", step.Name)
if err := s.serverClient.Notify(ctx, pipeline, step, aud); err != nil {
s.config.Logger.Errorf("failed to notify condition: %s", err)
}
labels := map[string]string{
"service": s.config.ServiceName,
"component": req.ComponentName,
"operation": req.OperationName,
"pipeline_name": pipeline.Name,
"pipeline_id": pipeline.Id,
}
_ = s.metrics.Incr(ctx, &types.CounterEntry{Name: types.NotifyCount, Labels: labels, Value: 1, Audience: aud})
}
// Should we pass back metadata?
if len(stepCond.Metadata) > 0 {
s.config.Logger.Debugf("Performing 'metadata' condition for step '%s'", step.Name)
s.populateMetadata(resp, stepCond.Metadata)
}
// Should we abort current or ALL pipelines?
if stepCond.Abort == protos.AbortCondition_ABORT_CONDITION_ABORT_CURRENT {
s.config.Logger.Debugf("Abort condition set to 'current' for step '%s'", step.Name)
return condition{
abortCurrent: true,
abortCondition: protos.AbortCondition_ABORT_CONDITION_ABORT_CURRENT,
}
} else if stepCond.Abort == protos.AbortCondition_ABORT_CONDITION_ABORT_ALL {
s.config.Logger.Debugf("Abort condition set to 'all' for step '%s'", step.Name)
return condition{
abortAll: true,
abortCondition: protos.AbortCondition_ABORT_CONDITION_ABORT_ALL,
}
}
s.config.Logger.Debugf("No abort conditions set for step '%s'", step.Name)
// Don't abort anything - continue as-is
return condition{}
}
func (s *Streamdal) populateMetadata(resp *ProcessResponse, metadata map[string]string) {
if resp == nil || metadata == nil {
return
}
for k, v := range metadata {
if resp.Metadata == nil {
resp.Metadata = make(map[string]string)
}
resp.Metadata[k] = v
}
}
// updateStatus is a wrapper for updating step, pipeline and resp statuses.
//
// This method allows resp OR step status to be nil. This is because we are not
// always returning a final response - we are still going through more pipelines
// and steps.
//
// Similarly, step status can be nil because the FINAL response will only have
// a pipeline status and no step status.
func (s *Streamdal) updateStatus(resp *ProcessResponse, pipelineStatus *protos.PipelineStatus, stepStatus *protos.StepStatus) {
// Pipeline status is ALWAYS required
if pipelineStatus == nil {
s.config.Logger.Warn("BUG: pipelineStatus cannot be nil in updateStatus()")
}
// If resp is nil, there should ALWAYS be a step and pipeline status
if resp == nil && (stepStatus == nil || pipelineStatus == nil) {
s.config.Logger.Warn("BUG: stepStatus and pipelineStatus cannot be nil when resp is nil in updateStatus()")
return
}
if stepStatus != nil {
// When returning final response, we won't have a step status
pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus)
// Prettify status message with abort info
var abortStatusStr string
switch stepStatus.AbortCondition {
case protos.AbortCondition_ABORT_CONDITION_ABORT_CURRENT:
abortStatusStr = AbortCurrentStr
case protos.AbortCondition_ABORT_CONDITION_ABORT_ALL:
abortStatusStr = AbortAllStr
default:
abortStatusStr = AbortNoneStr
}
// StepStatusMessage can be nil because it is optional
if stepStatus.StatusMessage != nil {
// If the message is NOT empty, we want to append the append abort status
if *stepStatus.StatusMessage != "" {
stepStatus.StatusMessage = proto.String(fmt.Sprintf("%s (%s)", *stepStatus.StatusMessage, abortStatusStr))
} else {
// Otherwise, just display the abort status
stepStatus.StatusMessage = proto.String(abortStatusStr)
}
}
}
// Response could be nil if we are NOT aborting all pipelines; resp will NOT
// be nil if this is the LAST pipeline and LAST step.
if resp != nil {
resp.PipelineStatus = append(resp.PipelineStatus, pipelineStatus)
// Resp status should be the status of the LAST step
resp.Status = pipelineStatus.StepStatus[len(pipelineStatus.StepStatus)-1].Status
resp.StatusMessage = pipelineStatus.StepStatus[len(pipelineStatus.StepStatus)-1].StatusMessage
}
}
func (a *Audience) toProto(serviceName string) *protos.Audience {
return &protos.Audience{
ServiceName: strings.ToLower(serviceName),
ComponentName: strings.ToLower(a.ComponentName),
OperationType: protos.OperationType(a.OperationType),
OperationName: strings.ToLower(a.OperationName),
}
}