cloudfoundry/stratos

View on GitHub
src/jetstream/plugins/cfapppush/deploy.go

Summary

Maintainability
D
3 days
Test Coverage
package cfapppush

import (
    "encoding/base64"
    "encoding/json"
    "errors"
    "fmt"
    "io/ioutil"
    "net/url"
    "os"
    "path/filepath"
    "strings"
    "time"

    "github.com/cloudfoundry-incubator/stratos/src/jetstream/repository/interfaces"
    "github.com/gorilla/websocket"
    "github.com/labstack/echo/v4"
    log "github.com/sirupsen/logrus"
    yaml "gopkg.in/yaml.v2"

    archiver "github.com/mholt/archiver"
)

// Success
const (
    DATA MessageType = iota + 20000
    MANIFEST
    CLOSE_SUCCESS
    APP_GUID_NOTIFY
)

// Close - error cases
const (
    CLOSE_PUSH_ERROR MessageType = iota + 40000
    CLOSE_NO_MANIFEST
    CLOSE_INVALID_MANIFEST
    CLOSE_FAILED_CLONE
    CLOSE_FAILED_NO_BRANCH
    CLOSE_FAILURE
    CLOSE_NO_SESSION
    CLOSE_NO_CNSI
    CLOSE_NO_CNSI_USERTOKEN
    CLOSE_ACK
)

// Events
const (
    EVENT_CLONED MessageType = iota + 10000
    EVENT_FETCHED_MANIFEST
    EVENT_PUSH_STARTED
    EVENT_PUSH_COMPLETED
)

// Source exchange messages
const (
    SOURCE_REQUIRED MessageType = iota + 30000
    SOURCE_GITSCM
    SOURCE_FOLDER
    SOURCE_FILE
    SOURCE_FILE_DATA
    SOURCE_FILE_ACK
    SOURCE_GITURL
    SOURCE_WAIT_ACK
    SOURCE_DOCKER_IMG
)

// Application Overrides messages
const (
    OVERRIDES_REQUIRED MessageType = iota + 50000
    OVERRIDES_SUPPLIED
)

const (
    SCM_TYPE_GITHUB = "github"
    SCM_TYPE_GITLAB = "gitlab"
)

const (
    stratosProjectKey = "STRATOS_PROJECT"
)

