manager/controlapi/cluster.go
package controlapi
import (
"context"
"strings"
"time"
gogotypes "github.com/gogo/protobuf/types"
"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/ca"
"github.com/moby/swarmkit/v2/log"
"github.com/moby/swarmkit/v2/manager/encryption"
"github.com/moby/swarmkit/v2/manager/state/store"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
// expiredCertGrace is the amount of time to keep a node in the
// blacklist beyond its certificate expiration timestamp.
expiredCertGrace = 24 * time.Hour * 7
// inbuilt default subnet size
inbuiltSubnetSize = 24
// VXLAN default port
defaultVXLANPort = 4789
)
var (
// inbuilt default address pool
inbuiltDefaultAddressPool = []string{"10.0.0.0/8"}
)
func validateClusterSpec(spec *api.ClusterSpec) error {
if spec == nil {
return status.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
}
// Validate that expiry time being provided is valid, and over our minimum
if spec.CAConfig.NodeCertExpiry != nil {
expiry, err := gogotypes.DurationFromProto(spec.CAConfig.NodeCertExpiry)
if err != nil {
return status.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
}
if expiry < ca.MinNodeCertExpiration {
return status.Errorf(codes.InvalidArgument, "minimum certificate expiry time is: %s", ca.MinNodeCertExpiration)
}
}
// Validate that AcceptancePolicies only include Secrets that are bcrypted
// TODO(diogo): Add a global list of acceptance algorithms. We only support bcrypt for now.
if len(spec.AcceptancePolicy.Policies) > 0 {
for _, policy := range spec.AcceptancePolicy.Policies {
if policy.Secret != nil && strings.ToLower(policy.Secret.Alg) != "bcrypt" {
return status.Errorf(codes.InvalidArgument, "hashing algorithm is not supported: %s", policy.Secret.Alg)
}
}
}
// Validate that heartbeatPeriod time being provided is valid
if spec.Dispatcher.HeartbeatPeriod != nil {
heartbeatPeriod, err := gogotypes.DurationFromProto(spec.Dispatcher.HeartbeatPeriod)
if err != nil {
return status.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
}
if heartbeatPeriod < 0 {
return status.Errorf(codes.InvalidArgument, "heartbeat time period cannot be a negative duration")
}
}
if spec.Annotations.Name != store.DefaultClusterName {
return status.Errorf(codes.InvalidArgument, "modification of cluster name is not allowed")
}
return nil
}
// GetCluster returns a Cluster given a ClusterID.
// - Returns `InvalidArgument` if ClusterID is not provided.
// - Returns `NotFound` if the Cluster is not found.
func (s *Server) GetCluster(ctx context.Context, request *api.GetClusterRequest) (*api.GetClusterResponse, error) {
if request.ClusterID == "" {
return nil, status.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
}
var cluster *api.Cluster
s.store.View(func(tx store.ReadTx) {
cluster = store.GetCluster(tx, request.ClusterID)
})
if cluster == nil {
return nil, status.Errorf(codes.NotFound, "cluster %s not found", request.ClusterID)
}
redactedClusters := redactClusters([]*api.Cluster{cluster})
// WARN: we should never return cluster here. We need to redact the private fields first.
return &api.GetClusterResponse{
Cluster: redactedClusters[0],
}, nil
}
// UpdateCluster updates a Cluster referenced by ClusterID with the given ClusterSpec.
// - Returns `NotFound` if the Cluster is not found.
// - Returns `InvalidArgument` if the ClusterSpec is malformed.
// - Returns `Unimplemented` if the ClusterSpec references unimplemented features.
// - Returns an error if the update fails.
func (s *Server) UpdateCluster(ctx context.Context, request *api.UpdateClusterRequest) (*api.UpdateClusterResponse, error) {
if request.ClusterID == "" || request.ClusterVersion == nil {
return nil, status.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
}
if err := validateClusterSpec(request.Spec); err != nil {
return nil, err
}
var cluster *api.Cluster
err := s.store.Update(func(tx store.Tx) error {
cluster = store.GetCluster(tx, request.ClusterID)
if cluster == nil {
return status.Errorf(codes.NotFound, "cluster %s not found", request.ClusterID)
}
// This ensures that we have the current rootCA with which to generate tokens (expiration doesn't matter
// for generating the tokens)
rootCA, err := ca.RootCAFromAPI(&cluster.RootCA, ca.DefaultNodeCertExpiration)
if err != nil {
log.G(ctx).WithField(
"method", "(*controlapi.Server).UpdateCluster").WithError(err).Error("invalid cluster root CA")
return status.Errorf(codes.Internal, "error loading cluster rootCA for update")
}
cluster.Meta.Version = *request.ClusterVersion
cluster.Spec = *request.Spec.Copy()
expireBlacklistedCerts(cluster)
if request.Rotation.WorkerJoinToken {
cluster.RootCA.JoinTokens.Worker = ca.GenerateJoinToken(&rootCA, cluster.FIPS)
}
if request.Rotation.ManagerJoinToken {
cluster.RootCA.JoinTokens.Manager = ca.GenerateJoinToken(&rootCA, cluster.FIPS)
}
updatedRootCA, err := validateCAConfig(ctx, s.securityConfig, cluster)
if err != nil {
return err
}
cluster.RootCA = *updatedRootCA
var unlockKeys []*api.EncryptionKey
var managerKey *api.EncryptionKey
for _, eKey := range cluster.UnlockKeys {
if eKey.Subsystem == ca.ManagerRole {
if !cluster.Spec.EncryptionConfig.AutoLockManagers {
continue
}
managerKey = eKey
}
unlockKeys = append(unlockKeys, eKey)
}
switch {
case !cluster.Spec.EncryptionConfig.AutoLockManagers:
break
case managerKey == nil:
unlockKeys = append(unlockKeys, &api.EncryptionKey{
Subsystem: ca.ManagerRole,
Key: encryption.GenerateSecretKey(),
})
case request.Rotation.ManagerUnlockKey:
managerKey.Key = encryption.GenerateSecretKey()
}
cluster.UnlockKeys = unlockKeys
return store.UpdateCluster(tx, cluster)
})
if err != nil {
return nil, err
}
redactedClusters := redactClusters([]*api.Cluster{cluster})
// WARN: we should never return cluster here. We need to redact the private fields first.
return &api.UpdateClusterResponse{
Cluster: redactedClusters[0],
}, nil
}
func filterClusters(candidates []*api.Cluster, filters ...func(*api.Cluster) bool) []*api.Cluster {
result := []*api.Cluster{}
for _, c := range candidates {
match := true
for _, f := range filters {
if !f(c) {
match = false
break
}
}
if match {
result = append(result, c)
}
}
return result
}
// ListClusters returns a list of all clusters.
func (s *Server) ListClusters(ctx context.Context, request *api.ListClustersRequest) (*api.ListClustersResponse, error) {
var (
clusters []*api.Cluster
err error
)
s.store.View(func(tx store.ReadTx) {
switch {
case request.Filters != nil && len(request.Filters.Names) > 0:
clusters, err = store.FindClusters(tx, buildFilters(store.ByName, request.Filters.Names))
case request.Filters != nil && len(request.Filters.NamePrefixes) > 0:
clusters, err = store.FindClusters(tx, buildFilters(store.ByNamePrefix, request.Filters.NamePrefixes))
case request.Filters != nil && len(request.Filters.IDPrefixes) > 0:
clusters, err = store.FindClusters(tx, buildFilters(store.ByIDPrefix, request.Filters.IDPrefixes))
default:
clusters, err = store.FindClusters(tx, store.All)
}
})
if err != nil {
return nil, err
}
if request.Filters != nil {
clusters = filterClusters(clusters,
func(e *api.Cluster) bool {
return filterContains(e.Spec.Annotations.Name, request.Filters.Names)
},
func(e *api.Cluster) bool {
return filterContainsPrefix(e.Spec.Annotations.Name, request.Filters.NamePrefixes)
},
func(e *api.Cluster) bool {
return filterContainsPrefix(e.ID, request.Filters.IDPrefixes)
},
func(e *api.Cluster) bool {
return filterMatchLabels(e.Spec.Annotations.Labels, request.Filters.Labels)
},
)
}
// WARN: we should never return cluster here. We need to redact the private fields first.
return &api.ListClustersResponse{
Clusters: redactClusters(clusters),
}, nil
}
// redactClusters is a method that enforces a whitelist of fields that are ok to be
// returned in the Cluster object. It should filter out all sensitive information.
func redactClusters(clusters []*api.Cluster) []*api.Cluster {
var redactedClusters []*api.Cluster
// Only add public fields to the new clusters
for _, cluster := range clusters {
// Copy all the mandatory fields
// Do not copy secret keys
redactedSpec := cluster.Spec.Copy()
redactedSpec.CAConfig.SigningCAKey = nil
// the cert is not a secret, but if API users get the cluster spec and then update,
// then because the cert is included but not the key, the user can get update errors
// or unintended consequences (such as telling swarm to forget about the key so long
// as there is a corresponding external CA)
redactedSpec.CAConfig.SigningCACert = nil
redactedRootCA := cluster.RootCA.Copy()
redactedRootCA.CAKey = nil
if r := redactedRootCA.RootRotation; r != nil {
r.CAKey = nil
}
newCluster := &api.Cluster{
ID: cluster.ID,
Meta: cluster.Meta,
Spec: *redactedSpec,
RootCA: *redactedRootCA,
BlacklistedCertificates: cluster.BlacklistedCertificates,
DefaultAddressPool: cluster.DefaultAddressPool,
SubnetSize: cluster.SubnetSize,
VXLANUDPPort: cluster.VXLANUDPPort,
}
if newCluster.DefaultAddressPool == nil {
// This is just for CLI display. Set the inbuilt default pool for
// user reference.
newCluster.DefaultAddressPool = inbuiltDefaultAddressPool
newCluster.SubnetSize = inbuiltSubnetSize
}
if newCluster.VXLANUDPPort == 0 {
newCluster.VXLANUDPPort = defaultVXLANPort
}
redactedClusters = append(redactedClusters, newCluster)
}
return redactedClusters
}
func expireBlacklistedCerts(cluster *api.Cluster) {
nowMinusGrace := time.Now().Add(-expiredCertGrace)
for cn, blacklistedCert := range cluster.BlacklistedCertificates {
if blacklistedCert.Expiry == nil {
continue
}
expiry, err := gogotypes.TimestampFromProto(blacklistedCert.Expiry)
if err == nil && nowMinusGrace.After(expiry) {
delete(cluster.BlacklistedCertificates, cn)
}
}
}