pstuifzand/ekster

View on GitHub
pkg/timeline/redisset.go

Summary

Maintainability
A
2 hrs
Test Coverage
package timeline

import (
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github.com/gomodule/redigo/redis"
    "p83.nl/go/ekster/pkg/microsub"
)

type redisSortedSetTimeline struct {
    channel string
    pool    *redis.Pool
}

/*
 * REDIS SORTED SETS TIMELINE
 */
func (timeline *redisSortedSetTimeline) Init() error {
    return nil
}

func (timeline *redisSortedSetTimeline) Items(before, after string) (microsub.Timeline, error) {
    conn := timeline.pool.Get()
    defer conn.Close()

    items := []microsub.Item{}

    channel := timeline.channel

    zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)

    afterScore := "-inf"
    if len(after) != 0 {
        afterScore = "(" + after
    }
    beforeScore := "+inf"
    if len(before) != 0 {
        beforeScore = "(" + before
    }

    var itemJSONs [][]byte

    itemScores, err := redis.Strings(
        conn.Do(
            "ZRANGEBYSCORE",
            zchannelKey,
            afterScore,
            beforeScore,
            "LIMIT",
            0,
            20,
            "WITHSCORES",
        ),
    )

    if err != nil {
        return microsub.Timeline{
            Paging: microsub.Pagination{},
            Items:  items,
        }, err
    }

    if len(itemScores) >= 2 {
        before = itemScores[1]
        after = itemScores[len(itemScores)-1]
    } else {
        before = ""
        after = ""
    }

    for i := 0; i < len(itemScores); i += 2 {
        itemID := itemScores[i]
        itemJSON, err := redis.Bytes(conn.Do("HGET", itemID, "Data"))
        if err != nil {
            log.Println(err)
            continue
        }
        itemJSONs = append(itemJSONs, itemJSON)
    }

    for _, obj := range itemJSONs {
        item := microsub.Item{}
        err := json.Unmarshal(obj, &item)
        if err != nil {
            // FIXME: what should we do if one of the items doen't unmarshal?
            log.Println(err)
            continue
        }
        item.Read = false
        items = append(items, item)
    }
    paging := microsub.Pagination{
        After:  after,
        Before: before,
    }

    return microsub.Timeline{
        Paging: paging,
        Items:  items,
    }, nil
}

func (timeline *redisSortedSetTimeline) AddItem(item microsub.Item) (bool, error) {
    conn := timeline.pool.Get()
    defer conn.Close()

    channel := timeline.channel
    zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)

    if item.Published == "" {
        item.Published = time.Now().Format(time.RFC3339)
    }

    // Fix date when it almost matches with RFC3339, except the colon in the timezone
    format := "2006-01-02T15:04:05Z0700"
    if parsedDate, err := time.Parse(format, item.Published); err == nil {
        item.Published = parsedDate.Format(time.RFC3339)
    }

    data, err := json.Marshal(item)
    if err != nil {
        return false, fmt.Errorf("couldn't marshal item for redis: %s", err)
    }

    forRedis := redisItem{
        ID:        item.ID,
        Published: item.Published,
        Read:      item.Read,
        Data:      data,
    }

    itemKey := fmt.Sprintf("item:%s", item.ID)
    _, err = redis.String(conn.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...))
    if err != nil {
        return false, fmt.Errorf("writing failed for item to redis: %v", err)
    }

    readChannelKey := fmt.Sprintf("channel:%s:read", channel)
    isRead, err := redis.Bool(conn.Do("SISMEMBER", readChannelKey, itemKey))
    if err != nil {
        return false, err
    }

    if isRead {
        return false, nil
    }

    score, err := time.Parse(time.RFC3339, item.Published)
    if err != nil {
        return false, fmt.Errorf("can't parse %s as time", item.Published)
    }

    n, err := redis.Int64(conn.Do("ZADD", zchannelKey, score.Unix()*1.0, itemKey))
    if err != nil {
        return false, fmt.Errorf("zadding failed item %s to channel %s for redis: %v", itemKey, zchannelKey, err)
    }

    return n == 1, nil
}

func (timeline *redisSortedSetTimeline) Count() (int, error) {
    conn := timeline.pool.Get()
    defer conn.Close()

    channel := timeline.channel
    zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
    unread, err := redis.Int(conn.Do("ZCARD", zchannelKey))
    if err != nil {
        return -1, fmt.Errorf("while updating channel unread count for %s: %s", channel, err)
    }
    return unread, nil
}

func (timeline *redisSortedSetTimeline) MarkRead(uids []string) error {
    conn := timeline.pool.Get()
    defer conn.Close()

    channel := timeline.channel

    itemUIDs := []string{}
    for _, uid := range uids {
        itemUIDs = append(itemUIDs, "item:"+uid)
    }

    channelKey := fmt.Sprintf("channel:%s:read", channel)
    args := redis.Args{}.Add(channelKey).AddFlat(itemUIDs)

    if _, err := conn.Do("SADD", args...); err != nil {
        return fmt.Errorf("marking read for channel %s has failed: %s", channel, err)
    }

    zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
    args = redis.Args{}.Add(zchannelKey).AddFlat(itemUIDs)

    if _, err := conn.Do("ZREM", args...); err != nil {
        return fmt.Errorf("marking read for channel %s has failed: %s", channel, err)
    }

    return nil
}

func (timeline *redisSortedSetTimeline) MarkUnread(uids []string) error {
    panic("implement me")
}