func (cfAppPush *CFAppPush) deploy(echoContext echo.Context) error {

    cnsiGUID := echoContext.Param("cnsiGuid")
    orgGUID := echoContext.Param("orgGuid")
    spaceGUID := echoContext.Param("spaceGuid")
    spaceName := echoContext.QueryParam("space")
    orgName := echoContext.QueryParam("org")

    // App ID is this is a redeploy
    appID := echoContext.QueryParam("app")
    userGUID := echoContext.Get("user_id").(string)

    log.Debug("UpgradeToWebSocket")
    clientWebSocket, pingTicker, err := interfaces.UpgradeToWebSocket(echoContext)
    log.Debug("UpgradeToWebSocket done")
    if err != nil {
        log.Errorf("Upgrade to websocket failed due to: %+v", err)
        return err
    }
    defer clientWebSocket.Close()
    defer pingTicker.Stop()

    // We use a simple protocol to get the source to use for cf push and any cf push cli overrides

    // Ask for source first, then overrides - to support the case that local file/folder source is uploaded first before overrides are avialable

    // Send a message to the client to say that we are awaiting source details
    sendEvent(clientWebSocket, SOURCE_REQUIRED)

    // Wait for a message from the client
    log.Debug("Waiting for source information from client")

    msg := SocketMessage{}
    if err := clientWebSocket.ReadJSON(&msg); err != nil {
        log.Errorf("Error reading JSON: %v+", err)
        return err
    }

    log.Debugf("Source %v+", msg)

    // Temporary folder for the application source
    tempDir, err := ioutil.TempDir("", "cf-push-")
    defer os.RemoveAll(tempDir)

    var appDir string
    var stratosProject StratosProject

    // Get the source, depending on the source type
    switch msg.Type {
    case SOURCE_GITSCM:
        stratosProject, appDir, err = cfAppPush.getGitSCMSource(clientWebSocket, tempDir, msg, userGUID)
    case SOURCE_FOLDER:
        stratosProject, appDir, err = getFolderSource(clientWebSocket, tempDir, msg)
    case SOURCE_GITURL:
        stratosProject, appDir, err = getGitURLSource(clientWebSocket, tempDir, msg)
    case SOURCE_DOCKER_IMG:
        stratosProject, appDir, err = getDockerURLSource(clientWebSocket, tempDir, msg)
    default:
        err = errors.New("Unsupported source type; don't know how to get the source for the application")
    }

    if err != nil {
        log.Errorf("Failed to fetch source: %v+", err)
        return err
    }

    // Send a message to the client to say that we are awaiting application overrides
    sendEvent(clientWebSocket, OVERRIDES_REQUIRED)

    // Wait for a message from the client
    log.Debug("Waiting for app overrides from client")

    msgOverrides := SocketMessage{}
    if err := clientWebSocket.ReadJSON(&msgOverrides); err != nil {
        log.Errorf("Error reading JSON: %v+", err)
        return err
    }

    if msgOverrides.Type != OVERRIDES_SUPPLIED {
        log.Errorf("Expected app deploy override but received event with type: %v", msgOverrides.Type)
        return errors.New("Expected app deploy override message but received another type")
    }

    log.Debugf("Overrides: %v+", msgOverrides)
    overrides := CFPushAppOverrides{}
    if err = json.Unmarshal([]byte(msgOverrides.Message), &overrides); err != nil {
        log.Errorf("Error marshalling json: %v+", err)
        return err
    }

    stratosProject.DeployOverrides = overrides

    // Source fetched - read manifest
    manifest, manifestFile, err := fetchManifest(appDir, stratosProject, clientWebSocket)
    if err != nil {
        log.Warnf("Failed to find manifest file: %s", err)
        sendErrorMessage(clientWebSocket, err, CLOSE_FAILURE)
        return err
    }

    sendEvent(clientWebSocket, EVENT_FETCHED_MANIFEST)

    err = sendManifest(manifest, clientWebSocket)
    if err != nil {
        log.Warnf("Failed to read or send manifest due to %s", err)
        sendErrorMessage(clientWebSocket, err, CLOSE_FAILURE)
        return err
    }

    socketWriter := &SocketWriter{
        clientWebSocket: clientWebSocket,
    }
    pushConfig, err := cfAppPush.getConfigData(echoContext, cnsiGUID, orgGUID, spaceGUID, spaceName, orgName, clientWebSocket)
    if err != nil {
        log.Warnf("Failed to initialise config due to error %+v", err)
        sendErrorMessage(clientWebSocket, err, CLOSE_FAILURE)
        return err
    }

    // Send App ID now if we have it (redeploy)
    if len(appID) > 0 {
        cfAppPush.SendEvent(clientWebSocket, APP_GUID_NOTIFY, appID)
    }

    dialTimeout := cfAppPush.portalProxy.Env().String("CF_DIAL_TIMEOUT", "")
    pushConfig.OutputWriter = socketWriter
    pushConfig.DialTimeout = dialTimeout

    // Initialise Push Command
    cfPush := Constructor(pushConfig, cfAppPush.portalProxy)

    err = cfPush.Init(appDir, manifestFile, overrides)
    if err != nil {
        log.Warnf("Failed to parse due to: %+v", err)
        sendErrorMessage(clientWebSocket, err, CLOSE_FAILURE)
        return err
    }

    sendEvent(clientWebSocket, EVENT_PUSH_STARTED)

    err = cfPush.Run(cfAppPush, clientWebSocket)
    if err != nil {
        log.Warnf("Failed to execute due to: %+v", err)
        sendErrorMessage(clientWebSocket, err, CLOSE_PUSH_ERROR)
        return err
    }

    log.Debug("Sending message to front-end to indicate push completed")
    sendEvent(clientWebSocket, EVENT_PUSH_COMPLETED)

    sendEvent(clientWebSocket, CLOSE_SUCCESS)

    log.Debug("Waiting for close acknowledgement from the client")

    wait := 30 * time.Second
    clientWebSocket.SetReadDeadline(time.Now().Add(wait))

    // Wait for the client to acknowledge the close - timeout ?
    if err := clientWebSocket.ReadJSON(&msg); err != nil {
        log.Errorf("Error reading JSON: %v+", err)
        return nil
    }

    if msg.Type != CLOSE_ACK {
        log.Errorf("Expected a close acknowledgement - got: %s", string(msg.Type))
    } else {
        log.Debug("Got close acknowledgement from the client")
    }

    // Close the web socket - should we wait for ack from client?
    clientWebSocket.Close()

    return nil
}

