cloudfoundry-incubator/stratos

View on GitHub
src/jetstream/plugins/cloudfoundry/cf_websocket_streams.go

Summary

Maintainability
A
2 hrs
Test Coverage
package cloudfoundry

import (
    "crypto/tls"
    "encoding/json"
    "fmt"
    "net/http"
    "strconv"
    "time"

    "github.com/cloudfoundry-incubator/stratos/src/jetstream/repository/interfaces"
    "github.com/cloudfoundry/noaa"
    "github.com/cloudfoundry/noaa/consumer"
    noaa_errors "github.com/cloudfoundry/noaa/errors"
    "github.com/cloudfoundry/sonde-go/events"
    "github.com/gorilla/websocket"
    "github.com/labstack/echo/v4"
    log "github.com/sirupsen/logrus"
)

const (
    // Time allowed to read the next pong message from the peer
    pongWait = 30 * time.Second

    // Send ping messages to peer with this period (must be less than pongWait)
    pingPeriod = (pongWait * 9) / 10
)

// Allow connections from any Origin
var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool { return true },
}

func (c CloudFoundrySpecification) appStream(echoContext echo.Context) error {
    return c.commonStreamHandler(echoContext, appStreamHandler)
}

func (c CloudFoundrySpecification) firehose(echoContext echo.Context) error {
    return c.commonStreamHandler(echoContext, firehoseStreamHandler)
}

func (c CloudFoundrySpecification) appFirehose(echoContext echo.Context) error {
    return c.commonStreamHandler(echoContext, appFirehoseStreamHandler)
}

func (c CloudFoundrySpecification) commonStreamHandler(echoContext echo.Context, bespokeStreamHandler func(echo.Context, *AuthorizedConsumer, *websocket.Conn) error) error {
    ac, err := c.openNoaaConsumer(echoContext)
    if err != nil {
        return err
    }
    defer ac.consumer.Close()

    clientWebSocket, pingTicker, err := interfaces.UpgradeToWebSocket(echoContext)
    if err != nil {
        return err
    }
    defer clientWebSocket.Close()
    defer pingTicker.Stop()

    if err := bespokeStreamHandler(echoContext, ac, clientWebSocket); err != nil {
        return err
    }

    // This blocks until the WebSocket is closed
    drainClientMessages(clientWebSocket)
    return nil
}

type AuthorizedConsumer struct {
    consumer     *consumer.Consumer
    authToken    string
    refreshToken func() error
}

// Refresh the Authorization token if needed and create a new Noaa consumer
func (c CloudFoundrySpecification) openNoaaConsumer(echoContext echo.Context) (*AuthorizedConsumer, error) {

    ac := &AuthorizedConsumer{}

    // Get the CNSI and app IDs from route parameters
    cnsiGUID := echoContext.Param("cnsiGuid")
    userGUID := echoContext.Get("user_id").(string)

    // Extract the Doppler endpoint from the CNSI record
    cnsiRecord, err := c.portalProxy.GetCNSIRecord(cnsiGUID)
    if err != nil {
        return nil, fmt.Errorf("Failed to get record for CNSI %s: [%v]", cnsiGUID, err)
    }

    ac.refreshToken = func() error {
        newTokenRecord, err := c.portalProxy.RefreshOAuthToken(cnsiRecord.SkipSSLValidation, cnsiGUID, userGUID, cnsiRecord.ClientId, cnsiRecord.ClientSecret, cnsiRecord.TokenEndpoint)
        if err != nil {
            msg := fmt.Sprintf("Error refreshing token for CNSI %s : [%v]", cnsiGUID, err)
            return echo.NewHTTPError(http.StatusUnauthorized, msg)
        }
        ac.authToken = "bearer " + newTokenRecord.AuthToken
        return nil
    }

    dopplerAddress := cnsiRecord.DopplerLoggingEndpoint
    log.Debugf("CNSI record Obtained! Using Doppler Logging Endpoint: %s", dopplerAddress)

    // Get the auth token for the CNSI from the DB, refresh it if it's expired
    if tokenRecord, ok := c.portalProxy.GetCNSITokenRecord(cnsiGUID, userGUID); ok && !tokenRecord.Disconnected {
        ac.authToken = "bearer " + tokenRecord.AuthToken
        expTime := time.Unix(tokenRecord.TokenExpiry, 0)
        if expTime.Before(time.Now()) {
            log.Debug("Token obtained has expired, refreshing!")
            if err = ac.refreshToken(); err != nil {
                return nil, err
            }
        }
    } else {
        return nil, fmt.Errorf("Error getting token for user %s on CNSI %s", userGUID, cnsiGUID)
    }

    // Open a Noaa consumer to the doppler endpoint
    log.Debugf("Creating Noaa consumer for Doppler endpoint %s", dopplerAddress)
    ac.consumer = consumer.New(dopplerAddress, &tls.Config{InsecureSkipVerify: true}, http.ProxyFromEnvironment)

    return ac, nil
}

// Attempts to get the recent logs, if we get an unauthorized error we will refresh the auth token and retry once
func getRecentLogs(ac *AuthorizedConsumer, cnsiGUID, appGUID string) ([]*events.LogMessage, error) {
    log.Debug("getRecentLogs")
    messages, err := ac.consumer.RecentLogs(appGUID, ac.authToken)
    if err != nil {
        errorPattern := "Failed to get recent messages for App %s on CNSI %s [%v]"
        if _, ok := err.(*noaa_errors.UnauthorizedError); ok {
            // If unauthorized, we may need to refresh our Auth token
            // Note: annoyingly, older versions of CF also send back "401 - Unauthorized" when the app doesn't exist...
            // This means we sometimes end up here even when our token is legit
            if err := ac.refreshToken(); err != nil {
                return nil, fmt.Errorf(errorPattern, appGUID, cnsiGUID, err)
            }
            messages, err = ac.consumer.RecentLogs(appGUID, ac.authToken)
            if err != nil {
                msg := fmt.Sprintf(errorPattern, appGUID, cnsiGUID, err)
                return nil, echo.NewHTTPError(http.StatusUnauthorized, msg)
            }
        } else {
            return nil, fmt.Errorf(errorPattern, appGUID, cnsiGUID, err)
        }
    }
    return messages, nil
}

