cyberark/secretless-broker

View on GitHub
internal/proxyservice/proxy_service.go

Summary

Maintainability
B
4 hrs
Test Coverage
C
75%
package proxyservice

import (
    "fmt"
    "net"
    "os"
    "strings"

    "github.com/go-ozzo/ozzo-validation"

    "github.com/cyberark/secretless-broker/internal"
    "github.com/cyberark/secretless-broker/internal/plugin"
    httpproxy "github.com/cyberark/secretless-broker/internal/plugin/connectors/http"
    sshproxy "github.com/cyberark/secretless-broker/internal/plugin/connectors/ssh"
    sshagentproxy "github.com/cyberark/secretless-broker/internal/plugin/connectors/sshagent"
    tcpproxy "github.com/cyberark/secretless-broker/internal/plugin/connectors/tcp"
    v1 "github.com/cyberark/secretless-broker/internal/plugin/v1"
    "github.com/cyberark/secretless-broker/internal/providers"
    v2 "github.com/cyberark/secretless-broker/pkg/secretless/config/v2"
    logapi "github.com/cyberark/secretless-broker/pkg/secretless/log"
    plugin2 "github.com/cyberark/secretless-broker/pkg/secretless/plugin"
    "github.com/cyberark/secretless-broker/pkg/secretless/plugin/connector"
    "github.com/cyberark/secretless-broker/pkg/secretless/plugin/connector/http"
    "github.com/cyberark/secretless-broker/pkg/secretless/plugin/connector/tcp"
)

// TODO: move to impl package
type proxyServices struct {
    availPlugins    plugin2.AvailablePlugins
    config          v2.Config
    configsByType   v2.ConfigsByType
    eventNotifier   v1.EventNotifier
    logger          logapi.Logger
    resolver        v1.Resolver
    runningServices []internal.Service
}

// Start starts all proxy services
func (s *proxyServices) Start() error {
    for _, svc := range s.servicesToStart() {
        err := svc.Start()
        if err != nil {
            // TODO: Upgrade our logger so we can use Fatalf here
            s.logger.Panicf("could not start proxy service: %s", err)
        }
        s.runningServices = append(s.runningServices, svc)
    }
    return nil
}

// Stop stops all proxy services
func (s *proxyServices) Stop() error {
    var stopFailures []string

    s.logger.Infoln("Stopping all services...")
    for _, svc := range s.runningServices {
        err := svc.Stop()
        if err != nil {
            stopFailures = append(stopFailures, err.Error())
        }
    }

    if len(stopFailures) > 0 {
        return fmt.Errorf(
            "these errors occured while stopping all services: %s",
            strings.Join(stopFailures, "; "),
        )
    }
    return nil
}

func (s *proxyServices) servicesToStart() (servicesToStart []internal.Service) {
    httpPlugins := s.availPlugins.HTTPPlugins()
    tcpPlugins := s.availPlugins.TCPPlugins()

    errors := validation.Errors{}

    // TCP Plugins
    for _, cfg := range s.configsByType.TCP {
        // Validation will have already happened
        tcpSvc, err := s.createTCPService(cfg, tcpPlugins[cfg.Connector])
        if err != nil {
            // TODO: Add Fatalf to our logger and use that
            errors[cfg.Name] = fmt.Errorf(
                "unable to create TCP service: %s",
                err,
            )
            continue
        }
        servicesToStart = append(servicesToStart, tcpSvc)
    }

    // HTTP Plugins
    for _, httpSvcConfig := range s.configsByType.HTTP {
        // Validation will have already happened
        httpSvc, err := s.createHTTPService(httpSvcConfig, httpPlugins)
        if err != nil {
            // TODO: Add Fatalf to our logger and use that
            errors[httpSvcConfig.Name()] = fmt.Errorf(
                "unable to create HTTP proxy service on '%s': %s",
                httpSvcConfig.SharedListenOn,
                err,
            )
            continue
        }
        servicesToStart = append(servicesToStart, httpSvc)
    }

    // SSH Plugins
    for _, cfg := range s.configsByType.SSH {
        // Validation will have already happened
        sshSvc, err := s.createSSHService(cfg)
        if err != nil {
            errors[cfg.Name] = fmt.Errorf(
                "unable to create SSH service: %s",
                err,
            )
            continue
        }
        servicesToStart = append(servicesToStart, sshSvc)
    }

    // SSH Agent Plugins
    for _, cfg := range s.configsByType.SSHAgent {
        // Validation will have already happened
        sshAgentSvc, err := s.createSSHAgentService(cfg)
        if err != nil {
            errors[cfg.Name] = fmt.Errorf(
                "unable to create SSH Agent service: %s",
                err,
            )
            continue
        }
        servicesToStart = append(servicesToStart, sshAgentSvc)
    }

    // If there are errors, we need to show them. This method exits the
    // program if any errors are detected.
    handleErrors(errors, s.logger)

    return servicesToStart
}

