vorteil/direktiv

View on GitHub
pkg/flow/grpc-namespace-variables.go

Summary

Maintainability
C
1 day
Test Coverage
package flow

import (
    "bytes"
    "context"
    "errors"
    "io"
    "log/slog"
    "time"

    "github.com/direktiv/direktiv/pkg/datastore"
    "github.com/direktiv/direktiv/pkg/flow/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    "google.golang.org/protobuf/types/known/timestamppb"
)

func (flow *flow) NamespaceVariable(ctx context.Context, req *grpc.NamespaceVariableRequest) (*grpc.NamespaceVariableResponse, error) {
    slog.Debug("Handling gRPC request", "this", this())

    tx, err := flow.beginSQLTx(ctx)
    if err != nil {
        return nil, err
    }
    defer tx.Rollback()

    ns, err := tx.DataStore().Namespaces().GetByName(ctx, req.GetNamespace())
    if err != nil {
        return nil, err
    }

    item, err := tx.DataStore().RuntimeVariables().GetForNamespace(ctx, ns.Name, req.GetKey())
    if err != nil {
        if errors.Is(err, datastore.ErrNotFound) {
            t := time.Now()

            return &grpc.NamespaceVariableResponse{
                Namespace: ns.Name,
                Key:       req.GetKey(),
                CreatedAt: timestamppb.New(t),
                UpdatedAt: timestamppb.New(t),
                TotalSize: int64(0),
                MimeType:  "",
                Data:      make([]byte, 0),
            }, nil
        }

        return nil, err
    }

    var resp grpc.NamespaceVariableResponse

    resp.Namespace = ns.Name
    resp.Key = item.Name
    resp.CreatedAt = timestamppb.New(item.CreatedAt)
    resp.UpdatedAt = timestamppb.New(item.UpdatedAt)
    resp.TotalSize = int64(item.Size)
    resp.MimeType = item.MimeType

    if resp.GetTotalSize() > parcelSize {
        return nil, status.Error(codes.ResourceExhausted, "variable too large to return without using the parcelling API")
    }

    data, err := tx.DataStore().RuntimeVariables().LoadData(ctx, item.ID)
    if err != nil {
        return nil, err
    }

    resp.Data = data

    return &resp, nil
}

func (flow *flow) SetNamespaceVariable(ctx context.Context, req *grpc.SetNamespaceVariableRequest) (*grpc.SetNamespaceVariableResponse, error) {
    slog.Debug("Handling gRPC request", "this", this())

    tx, err := flow.beginSQLTx(ctx)
    if err != nil {
        return nil, err
    }
    defer tx.Rollback()

    ns, err := tx.DataStore().Namespaces().GetByName(ctx, req.GetNamespace())
    if err != nil {
        return nil, err
    }

    newVar, err := tx.DataStore().RuntimeVariables().Set(ctx, &datastore.RuntimeVariable{
        Namespace: ns.Name,
        Name:      req.GetKey(),
        Data:      req.GetData(),
        MimeType:  req.GetMimeType(),
    })
    if err != nil {
        return nil, err
    }

    if err = tx.Commit(ctx); err != nil {
        return nil, err
    }

    // TODO: Alex, please fix here.

    // flow.logger.Infof(ctx, cached.Namespace.ID, cached.GetAttributes(recipient.Namespace), "Set namespace variable '%s'.", req.GetKey())
    // flow.pubsub.NotifyNamespaceVariables(cached.Namespace)

    var resp grpc.SetNamespaceVariableResponse

    resp.Namespace = ns.Name
    resp.Key = req.GetKey()
    resp.CreatedAt = timestamppb.New(newVar.CreatedAt)
    resp.UpdatedAt = timestamppb.New(newVar.UpdatedAt)
    resp.TotalSize = int64(newVar.Size)
    resp.MimeType = newVar.MimeType

    return &resp, nil
}

//nolint:dupl
func (flow *flow) SetNamespaceVariableParcels(srv grpc.Flow_SetNamespaceVariableParcelsServer) error {
    slog.Debug("Handling gRPC request", "this", this())
    ctx := srv.Context()

    req, err := srv.Recv()
    if err != nil {
        return err
    }

    firstReq := req

    totalSize := int(req.GetTotalSize())

    buf := new(bytes.Buffer)

    for {
        _, err = io.Copy(buf, bytes.NewReader(req.GetData()))
        if err != nil {
            return err
        }

        if req.GetTotalSize() <= 0 {
            if buf.Len() >= totalSize {
                break
            }
        }

        req, err = srv.Recv()
        if err != nil {
            if errors.Is(err, io.EOF) {
                break
            }

            return err
        }

        if req.GetTotalSize() <= 0 {
            if buf.Len() >= totalSize {
                break
            }
        } else {
            if req == nil {
                break
            }
        }

        if int(req.GetTotalSize()) != totalSize {
            return errors.New("totalSize changed mid stream")
        }
    }

    if buf.Len() > totalSize {
        return errors.New("received more data than expected")
    }

    firstReq.Data = buf.Bytes()
    resp, err := flow.SetNamespaceVariable(ctx, firstReq)
    if err != nil {
        return err
    }
    err = srv.SendAndClose(resp)
    if err != nil {
        return err
    }

    return nil
}