package main

import (

var (
    rabbitHost     = flag.String("rabbit-host", "", "RabbitMQ host")
    rabbitPort     = flag.Int("rabbit-port", 5672, "RabbitMQ port")
    rabbitUser     = flag.String("rabbit-user", "guest", "RabbitMQ username")
    rabbitPassword = flag.String("rabbit-password", "guest", "RabbitMQ password")
    rabbitExchange = flag.String("rabbit-exchange", "logs", "RabbitMQ exchange name")
    redisHost      = flag.String("redis-host", "", "redis host")
    redisPort      = flag.Int("redis-port", 6379, "redis port")
    debugMode      = flag.Bool("loglevel", false, "debug mode")
    metricsPort    = flag.Int("metrics-port", 44444, "expvar stats port")

const (
    bucketLength    int64 = 30 //in days
    secondsInDay    int64 = 60 * 60 * 24
    secondsInBucket int64 = secondsInDay * bucketLength

const (
    processQueue = "queue"
    bucketPrefix = "m_"

type redisPool struct {

func main() {
    common.DebugLevel = *debugMode
    common.Info("disctinct name Worker")
    common.Info(`collects daily occurrences of distinct events in Redis. 
                Metrics that are older than 30 days are merged into a monthly bucket, 
                then cleared.`)

    common.Info("connecting to Redis...")

    r, err := OpenRedis(*redisHost, *redisPort)
    if err != nil {
        log.Fatalln("Cannot dial redis", err)
    defer r.Close()
    common.Info("connecting to RabbitMQ...")

    connector, err := common.BuildRabbitMQConnector(*rabbitHost, *rabbitPort, *rabbitUser, *rabbitPassword, *rabbitExchange)
    if err != nil {
        log.Fatalln("cannot connect to rabbitmq", err)
    defer connector.Close()
    common.Info("Starting worker proccesor")

    connector.Handle("distinct-name", func(b []byte) bool {
        d := common.MustUnmarshallFromJSON(b)
        if _, err = r.processMetric(d); err != nil {
            log.Println("error inserting in redis", err)
            return false //requeue
        return true

    if err != nil {
        log.Fatalln("error connecting to Rabbit", err)
    common.Info("Starting a metrics http server")

    bindTo := fmt.Sprintf(":%v", *metricsPort)
    go http.ListenAndServe(bindTo, nil)

    common.Info("Starting old buckets ticker")
    for range time.Tick(time.Minute) {
        if _, err = r.processBuckets(); err != nil {
            log.Println("error proccesing buckets:", err)

func OpenRedis(host string, port int) (*redisPool, error) {
    c := &redis.Pool{
        MaxIdle:     5,
        MaxActive:   30,
        IdleTimeout: 240 * time.Second,
        Dial: func() (redis.Conn, error) {
            return redis.Dial("tcp", fmt.Sprintf("%s:%v", host, port))
        TestOnBorrow: func(c redis.Conn, t time.Time) error {
            _, err := c.Do("PING")
            return err

    r := &redisPool{c}
    return r, nil

func (r *redisPool) processMetric(m common.MetricEntry) (interface{}, error) {
    c := r.Get()
    defer c.Close()

    s := begginingOfDayUnix(m.Time)

    //increments the occurrences for the given metric
    c.Send("ZINCRBY", s, 1, m.Metric)
    //adds metrics unix day number(begginingOfDayUnix) to the process queue.
    //example: if metric occured at 18th nov 9pm UTC (1447095600)
    // it will add the day identified by begginingOfDayUnix(m.Time)
    //to the process queue

    c.Send("ZADD", processQueue, s, s)
    return c.Do("EXEC")

func (r *redisPool) processBuckets() (string, error) {
    c := r.Get()
    defer c.Close()

    t := time.Now()
    //get all elements which correpond to an older bucket.
    //NOTE: currently supports just a single element
    ids, err := redis.Values(c.Do("ZRANGEBYSCORE", "queue", "-inf", begginingOfBucketUnix(t)))
    if err != nil {
        return "", err
    total := len(ids)

    if total == 0 { //nothing to process
        return "", nil

    //name the proccesed bucket
    dest := bucketPrefix + strconv.FormatInt(begginingOfDayUnix(t), 10)

    // //check if similar ids have been proccesed
    // //if there is a previously computed set, add to the arguments
    // exists, err := redis.Bool(r.Do("EXISTS", dest))
    // if err != nil {
    //     return nil, err
    // }
    // if exists {
    //     total++
    //     ids = append(ids, dest)
    // }


    params := []interface{}{dest, strconv.Itoa(total)}
    args := append(params, ids...)

    //sum all daily based occurences (loaded form the queue set) and save the result at 'dest'
    c.Send("ZUNIONSTORE", args...)

    right := begginingOfDayUnix(t) - 1 // [....) interval
    left := right - secondsInBucket

    //delete all the days corresponding to the bucket from the processing queue
    c.Send("ZREMRANGEBYSCORE", "queue", left, right)

    //delete all day-based keys
    c.Send("DEL", ids...)
    _, err = c.Do("EXEC")
    return dest, err

//begginingOfDayUnix returns beggining of day(as unix time) for the given time.Time
func begginingOfDayUnix(t time.Time) int64 {
    tu := t.UTC().Unix()
    return tu - tu%secondsInDay

func begginingOfBucketUnix(t time.Time) int64 {
    tu := t.UTC().Unix()
    return tu - tu%secondsInBucket