cloudfoundry-incubator/stratos

View on GitHub
src/jetstream/plugins/kubernetes/get_release.go

Summary

Maintainability
A
3 hrs
Test Coverage
package kubernetes

import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "time"

    "github.com/gorilla/websocket"
    "github.com/labstack/echo/v4"
    log "github.com/sirupsen/logrus"

    "helm.sh/helm/v3/pkg/action"

    "github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/kubernetes/helm"
    "github.com/cloudfoundry-incubator/stratos/src/jetstream/repository/interfaces"
)

const (
    PauseTrue int = iota + 20000
    PauseFalse
)

// ResourceMessage ...  Incoming content of socket
type ResourceMessage struct {
    MessageType int `json:"type"`
}

// ResourceResponse ... Outgoing content of socket
type ResourceResponse struct {
    Kind string          `json:"kind"`
    Data json.RawMessage `json:"data"`
}

type kubeReleasesData struct {
    Endpoint  string `json:"endpoint"`
    Name      string `json:"releaseName"`
    Namespace string `json:"releaseNamespace"`
    Chart     struct {
        Name       string `json:"chartName"`
        Repository string `json:"repo"`
        Version    string `json:"version"`
    } `json:"chart"`
}

// GetRelease gets the release information for a specific Helm release
func (c *KubernetesSpecification) GetRelease(ec echo.Context) error {

    // Need to get a config object for the target endpoint
    endpointGUID := ec.Param("endpoint")
    release := ec.Param("name")
    namespace := ec.Param("namespace")
    userID := ec.Get("user_id").(string)

    log.Debugf("Helm: Get Release: %s %s %s", endpointGUID, namespace, release)

    config, hc, err := c.GetHelmConfiguration(endpointGUID, userID, namespace)
    if err != nil {
        log.Errorf("Helm: GetRelease could not get a Helm Configuration: %s", err)
        return err
    }

    defer hc.Cleanup()

    status := action.NewStatus(config)
    res, err := status.Run(release)
    if err != nil {
        log.Error(err)
        return err
    }

    return ec.JSON(200, res)
}

// GetReleaseStatus will get release status for the given release
// This is a web socket request and will return info over the websocket
// polling until disconnected
func (c *KubernetesSpecification) GetReleaseStatus(ec echo.Context) error {

    // Need to get a config object for the target endpoint
    endpointGUID := ec.Param("endpoint")
    release := ec.Param("name")
    namespace := ec.Param("namespace")
    userID := ec.Get("user_id").(string)

    log.Debugf("Helm: Get Release Status: %s %s %s", endpointGUID, namespace, release)

    config, hc, err := c.GetHelmConfiguration(endpointGUID, userID, namespace)
    if err != nil {
        log.Errorf("Helm: GetRelease could not get a Helm Configuration: %s", err)
        return err
    }

    defer hc.Cleanup()

    status := action.NewStatus(config)
    res, err := status.Run(release)
    if err != nil {
        log.Error(err)
        return err
    }

    // Upgrade to a web socket
    ws, pingTicker, err := interfaces.UpgradeToWebSocket(ec)
    if err != nil {
        return err
    }
    defer ws.Close()
    defer pingTicker.Stop()

    // ws is the websocket ready for use

    // Write the release info first - we will then go fetch the status of everything in the release and send
    // this back incrementally

    // Parse the manifest
    rel := helm.NewHelmRelease(res, endpointGUID, userID, c.portalProxy)

    graph := helm.NewHelmReleaseGraph(rel)

    id := fmt.Sprintf("%s-%s", endpointGUID, rel.Namespace)

    // Send over the namespace details of the release
    sendResource(ws, "ReleasePrefix", id)

    //graph.ParseManifest(rel)

    // Send the manifest for the release
    sendResource(ws, "Resources", rel.GetResources())

    // // Send the manifest for the release
    // sendResource(ws, "Test", rel.HelmManifest)

    // Send the graph as we have it now
    sendResource(ws, "Graph", graph)

    // Loop over this until the web socket is closed

    // Get the pods first and send those
    rel.UpdatePods(c.portalProxy)
    sendResource(ws, "Pods", rel.GetPods())

    //graph.Generate(pods)
    //graph.ParseManifest(rel)
    sendResource(ws, "Graph", graph)

    // Send the manifest for the release again (ReplicaSets will now be added)
    sendResource(ws, "Manifest", rel.GetResources())

    // Now get all of the resources in the manifest
    rel.UpdateResources(c.portalProxy)
    sendResource(ws, "Resources", rel.GetResources())

    graph.ParseManifest(rel)
    sendResource(ws, "Graph", graph)

    sendResource(ws, "ManifestErrors", rel.ManifestErrors)

    stopchan := make(chan bool)
    pausechan := make(chan bool)

    go readLoop(ws, stopchan, pausechan)

    var sleep = 1 * time.Second
    var paused = false

    // Now we have everything, so loop, polling to get status
    for {

        select {
        case pause := <-pausechan:
            paused = pause
            break
        case <-stopchan:
            ws.Close()
            return nil
        case <-time.After(sleep):
            break
        }

        if paused {
            log.Debug("Updating release resources paused ....")
            continue
        }

        log.Debug("Updating release resources ....")

        // Pods
        rel.UpdatePods(c.portalProxy)
        sendResource(ws, "Pods", rel.GetPods())

        graph.ParseManifest(rel)
        sendResource(ws, "Graph", graph)

        // Now get all of the resources in the manifest
        rel.UpdateResources(c.portalProxy)
        sendResource(ws, "Resources", rel.GetResources())

        graph.ParseManifest(rel)
        sendResource(ws, "Graph", graph)

        sleep = 10 * time.Second
    }
}

func readLoop(c *websocket.Conn, stopchan chan<- bool, pausechan chan<- bool) {
    for {

        messageType, r, err := c.NextReader()
        if err != nil {
            c.Close()
            close(stopchan)
            break
        }

        switch messageType {
        case websocket.TextMessage:
            data, err := ioutil.ReadAll(r)
            if err != nil {
                log.Warnf("Failed to read content of helm resource websocket message: %+v", err)
                break
            }

            message := ResourceMessage{}
            err = json.Unmarshal(data, &message)
            if err != nil {
                log.Warnf("Failed to parse content of helm resource websocket message: %+v", err)
                break
            }

            switch message.MessageType {
            case PauseTrue:
                pausechan <- true
                break
            case PauseFalse:
                pausechan <- false
                break
            }
        default:
            c.Close()
            close(stopchan)
            break
        }
    }
}

func sendResource(ws *websocket.Conn, kind string, data interface{}) error {
    var err error
    var txt []byte
    if txt, err = json.Marshal(data); err == nil {
        resp := ResourceResponse{
            Kind: kind,
            Data: json.RawMessage(txt),
        }

        if txt, err = json.Marshal(resp); err == nil {
            if ws.WriteMessage(websocket.TextMessage, txt); err == nil {
                return nil
            }
        }
    }

    return err
}