evalphobia/aws-sdk-go-wrapper

View on GitHub
sqs/queue.go

Summary

Maintainability
A
2 hrs
Test Coverage
// SQS Queue

package sqs

import (
    "encoding/json"
    "fmt"
    "sync"

    SDK "github.com/aws/aws-sdk-go/service/sqs"

    "github.com/evalphobia/aws-sdk-go-wrapper/private/pointers"
)

const (
    defaultMessageIDPrefix = "msg_"
    defaultExpireSecond    = 180
    defaultWaitTimeSeconds = 0
)

// Queue is SQS Queue wrapper struct.
type Queue struct {
    service *SQS

    name           string
    nameWithPrefix string
    url            *string

    sendSpoolMu sync.Mutex
    sendSpool   []*SDK.SendMessageBatchRequestEntry

    deleteSpoolMu sync.Mutex
    deleteSpool   []*SDK.DeleteMessageBatchRequestEntry

    failedMu     sync.Mutex
    failedSend   []*SDK.BatchResultErrorEntry
    failedDelete []*SDK.BatchResultErrorEntry

    autoDel         bool
    expire          int
    waitTimeSeconds int
}

// NewQueue returns initialized *Queue.
func NewQueue(svc *SQS, name string, url string) *Queue {
    queueName := svc.prefix + name
    return &Queue{
        service:         svc,
        name:            name,
        nameWithPrefix:  queueName,
        url:             pointers.String(url),
        autoDel:         false,
        expire:          defaultExpireSecond,
        waitTimeSeconds: defaultWaitTimeSeconds,
    }
}

// AutoDelete sets auto delete flag.
func (q *Queue) AutoDelete(b bool) {
    q.autoDel = b
}

// SetExpire sets visibility timeout for message.
func (q *Queue) SetExpire(sec int) {
    q.expire = sec
}

// SetWaitTimeSeconds sets wait time timeout for message.
// Setting this value allows for a long polling workflow.
func (q *Queue) SetWaitTimeSeconds(sec int) {
    q.waitTimeSeconds = sec
}

// AddMessage adds message to the send spool.
// This assumes a Standard SQS Queue and not a FifoQueue
func (q *Queue) AddMessage(message string) {
    q.sendSpoolMu.Lock()
    defer q.sendSpoolMu.Unlock()

    num := fmt.Sprint(len(q.sendSpool) + 1)
    m := &SDK.SendMessageBatchRequestEntry{
        MessageBody: pointers.String(message),
        Id:          pointers.String(defaultMessageIDPrefix + num), // serial numbering for convenience sake
    }
    q.sendSpool = append(q.sendSpool, m)
}

// AddMessageWithGroupID adds a message to the send spool but adds the required attributes
// for a SQS FIFO Queue. This assumes the SQS FIFO Queue has ContentBasedDeduplication enabled.
func (q *Queue) AddMessageWithGroupID(message string, messageGroupID string) {
    q.sendSpoolMu.Lock()
    defer q.sendSpoolMu.Unlock()

    num := fmt.Sprint(len(q.sendSpool) + 1)
    m := &SDK.SendMessageBatchRequestEntry{
        MessageBody:    pointers.String(message),
        Id:             pointers.String(defaultMessageIDPrefix + num), // serial numbering for convenience sake
        MessageGroupId: pointers.String(messageGroupID),
    }
    q.sendSpool = append(q.sendSpool, m)
}

// AddMessageJSONMarshal adds message to the send pool with encoding json data.
func (q *Queue) AddMessageJSONMarshal(message interface{}) error {
    msg, err := json.Marshal(message)
    if err != nil {
        q.service.Errorf("error on Queue.AddMessageJSONMarshal `json.Marshal` message=%s; error=%s;", fmt.Sprint(msg), err.Error())
        return err
    }

    q.AddMessage(string(msg))
    return nil
}

// AddMessageMap adds message to the send pool from map data.
func (q *Queue) AddMessageMap(message map[string]interface{}) error {
    return q.AddMessageJSONMarshal(message)
}

// Send sends messages in the send spool
func (q *Queue) Send() error {
    q.sendSpoolMu.Lock()
    defer q.sendSpoolMu.Unlock()

    messages := make(map[int][]*SDK.SendMessageBatchRequestEntry)
    spool := q.sendSpool
    switch {
    case len(spool) > 10:
        for i, msg := range spool {
            v := (i + 1) / 10
            messages[v] = append(messages[v], msg)
        }
    default:
        // pack the messages ten each to follow the SQS restriction.
        messages[0] = append(messages[0], spool...)
    }

    errList := newErrors()
    // send message
    for i := 0; i < len(messages); i++ {
        err := q.send(messages[i])
        if err != nil {
            q.service.Errorf("error on `SendMessageBatch` operation; queue=%s; error=%s;", q.nameWithPrefix, err.Error())
            errList.Add(err)
        }
    }
    q.sendSpool = nil

    if errList.HasError() {
        return errList
    }
    return nil
}

