internal/queue/client.go
package queue
import (
"context"
"errors"
"time"
"github.com/hibiken/asynq"
"github.com/opcotech/elemo/internal/config"
"github.com/opcotech/elemo/internal/pkg/log"
"github.com/opcotech/elemo/internal/pkg/tracing"
)
// ClientOption is a function that can be used to configure an async worker.
type ClientOption func(*Client) error
// WithClientConfig sets the config for the worker.
func WithClientConfig(conf *config.WorkerConfig) ClientOption {
return func(c *Client) error {
if conf == nil {
return config.ErrNoConfig
}
c.conf = conf
return nil
}
}
// WithClientLogger sets the logger for the worker.
func WithClientLogger(logger log.Logger) ClientOption {
return func(c *Client) error {
if logger == nil {
return log.ErrNoLogger
}
c.logger = logger
return nil
}
}
// WithClientTracer sets the tracer for the worker.
func WithClientTracer(tracer tracing.Tracer) ClientOption {
return func(c *Client) error {
if tracer == nil {
return tracing.ErrNoTracer
}
c.tracer = tracer
return nil
}
}
// Client is sending async task to the worker for processing.
type Client struct {
conf *config.WorkerConfig
logger log.Logger
tracer tracing.Tracer
client *asynq.Client
inspector *asynq.Inspector
}
// Enqueue sends a task to the worker for processing.
func (c *Client) Enqueue(ctx context.Context, task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error) {
ctx, span := c.tracer.Start(ctx, "transport.asynq.Client/Enqueue")
defer span.End()
info, err := c.client.EnqueueContext(ctx, task, opts...)
if err != nil {
return nil, errors.Join(ErrSendTask, err)
}
return info, nil
}
// GetTaskInfo returns the task info for the given task ID in a queue.
func (c *Client) GetTaskInfo(queue string, id string) (*asynq.TaskInfo, error) {
return c.inspector.GetTaskInfo(queue, id)
}
// Ping sends a sample task to the worker and waits for it to finish. If the
// task is not completed within 5 seconds, the task is canceled.
func (c *Client) Ping(ctx context.Context) error {
ctx, span := c.tracer.Start(ctx, "transport.asynq.Client/Ping")
defer span.End()
task, err := NewSystemHealthCheckTask()
if err != nil {
return err
}
info, err := c.Enqueue(ctx, task)
if err != nil {
return err
}
for info.State != asynq.TaskStateCompleted {
select {
case <-ctx.Done():
return errors.Join(ErrReceiveTask, ctx.Err())
default:
if info, err = c.GetTaskInfo(info.Queue, info.ID); err != nil {
return errors.Join(ErrReceiveTask, err)
}
}
}
if info.State != asynq.TaskStateCompleted || info.LastErr != "" {
return errors.Join(ErrReceiveTask, errors.New(info.LastErr))
}
return nil
}
// Close closes the connection with the message broker.
func (c *Client) Close(ctx context.Context) error {
_, span := c.tracer.Start(ctx, "transport.asynq.Client/Close")
defer span.End()
return c.client.Close()
}
// NewClient creates a new client to send async tasks to Worker.
func NewClient(opts ...ClientOption) (*Client, error) {
c := &Client{
logger: log.DefaultLogger(),
tracer: tracing.NoopTracer(),
}
for _, opt := range opts {
if err := opt(c); err != nil {
return nil, err
}
}
brokerOpts := asynq.RedisClientOpt{
Addr: c.conf.Broker.Address(),
Username: c.conf.Broker.Username,
Password: c.conf.Broker.Password,
DB: c.conf.Broker.Database,
DialTimeout: c.conf.Broker.DialTimeout * time.Second,
ReadTimeout: c.conf.Broker.ReadTimeout * time.Second,
WriteTimeout: c.conf.Broker.WriteTimeout * time.Second,
PoolSize: c.conf.Broker.PoolSize,
}
c.client = asynq.NewClient(brokerOpts)
c.inspector = asynq.NewInspector(brokerOpts)
return c, nil
}