vorteil/direktiv

View on GitHub
cmd/sidecar/api_v2_clients.go

Summary

Maintainability
B
5 hrs
Test Coverage
package sidecar

import (
    "bytes"
    "context"
    "encoding/base64"
    "encoding/json"
    "errors"
    "fmt"
    "io"
    "net/http"
    "slices"

    "github.com/direktiv/direktiv/pkg/datastore"
    "github.com/direktiv/direktiv/pkg/utils"
)

func getInstanceVariables(ctx context.Context, flowToken string, flowAddr string, ir *functionRequest) (*variablesResponse, int, error) {
    addr := fmt.Sprintf("http://%v/api/v2/namespaces/%v/variables?instanceId=%v", flowAddr, ir.Namespace, ir.Instance)

    return getVariables(ctx, flowToken, addr)
}

func getNamespaceVariables(ctx context.Context, flowToken string, flowAddr string, ir *functionRequest) (*variablesResponse, int, error) {
    addr := fmt.Sprintf("http://%v/api/v2/namespaces/%v/variables", flowAddr, ir.Namespace)

    return getVariables(ctx, flowToken, addr)
}

func getWorkflowVariables(ctx context.Context, flowToken string, flowAddr string, ir *functionRequest) (*variablesResponse, int, error) {
    addr := fmt.Sprintf("http://%v/api/v2/namespaces/%v/variables?workflowPath=%v", flowAddr, ir.Namespace, ir.Workflow)

    return getVariables(ctx, flowToken, addr)
}

func getVariables(ctx context.Context, flowToken, addr string) (*variablesResponse, int, error) {
    resp, err := doRequest(ctx, http.MethodGet, flowToken, addr, nil)
    if err != nil {
        return nil, resp.StatusCode, err
    }
    variables := variablesResponse{}

    statusCode, err := handleResponse(resp, func(resp *http.Response) (int, error) {
        decoder := json.NewDecoder(resp.Body)
        if err = decoder.Decode(&variables); err != nil {
            return resp.StatusCode, fmt.Errorf("failed to decode response body: %w", err)
        }
        return http.StatusOK, nil
    })
    if err != nil {
        return nil, statusCode, err
    }

    return &variables, resp.StatusCode, nil
}

func getReferencedFile(ctx context.Context, flowToken, flowAddr, namespace string, path string) ([]byte, int, error) {
    addr := fmt.Sprintf("http://%v/api/v2/namespaces/%v/files/%v", flowAddr, namespace, path)
    var d []byte

    resp, err := doRequest(ctx, http.MethodGet, flowToken, addr, nil)
    if resp.StatusCode == http.StatusNotFound {
        // some very special magic
        return nil, resp.StatusCode, &RessourceNotFoundError{
            Key:   path,
            Scope: "file",
        }
    }
    if err != nil {
        return nil, resp.StatusCode, fmt.Errorf("failed to do request: %w", err)
    }
    statusCode, err := handleResponse(resp, func(resp *http.Response) (int, error) {
        var respData decodedFilesResponse
        if resp.Body == nil {
            return http.StatusInternalServerError, fmt.Errorf("unexpected failure: response body is nil")
        }
        decoder := json.NewDecoder(resp.Body)
        if err := decoder.Decode(&respData); err != nil {
            target := &json.UnmarshalTypeError{}
            if errors.As(err, &target) {
                return http.StatusBadRequest, fmt.Errorf("invalid response format: %w", err)
            }
            return http.StatusInternalServerError, fmt.Errorf("failed to decode response: %w", err)
        }

        if len(respData.Data.Data) > 0 {
            d, err = base64.StdEncoding.DecodeString(respData.Data.Data)
            if err != nil {
                return http.StatusInternalServerError, fmt.Errorf("failed to decode base64 data: %w", err)
            }
        }
        return http.StatusOK, nil
    })
    if err != nil {
        return nil, statusCode, err
    }

    return d, statusCode, nil
}