func getFolderSource(clientWebSocket *websocket.Conn, tempDir string, msg SocketMessage) (StratosProject, string, error) {
    // The msg data is JSON for the Folder info
    info := FolderSourceInfo{
        WaitAfterUpload: false,
    }

    if err := json.Unmarshal([]byte(msg.Message), &info); err != nil {
        return StratosProject{}, tempDir, err
    }

    // Create all of the folders
    for _, folder := range info.Folders {
        path := filepath.Join(tempDir, folder)
        err := os.Mkdir(path, 0700)
        if err != nil {
            return StratosProject{}, tempDir, errors.New("Failed to create folder")
        }
    }

    var transfers = info.Files
    var lastFilePath string
    for transfers > 0 {
        log.Debugf("Waiting for a file: %d remaining", transfers)

        // Send an ACK to ask the client to start sending us files
        sendEvent(clientWebSocket, SOURCE_FILE_ACK)

        // We should get a SOURCE_FILE message next
        msg := SocketMessage{}
        if err := clientWebSocket.ReadJSON(&msg); err != nil {
            log.Errorf("Error reading JSON: %v+", err)
            return StratosProject{}, tempDir, err
        }

        // Expecting a file
        if msg.Type != SOURCE_FILE {
            return StratosProject{}, tempDir, errors.New("Unexpected web socket message type")
        }

        log.Debugf("Transferring file: %s", msg.Message)

        // Now expecting a binary message
        messageType, p, err := clientWebSocket.ReadMessage()

        if err != nil {
            return StratosProject{}, tempDir, err
        }

        if messageType != websocket.BinaryMessage {
            return StratosProject{}, tempDir, errors.New("Expecting binary file data")
        }

        // Write the file
        path := filepath.Join(tempDir, msg.Message)
        err = ioutil.WriteFile(path, p, 0644)
        if err != nil {
            return StratosProject{}, tempDir, err
        }

        lastFilePath = path
        transfers--

        // Acknowledge last file transfer
        if transfers == 0 {
            sendEvent(clientWebSocket, SOURCE_FILE_ACK)
        }
    }

    // Check to see if we received only 1 file and check if that was an archive file
    if info.Files == 1 {
        log.Debugf("Checking for archive file - %s", lastFilePath)

        // Overwrite generic 'filefolder' type
        info.DeploySource.SourceType = "archive"

        log.Debug("Unpacking archive ......")
        unpackPath := filepath.Join(tempDir, "application")
        err := os.Mkdir(unpackPath, 0700)

        err = archiver.Unarchive(lastFilePath, unpackPath)
        if err != nil {
            return StratosProject{}, tempDir, err
        }

        // Just check to see if we actually unpacked into a root folder
        contents, err := ioutil.ReadDir(unpackPath)
        if err != nil {
            return StratosProject{}, tempDir, err
        }

        if len(contents) == 1 && contents[0].IsDir() {
            unpackPath = filepath.Join(unpackPath, contents[0].Name())
        }

        // Archive done
        tempDir = unpackPath
    }

    // The client (v2) can request only source upload and for deploy to wait until it sends a message
    if info.WaitAfterUpload {
        msg := SocketMessage{}
        if err := clientWebSocket.ReadJSON(&msg); err != nil {
            log.Errorf("Error reading JSON: %v+", err)
            return StratosProject{}, tempDir, err
        }

        if msg.Type != SOURCE_WAIT_ACK {
            return StratosProject{}, tempDir, errors.New("Expecting ACK message to begin deployment")
        }
    }

    // All done!

    // Return a string that can be added to the manifest as an application env var to trace where the source originated
    info.Timestamp = time.Now().Unix()
    info.Folders = nil
    stratosProject := StratosProject{
        DeploySource: info,
    }

    return stratosProject, tempDir, nil
}