func handleErrors(errors validation.Errors, logger logapi.Logger) {
    if len(errors) > 0 {
        for cfgName, err := range errors {
            logger.Errorf("Fatal error in '%s': %s", cfgName, err)
        }

        os.Exit(1)
    }
}

func (s *proxyServices) createHTTPService(
    httpSvcCfg v2.HTTPServiceConfig,
    plugins map[string]http.Plugin,
) (internal.Service, error) {

    // Create the listener
    // TODO: If we want to unit test this, we'll need to inject net.Listen

    netAddr := httpSvcCfg.SharedListenOn
    listener, err := net.Listen(netAddr.Network(), netAddr.Address())
    if err != nil {
        s.logger.Errorf("listener creation failed: %s", httpSvcCfg.SharedListenOn)
        return nil, err
    }

    s.logger.Infof("Starting HTTP listener on %s...", netAddr.Address())

    // Create the subservices

    var subservices []httpproxy.Subservice
    for _, subCfg := range httpSvcCfg.SubserviceConfigs {
        // "cur" naming prefix needed to avoid package name collision
        curPlugin := plugins[subCfg.Connector]
        connResources := s.connectorResources(subCfg)
        curConnector := curPlugin.NewConnector(connResources)
        credsRetriever := s.credsRetriever(subCfg.Credentials)

        // Get the http traffic patterns to match from the connector config.
        httpCfg, err := v2.NewHTTPConfig(subCfg.ConnectorConfig)
        if err != nil {
            s.logger.Errorf("configuration parsing of '%s' failed: %s", subCfg.Connector, err)
            return nil, err
        }

        s.logger.Infof("Starting HTTP subservice %s...", subCfg.Connector)

        subservices = append(subservices, httpproxy.Subservice{
            ConnectorID:              subCfg.Connector, // TODO: Rename connectorID
            Connector:                curConnector,
            RetrieveCredentials:      credsRetriever,
            AuthenticateURLsMatching: httpCfg.AuthenticateURLsMatching,
        })
    }

    // Create the logger
    // HTTP proxy service gets its own logger (subservices have own loggers)

    proxyName := httpSvcCfg.Name()
    svcLogger := s.loggerFor(proxyName)

    // TODO: NewHTTPProxyFunc needs to be injected
    newSvc, err := httpproxy.NewProxyService(subservices, listener, svcLogger)
    if err != nil {
        s.logger.Errorf("could not create http proxy service '%s'", proxyName)
        return nil, err
    }
    return newSvc, nil
}

func (s *proxyServices) createSSHService(
    config v2.Service,
) (internal.Service, error) {

    // TODO: Add validation somewhere about overlapping listenOns
    // TODO: v2.NetworkAddress is a value type.  It needs to be moved to its
    //   own package with no deps (stdlib deps are ok).
    netAddr := config.ListenOn
    listener, err := net.Listen(netAddr.Network(), netAddr.Address())
    if err != nil {
        return nil, err
    }

    s.logger.Infof("Starting SSH listener on %s...", netAddr.Address())

    connResources := s.connectorResources(config)
    credsRetriever := s.credsRetriever(config.Credentials)

    // TODO: NewProxyService needs to be injected
    newSvc, err := sshproxy.NewProxyService(
        listener,
        connResources.Logger(),
        credsRetriever,
    )

    if err != nil {
        s.logger.Errorf("could not create proxy service '%s'", config.Name)
        return nil, err
    }

    return newSvc, nil
}

