cloudfoundry-community/bosh-cloudstack-cpi

View on GitHub
go_agent/src/bosh/jobsupervisor/monit_job_supervisor.go

Summary

Maintainability
B
4 hrs
Test Coverage
package jobsupervisor

import (
    "fmt"
    "path/filepath"
    "time"

    "github.com/pivotal/go-smtpd/smtpd"

    boshalert "bosh/agent/alert"
    bosherr "bosh/errors"
    boshmonit "bosh/jobsupervisor/monit"
    boshlog "bosh/logger"
    boshdir "bosh/settings/directories"
    boshsys "bosh/system"
)

const monitJobSupervisorLogTag = "monitJobSupervisor"

type monitJobSupervisor struct {
    fs          boshsys.FileSystem
    runner      boshsys.CmdRunner
    client      boshmonit.Client
    logger      boshlog.Logger
    dirProvider boshdir.DirectoriesProvider

    jobFailuresServerPort int

    reloadOptions MonitReloadOptions
}

type MonitReloadOptions struct {
    // Number of times `monit reload` will be executed
    MaxTries int

    // Number of times monit incarnation will be checked
    // for difference after executing `monit reload`
    MaxCheckTries int

    // Length of time between checking for incarnation difference
    DelayBetweenCheckTries time.Duration
}

func NewMonitJobSupervisor(
    fs boshsys.FileSystem,
    runner boshsys.CmdRunner,
    client boshmonit.Client,
    logger boshlog.Logger,
    dirProvider boshdir.DirectoriesProvider,
    jobFailuresServerPort int,
    reloadOptions MonitReloadOptions,
) (m monitJobSupervisor) {
    return monitJobSupervisor{
        fs:          fs,
        runner:      runner,
        client:      client,
        logger:      logger,
        dirProvider: dirProvider,

        jobFailuresServerPort: jobFailuresServerPort,

        reloadOptions: reloadOptions,
    }
}

func (m monitJobSupervisor) Reload() error {
    var currentIncarnation int

    oldIncarnation, err := m.getIncarnation()
    if err != nil {
        return bosherr.WrapError(err, "Getting monit incarnation")
    }

    // Monit process could be started in the same second as `monit reload` runs
    // so it's ideal for MaxCheckTries * DelayBetweenCheckTries to be greater than 1 sec
    // because monit incarnation id is just a timestamp with 1 sec resolution.
    for reloadI := 0; reloadI < m.reloadOptions.MaxTries; reloadI++ {
        // Exit code or output cannot be trusted
        _, _, _, err := m.runner.RunCommand("monit", "reload")
        if err != nil {
            m.logger.Error(monitJobSupervisorLogTag, "Failed to reload monit %s", err.Error())
        }

        for checkI := 0; checkI < m.reloadOptions.MaxCheckTries; checkI++ {
            currentIncarnation, err = m.getIncarnation()
            if err != nil {
                return bosherr.WrapError(err, "Getting monit incarnation")
            }

            // Incarnation id can decrease or increase because
            // monit uses time(...) and system time can be changed
            if oldIncarnation != currentIncarnation {
                return nil
            }

            m.logger.Debug(
                monitJobSupervisorLogTag,
                "Waiting for monit to reload: before=%d after=%d",
                oldIncarnation, currentIncarnation,
            )

            time.Sleep(m.reloadOptions.DelayBetweenCheckTries)
        }
    }

    return bosherr.New(
        "Failed to reload monit: before=%d after=%d",
        oldIncarnation, currentIncarnation,
    )
}

func (m monitJobSupervisor) Start() error {
    services, err := m.client.ServicesInGroup("vcap")
    if err != nil {
        return bosherr.WrapError(err, "Getting vcap services")
    }

    for _, service := range services {
        err = m.client.StartService(service)
        if err != nil {
            return bosherr.WrapError(err, "Starting service %s", service)
        }
        m.logger.Debug(monitJobSupervisorLogTag, "Starting service %s", service)
    }

    return nil
}

func (m monitJobSupervisor) Stop() error {
    services, err := m.client.ServicesInGroup("vcap")
    if err != nil {
        return bosherr.WrapError(err, "Getting vcap services")
    }

    for _, service := range services {
        err = m.client.StopService(service)
        if err != nil {
            return bosherr.WrapError(err, "Stopping service %s", service)
        }
        m.logger.Debug(monitJobSupervisorLogTag, "Stopping service %s", service)
    }

    return nil
}

func (m monitJobSupervisor) Unmonitor() error {
    services, err := m.client.ServicesInGroup("vcap")
    if err != nil {
        return bosherr.WrapError(err, "Getting vcap services")
    }

    for _, service := range services {
        err := m.client.UnmonitorService(service)
        if err != nil {
            return bosherr.WrapError(err, "Unmonitoring service %s", service)
        }
        m.logger.Debug(monitJobSupervisorLogTag, "Unmonitoring service %s", service)
    }

    return nil
}

func (m monitJobSupervisor) Status() (status string) {
    status = "running"
    monitStatus, err := m.client.Status()
    if err != nil {
        status = "unknown"
        return
    }

    for _, service := range monitStatus.ServicesInGroup("vcap") {
        if service.Status == "starting" {
            return "starting"
        }
        if !service.Monitored || service.Status != "running" {
            status = "failing"
        }
    }

    return
}

func (m monitJobSupervisor) getIncarnation() (int, error) {
    monitStatus, err := m.client.Status()
    if err != nil {
        return -1, err
    }

    return monitStatus.GetIncarnation()
}

func (m monitJobSupervisor) AddJob(jobName string, jobIndex int, configPath string) error {
    targetFilename := fmt.Sprintf("%04d_%s.monitrc", jobIndex, jobName)
    targetConfigPath := filepath.Join(m.dirProvider.MonitJobsDir(), targetFilename)

    configContent, err := m.fs.ReadFile(configPath)
    if err != nil {
        return bosherr.WrapError(err, "Reading job config from file")
    }

    err = m.fs.WriteFile(targetConfigPath, configContent)
    if err != nil {
        return bosherr.WrapError(err, "Writing to job config file")
    }

    return nil
}

func (m monitJobSupervisor) RemoveAllJobs() error {
    return m.fs.RemoveAll(m.dirProvider.MonitJobsDir())
}

func (m monitJobSupervisor) MonitorJobFailures(handler JobFailureHandler) (err error) {
    alertHandler := func(smtpd.Connection, smtpd.MailAddress) (env smtpd.Envelope, err error) {
        env = &alertEnvelope{
            new(smtpd.BasicEnvelope),
            handler,
            new(boshalert.MonitAlert),
        }
        return
    }

    serv := &smtpd.Server{
        Addr:      fmt.Sprintf(":%d", m.jobFailuresServerPort),
        OnNewMail: alertHandler,
    }

    err = serv.ListenAndServe()
    if err != nil {
        err = bosherr.WrapError(err, "Listen for SMTP")
    }
    return
}