cloudfoundry/stratos

View on GitHub
src/jetstream/plugins/kubernetes/terminal/start.go

Summary

Maintainability
B
5 hrs
Test Coverage
package terminal

import (
    "crypto/tls"
    "errors"
    "fmt"

    //"encoding/base64"
    "encoding/json"
    "net/http"
    "strings"
    "time"

    "github.com/labstack/echo/v4"
    log "github.com/sirupsen/logrus"

    "github.com/cloudfoundry-incubator/stratos/src/jetstream/repository/interfaces"

    "github.com/gorilla/websocket"
)

// TTY Resize, see: https://gitlab.cncf.ci/kubernetes/kubernetes/commit/3b21a9901bcd48bb452d3bf1a0cddc90dae142c4#9691a2f9b9c30711f0397221db0b9ac55ab0e2d1

// Allow connections from any Origin
var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool { return true },
}

// KeyCode - JSON object that is passed from the front-end to notify of a key press or a term resize
type KeyCode struct {
    Key  string `json:"key"`
    Cols int    `json:"cols"`
    Rows int    `json:"rows"`
}

type terminalSize struct {
    Width  uint16
    Height uint16
}

const (
    // Time allowed to write a message to the peer.
    writeWait = 10 * time.Second

    // Time allowed to read the next pong message from the peer.
    pongWait = 60 * time.Second

    // Send pings to peer with this period. Must be less than pongWait.
    pingPeriod = (pongWait * 9) / 10

    // Time to wait before force close on connection.
    closeGracePeriod = 10 * time.Second
)

// Start handles web-socket request to launch a Kubernetes Terminal
func (k *KubeTerminal) Start(c echo.Context) error {
    log.Debug("Kube Terminal start request")

    endpointGUID := c.Param("guid")
    userGUID := c.Get("user_id").(string)

    cnsiRecord, err := k.PortalProxy.GetCNSIRecord(endpointGUID)
    if err != nil {
        return errors.New("Could not get endpoint information")
    }

    // Get token for this user
    tokenRecord, ok := k.PortalProxy.GetCNSITokenRecord(endpointGUID, userGUID)
    if !ok {
        return errors.New("Could not get token")
    }

    // This is the kube config for the kubernetes endpoint that we want configured in the Terminal
    kubeConfig, err := k.Kube.GetKubeConfigForEndpoint(cnsiRecord.APIEndpoint.String(), tokenRecord, "")
    if err != nil {
        return errors.New("Can not get Kubernetes config for specified endpoint")
    }

    // Determine the Kubernetes version
    version, _ := k.getKubeVersion(endpointGUID, userGUID)
    log.Debugf("Kubernetes Version: %s", version)

    // Upgrade the web socket for the incoming request
    ws, pingTicker, err := interfaces.UpgradeToWebSocket(c)
    if err != nil {
        return err
    }
    defer ws.Close()
    defer pingTicker.Stop()

    // At this point we aer using web sockets, so we can not return errors to the client as the connection
    // has been upgraded to a web socket

    // We are now in web socket land - we don't want any middleware to change the HTTP response
    c.Set("Stratos-WebSocket", "true")

    // Send a message to say that we are creating the pod
    sendProgressMessage(ws, "Launching Kubernetes Terminal ... one moment please")

    podData, err := k.createPod(c, kubeConfig, version, ws)

    // Clear progress message
    sendProgressMessage(ws, "")

    if err != nil {
        log.Errorf("Kubernetes Terminal: Error creating secret or pod: %+v", err)
        k.cleanupPodAndSecret(podData)

        // Send error message
        sendProgressMessage(ws, "!"+err.Error())
        return nil
    }

    // API Endpoint to SSH/exec into a container
    target := fmt.Sprintf("%s/api/v1/namespaces/%s/pods/%s/exec?command=/bin/bash&stdin=true&stderr=true&stdout=true&tty=true", k.APIServer, k.Namespace, podData.PodName)

    dialer := &websocket.Dialer{
        TLSClientConfig: &tls.Config{
            InsecureSkipVerify: true,
        },
    }

    if strings.HasPrefix(target, "https://") {
        target = "wss://" + target[8:]
    } else {
        target = "ws://" + target[7:]
    }

    header := &http.Header{}
    header.Add("Authorization", fmt.Sprintf("Bearer %s", string(k.Token)))
    wsConn, _, err := dialer.Dial(target, *header)

    if err == nil {
        defer wsConn.Close()
    }

    if err != nil {
        k.cleanupPodAndSecret(podData)
        log.Warn("Kube Terminal: Could not connect to pod")
        // No point returning an error - we've already upgraded to web sockets, so we can't use the HTTP response now
        return nil
    }

    stdoutDone := make(chan bool)
    go pumpStdout(ws, wsConn, stdoutDone)
    go ping(ws, stdoutDone)

    // If the downstream connection is closed, close the other web socket as well
    ws.SetCloseHandler(func(code int, text string) error {
        wsConn.Close()
        // Cleanup
        k.cleanupPodAndSecret(podData)
        podData = nil
        return nil
    })

    // Wait a while when reading - can take some time for the container to launch
    ws.SetReadDeadline(time.Now().Add(pongWait))
    ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(pongWait)); return nil })

    // Read the input from the web socket and pipe it to the SSH client
    for {
        _, r, err := ws.ReadMessage()
        if err != nil {
            // Error reading - so clean up
            k.cleanupPodAndSecret(podData)
            podData = nil

            ws.SetWriteDeadline(time.Now().Add(writeWait))
            ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
            time.Sleep(closeGracePeriod)
            ws.Close()

            // No point returning an error - we've already upgraded to web sockets, so we can't use the HTTP response now
            return nil
        }

        res := KeyCode{}
        json.Unmarshal(r, &res)
        if res.Cols == 0 {
            slice := make([]byte, 1)
            slice[0] = 0
            slice = append(slice, []byte(res.Key)...)
            wsConn.WriteMessage(websocket.TextMessage, slice)
        } else {
            size := terminalSize{
                Width:  uint16(res.Cols),
                Height: uint16(res.Rows),
            }
            j, _ := json.Marshal(size)
            resizeStream := []byte{4}
            slice := append(resizeStream, j...)
            wsConn.WriteMessage(websocket.TextMessage, slice)
        }
    }
}

func pumpStdout(ws *websocket.Conn, source *websocket.Conn, done chan bool) {
    for {
        _, r, err := source.ReadMessage()
        if err != nil {
            // Close
            ws.Close()
            done <- true
            break
        }
        ws.SetWriteDeadline(time.Now().Add(writeWait))
        bytes := fmt.Sprintf("% x\n", r[1:])
        if err := ws.WriteMessage(websocket.TextMessage, []byte(bytes)); err != nil {
            log.Errorf("Kubernetes Terminal failed to write message: %+v", err)
            ws.Close()
            break
        }
    }
}

func ping(ws *websocket.Conn, done chan bool) {
    ticker := time.NewTicker(pingPeriod)
    defer ticker.Stop()
    for {
        select {
        case <-ticker.C:
            if err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)); err != nil {
                log.Errorf("Web socket ping error: %+v", err)
            }
        case <-done:
            return
        }
    }
}