func (cfAppPush *CFAppPush) getGitSCMSource(clientWebSocket *websocket.Conn, tempDir string, msg SocketMessage, userGUID string) (StratosProject, string, error) {
    var (
        err error
    )

    // The msg data is JSON for the GitSCM info
    info := GitSCMSourceInfo{}
    if err = json.Unmarshal([]byte(msg.Message), &info); err != nil {
        return StratosProject{}, tempDir, err
    }

    loggerURL := info.URL
    cloneURL := info.URL
    skipSLL := false

    // Apply credentials associated with the endpoint
    if len(info.EndpointGUID) != 0 {
        parsedURL, err := url.Parse(info.URL)
        if err != nil {
            return StratosProject{}, tempDir, errors.New("Failed to parse SCM URL")
        }

        cnsiRecord, err := cfAppPush.portalProxy.GetCNSIRecord(info.EndpointGUID)
        if err != nil {
            return StratosProject{}, tempDir, errors.New("Failed to find endpoint with guid " + info.EndpointGUID)
        }

        skipSLL = cnsiRecord.SkipSSLValidation

        tokenRecord, isTokenFound := cfAppPush.portalProxy.GetCNSITokenRecord(info.EndpointGUID, userGUID)
        if isTokenFound {
            authTokenDecodedBytes, err := base64.StdEncoding.DecodeString(tokenRecord.AuthToken)
            if err != nil {
                return StratosProject{}, tempDir, errors.New("Failed to decode auth token")
            }

            var (
                username string
                password string
            )

            switch info.SCM {
            case SCM_TYPE_GITHUB:
                // GitHub API uses token auth: username and password are stored in the token information
                username = tokenRecord.RefreshToken
                password = string(authTokenDecodedBytes)
            case SCM_TYPE_GITLAB:
                // GitLab API uses token auth: username and password are stored in the token information
                username = tokenRecord.RefreshToken
                password = string(authTokenDecodedBytes)
            default:
                return StratosProject{}, tempDir, fmt.Errorf("Unknown SCM type '%s'", info.SCM)
            }

            if len(username) == 0 {
                return StratosProject{}, tempDir, errors.New("Username is empty")
            }

            // mask the credentials for the logs and env var
            parsedURL.User = url.UserPassword("REDACTED", "REDACTED")
            loggerURL = parsedURL.String()

            // apply the correct credentials
            parsedURL.User = url.UserPassword(username, password)
            cloneURL = parsedURL.String()
        }
    }

    log.Debugf("GitSCM SCM: %s, Source: %s, branch %s, url: %s", info.SCM, info.Project, info.Branch, loggerURL)
    cloneDetails := CloneDetails{
        Url:       cloneURL,
        LoggerUrl: loggerURL,
        Branch:    info.Branch,
        Commit:    info.CommitHash,
        SkipSSL:   skipSLL,
    }
    info.CommitHash, err = cloneRepository(cloneDetails, clientWebSocket, tempDir)
    if err != nil {
        return StratosProject{}, tempDir, err
    }

    sendEvent(clientWebSocket, EVENT_CLONED)

    // Return a string that can be added to the manifest as an application env var to trace where the source originated
    info.Timestamp = time.Now().Unix()
    stratosProject := StratosProject{
        DeploySource: info,
    }

    return stratosProject, tempDir, nil
}

func getGitURLSource(clientWebSocket *websocket.Conn, tempDir string, msg SocketMessage) (StratosProject, string, error) {

    var (
        err error
    )

    // The msg data is JSON for the GitHub info
    info := GitUrlSourceInfo{}

    if err = json.Unmarshal([]byte(msg.Message), &info); err != nil {
        return StratosProject{}, tempDir, err
    }

    log.Debugf("Git Url Source: %s, branch %s", info.Url, info.Branch)
    cloneDetails := CloneDetails{
        Url:    info.Url,
        Branch: info.Branch,
        Commit: info.CommitHash,
    }
    info.CommitHash, err = cloneRepository(cloneDetails, clientWebSocket, tempDir)
    if err != nil {
        return StratosProject{}, tempDir, err
    }

    sendEvent(clientWebSocket, EVENT_CLONED)

    // Return a string that can be added to the manifest as an application env var to trace where the source originated
    info.Timestamp = time.Now().Unix()
    stratosProject := StratosProject{
        DeploySource: info,
    }

    return stratosProject, tempDir, nil
}

func getDockerURLSource(clientWebSocket *websocket.Conn, tempDir string, msg SocketMessage) (StratosProject, string, error) {

    var (
        err error
    )

    // The msg data is JSON for the docker info
    info := DockerImageSourceInfo{}

    if err = json.Unmarshal([]byte(msg.Message), &info); err != nil {
        return StratosProject{}, tempDir, err
    }

    log.Debugf("Docker Image: '%s', Username '%s'", info.DockerImage, info.DockerUsername)

    // Create a manifest using the application name. This sets up the environment as if it were a git clone
    applicationData := RawManifestApplication{
        Name: info.ApplicationName,
    }
    manifest := Applications{
        Applications: []RawManifestApplication{applicationData},
    }
    marshalledYaml, err := yaml.Marshal(manifest)
    manifestPath := fmt.Sprintf("%s/manifest.yml", tempDir)
    err = ioutil.WriteFile(manifestPath, marshalledYaml, 0600)
    if err != nil {
        log.Warnf("Failed to write manifest in path %s", manifestPath)
        return StratosProject{}, tempDir, err
    }

    sendEvent(clientWebSocket, EVENT_CLONED)

    // Return a string that can be added to the manifest as an application env var to trace where the source originated
    info.Timestamp = time.Now().Unix()
    stratosProject := StratosProject{
        DeploySource: info,
    }

    return stratosProject, tempDir, nil
}

