ipfs-search/ipfs-search

View on GitHub
components/sniffer/queuer/queuer.go

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
package queuer

import (
    "context"
    "time"

    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/codes"
    "go.opentelemetry.io/otel/trace"

    "github.com/ipfs-search/ipfs-search/components/queue"

    "github.com/ipfs-search/ipfs-search/instr"
    t "github.com/ipfs-search/ipfs-search/types"
)

// Queuer publishes an AnnotatedResource to a queue Publisher for Provider's it receives on a channel.
type Queuer struct {
    queue        queue.Publisher
    providers    <-chan t.Provider
    queueTimeout time.Duration
    *instr.Instrumentation
}

// New creates a new Queuer.
func New(q queue.Publisher, providers <-chan t.Provider) Queuer {
    return Queuer{
        queue:           q,
        providers:       providers,
        queueTimeout:    5 * time.Minute, // Kamikaze after 5 minutes of waiting
        Instrumentation: instr.New(),
    }
}

func (q *Queuer) iterate(ctx context.Context) error {
    // Never wait more than queueTimeout for a message
    ctx, cancel := context.WithTimeout(ctx, q.queueTimeout)
    defer cancel()

    select {
    case <-ctx.Done():
        return ctx.Err()
    case p := <-q.providers:
        return func() error {
            ctx = trace.ContextWithRemoteSpanContext(ctx, p.SpanContext)
            _, span := q.Tracer.Start(ctx, "queue.Publish", trace.WithAttributes(
                attribute.String("cid", p.ID),
                attribute.String("peerid", p.Provider),
            ), trace.WithSpanKind(trace.SpanKindProducer))
            defer span.End()

            // TODO: Queue provider here, not AnnotatedResource.

            r := t.AnnotatedResource{
                Resource: p.Resource,
                Source:   t.SnifferSource,
            }

            // Add with highest priority (9), as this is supposed to be available
            err := q.queue.Publish(ctx, &r, 9)

            if err != nil {
                span.RecordError(err)
            } else {
                span.SetStatus(codes.Ok, "published")
            }

            return err
        }()
    }
}

// Queue reads from the providers channel and queue's its items.
func (q *Queuer) Queue(ctx context.Context) error {
    for {
        if err := q.iterate(ctx); err != nil {
            return err
        }
    }
}