cloudfoundry/stratos

View on GitHub
src/jetstream/plugins/analysis/run.go

Summary

Maintainability
A
3 hrs
Test Coverage
package analysis

import (
    "bytes"
    "encoding/json"
    "errors"
    "fmt"
    "io/ioutil"
    "mime/multipart"
    "net/http"
    "net/textproto"
    "time"

    "github.com/cloudfoundry-incubator/stratos/src/jetstream/plugins/analysis/store"

    "github.com/labstack/echo/v4"
    uuid "github.com/satori/go.uuid"
    log "github.com/sirupsen/logrus"
)

type popeyeConfig struct {
    Namespace string `json:"namespace"`
    App       string `json:"app"`
}

type KubeConfigExporter interface {
    GetKubeConfigForEndpointUser(endpointID, userID string) (string, error)
}

const idHeaderName = "X-Stratos-Analaysis-ID"

func (c *Analysis) runReport(ec echo.Context) error {
    log.Debug("runReport")

    analyzer := ec.Param("analyzer")
    endpointID := ec.Param("endpoint")
    userID := ec.Get("user_id").(string)

    // Look up the endpoint for the user
    var p = c.portalProxy
    endpoint, err := p.GetCNSIRecord(endpointID)
    if err != nil {
        return errors.New("Could not get endpoint information")
    }

    report := store.AnalysisRecord{
        ID:           uuid.NewV4().String(),
        EndpointID:   endpointID,
        EndpointType: endpoint.CNSIType,
        UserID:       userID,
        Path:         "",
        Created:      time.Now(),
        Read:         false,
        Duration:     0,
        Status:       "pending",
        Result:       "",
    }

    // Create a record in the reports datastore
    dbStore, err := store.NewAnalysisDBStore(p.GetDatabaseConnection())
    if err != nil {
        return err
    }

    report.Name = fmt.Sprintf("Analysis report %s", analyzer)
    dbStore.Save((report))

    err = c.doRunReport(ec, analyzer, endpointID, userID, dbStore, &report)
    if err != nil {
        report.Status = "error"
        report.Result = err.Error()
        dbStore.UpdateReport(userID, &report)
    }

    return err

}

func (c *Analysis) doRunReport(ec echo.Context, analyzer, endpointID, userID string, dbStore store.AnalysisStore, report *store.AnalysisRecord) error {

    // Get Kube Config
    k8s := c.portalProxy.GetPlugin("kubernetes")
    if k8s == nil {
        return errors.New("Could not find Kubernetes plugin")
    }

    k8sConfig, ok := k8s.(KubeConfigExporter)
    if !ok {
        return errors.New("Could not find Kubernetes plugin interface")
    }

    config, err := k8sConfig.GetKubeConfigForEndpointUser(endpointID, userID)
    if err != nil {
        return errors.New("Could not get Kube Config for the endpoint")
    }

    id := fmt.Sprintf("%s/%s/%s", userID, endpointID, report.ID)

    // Create a multi-part form to send to the analyzer container
    body := new(bytes.Buffer)
    writer := multipart.NewWriter(body)

    // Add kube config
    metadataHeader := textproto.MIMEHeader{}
    metadataHeader.Set("Content-Type", "application/yaml")
    metadataHeader.Set("Content-ID", "kubeconfig")
    part, _ := writer.CreatePart(metadataHeader)
    part.Write([]byte(config))

    requestBody := make([]byte, 0)

    // Read body
    defer ec.Request().Body.Close()
    if b, err := ioutil.ReadAll((ec.Request().Body)); err == nil {
        requestBody = b
    }

    // Content that was posted to us
    postHeader := textproto.MIMEHeader{}
    postHeader.Set("Content-Type", "application/json")
    postHeader.Set("Content-ID", "body")
    part, _ = writer.CreatePart(postHeader)
    part.Write(requestBody)

    // Report config
    reportHeader := textproto.MIMEHeader{}
    reportHeader.Set("Content-Type", "application/json")
    reportHeader.Set("Content-ID", "job")
    part, _ = writer.CreatePart(reportHeader)
    job, err := json.Marshal(report)
    if err != nil {
        return errors.New("Could not serialize job")
    }
    part.Write(job)
    writer.Close()

    // Post this to the Analyzer API
    contentType := fmt.Sprintf("multipart/form-data; boundary=%s", writer.Boundary())
    uploadURL := fmt.Sprintf("%s/api/v1/run/%s", c.analysisServer, analyzer)
    r, _ := http.NewRequest(http.MethodPost, uploadURL, bytes.NewReader(body.Bytes()))
    r.Header.Set("Content-Type", contentType)
    r.Header.Set(idHeaderName, id)
    client := &http.Client{Timeout: 180 * time.Second}
    rsp, err := client.Do(r)
    if err != nil {
        return errors.New("Analysis job failed - could not contact Analysis Server")
    }

    if rsp.StatusCode != http.StatusOK {
        log.Debugf("Request failed with response code: %d", rsp.StatusCode)
        return fmt.Errorf("Analysis job failed with response code: %d", rsp.StatusCode)
    }

    // Job submitted okay
    // Updated job is in the response

    defer rsp.Body.Close()
    response, err := ioutil.ReadAll(rsp.Body)
    if err != nil {
        return errors.New("Could not read response")
    }

    updatedJob := store.AnalysisRecord{}
    if err = json.Unmarshal(response, &updatedJob); err != nil {
        return errors.New("Could not read response - could not deserialize response")
    }

    report.Duration = updatedJob.Duration
    report.Status = updatedJob.Status
    report.Name = updatedJob.Name
    report.Format = updatedJob.Format
    report.Type = updatedJob.Type
    report.Path = updatedJob.Path

    log.Debug("OK => Job submitted okay")
    log.Debug("=======================================================")
    log.Debugf("%+v", report)
    log.Debug("=======================================================")

    err = dbStore.UpdateReport(userID, report)
    if err != nil {
        return fmt.Errorf("Could not save report %s", err)
    }

    log.Debug("All done - job saved")

    return ec.JSON(200, report)
}