vorteil/direktiv

View on GitHub
pkg/pubsub/pubsub_pg.go

Summary

Maintainability
A
0 mins
Test Coverage
package pubsub

import (
    "fmt"
    "reflect"
    "strings"
    "sync"
    "time"

    "github.com/direktiv/direktiv/pkg/core"
    "github.com/google/uuid"
    "k8s.io/apimachinery/pkg/util/json"
)

type Bus struct {
    coreBus CoreBus

    subscribers  sync.Map
    fingerprints sync.Map
}

const defaultDebouncePublishDuration = 200 * time.Millisecond

func NewBus(coreBus CoreBus) *Bus {
    return &Bus{
        coreBus: coreBus,
    }
}

func (p *Bus) Loop(circuit *core.Circuit) error {
    return p.coreBus.Loop(circuit.Done(), func(channel string, data string) {
        p.subscribers.Range(func(key, f any) bool {
            k, _ := key.(string)
            h, _ := f.(func(data string))

            if strings.HasPrefix(k, channel) {
                go h(data)
            }

            return true
        })
    })
}

func (p *Bus) Publish(event any) error {
    channel := typeToChannelName(event)

    data, err := json.Marshal(event)
    if err != nil {
        panic("Logic error: " + err.Error())
    }

    return p.coreBus.Publish(channel, string(data))
}

func (p *Bus) debouncedPublishWithInterval(i time.Duration, event any) error {
    // This function works by associating input with a signature, sleep for a duration and nly publish the message
    // when the signature matches.

    channel := typeToChannelName(event)

    data, err := json.Marshal(event)
    if err != nil {
        panic("Logic error: " + err.Error())
    }

    input := fmt.Sprintf("%d_%s_%s", i, channel, data)
    signature := uuid.New()
    p.fingerprints.Store(input, signature)

    go func() {
        time.Sleep(i)
        currentSignature, _ := p.fingerprints.Load(input)
        // When signature matches, this means no later async publish was recorded.
        if signature == currentSignature {
            err = p.coreBus.Publish(channel, string(data))
            if err != nil {
                panic(">>>>>>>>>>Logic error453: " + err.Error())
            }
        }
    }()

    return nil
}

// DebouncedPublish prevents multiple concussive publishes of the same input during an interval.
func (p *Bus) DebouncedPublish(event any) error {
    return p.debouncedPublishWithInterval(defaultDebouncePublishDuration, event)
}

func (p *Bus) Subscribe(channel any, handler func(data string)) {
    if channel == nil {
        panic("nil channel")
    }
    channelStr := typeToChannelName(channel)

    p.subscribers.Store(fmt.Sprintf("%s_%s", channelStr, uuid.New().String()), handler)
    err := p.coreBus.Listen(channelStr)
    if err != nil {
        panic("TODO: handle this pubsub error: " + err.Error())
    }
}

type FileSystemChangeEvent struct {
    Action       string
    Namespace    string
    NamespaceID  uuid.UUID
    FileType     string
    FilePath     string
    OldPath      string
    DeleteFileID uuid.UUID
}

type NamespacesChangeEvent struct {
    Name   string
    Action string
}

type InstanceMessageEvent struct {
    Message string
}

func typeToChannelName(channel any) string {
    channelStr := reflect.TypeOf(channel).String()
    channelStr = strings.ReplaceAll(channelStr, "*", "")
    channelStr = strings.ReplaceAll(channelStr, ".", "")
    channelStr = strings.ToLower(channelStr)

    return channelStr
}