pkg/flow/grpc-namespace-variables.go
package flow
import (
"bytes"
"context"
"errors"
"io"
"log/slog"
"time"
"github.com/direktiv/direktiv/pkg/datastore"
"github.com/direktiv/direktiv/pkg/flow/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)
func (flow *flow) NamespaceVariable(ctx context.Context, req *grpc.NamespaceVariableRequest) (*grpc.NamespaceVariableResponse, error) {
slog.Debug("Handling gRPC request", "this", this())
tx, err := flow.beginSQLTx(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback()
ns, err := tx.DataStore().Namespaces().GetByName(ctx, req.GetNamespace())
if err != nil {
return nil, err
}
item, err := tx.DataStore().RuntimeVariables().GetForNamespace(ctx, ns.Name, req.GetKey())
if err != nil {
if errors.Is(err, datastore.ErrNotFound) {
t := time.Now()
return &grpc.NamespaceVariableResponse{
Namespace: ns.Name,
Key: req.GetKey(),
CreatedAt: timestamppb.New(t),
UpdatedAt: timestamppb.New(t),
TotalSize: int64(0),
MimeType: "",
Data: make([]byte, 0),
}, nil
}
return nil, err
}
var resp grpc.NamespaceVariableResponse
resp.Namespace = ns.Name
resp.Key = item.Name
resp.CreatedAt = timestamppb.New(item.CreatedAt)
resp.UpdatedAt = timestamppb.New(item.UpdatedAt)
resp.TotalSize = int64(item.Size)
resp.MimeType = item.MimeType
if resp.GetTotalSize() > parcelSize {
return nil, status.Error(codes.ResourceExhausted, "variable too large to return without using the parcelling API")
}
data, err := tx.DataStore().RuntimeVariables().LoadData(ctx, item.ID)
if err != nil {
return nil, err
}
resp.Data = data
return &resp, nil
}
func (flow *flow) SetNamespaceVariable(ctx context.Context, req *grpc.SetNamespaceVariableRequest) (*grpc.SetNamespaceVariableResponse, error) {
slog.Debug("Handling gRPC request", "this", this())
tx, err := flow.beginSQLTx(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback()
ns, err := tx.DataStore().Namespaces().GetByName(ctx, req.GetNamespace())
if err != nil {
return nil, err
}
newVar, err := tx.DataStore().RuntimeVariables().Set(ctx, &datastore.RuntimeVariable{
Namespace: ns.Name,
Name: req.GetKey(),
Data: req.GetData(),
MimeType: req.GetMimeType(),
})
if err != nil {
return nil, err
}
if err = tx.Commit(ctx); err != nil {
return nil, err
}
// TODO: Alex, please fix here.
// flow.logger.Infof(ctx, cached.Namespace.ID, cached.GetAttributes(recipient.Namespace), "Set namespace variable '%s'.", req.GetKey())
// flow.pubsub.NotifyNamespaceVariables(cached.Namespace)
var resp grpc.SetNamespaceVariableResponse
resp.Namespace = ns.Name
resp.Key = req.GetKey()
resp.CreatedAt = timestamppb.New(newVar.CreatedAt)
resp.UpdatedAt = timestamppb.New(newVar.UpdatedAt)
resp.TotalSize = int64(newVar.Size)
resp.MimeType = newVar.MimeType
return &resp, nil
}
//nolint:dupl
func (flow *flow) SetNamespaceVariableParcels(srv grpc.Flow_SetNamespaceVariableParcelsServer) error {
slog.Debug("Handling gRPC request", "this", this())
ctx := srv.Context()
req, err := srv.Recv()
if err != nil {
return err
}
firstReq := req
totalSize := int(req.GetTotalSize())
buf := new(bytes.Buffer)
for {
_, err = io.Copy(buf, bytes.NewReader(req.GetData()))
if err != nil {
return err
}
if req.GetTotalSize() <= 0 {
if buf.Len() >= totalSize {
break
}
}
req, err = srv.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return err
}
if req.GetTotalSize() <= 0 {
if buf.Len() >= totalSize {
break
}
} else {
if req == nil {
break
}
}
if int(req.GetTotalSize()) != totalSize {
return errors.New("totalSize changed mid stream")
}
}
if buf.Len() > totalSize {
return errors.New("received more data than expected")
}
firstReq.Data = buf.Bytes()
resp, err := flow.SetNamespaceVariable(ctx, firstReq)
if err != nil {
return err
}
err = srv.SendAndClose(resp)
if err != nil {
return err
}
return nil
}