pkg/jobs/k8sevent/k8sevent.go
// Copyright © 2023 Horizoncd.//// Licensed under the Apache License, Version 2.0 (the "License");// you may not use this file except in compliance with the License.// You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.// See the License for the specific language governing permissions and// limitations under the License. package k8sevent import ( "context" "fmt" "regexp" "sort" "sync" "time" "gorm.io/gorm" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/json" "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" "github.com/horizoncd/horizon/core/common" "github.com/horizoncd/horizon/lib/q" "github.com/horizoncd/horizon/pkg/config/k8sevent" eventmanager "github.com/horizoncd/horizon/pkg/event/manager" eventmodels "github.com/horizoncd/horizon/pkg/event/models" "github.com/horizoncd/horizon/pkg/param/managerparam" "github.com/horizoncd/horizon/pkg/region/models" "github.com/horizoncd/horizon/pkg/regioninformers" "github.com/horizoncd/horizon/pkg/util/log") const ( cacheMax = 160 savingInterval = 10 * time.Second kubernetesInstanceLabelKey = "app.kubernetes.io/instance") var gvrEvent = schema.GroupVersionResource{ Resource: "events", Version: "v1", Group: "",} type SuperVisor struct { filter *gvkFilter informers *regioninformers.RegionInformers mgr *managerparam.Manager db *gorm.DB cacheMax int} func New(config k8sevent.Config, informers *regioninformers.RegionInformers, mgr *managerparam.Manager, db *gorm.DB) *SuperVisor { v := &SuperVisor{ filter: newGVKFilter(config), informers: informers, mgr: mgr, db: db, cacheMax: cacheMax, } return v} func (v *SuperVisor) Run(ctx context.Context) { v.informers.Register(regioninformers.Resource{GVR: gvrEvent, MakeHandler: v.newEventHandler})} type EventWithTime struct { *eventmodels.Event LastTimestamp time.Time} func (v *SuperVisor) newEventHandler(regionID uint, stopCh <-chan struct{}) (cache.ResourceEventHandler, error) { log.Debugf(context.Background(), "new event handler for region %d", regionID) ctx := context.Background() entity, err := v.mgr.RegionMgr.GetRegionByID(ctx, regionID) if err != nil { return nil, err } h := &eventHandler{ SuperVisor: v, regionID: regionID, region: entity.Region, eventCh: make(chan *corev1.Event), cacheMax: v.cacheMax, stopCh: stopCh, } go func() { eventCache := make([]*corev1.Event, 0, v.cacheMax) ticker := time.NewTicker(savingInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: if len(eventCache) == 0 { ticker.Reset(savingInterval) continue } case event := <-h.eventCh: eventCache = append(eventCache, event) if len(eventCache) < h.cacheMax { continue } } err := h.save(eventCache) if err != nil { log.Errorf(ctx, "failed to save event: %v", err) } eventCache = eventCache[:0] ticker.Reset(savingInterval) } }() return h, nil} type eventHandler struct { *SuperVisor regionID uint region *models.Region cacheMax int stopCh <-chan struct{} eventCh chan *corev1.Event} Method `eventHandler.save` has 63 lines of code (exceeds 50 allowed). Consider refactoring.
Method `eventHandler.save` has 9 return statements (exceeds 4 allowed).
Method `eventHandler.save` has a Cognitive Complexity of 23 (exceeds 20 allowed). Consider refactoring.func (e *eventHandler) save(cache []*corev1.Event) error { select { case <-e.stopCh: return nil default: } ctx := context.Background() eventsWithTime := make([]*EventWithTime, len(cache)) reqIDs := sync.Map{} wg := sync.WaitGroup{} for i, event := range cache { wg.Add(1) go func(index int, event *corev1.Event, dst []*EventWithTime) { defer wg.Done() horizonEvent, err := e.mapEvent(event) if err != nil { log.Errorf(ctx, "failed to map event: %v", err) return } // skip same reqID if _, ok := reqIDs.Load(horizonEvent.ReqID); ok { return } reqIDs.Store(horizonEvent.ReqID, struct{}{}) sameEvents, _ := e.mgr.EventMgr.ListEvents(ctx, &q.Query{Keywords: q.KeyWords{common.ReqID: horizonEvent.ReqID}}) // skip same event for _, sameEvent := range sameEvents { if sameEvent.Extra != nil && horizonEvent.Extra != nil && *sameEvent.Extra == *horizonEvent.Extra { return } } dst[index] = &EventWithTime{horizonEvent, event.LastTimestamp.Time} }(i, event, eventsWithTime) } wg.Wait() // skip nil in eventsWithTime for i := len(eventsWithTime) - 1; i >= 0; i-- { if eventsWithTime[i] == nil { eventsWithTime = append(eventsWithTime[:i], eventsWithTime[i+1:]...) } } if len(eventsWithTime) == 0 { return nil } sort.Slice(eventsWithTime, func(i, j int) bool { return eventsWithTime[i].LastTimestamp.Before(eventsWithTime[j].LastTimestamp) }) tx := e.db.WithContext(ctx).Begin() txEventManager := eventmanager.New(tx) events := make([]*eventmodels.Event, 0, len(eventsWithTime)) for _, event := range eventsWithTime { events = append(events, event.Event) } _, err := txEventManager.CreateEvent(ctx, events...) if err != nil { log.Errorf(ctx, "failed to save regionEvents: %v", err) tx.Rollback() return nil } err = tx.Commit().Error if err != nil { log.Errorf(ctx, "failed to commit: %v", err) tx.Rollback() return nil } return nil} func (*eventHandler) compactEvent(event *corev1.Event) map[string]interface{} { m := make(map[string]interface{}) m["message"] = event.Message m["reason"] = event.Reason m["type"] = event.Type m["lastTimestamp"] = event.LastTimestamp m["name"] = event.Name obj := make(map[string]interface{}) obj["kind"] = event.InvolvedObject.Kind obj["name"] = event.InvolvedObject.Name obj["namespace"] = event.InvolvedObject.Namespace obj["apiVersion"] = event.InvolvedObject.APIVersion m["involvedObject"] = obj return m} var GVKApplication = schema.GroupVersionKind{ Group: "argoproj.io", Version: "v1alpha1", Kind: "Application",} Method `eventHandler.mapEvent` has 70 lines of code (exceeds 50 allowed). Consider refactoring.
Method `eventHandler.mapEvent` has 12 return statements (exceeds 4 allowed).
Method `eventHandler.mapEvent` has a Cognitive Complexity of 24 (exceeds 20 allowed). Consider refactoring.func (e *eventHandler) mapEvent(event *corev1.Event) (*eventmodels.Event, error) { if event == nil { return nil, nil } extra, err := json.Marshal(e.compactEvent(event)) if err != nil { log.Errorf(context.Background(), "failed to marshal event: %v", err) return nil, err } extraStr := string(extra) horizonEvent := &eventmodels.Event{ EventSummary: eventmodels.EventSummary{ ResourceType: common.ResourceCluster, EventType: eventmodels.ClusterKubernetesEvent, Extra: &extraStr, }, ReqID: string(event.UID), } involvedObj := event.InvolvedObject var ( obj runtime.Object ns = involvedObj.Namespace name = involvedObj.Name gvk = involvedObj.GroupVersionKind() ) if gvk == GVKApplication { cluster, err := e.mgr.ClusterMgr.GetByName(context.Background(), name) if err != nil { return nil, err } horizonEvent.ResourceID = cluster.ID return horizonEvent, nil } for { gvr, err := e.informers.GVK2GVR(e.regionID, gvk) if err != nil { return nil, err } err = e.informers.GetDynamicInformer(e.regionID, gvr, func(informer informers.GenericInformer) error { obj, err = informer.Lister().ByNamespace(ns).Get(name) if err != nil { return err } return nil }) if err != nil { return nil, err } un, ok := obj.(*unstructured.Unstructured) if !ok { return nil, fmt.Errorf("failed to convert object to unstructured") } labels := un.GetLabels() if labels != nil && labels[kubernetesInstanceLabelKey] != "" { instanceName := labels[kubernetesInstanceLabelKey] cluster, err := e.mgr.ClusterMgr.GetByName(context.Background(), instanceName) if err != nil { return nil, err } horizonEvent.ResourceID = cluster.ID return horizonEvent, nil } ownerReferences := un.GetOwnerReferences() if len(ownerReferences) != 0 { ownerReference := ownerReferences[0] gvk = schema.FromAPIVersionAndKind(ownerReference.APIVersion, ownerReference.Kind) name = ownerReference.Name } else { return horizonEvent, nil } }} func (e *eventHandler) addEventToCache(un *unstructured.Unstructured) { event := &corev1.Event{} err := runtime.DefaultUnstructuredConverter.FromUnstructured(un.Object, event) if err != nil { log.Errorf(context.Background(), "failed to convert unstructured to event: %v", err) return } if !e.filter.has(event) { return } e.eventCh <- event} func (e *eventHandler) OnAdd(obj interface{}) { log.Debugf(context.Background(), "%p: event added\n", e) un, ok := obj.(*unstructured.Unstructured) if !ok { log.Errorf(context.Background(), "failed to convert object to unstructured") return } e.addEventToCache(un)} func (e *eventHandler) OnUpdate(_, newObj interface{}) { log.Debugf(context.Background(), "%v: event updated", e) un, ok := newObj.(*unstructured.Unstructured) if !ok { log.Errorf(context.Background(), "failed to convert object to unstructured") return } e.addEventToCache(un)} func (e *eventHandler) OnDelete(_ interface{}) { // no need to handle delete event} type reasonIndex map[string][]*regexp.Regexp type gvkIndex map[schema.GroupVersionKind]reasonIndex type gvkFilter struct { index gvkIndex} func newGVKFilter(config k8sevent.Config) *gvkFilter { index := make(gvkIndex) for _, rule := range config.Rules { gvk := rule.GroupVersionKind if _, ok := index[gvk]; !ok { index[gvk] = make(reasonIndex) } for _, reason := range rule.Reasons { if _, ok := index[gvk][reason.Reason]; !ok { index[gvk][reason.Reason] = nil } for _, message := range reason.Messages { if message == "" { continue } pattern := regexp.MustCompile(message) index[gvk][reason.Reason] = append(index[gvk][reason.Reason], pattern) } } } return &gvkFilter{index}} func (f *gvkFilter) has(event *corev1.Event) bool { gvk := schema.FromAPIVersionAndKind(event.InvolvedObject.APIVersion, event.InvolvedObject.Kind) if reasonIndex, ok := f.index[gvk]; ok { if len(reasonIndex) == 0 { return true } if patterns, ok := reasonIndex[event.Reason]; ok { if len(patterns) == 0 { return true } for _, pattern := range patterns { if pattern.MatchString(event.Message) { return true } } } } log.Debugf(context.Background(), "event (%s) has been skipped: gvk = %s, uid = %s", event.Message, gvk.String(), event.UID) return false}