// send operates SendMessageBatchInput ands sends a packed message.
func (q *Queue) send(msg []*SDK.SendMessageBatchRequestEntry) error {
    res, err := q.service.client.SendMessageBatch(&SDK.SendMessageBatchInput{
        Entries:  msg,
        QueueUrl: q.url,
    })
    if len(res.Failed) != 0 {
        q.failedMu.Lock()
        defer q.failedMu.Unlock()
        q.failedSend = append(q.failedSend, res.Failed...)
    }
    return err
}

// SendSingleMessage sends a message directly to the SQS immediately
// and bypasses the spool and batch submits.
func (q *Queue) SendSingleMessage(message string) (string, error) {
    res, err := q.service.client.SendMessage(&SDK.SendMessageInput{
        MessageBody: pointers.String(message),
        QueueUrl:    q.url,
    })
    return res.GoString(), err
}

// Fetch fetches message list from the queue with limit.
func (q *Queue) Fetch(num int) ([]*Message, error) {
    wait := q.waitTimeSeconds

    if wait == 0 && num > 1 {
        wait = 1 // use long-polling for 1sec when to get multiple messages
    }

    // receive message from AWS api
    resp, err := q.service.client.ReceiveMessage(&SDK.ReceiveMessageInput{
        QueueUrl:            q.url,
        WaitTimeSeconds:     pointers.Long(wait),
        MaxNumberOfMessages: pointers.Long(num),
        VisibilityTimeout:   pointers.Long(q.expire),
    })
    if err != nil {
        q.service.Errorf("error on `ReceiveMessage` operation; queue=%s; error=%s;", q.nameWithPrefix, err.Error())
    }

    if resp == nil || len(resp.Messages) == 0 {
        return nil, err
    }

    // delete messages automatically
    if q.autoDel {
        q.AddDeleteList(resp.Messages)
        defer q.DeleteListItems()
    }

    list := make([]*Message, len(resp.Messages))
    for i, msg := range resp.Messages {
        list[i] = NewMessage(msg)
    }
    return list, err
}

// FetchOne fetches a single message.
func (q *Queue) FetchOne() (*Message, error) {
    msgList, err := q.Fetch(1)
    switch {
    case err != nil:
        return nil, err
    case len(msgList) == 0:
        return nil, nil
    }

    return msgList[0], nil
}

// FetchBody fetches only the body of messages.
// ** cannot handle deletion manually as lack of MessageId and ReceiptHandle **
func (q *Queue) FetchBody(num int) []string {
    msgList, err := q.Fetch(num)
    switch {
    case err != nil:
        return nil
    case len(msgList) == 0:
        return nil
    }

    bodies := make([]string, len(msgList))
    for i, msg := range msgList {
        bodies[i] = msg.Body()
    }

    q.AddDeleteList(msgList)
    if q.autoDel {
        defer q.DeleteListItems()
    }
    return bodies
}

// FetchBodyOne fetches the body of a single message.
// ** cannot handle deletion manually as lack of MessageId and ReceiptHandle **
func (q *Queue) FetchBodyOne() string {
    bodies := q.FetchBody(1)
    if len(bodies) == 0 {
        return ""
    }
    return bodies[0]
}

// AddDeleteList adds a message to the delete spool.
func (q *Queue) AddDeleteList(msg interface{}) {
    switch v := msg.(type) {
    case *SDK.Message:
        q.deleteSpoolMu.Lock()
        defer q.deleteSpoolMu.Unlock()
        q.deleteSpool = append(q.deleteSpool, &SDK.DeleteMessageBatchRequestEntry{
            Id:            v.MessageId,
            ReceiptHandle: v.ReceiptHandle,
        })
    case *Message:
        q.deleteSpoolMu.Lock()
        defer q.deleteSpoolMu.Unlock()
        q.deleteSpool = append(q.deleteSpool, &SDK.DeleteMessageBatchRequestEntry{
            Id:            v.GetMessageID(),
            ReceiptHandle: v.GetReceiptHandle(),
        })
    case []*SDK.Message:
        for _, m := range v {
            q.AddDeleteList(m)
        }
    case []*Message:
        for _, m := range v {
            q.AddDeleteList(m.message)
        }
    }
}

// ChangeMessageVisibility sends the request to AWS api to change visibility of the message.
func (q *Queue) ChangeMessageVisibility(msg *Message, timeoutInSeconds int) error {
    _, err := q.service.client.ChangeMessageVisibility(&SDK.ChangeMessageVisibilityInput{
        QueueUrl:          q.url,
        VisibilityTimeout: pointers.Long(timeoutInSeconds),
        ReceiptHandle:     msg.GetReceiptHandle(),
    })
    if err != nil {
        q.service.Errorf("error on `ChangeMessageVisibility`; queue=%s; error=%s;", q.nameWithPrefix, err.Error())
    }
    return err
}

