streamdal/go-sdk

View on GitHub
audience.go

Summary

Maintainability
A
0 mins
Test Coverage
A
92%
package streamdal

import (
    "context"
    "fmt"
    "strconv"
    "strings"

    "github.com/streamdal/streamdal/libs/protos/build/go/protos"
)

func (s *Streamdal) addAudience(ctx context.Context, aud *protos.Audience) {
    // Don't need to add twice
    if s.seenAudience(ctx, aud) {
        return
    }

    s.audiencesMtx.Lock()

    if s.audiences == nil {
        s.audiences = make(map[string]struct{})
    }

    s.audiences[audToStr(aud)] = struct{}{}
    s.audiencesMtx.Unlock()

    // Run as goroutine to avoid blocking processing
    go func() {
        if err := s.serverClient.NewAudience(ctx, aud, s.sessionID); err != nil {
            s.config.Logger.Errorf("failed to add audience: %s", err)
        }
    }()
}

// addAudiences is used for RE-adding audiences that may have timed out after
// a server reconnect. The method will re-add all known audiences to the server
// via internal gRPC NewAudience() endpoint. This is a non-blocking method.
func (s *Streamdal) addAudiences(ctx context.Context) {
    s.audiencesMtx.RLock()
    defer s.audiencesMtx.RUnlock()

    for audStr := range s.audiences {
        aud := strToAud(audStr)

        if aud == nil {
            s.config.Logger.Errorf("unexpected strToAud resulted in nil audience (audStr: %s)", audStr)
            continue
        }

        // Run as goroutine to avoid blocking processing
        go func() {
            if err := s.serverClient.NewAudience(ctx, aud, s.sessionID); err != nil {
                s.config.Logger.Errorf("failed to add audience: %s", err)
            }
        }()
    }
}

func (s *Streamdal) seenAudience(_ context.Context, aud *protos.Audience) bool {
    s.audiencesMtx.RLock()
    defer s.audiencesMtx.RUnlock()

    if s.audiences == nil {
        return false
    }

    _, ok := s.audiences[audToStr(aud)]
    return ok
}

func (s *Streamdal) getCurrentAudiences() []*protos.Audience {
    s.audiencesMtx.RLock()
    defer s.audiencesMtx.RUnlock()

    auds := make([]*protos.Audience, 0)
    for aud := range s.audiences {
        auds = append(auds, strToAud(aud))
    }

    return auds
}

func audToStr(aud *protos.Audience) string {
    if aud == nil {
        return ""
    }

    return strings.ToLower(fmt.Sprintf("%s:%s:%d:%s", aud.ServiceName, aud.ComponentName, aud.OperationType, aud.OperationName))
}

func strToAud(str string) *protos.Audience {
    if str == "" {
        return nil
    }

    str = strings.ToLower(str)

    parts := strings.Split(str, ":")
    if len(parts) != 4 {
        return nil
    }

    opType, err := strconv.Atoi(parts[2])
    if err != nil {
        return nil
    }

    return &protos.Audience{
        ServiceName:   parts[0],
        ComponentName: parts[1],
        OperationType: protos.OperationType(opType),
        OperationName: parts[3],
    }
}