0x4b53/amqp-rpc

View on GitHub
request.go

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
package amqprpc

import (
    "context"
    "fmt"
    "sync"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

// Request is a requet to perform with the client.
type Request struct {
    // Exchange is the exchange to which the rquest will be published when
    // passing it to the clients send function.
    Exchange string

    // Routing key is the routing key that will be used in the amqp.Publishing
    // request.
    RoutingKey string

    // Reply is a boolean value telling if the request should wait for a reply
    // or just send the request without waiting.
    Reply bool

    // Timeout is the time we should wait after a request is published before
    // we assume the request got lost.
    Timeout time.Duration

    // timeoutAt is the exact time when the request times out. This is set by
    // the client when starting the countdown.
    timeoutAt time.Time

    // Publishing is the publising that are going to be published.
    Publishing amqp.Publishing

    // Context is a context which you can use to pass data from where the
    // request is created to middlewares. By default this will be a
    // context.Background()
    Context context.Context //nolint:containedctx // Needed in the struct.

    // middlewares holds slice of middlewares to run before or after the client
    // sends a request. This is only executed for the specific request.
    middlewares []ClientMiddlewareFunc

    // These channels are used by the repliesConsumer and correlcationIdMapping and will send the
    // replies to this Request here.
    response chan *amqp.Delivery
    errChan  chan error // If we get a client error (e.g we can't publish) it will end up here.

    // the number of times that the publisher should retry.
    numRetries int

    deliveryTag uint64

    confirmed chan struct{}
    returned  *amqp.Return
}

// NewRequest will generate a new request to be published. The default request
// will use the content type "text/plain" and always wait for reply.
func NewRequest() *Request {
    r := Request{
        Context:     context.Background(),
        Reply:       true,
        middlewares: []ClientMiddlewareFunc{},
        Publishing: amqp.Publishing{
            ContentType: "text/plain",
            Headers:     amqp.Table{},
        },
    }

    return &r
}

// WithRoutingKey will set the routing key for the request.
func (r *Request) WithRoutingKey(rk string) *Request {
    r.RoutingKey = rk

    return r
}

// WithCorrelationID will add/overwrite the correlation ID used for the
// request and set it on the Publishing.
func (r *Request) WithCorrelationID(id string) *Request {
    r.Publishing.CorrelationId = id

    return r
}

// WithContext will set the context on the request.
func (r *Request) WithContext(ctx context.Context) *Request {
    r.Context = ctx

    return r
}

// WriteHeader will write a header for the specified key.
func (r *Request) WriteHeader(header string, value interface{}) {
    r.Publishing.Headers[header] = value
}

// WithExchange will set the exchange on to which the request will be published.
func (r *Request) WithExchange(e string) *Request {
    r.Exchange = e

    return r
}

// WithHeaders will set the full amqp.Table as the headers for the request.
// Note that this will overwrite anything previously set on the headers.
func (r *Request) WithHeaders(h amqp.Table) *Request {
    r.Publishing.Headers = h

    return r
}

// WithTimeout will set the client timeout used when publishing messages.
// t will be rounded using the duration's Round function to the nearest
// multiple of a millisecond. Rounding will be away from zero.
func (r *Request) WithTimeout(t time.Duration) *Request {
    r.Timeout = t.Round(time.Millisecond)

    return r
}

// WithResponse sets the value determining wether the request should wait for a
// response or not. A request that does not require a response will only catch
// errors occurring before the reuqest has been published.
func (r *Request) WithResponse(wr bool) *Request {
    r.Reply = wr

    return r
}

// WithContentType will update the content type passed in the header of the
// request. This value will bee set as the ContentType in the amqp.Publishing
// type but also preserved as a header value.
func (r *Request) WithContentType(ct string) *Request {
    r.Publishing.ContentType = ct

    return r
}

// WithBody will convert a string to a byte slice and add as the body
// passed for the request.
func (r *Request) WithBody(b string) *Request {
    r.Publishing.Body = []byte(b)

    return r
}

// Write will write the response Body of the amqp.Publishing.
// It is safe to call Write multiple times.
func (r *Request) Write(p []byte) (int, error) {
    r.Publishing.Body = append(r.Publishing.Body, p...)

    return len(p), nil
}

// AddMiddleware will add a middleware which will be executed when the request
// is published.
func (r *Request) AddMiddleware(m ClientMiddlewareFunc) *Request {
    r.middlewares = append(r.middlewares, m)

    return r
}

// startTimeout will start the timeout counter. Is will also set the Expiration
// field for the Publishing so that amqp won't hold on to the message in the
// queue after the timeout has happened.
func (r *Request) startTimeout(defaultTimeout time.Duration) {
    if r.Timeout.Nanoseconds() == 0 {
        r.WithTimeout(defaultTimeout)
    }

    if r.Reply {
        r.Publishing.Expiration = fmt.Sprintf("%d", r.Timeout.Nanoseconds()/1e6)
    }

    r.timeoutAt = time.Now().Add(r.Timeout)
}

// AfterTimeout waits for the duration of the timeout.
func (r *Request) AfterTimeout() <-chan time.Time {
    return time.After(time.Until(r.timeoutAt))
}

// RequestMap keeps track of requests based on their DeliveryTag and/or
// CorrelationID.
type RequestMap struct {
    byDeliveryTag   map[uint64]*Request
    byCorrelationID map[string]*Request
    mu              sync.RWMutex
}

// GetByCorrelationID returns the request with the provided correlation id.
func (m *RequestMap) GetByCorrelationID(key string) (*Request, bool) {
    m.mu.RLock()
    r, ok := m.byCorrelationID[key]
    m.mu.RUnlock()

    return r, ok
}

// GetByDeliveryTag returns the request with the provided delivery tag.
func (m *RequestMap) GetByDeliveryTag(key uint64) (*Request, bool) {
    m.mu.RLock()
    r, ok := m.byDeliveryTag[key]
    m.mu.RUnlock()

    return r, ok
}

// Set will add r to m so it can be fetched later using it's correlation id or
// delivery tag.
func (m *RequestMap) Set(r *Request) {
    m.mu.Lock()
    m.byDeliveryTag[r.deliveryTag] = r
    m.byCorrelationID[r.Publishing.CorrelationId] = r
    m.mu.Unlock()
}

// Delete will remove r from m.
func (m *RequestMap) Delete(r *Request) {
    m.mu.Lock()
    delete(m.byDeliveryTag, r.deliveryTag)
    delete(m.byCorrelationID, r.Publishing.CorrelationId)
    m.mu.Unlock()
}