// DeleteMessage sends the request to AWS api to delete the message.
func (q *Queue) DeleteMessage(msg *Message) error {
    _, err := q.service.client.DeleteMessage(&SDK.DeleteMessageInput{
        QueueUrl:      q.url,
        ReceiptHandle: msg.GetReceiptHandle(),
    })
    if err != nil {
        q.service.Errorf("error on `DeleteMessage`; queue=%s; error=%s;", q.nameWithPrefix, err.Error())
    }
    return err
}

// DeleteMessageWithReceipt sends the request to AWS api to delete the message.
func (q *Queue) DeleteMessageWithReceipt(msgReceipt string) error {
    _, err := q.service.client.DeleteMessage(&SDK.DeleteMessageInput{
        QueueUrl:      q.url,
        ReceiptHandle: pointers.String(msgReceipt),
    })
    if err != nil {
        q.service.Errorf("error on `DeleteMessage`; queue=%s; error=%s;", q.nameWithPrefix, err.Error())
    }
    return err
}

// DeleteListItems executes delete operation in the delete spool.
func (q *Queue) DeleteListItems() error {
    q.deleteSpoolMu.Lock()
    defer q.deleteSpoolMu.Unlock()

    // pack the messages ten each to meet the SQS restriction.
    spool := q.deleteSpool
    msgCount := len(q.deleteSpool)
    if msgCount == 0 {
        return nil
    }

    messages := make(map[int][]*SDK.DeleteMessageBatchRequestEntry)
    switch {
    case msgCount > 10:
        for i, msg := range spool {
            v := (i + 1) / 10
            messages[v] = append(messages[v], msg)
        }
    default:
        messages[0] = append(messages[0], q.deleteSpool...)
    }

    // delete messages sequentially
    errList := newErrors()
    for i := 0; i < len(messages); i++ {
        err := q.delete(messages[i])
        if err != nil {
            errList.Add(err)
        }
    }
    q.deleteSpool = nil

    if errList.HasError() {
        return errList
    }
    return nil
}

// delete operates DeleteMessageBatchInput and deletes a packed message.
func (q *Queue) delete(msg []*SDK.DeleteMessageBatchRequestEntry) error {
    if len(msg) == 0 {
        return nil
    }

    res, err := q.service.client.DeleteMessageBatch(&SDK.DeleteMessageBatchInput{
        Entries:  msg,
        QueueUrl: q.url,
    })
    if err != nil {
        q.failedMu.Lock()
        defer q.failedMu.Unlock()
        q.service.Errorf("error on `DeleteMessageBatch`; queue=%s; error=%s;", q.nameWithPrefix, err.Error())
        q.failedDelete = append(q.failedDelete, res.Failed...)
    }

    return err
}

// CountMessage sends request to AWS api to counts left messages in the Queue.
func (q *Queue) CountMessage() (visible int, invisible int, err error) {
    attr, err := q.service.GetQueueAttributes(*q.url,
        AttributeApproximateNumberOfMessages,
        AttributeApproximateNumberOfMessagesNotVisible,
    )
    if err != nil {
        return 0, 0, err
    }

    return attr.ApproximateNumberOfMessages, attr.ApproximateNumberOfMessagesNotVisible, nil
}

// GetAttributes sends request to AWS api to get the queue's attributes.
// `AttributeNames` will be set as `All`.
func (q *Queue) GetAttributes() (AttributesResponse, error) {
    return q.service.GetQueueAttributes(*q.url)
}

// Purge deletes all messages in the Queue.
func (q *Queue) Purge() error {
    _, err := q.service.client.PurgeQueue(&SDK.PurgeQueueInput{
        QueueUrl: q.url,
    })
    if err != nil {
        q.service.Errorf("error on `PurgeQueue` operation; queue=%s; error=%s;", q.nameWithPrefix, err.Error())
        return err
    }

    q.service.Infof("success on `PurgeQueue` operation; queue=%s;", q.nameWithPrefix)
    return nil
}

// FailedResults contains failed results of batch request.
type FailedResults struct {
    Send   []*SDK.BatchResultErrorEntry
    Delete []*SDK.BatchResultErrorEntry
}

// GetFailedResults gets failed results of batch request.
func (q *Queue) GetFailedResults() FailedResults {
    return FailedResults{
        Send:   q.failedSend,
        Delete: q.failedDelete,
    }
}

// ClearFailedResults resets failed results of batch request.
func (q *Queue) ClearFailedResults() {
    q.failedMu.Lock()
    defer q.failedMu.Unlock()
    q.failedSend = nil
    q.failedDelete = nil
}