vorteil/direktiv

View on GitHub
pkg/flow/trace.go

Summary

Maintainability
B
4 hrs
Test Coverage
package flow

import (
    "context"
    "encoding/json"
    "fmt"

    "github.com/cloudevents/sdk-go/v2/event"
    "github.com/direktiv/direktiv/pkg/flow/nohome/recipient"
    "github.com/direktiv/direktiv/pkg/utils"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/codes"
    "go.opentelemetry.io/otel/trace"
)

// Carrier controls the trace.
type Carrier struct {
    Trace map[string]string
}

// Get returns a value of a trace.
func (c *Carrier) Get(key string) string {
    v := c.Trace[key]
    return v
}

// Keys returns all the keys of the trace.
func (c *Carrier) Keys() []string {
    var keys []string
    for k := range c.Trace {
        keys = append(keys, k)
    }

    return keys
}

// Set sets a trace key and value.
func (c *Carrier) Set(key, val string) {
    c.Trace[key] = val
}

func dbTrace(ctx context.Context) *Carrier {
    carrier := &Carrier{
        Trace: make(map[string]string),
    }
    prop := otel.GetTextMapPropagator()
    prop.Inject(ctx, carrier)

    return carrier
}

func traceAddWorkflowInstance(ctx context.Context, im *instanceMemory) {
    span := trace.SpanFromContext(ctx)

    m := im.instance.GetAttributes(recipient.Instance)
    delete(m, "recipientType")

    attrs := make([]attribute.KeyValue, 0)
    for k, v := range m {
        attrs = append(attrs, attribute.KeyValue{
            Key:   attribute.Key(k),
            Value: attribute.StringValue(v),
        })
    }

    span.SetAttributes(attrs...)
}

func traceFullAddWorkflowInstance(ctx context.Context, im *instanceMemory) (context.Context, error) {
    traceAddWorkflowInstance(ctx, im)
    tp := otel.GetTracerProvider()
    tr := tp.Tracer("direktiv/flow")
    ctx, span := tr.Start(ctx, "new-workflow-instance", trace.WithSpanKind(trace.SpanKindInternal))
    defer span.End()
    traceAddWorkflowInstance(ctx, im)

    x := dbTrace(ctx)
    s := utils.Marshal(x)

    im.instance.TelemetryInfo.TraceID = s
    data, err := im.instance.TelemetryInfo.MarshalJSON()
    if err != nil {
        return nil, err
    }

    im.updateArgs.TelemetryInfo = &data

    return ctx, nil
}

func traceStateError(ctx context.Context, err error) {
    span := trace.SpanFromContext(ctx)
    span.SetStatus(codes.Error, err.Error())
    span.AddEvent(fmt.Sprintf("state error: %v", err.Error()))
}

func traceSubflowInvoke(ctx context.Context, name, child string) {
    span := trace.SpanFromContext(ctx)

    span.SetAttributes(
        attribute.KeyValue{
            Key:   "child-instance",
            Value: attribute.StringValue(child),
        },
    )

    span.AddEvent(fmt.Sprintf("Calling subflow: %s (%s)", name, child))
}

func traceStateGenericBegin(ctx context.Context, im *instanceMemory) (context.Context, func(), error) {
    tp := otel.GetTracerProvider()
    tr := tp.Tracer("direktiv/flow")
    prop := otel.GetTextMapPropagator()
    var span trace.Span

    carrier := new(Carrier)

    err := json.Unmarshal([]byte(im.instance.TelemetryInfo.TraceID), carrier)
    if err != nil {
        return ctx, nil, err
    }

    ctx = prop.Extract(ctx, carrier)

    ctx, span = tr.Start(ctx, im.logic.GetType().String(), trace.WithSpanKind(trace.SpanKindInternal))

    x := dbTrace(ctx)
    s := utils.Marshal(x)

    im.instance.TelemetryInfo.SpanID = s
    data, err := im.instance.TelemetryInfo.MarshalJSON()
    if err != nil {
        return ctx, nil, err
    }

    im.updateArgs.TelemetryInfo = &data

    finish := func() {
        span.End()
    }

    return ctx, finish, nil
}

func traceStateGenericLogicThread(ctx context.Context, im *instanceMemory) (context.Context, func(), error) {
    tp := otel.GetTracerProvider()
    tr := tp.Tracer("direktiv/flow")
    prop := otel.GetTextMapPropagator()
    var span trace.Span

    carrier := new(Carrier)
    err := json.Unmarshal([]byte(im.instance.TelemetryInfo.SpanID), carrier)
    if err != nil {
        return nil, nil, err
    }

    ctx = prop.Extract(ctx, carrier)

    ctx, span = tr.Start(ctx, fmt.Sprintf("%s-logic", im.logic.GetType().String()), trace.WithSpanKind(trace.SpanKindInternal))

    finish := func() {
        span.End()
    }

    return ctx, finish, nil
}

func traceActionResult(ctx context.Context, results *actionResultPayload) {
    span := trace.SpanFromContext(ctx)
    span.SetAttributes(
        attribute.KeyValue{
            Key:   "returned-action",
            Value: attribute.StringValue(results.ActionID),
        },
    )
}

func traceAddtoEventlog(ctx context.Context) (context.Context, func()) {
    tp := otel.GetTracerProvider()
    tr := tp.Tracer("direktiv/flow")
    ctx, span := tr.Start(ctx, "addToEventLog", trace.WithSpanKind(trace.SpanKindInternal))
    finish := func() {
        span.End()
    }

    return ctx, finish
}

func traceBrokerMessage(ctx context.Context, ev event.Event) (context.Context, func()) {
    tp := otel.GetTracerProvider()
    tr := tp.Tracer("direktiv/flow")
    ctx, span := tr.Start(ctx, "BroadcastCloudevent", trace.WithSpanKind(trace.SpanKindInternal))
    span.SetAttributes(
        attribute.KeyValue{
            Key:   "event-registered",
            Value: attribute.StringValue(ev.Source() + "-" + ev.ID()),
        },
    )
    finish := func() {
        span.End()
    }

    return ctx, finish
}

func traceGetListenersByTopic(ctx context.Context, topic string) (context.Context, func()) {
    tp := otel.GetTracerProvider()
    tr := tp.Tracer("direktiv/flow")
    ctx, span := tr.Start(ctx, "GetListenersByTopic", trace.WithSpanKind(trace.SpanKindInternal))
    span.SetAttributes(
        attribute.KeyValue{
            Key:   "topic",
            Value: attribute.StringValue(topic),
        },
    )
    finish := func() {
        span.End()
    }

    return ctx, finish
}

func traceProcessingMessage(ctx context.Context) (context.Context, func()) {
    tp := otel.GetTracerProvider()
    tr := tp.Tracer("direktiv/flow")
    ctx, span := tr.Start(ctx, "processingCloudevent", trace.WithSpanKind(trace.SpanKindInternal))

    finish := func() {
        span.End()
    }

    return ctx, finish
}

func traceMessageTrigger(ctx context.Context, triggerDescription string) (context.Context, func()) { //nolint:unparam
    tp := otel.GetTracerProvider()
    tr := tp.Tracer("direktiv/flow")
    ctx, span := tr.Start(ctx, "triggered-by-event", trace.WithSpanKind(trace.SpanKindInternal))
    span.SetAttributes(
        attribute.KeyValue{
            Key:   "trigger-desc",
            Value: attribute.StringValue(triggerDescription),
        },
    )
    finish := func() {
        span.End()
    }

    return ctx, finish
}