func getMarshalledSocketMessage(data string, messageType MessageType) ([]byte, error) {

    messageStruct := SocketMessage{
        Message:   string(data),
        Timestamp: time.Now().Unix(),
        Type:      messageType,
    }
    marshalledJSON, err := json.Marshal(messageStruct)
    return marshalledJSON, err
}

func (cfAppPush *CFAppPush) getConfigData(echoContext echo.Context, cnsiGUID string, orgGUID string, spaceGUID string, spaceName string, orgName string, clientWebSocket *websocket.Conn) (*CFPushAppConfig, error) {

    cnsiRecord, err := cfAppPush.portalProxy.GetCNSIRecord(cnsiGUID)
    if err != nil {
        log.Warnf("Failed to retrieve record for CNSI %s, error is %+v", cnsiGUID, err)
        sendErrorMessage(clientWebSocket, err, CLOSE_NO_CNSI)
        return nil, err
    }

    userID, err := cfAppPush.portalProxy.GetSessionStringValue(echoContext, "user_id")

    if err != nil {
        log.Warnf("Failed to retrieve session user")
        sendErrorMessage(clientWebSocket, err, CLOSE_NO_SESSION)
        return nil, err
    }

    token, found := cfAppPush.portalProxy.GetCNSITokenRecord(cnsiGUID, userID)
    if !found {
        log.Warnf("Failed to retrieve record for CNSI %s", cnsiGUID)
        sendErrorMessage(clientWebSocket, err, CLOSE_NO_CNSI_USERTOKEN)
        return nil, errors.New("Failed to find token record")
    }

    config := &CFPushAppConfig{
        AuthorizationEndpoint:  cnsiRecord.AuthorizationEndpoint,
        CFClient:               cnsiRecord.ClientId,
        CFClientSecret:         cnsiRecord.ClientSecret,
        APIEndpointURL:         cnsiRecord.APIEndpoint.String(),
        DopplerLoggingEndpoint: cnsiRecord.DopplerLoggingEndpoint,
        SkipSSLValidation:      cnsiRecord.SkipSSLValidation,
        AuthToken:              token.AuthToken,
        OrgGUID:                orgGUID,
        OrgName:                orgName,
        SpaceGUID:              spaceGUID,
        SpaceName:              spaceName,
        EndpointID:             cnsiGUID,
        UserID:                 userID,
    }

    return config, nil
}

func cloneRepository(cloneDetails CloneDetails, clientWebSocket *websocket.Conn, tempDir string) (string, error) {

    if len(cloneDetails.Branch) == 0 {
        err := errors.New("No branch supplied")
        log.Infof("Failed to checkout repo %s due to %+v", cloneDetails.LoggerUrl, err)
        sendErrorMessage(clientWebSocket, err, CLOSE_FAILED_NO_BRANCH)
        return "", err
    }

    vcsGit := GetVCS()

    err := vcsGit.Create(cloneDetails.SkipSSL, tempDir, cloneDetails.Url, cloneDetails.Branch)
    if err != nil {
        log.Infof("Failed to clone repo %s due to %+v", cloneDetails.LoggerUrl, err)
        sendErrorMessage(clientWebSocket, err, CLOSE_FAILED_CLONE)
        return "", err
    }

    return getCommit(cloneDetails, clientWebSocket, tempDir, vcsGit)

}

func getCommit(cloneDetails CloneDetails, clientWebSocket *websocket.Conn, tempDir string, vcsGit *vcsCmd) (string, error) {

    if cloneDetails.Commit != "" {
        log.Debugf("Checking out commit %s", cloneDetails.Commit)
        err := vcsGit.ResetBranchToCommit(tempDir, cloneDetails.Commit)
        if err != nil {
            log.Infof("Failed to checkout commit %s", cloneDetails.Commit)
            sendErrorMessage(clientWebSocket, err, CLOSE_FAILED_CLONE)
            return "", err
        }
        return cloneDetails.Commit, nil
    }

    head, err := vcsGit.Head(tempDir)
    if err != nil {
        log.Infof("Unable to fetch HEAD in branch due to %s", err)
        return "", err
    }
    return strings.TrimSpace(head), nil

}