func (s *proxyServices) createSSHAgentService(
    config v2.Service,
) (internal.Service, error) {

    // TODO: Add validation somewhere about overlapping listenOns
    // TODO: v2.NetworkAddress is a value type.  It needs to be moved to its
    //   own package with no deps (stdlib deps are ok).
    netAddr := config.ListenOn
    listener, err := net.Listen(netAddr.Network(), netAddr.Address())
    if err != nil {
        return nil, err
    }

    s.logger.Infof("Starting SSH Agent listener on %s...", netAddr.Address())

    connResources := s.connectorResources(config)
    credsRetriever := s.credsRetriever(config.Credentials)

    // TODO: NewProxyService needs to be injected
    newSvc, err := sshagentproxy.NewProxyService(
        listener,
        connResources.Logger(),
        credsRetriever,
    )

    if err != nil {
        s.logger.Errorf("could not create proxy service '%s'", config.Name)
        return nil, err
    }

    return newSvc, nil
}

func (s *proxyServices) createTCPService(
    config v2.Service,
    pluginInst tcp.Plugin,
) (internal.Service, error) {

    // TODO: Add validation somewhere about overlapping listenOns
    // TODO: v2.NetworkAddress is a value type.  It needs to be moved to its
    //   own package with no deps (stdlib deps are ok).
    netAddr := config.ListenOn
    listener, err := net.Listen(netAddr.Network(), netAddr.Address())
    if err != nil {
        return nil, err
    }

    s.logger.Infof("Starting TCP listener on %s...", netAddr.Address())

    connResources := s.connectorResources(config)
    svcConnector := pluginInst.NewConnector(connResources)
    credsRetriever := s.credsRetriever(config.Credentials)

    // TODO: NewTCPProxyFunc needs to be injected
    newSvc, err := tcpproxy.NewProxyService(
        svcConnector,
        listener,
        connResources.Logger(),
        credsRetriever,
    )

    if err != nil {
        s.logger.Errorf("could not create proxy service '%s'", config.Name)
        return nil, err
    }

    return newSvc, nil
}

func (s *proxyServices) connectorResources(svc v2.Service) connector.Resources {
    svcLogger := s.loggerFor(svc.Name)
    return connector.NewResources(svc.ConnectorConfig, svcLogger)
}

func (s *proxyServices) loggerFor(name string) logapi.Logger {
    return s.logger.CopyWith(name, s.logger.DebugEnabled())
}

func (s *proxyServices) credsRetriever(
    creds []*v2.Credential,
) internal.CredentialsRetriever {
    return func() (map[string][]byte, error) {
        return s.resolver.Resolve(creds)
    }
}

// NewProxyServices returns a new ProxyServices instance.
// TODO: Reconsider the Resolver design so it's exactly what we need for the new code.
// TODO: v1.Provider options should be an interface
func NewProxyServices(
    cfg v2.Config,
    availPlugins plugin2.AvailablePlugins,
    logger logapi.Logger,
    evtNotifier v1.EventNotifier,
) internal.Service {

    // Setup our resolver
    providerFactories := make(map[string]func(v1.ProviderOptions) (v1.Provider, error))

    for providerID, providerFactory := range providers.ProviderFactories {
        providerFactories[providerID] = providerFactory
    }

    resolver := plugin.NewResolver(providerFactories, nil, nil)

    // Create the proxyServices object
    services := proxyServices{
        availPlugins:  availPlugins,
        config:        cfg,
        eventNotifier: evtNotifier,
        logger:        logger,
        resolver:      resolver,
    }

    // TODO: v2.NewConfigsByType should be an interface, so we can remove this
    //   hardcoded dep on an impl type of v2.  All deps need to be injected.
    services.configsByType = v2.NewConfigsByType(cfg.Services, availPlugins)

    return &services
}