pstuifzand/ekster

View on GitHub
pkg/timeline/redisstreams.go

Summary

Maintainability
A
0 mins
Test Coverage
package timeline

import (
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github.com/gomodule/redigo/redis"
    "p83.nl/go/ekster/pkg/microsub"
)

type redisStreamTimeline struct {
    channel, channelKey string

    pool *redis.Pool
}

/*
 * REDIS STREAMS TIMELINE
 */
func (timeline *redisStreamTimeline) Init() error {
    timeline.channelKey = fmt.Sprintf("stream:%s", timeline.channel)
    return nil
}

func (timeline *redisStreamTimeline) Items(before, after string) (microsub.Timeline, error) {
    conn := timeline.pool.Get()
    defer conn.Close()

    if before == "" {
        before = "-"
    }

    if after == "" {
        after = "+"
    }

    results, err := redis.Values(conn.Do("XREVRANGE", redis.Args{}.Add(timeline.channelKey, after, before, "COUNT", "20")...))
    if err != nil {
        return microsub.Timeline{}, err
    }

    var forRedis redisItem

    var items []microsub.Item
    for _, result := range results {
        if value, ok := result.([]interface{}); ok {
            id, ok2 := value[0].([]uint8)

            if item, ok3 := value[1].([]interface{}); ok3 {
                err = redis.ScanStruct(item, &forRedis)
                if err != nil {
                    continue
                }
                item := forRedis.Item()
                if ok2 {
                    item.ID = string(id)
                }
                items = append(items, item)
            }
        }
    }

    return microsub.Timeline{
        Items: items,
        Paging: microsub.Pagination{
            After: items[len(items)-1].ID,
        },
    }, nil
}

func (timeline *redisStreamTimeline) AddItem(item microsub.Item) (bool, error) {
    conn := timeline.pool.Get()
    defer conn.Close()

    if item.Published == "" {
        item.Published = time.Now().Format(time.RFC3339)
    }

    data, err := json.Marshal(item)
    if err != nil {
        log.Printf("error while creating item for redis: %v\n", err)
        return false, err
    }

    args := redis.Args{}.Add(timeline.channelKey).Add("*").Add("ID").Add(item.ID).Add("Published").Add(item.Published).Add("Read").Add(item.Read).Add("Data").Add(data)

    _, err = redis.String(conn.Do("XADD", args...))

    _, _ = conn.Do("XTRIM", timeline.channelKey, "MAXLEN", "~", "250")

    return err == nil, err
}

func (timeline *redisStreamTimeline) Count() (int, error) {
    conn := timeline.pool.Get()
    defer conn.Close()

    return redis.Int(conn.Do("XLEN", timeline.channelKey))
}

func (timeline *redisStreamTimeline) MarkRead(uids []string) error {
    // panic("implement me")
    return nil
}

func (timeline *redisStreamTimeline) MarkUnread(uids []string) error {
    // panic("implement me")
    return nil
}