streamdal/go-sdk

View on GitHub
schema.go

Summary

Maintainability
A
0 mins
Test Coverage
B
88%
package streamdal

import (
    "context"

    "github.com/streamdal/streamdal/libs/protos/build/go/protos"
)

func (s *Streamdal) getSchema(_ context.Context, aud *protos.Audience) []byte {
    s.schemasMtx.RLock()
    defer s.schemasMtx.RUnlock()

    schema, ok := s.schemas[audToStr(aud)]
    if !ok {
        return []byte(``)
    }

    return schema.JsonSchema
}

func (s *Streamdal) setSchema(_ context.Context, aud *protos.Audience, schema []byte) {
    s.schemasMtx.Lock()
    defer s.schemasMtx.Unlock()

    s.schemas[audToStr(aud)] = &protos.Schema{
        JsonSchema: schema,
    }
}

// handleSchema will handle the schema step in the pipeline, if necessary
func (s *Streamdal) handleSchema(ctx context.Context, aud *protos.Audience, step *protos.PipelineStep, resp *protos.WASMResponse) bool {
    inferSchema := step.GetInferSchema()

    if inferSchema == nil {
        // nothing to do
        return false
    }

    if resp.ExitCode != protos.WASMExitCode_WASM_EXIT_CODE_TRUE {
        return false
    }

    // Get existing schema for audience
    existingSchema := s.getSchema(ctx, aud)

    if string(resp.OutputStep) == string(existingSchema) {
        // Schema matches what we have in memory, nothing to do
        return false
    }

    // Schema is new or modified, update in memory and send to the server
    s.setSchema(ctx, aud, resp.OutputStep)

    go func() {
        err := s.serverClient.SendSchema(ctx, aud, resp.OutputStep)
        if err != nil {
            s.config.Logger.Errorf("failed to send schema: %s", err)
        }

        s.config.Logger.Debugf("published schema for audience '%s'", audToStr(aud))
    }()

    return true
}