func getVariableMetaFromFlow(ctx context.Context, flowToken string, flowAddr string, ir *functionRequest, scope, key string) (variable, int, error) {
    var varResp *variablesResponse
    var err error
    var typ string
    statusCode := http.StatusOK

    // Determine scope and retrieve variables
    switch scope {
    case utils.VarScopeInstance:
        varResp, statusCode, err = getInstanceVariables(ctx, flowToken, flowAddr, ir)
        if err != nil {
            return variable{}, statusCode, fmt.Errorf("failed to get instance variables: %w", err)
        }
        typ = "instance-variable"

    case utils.VarScopeWorkflow:
        varResp, statusCode, err = getWorkflowVariables(ctx, flowToken, flowAddr, ir)
        if err != nil {
            return variable{}, statusCode, fmt.Errorf("failed to get workflow variables: %w", err)
        }
        typ = "workflow-variable"

    case utils.VarScopeNamespace:
        varResp, statusCode, err = getNamespaceVariables(ctx, flowToken, flowAddr, ir)
        if err != nil {
            return variable{}, statusCode, fmt.Errorf("failed to get namespace variables: %w", err)
        }
        typ = "namespace-variable"

    default:
        return variable{}, statusCode, fmt.Errorf("unknown scope: %s", scope)
    }

    idx := slices.IndexFunc(varResp.Data, func(e variable) bool { return e.Typ == typ && e.Name == key })
    if idx < 0 {
        return variable{}, statusCode, &RessourceNotFoundError{Key: key, Scope: scope}
    }

    return varResp.Data[idx], statusCode, nil
}

func getVariableDataViaID(ctx context.Context, flowToken string, flowAddr string, namespace string, id string) (variable, error) {
    addr := fmt.Sprintf("http://%v/api/v2/namespaces/%v/variables/%v", flowAddr, namespace, id)
    client := &http.Client{}
    req, err := http.NewRequestWithContext(ctx, http.MethodGet, addr, nil)
    if err != nil {
        return variable{}, err
    }
    req.Header.Set("Direktiv-Token", flowToken)
    resp, err := client.Do(req)
    if err != nil {
        return variable{}, err
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        return variable{}, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
    }

    v := variable{}
    decoder := json.NewDecoder(resp.Body)
    if err = decoder.Decode(&v); err != nil {
        return variable{}, err
    }

    return v, nil
}

func postVarData(ctx context.Context, flowToken string, flowAddr string, namespace string, body createVarRequest) (int, error) {
    reqD, err := json.Marshal(body)
    if err != nil {
        return http.StatusBadRequest, fmt.Errorf("failed to marshal request body: %w", err)
    }
    read := bytes.NewReader(reqD)
    url := fmt.Sprintf("http://%v/api/v2/namespaces/%v/variables", flowAddr, namespace)

    resp, err := doRequest(ctx, http.MethodPost, flowToken, url, read)
    if err != nil {
        return resp.StatusCode, err
    }

    if statusCode, err := handleResponse(resp, nil); err != nil {
        return statusCode, err
    }

    return http.StatusOK, nil
}

func patchVarData(ctx context.Context, flowToken string, flowAddr string, namespace string, id string, body datastore.RuntimeVariablePatch) (int, error) {
    reqD, err := json.Marshal(body)
    if err != nil {
        return http.StatusBadRequest, fmt.Errorf("failed to marshal request body: %w", err)
    }
    read := bytes.NewReader(reqD)
    url := fmt.Sprintf("http://%v/api/v2/namespaces/%v/variables/%v", flowAddr, namespace, id)

    resp, err := doRequest(ctx, http.MethodPatch, flowToken, url, read)
    if err != nil {
        return resp.StatusCode, err
    }

    if statusCode, err := handleResponse(resp, nil); err != nil {
        return statusCode, err
    }

    return http.StatusOK, nil
}

func doRequest(ctx context.Context, method, flowToken, url string, body io.Reader) (*http.Response, error) {
    client := &http.Client{}
    req, err := http.NewRequestWithContext(ctx, method, url, body)
    if err != nil {
        return nil, fmt.Errorf("failed to create new request: %w", err)
    }
    req.Header.Set("Direktiv-Token", flowToken)

    resp, err := client.Do(req)
    if err != nil {
        return nil, fmt.Errorf("failed to execute request: %w", err)
    }

    return resp, nil
}

func handleResponse(resp *http.Response, next func(resp *http.Response) (int, error)) (int, error) {
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        var apiErrorResp apiError
        if err := json.NewDecoder(resp.Body).Decode(&apiErrorResp); err != nil {
            if err == io.EOF {
                return resp.StatusCode, fmt.Errorf("empty error response body")
            }
            return http.StatusInternalServerError, fmt.Errorf("failed to decode error response: %w", err)
        }
        return resp.StatusCode, fmt.Errorf("API error: code %v - message: %v", apiErrorResp.Error.Code, apiErrorResp.Error.Message)
    }

    if next != nil {
        return next(resp)
    }

    return http.StatusOK, nil
}