
View on GitHub


35 mins
Test Coverage
package queue

import (


var (
    client    *redis.Client
    pubsub    *redis.PubSub
    scheduler *cron.Cron
    isDev     bool

    emailer *Email
    biller  *Billing

    executors map[TaskID]TaskExecutor

// New initializes the queue tasks.
func New(rc *redis.Client, isDev bool, ex map[TaskID]TaskExecutor) {
    client = rc

    // built-in executor
    emailer = &Email{}
    biller = &Billing{}
    if isDev {
        emailer.Send = emailer.sendEmailDev
    } else {
        emailer.Send = emailer.sendEmailProd

    executors = ex

// SetAsSubscriber makes this instance a Pub/Sub subscriber. Each message queued
// will be processed by this instance.
func SetAsSubscriber() {
    scheduler = cron.New()

    pubsub = client.Subscribe("q")
    if err := pubsub.Ping("test"); err != nil {
        log.Fatal("unable to ping pubsub", err)
    defer func() {

    if _, err := pubsub.Receive(); err != nil {
        log.Fatal("unable to receive from pubsub channel", err)

    // we initialize our scheduler (cron)
    go setupCron()

    ch := pubsub.Channel()

    for {
        msg, ok := <-ch
        if !ok {
            log.Fatal("redis pub/sub is down")

        go process(msg)

func setupCron() {
    if _, err := os.Stat("tasks.cron"); os.IsNotExist(err) {
        log.Println("no tasks.cron file found, skipping scheduler setup")

    b, err := ioutil.ReadFile("tasks.cron")
    if err != nil {
        log.Println("error while reading tasks.cron", err)

    lines := strings.Split(string(b), "\n")
    if len(lines) == 0 {
        log.Println("no tasks found in tasks.cron, skipping scheduler setup")

    for _, line := range lines {
        exp, url := parseTask(line)

        err := scheduler.AddFunc(exp, func() {
            req, err := http.NewRequest("POST", url, bytes.NewReader(b))
            if err != nil {
                log.Println("error while creating an HTTP request to", url)

            req.SetBasicAuth("todo", "here")

            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                log.Println("error while executing an HTTP request to", url)
            defer resp.Body.Close()

            if resp.StatusCode >= 400 {
                log.Println("scheduler HTTP request to ", url, "failed with HTTP status", resp.StatusCode)

        if err != nil {
            log.Fatal("unable to create cron tasks", err)


func parseTask(s string) (exp string, url string) {
    tokens := strings.Split(s, " ")
    url = strings.Join(tokens[len(tokens)-1:], " ")
    exp = strings.Join(tokens[0:len(tokens)-1], " ")

// Enqueue adds a task to the queue.
func Enqueue(id TaskID, data interface{}) error {
    qt := QueueTask{
        ID:      id,
        Data:    data,
        Created: time.Now(),

    b, err := json.Marshal(qt)
    if err != nil {
        return err
    return client.Publish("q", string(b)).Err()

func process(msg *redis.Message) {
    var qt QueueTask
    if err := json.Unmarshal([]byte(msg.Payload), &qt); err != nil {
        log.Fatal("unable to decode this Redis message", err)

    var exec TaskExecutor

    switch qt.ID {
    case TaskEmail:
        exec = emailer
    case TaskCreateInvoice:
        exec = biller
        if ex, ok := executors[qt.ID]; ok {
            exec = ex

    if err := exec.Run(qt); err != nil {
        //TODO: better to log those critical errors
        log.Println("error while executing this task", qt.ID, err)