// Check if file exists
func fileExists(filename string) bool {
    info, err := os.Stat(filename)
    if os.IsNotExist(err) {
        return false
    }
    return !info.IsDir()
}

// This assumes manifest lives in the root of the app
func fetchManifest(repoPath string, stratosProject StratosProject, clientWebSocket *websocket.Conn) (Applications, string, error) {

    var manifest Applications

    // Can be either manifest.yml or manifest.yaml
    manifestPath := filepath.Join(repoPath, "manifest.yml")
    if !fileExists(manifestPath) {
        manifestPath = filepath.Join(repoPath, "manifest.yaml")
        if !fileExists(manifestPath) {
            return manifest, manifestPath, fmt.Errorf("Can not find manifest file")
        }
    }

    // Read the manifest
    data, err := ioutil.ReadFile(manifestPath)
    if err != nil {
        log.Warnf("Failed to read manifest in path %s", manifestPath)
        sendErrorMessage(clientWebSocket, err, CLOSE_NO_MANIFEST)
        return manifest, manifestPath, err
    }

    err = yaml.Unmarshal(data, &manifest)
    if err != nil {
        log.Warnf("Failed to unmarshall manifest in path %s", manifestPath)
        sendErrorMessage(clientWebSocket, err, CLOSE_INVALID_MANIFEST)
        return manifest, manifestPath, err
    }

    marshalledJSON, _ := json.Marshal(stratosProject)
    envVarMetaData := string(marshalledJSON)

    // If we have metadata to indicate the source origin, add it to the manifest
    if len(envVarMetaData) > 0 {
        for i, app := range manifest.Applications {
            if len(app.EnvironmentVariables) == 0 {
                app.EnvironmentVariables = make(map[string]string)
            }
            app.EnvironmentVariables[stratosProjectKey] = envVarMetaData
            manifest.Applications[i] = app
        }

        marshalledYaml, err := yaml.Marshal(manifest)
        if err != nil {
            log.Warnf("Failed to marshall manifest in path %v", manifest)
            sendErrorMessage(clientWebSocket, err, CLOSE_FAILURE)
            return manifest, manifestPath, err
        }
        ioutil.WriteFile(manifestPath, marshalledYaml, 0600)
    }

    return manifest, manifestPath, nil
}

func (sw *SocketWriter) Write(data []byte) (int, error) {

    defer func() {
        if r := recover(); r != nil {
            fmt.Println("WebSocket write recovered from panic", r)
        }
    }()

    message, _ := getMarshalledSocketMessage(string(data), DATA)

    err := sw.clientWebSocket.WriteMessage(websocket.TextMessage, message)
    if err != nil {
        log.Warnf("Failed to write data to web socket: %s", err)
        return 0, err
    }
    return len(data), nil
}

func sendManifest(manifest Applications, clientWebSocket *websocket.Conn) error {

    manifestBytes, err := json.Marshal(manifest)
    if err != nil {
        return err
    }
    manifestJSON := string(manifestBytes)
    message, _ := getMarshalledSocketMessage(manifestJSON, MANIFEST)

    clientWebSocket.WriteMessage(websocket.TextMessage, message)
    return nil
}

func sendErrorMessage(clientWebSocket *websocket.Conn, err error, errorType MessageType) {
    closingMessage, _ := getMarshalledSocketMessage(fmt.Sprintf("Failed due to %s!", err), errorType)
    if err := clientWebSocket.WriteMessage(websocket.TextMessage, closingMessage); err != nil {
        log.Warnf("Failed to write error message to web socket: %s", err)
    }
}

func sendEvent(clientWebSocket *websocket.Conn, event MessageType) {
    msg, _ := getMarshalledSocketMessage("", event)
    if err := clientWebSocket.WriteMessage(websocket.TextMessage, msg); err != nil {
        log.Warnf("Failed to write message to web socket: %s", err)
    }
}

// SendEvent sends a message over the web socket
func (cfAppPush *CFAppPush) SendEvent(clientWebSocket *websocket.Conn, event MessageType, data string) {
    msg, _ := getMarshalledSocketMessage(data, event)
    if err := clientWebSocket.WriteMessage(websocket.TextMessage, msg); err != nil {
        log.Warnf("Failed to write message to web socket: %s", err)
    }
}