
View on GitHub


1 hr
Test Coverage
package queue

import (



// ClientOption is a function that can be used to configure an async worker.
type ClientOption func(*Client) error

// WithClientConfig sets the config for the worker.
func WithClientConfig(conf *config.WorkerConfig) ClientOption {
    return func(c *Client) error {
        if conf == nil {
            return config.ErrNoConfig

        c.conf = conf

        return nil

// WithClientLogger sets the logger for the worker.
func WithClientLogger(logger log.Logger) ClientOption {
    return func(c *Client) error {
        if logger == nil {
            return log.ErrNoLogger

        c.logger = logger

        return nil

// WithClientTracer sets the tracer for the worker.
func WithClientTracer(tracer tracing.Tracer) ClientOption {
    return func(c *Client) error {
        if tracer == nil {
            return tracing.ErrNoTracer

        c.tracer = tracer
        return nil

// Client is sending async task to the worker for processing.
type Client struct {
    conf      *config.WorkerConfig
    logger    log.Logger
    tracer    tracing.Tracer
    client    *asynq.Client
    inspector *asynq.Inspector

// Enqueue sends a task to the worker for processing.
func (c *Client) Enqueue(ctx context.Context, task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error) {
    ctx, span := c.tracer.Start(ctx, "transport.asynq.Client/Enqueue")
    defer span.End()

    info, err := c.client.EnqueueContext(ctx, task, opts...)
    if err != nil {
        return nil, errors.Join(ErrSendTask, err)
    return info, nil

// GetTaskInfo returns the task info for the given task ID in a queue.
func (c *Client) GetTaskInfo(queue string, id string) (*asynq.TaskInfo, error) {
    return c.inspector.GetTaskInfo(queue, id)

// Ping sends a sample task to the worker and waits for it to finish. If the
// task is not completed within 5 seconds, the task is canceled.
func (c *Client) Ping(ctx context.Context) error {
    ctx, span := c.tracer.Start(ctx, "transport.asynq.Client/Ping")
    defer span.End()

    task, err := NewSystemHealthCheckTask()
    if err != nil {
        return err

    info, err := c.Enqueue(ctx, task)
    if err != nil {
        return err

    for info.State != asynq.TaskStateCompleted {
        select {
        case <-ctx.Done():
            return errors.Join(ErrReceiveTask, ctx.Err())
            if info, err = c.GetTaskInfo(info.Queue, info.ID); err != nil {
                return errors.Join(ErrReceiveTask, err)

    if info.State != asynq.TaskStateCompleted || info.LastErr != "" {
        return errors.Join(ErrReceiveTask, errors.New(info.LastErr))

    return nil

// Close closes the connection with the message broker.
func (c *Client) Close(ctx context.Context) error {
    _, span := c.tracer.Start(ctx, "transport.asynq.Client/Close")
    defer span.End()

    return c.client.Close()

// NewClient creates a new client to send async tasks to Worker.
func NewClient(opts ...ClientOption) (*Client, error) {
    c := &Client{
        logger: log.DefaultLogger(),
        tracer: tracing.NoopTracer(),

    for _, opt := range opts {
        if err := opt(c); err != nil {
            return nil, err

    brokerOpts := asynq.RedisClientOpt{
        Addr:         c.conf.Broker.Address(),
        Username:     c.conf.Broker.Username,
        Password:     c.conf.Broker.Password,
        DB:           c.conf.Broker.Database,
        DialTimeout:  c.conf.Broker.DialTimeout * time.Second,
        ReadTimeout:  c.conf.Broker.ReadTimeout * time.Second,
        WriteTimeout: c.conf.Broker.WriteTimeout * time.Second,
        PoolSize:     c.conf.Broker.PoolSize,

    c.client = asynq.NewClient(brokerOpts)
    c.inspector = asynq.NewInspector(brokerOpts)

    return c, nil