func drainErrors(errorChan <-chan error) {
    for err := range errorChan {
        // Note: we receive a nil error before the channel is closed so check here...
        if err != nil {
            log.Errorf("Received error from Doppler %v", err.Error())
        }
    }
}

func drainLogMessages(msgChan <-chan *events.LogMessage, callback func(msg *events.LogMessage)) {
    for msg := range msgChan {
        callback(msg)
    }
}

func drainFirehoseEvents(eventChan <-chan *events.Envelope, callback func(msg *events.Envelope)) {
    for event := range eventChan {
        callback(event)
    }
}

// Drain and discard incoming messages from the WebSocket client, effectively making our WebSocket read-only
func drainClientMessages(clientWebSocket *websocket.Conn) {
    for {
        _, _, err := clientWebSocket.ReadMessage()
        if err != nil {
            // We get here when the client (browser) disconnects
            break
        }
    }
}

func appStreamHandler(echoContext echo.Context, ac *AuthorizedConsumer, clientWebSocket *websocket.Conn) error {
    // Get the CNSI and app IDs from route parameters
    cnsiGUID := echoContext.Param("cnsiGuid")
    appGUID := echoContext.Param("appGuid")

    log.Infof("Received request for log stream for App ID: %s - in CNSI: %s", appGUID, cnsiGUID)

    messages, err := getRecentLogs(ac, cnsiGUID, appGUID)
    if err != nil {
        return err
    }
    // Reusable closure to pump messages from Noaa to the client WebSocket
    // N.B. We convert protobuf messages to JSON for ease of use in the frontend
    relayLogMsg := func(msg *events.LogMessage) {
        if jsonMsg, err := json.Marshal(msg); err != nil {
            log.Errorf("Received unparsable message from Doppler %v, %v", jsonMsg, err)
        } else {
            err := clientWebSocket.WriteMessage(websocket.TextMessage, jsonMsg)
            if err != nil {
                log.Errorf("Error writing data to WebSocket, %v", err)
            }
        }
    }

    // Send the recent messages, sorted in Chronological order
    for _, msg := range noaa.SortRecent(messages) {
        relayLogMsg(msg)
    }

    msgChan, errorChan := ac.consumer.TailingLogs(appGUID, ac.authToken)

    // Process the app stream
    go drainErrors(errorChan)
    go drainLogMessages(msgChan, relayLogMsg)

    log.Infof("Now streaming log for App ID: %s - on CNSI: %s", appGUID, cnsiGUID)
    return nil
}

func firehoseStreamHandler(echoContext echo.Context, ac *AuthorizedConsumer, clientWebSocket *websocket.Conn) error {
    log.Debug("firehose")

    // Get the CNSI and app IDs from route parameters
    cnsiGUID := echoContext.Param("cnsiGuid")

    log.Infof("Received request for Firehose stream for CNSI: %s", cnsiGUID)

    userGUID := echoContext.Get("user_id").(string)
    firehoseSubscriptionId := userGUID + "@" + strconv.FormatInt(time.Now().UnixNano(), 10)
    log.Debugf("Connecting the Firehose with subscription ID: %s", firehoseSubscriptionId)

    eventChan, errorChan := ac.consumer.Firehose(firehoseSubscriptionId, ac.authToken)

    // Process the app stream
    go drainErrors(errorChan)
    go drainFirehoseEvents(eventChan, func(msg *events.Envelope) {
        if jsonMsg, err := json.Marshal(msg); err != nil {
            log.Errorf("Received unparsable message from Doppler %v, %v", jsonMsg, err)
        } else {
            err := clientWebSocket.WriteMessage(websocket.TextMessage, jsonMsg)
            if err != nil {
                log.Errorf("Error writing data to WebSocket, %v", err)
            }
        }
    })

    log.Infof("Firehose connected and streaming for CNSI: %s - subscription ID: %s", cnsiGUID, firehoseSubscriptionId)
    return nil
}

func appFirehoseStreamHandler(echoContext echo.Context, ac *AuthorizedConsumer, clientWebSocket *websocket.Conn) error {
    log.Debug("appFirehoseStreamHandler")

    // Get the CNSI and app IDs from route parameters
    cnsiGUID := echoContext.Param("cnsiGuid")
    appGUID := echoContext.Param("appGuid")

    log.Infof("Received request for log stream for App ID: %s - in CNSI: %s", appGUID, cnsiGUID)

    msgChan, errorChan := ac.consumer.Stream(appGUID, ac.authToken)

    // Process the app stream
    go drainErrors(errorChan)
    go drainFirehoseEvents(msgChan, func(msg *events.Envelope) {
        if jsonMsg, err := json.Marshal(msg); err != nil {
            log.Errorf("Received unparsable message from Doppler %v, %v", jsonMsg, err)
        } else {
            err := clientWebSocket.WriteMessage(websocket.TextMessage, jsonMsg)
            if err != nil {
                log.Errorf("Error writing data to WebSocket, %v", err)
            }
        }
    })

    log.Infof("Now streaming for App ID: %s - on CNSI: %s", appGUID, cnsiGUID)
    return nil
}