cloudfoundry-incubator/stratos

View on GitHub
src/jetstream/plugins/monocular/sync.go

Summary

Maintainability
A
0 mins
Test Coverage
package monocular

import (
    "encoding/json"

    "github.com/cloudfoundry-incubator/stratos/src/jetstream/repository/interfaces"
    "github.com/labstack/echo/v4"
    log "github.com/sirupsen/logrus"
)

type SyncJob struct {
    Action   interfaces.EndpointAction
    Endpoint *interfaces.CNSIRecord
}

type SyncMetadata struct {
    Status string `json:"status"`
    Busy   bool   `json:"busy"`
}

// Sync Channel
var syncChan = make(chan SyncJob, 100)

// InitSync starts the go routine that will sync repositories in the background
func (m *Monocular) InitSync() {
    go m.processSyncRequests()
}

// syncRepo is endpoint to force a re-sync of a given Helm Repository
func (m *Monocular) syncRepo(c echo.Context) error {
    log.Debug("syncRepo")

    // Lookup repository by GUID
    var p = m.portalProxy
    guid := c.Param("guid")
    endpoint, err := p.GetCNSIRecord(guid)
    if err != nil {
        return interfaces.NewJetstreamErrorf("Could not find Helm Repository: %v+", err)
    }

    m.Sync(interfaces.EndpointRegisterAction, &endpoint)

    response := "OK"
    return c.JSON(200, response)
}

// Sync schedules a sync action for the given endpoint
func (m *Monocular) Sync(action interfaces.EndpointAction, endpoint *interfaces.CNSIRecord) {
    // Delete and Update are Synchronously handled
    // Add (Sync) is handled Asynchronously via a SyncJob
    if action == 0 {
        // If the sync job is busy, it won't update the status of this new job until it completes the previous one
        // Set the status to indicate it is pending
        metadata := SyncMetadata{
            Status: "Pending",
            Busy:   true,
        }
        m.portalProxy.UpdateEndpointMetadata(endpoint.GUID, marshalSyncMetadata(metadata))

        // Add the job to the queue to be processed
        job := SyncJob{
            Action:   action,
            Endpoint: endpoint,
        }

        // Schedula a sync job
        syncChan <- job
    } else if action == 1 {
        log.Debugf("Deleting Helm Repository: %s", endpoint.Name)
        m.deleteChartStoreForEndpoint(endpoint.GUID)
    } else if action == 2 {
        log.Debugf("Helm Repository has been updated - renaming the Helm repository field in the associated charts")
        if err := m.ChartStore.RenameEndpoint(endpoint.GUID, endpoint.Name); err != nil {
            log.Errorf("An error occurred renameing the Helm Repository for endpoint %s to %s - %+v", endpoint.GUID, endpoint.Name, err)
        }
    }
}

func (m *Monocular) deleteChartStoreForEndpoint(id string) {
    // Delete the records from the database
    if err := m.ChartStore.DeleteForEndpoint(id); err != nil {
        log.Warnf("Unable to delete Helm Charts for endpoint %s - %+v", id, err)
    }

    // Delete files from the cache
    if err := m.deleteCacheForEndpoint(id); err != nil {
        log.Warnf("Unable to delete Helm Chart Cache for endpoint %s - %+v", err)
    }
}

func (m *Monocular) processSyncRequests() {
    log.Info("Helm Repository Sync init")
    for job := range syncChan {
        log.Debugf("Processing Helm Repository Sync Job: %s", job.Endpoint.Name)
        metadata := SyncMetadata{
            Status: "Synchronizing",
            Busy:   true,
        }
        m.portalProxy.UpdateEndpointMetadata(job.Endpoint.GUID, marshalSyncMetadata(metadata))

        chartIndexURL := job.Endpoint.APIEndpoint.String()
        metadata.Status = "Synchronized"
        metadata.Busy = false
        err := m.syncHelmRepository(job.Endpoint.GUID, job.Endpoint.Name, chartIndexURL)
        if err != nil {
            log.Warn("Helm Repository sync repository failed for repository %s - %v", job.Endpoint.GUID, err)
            metadata.Status = "Sync Failed"
        }

        // Update the job status
        m.updateMetadata(job.Endpoint.GUID, metadata)
    }
    log.Debug("processSyncRequests finished")
}

func (m *Monocular) updateMetadata(endpoint string, metadata SyncMetadata) {
    err := m.portalProxy.UpdateEndpointMetadata(endpoint, marshalSyncMetadata(metadata))
    if err != nil {
        log.Errorf("Failed to update endpoint metadata: %v+", err)
    }
}

func marshalSyncMetadata(metadata SyncMetadata) string {
    jsonString, err := json.Marshal(metadata)
    if err != nil {
        return ""
    }
    return string(jsonString)
}