rancher/opni-monitoring

View on GitHub
pkg/management/clusters.go

Summary

Maintainability
B
4 hrs
Test Coverage
package management

import (
    "context"
    "time"

    "github.com/rancher/opni-monitoring/pkg/core"
    "github.com/rancher/opni-monitoring/pkg/validation"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    "google.golang.org/protobuf/types/known/emptypb"
)

func (m *Server) ListClusters(
    ctx context.Context,
    in *ListClustersRequest,
) (*core.ClusterList, error) {
    if err := validation.Validate(in); err != nil {
        return nil, err
    }
    clusterList, err := m.coreDataSource.StorageBackend().ListClusters(ctx, in.MatchLabels, in.MatchOptions)
    if err != nil {
        return nil, err
    }
    return clusterList, nil
}

func (m *Server) DeleteCluster(
    ctx context.Context,
    ref *core.Reference,
) (*emptypb.Empty, error) {
    if err := validation.Validate(ref); err != nil {
        return nil, err
    }
    return &emptypb.Empty{}, m.coreDataSource.StorageBackend().DeleteCluster(ctx, ref)
}

func (m *Server) GetCluster(
    ctx context.Context,
    ref *core.Reference,
) (*core.Cluster, error) {
    if err := validation.Validate(ref); err != nil {
        return nil, err
    }
    if cluster, err := m.coreDataSource.StorageBackend().GetCluster(ctx, ref); err != nil {
        return nil, err
    } else {
        return cluster, nil
    }
}

func (m *Server) WatchClusters(
    in *WatchClustersRequest,
    stream Management_WatchClustersServer,
) error {
    if err := validation.Validate(in); err != nil {
        return err
    }
    known := map[string]*core.Reference{}
    for _, cluster := range in.KnownClusters.Items {
        if _, err := m.coreDataSource.StorageBackend().GetCluster(context.Background(), cluster); err != nil {
            return err
        }
        known[cluster.Id] = cluster
    }
    tick := time.NewTicker(1 * time.Second)
    defer tick.Stop()
    for {
        select {
        case <-tick.C:
            clusters, err := m.coreDataSource.StorageBackend().ListClusters(context.Background(), nil, 0)
            updatedIds := map[string]struct{}{}
            if err != nil {
                return err
            }
            for _, cluster := range clusters.Items {
                updatedIds[cluster.Id] = struct{}{}
                if _, ok := known[cluster.Id]; !ok {
                    ref := cluster.Reference()
                    known[cluster.Id] = ref
                    if err := stream.Send(&WatchEvent{
                        Cluster: ref,
                        Type:    WatchEventType_Added,
                    }); err != nil {
                        return status.Error(codes.Internal, err.Error())
                    }
                }
            }
            for id, cluster := range known {
                if _, ok := updatedIds[id]; !ok {
                    delete(known, id)
                    if err := stream.Send(&WatchEvent{
                        Cluster: cluster,
                        Type:    WatchEventType_Deleted,
                    }); err != nil {
                        return status.Error(codes.Internal, err.Error())
                    }
                }
            }
        case <-stream.Context().Done():
            return stream.Context().Err()
        }
    }
}

func (m *Server) EditCluster(
    ctx context.Context,
    in *EditClusterRequest,
) (*core.Cluster, error) {
    if err := validation.Validate(in); err != nil {
        return nil, err
    }
    return m.coreDataSource.StorageBackend().UpdateCluster(ctx, in.GetCluster(), func(cluster *core.Cluster) {
        if cluster.Metadata == nil {
            cluster.Metadata = &core.ClusterMetadata{}
        }
        cluster.Metadata.Labels = in.GetLabels()
    })
}