status-im/status-go

View on GitHub
protocol/anonmetrics/client.go

Summary

Maintainability
A
0 mins
Test Coverage
F
0%
package anonmetrics

import (
    "context"
    "crypto/ecdsa"
    "errors"
    "sync"
    "time"

    "github.com/golang/protobuf/proto"
    "go.uber.org/zap"

    "github.com/status-im/status-go/appmetrics"
    gocommon "github.com/status-im/status-go/common"
    "github.com/status-im/status-go/eth-node/crypto"
    "github.com/status-im/status-go/protocol/common"
    "github.com/status-im/status-go/protocol/protobuf"
)

const ActiveClientPhrase = "yes i am wanting the activation of the anon metrics client, please thank you lots thank you"

type ClientConfig struct {
    ShouldSend  bool
    SendAddress *ecdsa.PublicKey
    Active      string
}

type Client struct {
    Config   *ClientConfig
    DB       *appmetrics.Database
    Identity *ecdsa.PrivateKey
    Logger   *zap.Logger

    //messageSender is a message processor used to send metric batch messages
    messageSender *common.MessageSender

    IntervalInc *FibonacciIntervalIncrementer

    // mainLoopQuit is a channel that concurrently orchestrates that the main loop that should be terminated
    mainLoopQuit chan struct{}

    // deleteLoopQuit is a channel that concurrently orchestrates that the delete loop that should be terminated
    deleteLoopQuit chan struct{}

    // DBLock prevents deletion of DB items during mainloop
    DBLock sync.Mutex
}

func NewClient(sender *common.MessageSender) *Client {
    return &Client{
        messageSender: sender,
        IntervalInc: &FibonacciIntervalIncrementer{
            Last:    0,
            Current: 1,
        },
    }
}

func (c *Client) sendUnprocessedMetrics() {
    if c.Config.Active != ActiveClientPhrase {
        return
    }

    c.Logger.Debug("sendUnprocessedMetrics() triggered")

    c.DBLock.Lock()
    defer c.DBLock.Unlock()

    // Get all unsent metrics grouped by session id
    uam, err := c.DB.GetUnprocessedGroupedBySession()
    if err != nil {
        c.Logger.Error("failed to get unprocessed messages grouped by session", zap.Error(err))
    }
    c.Logger.Debug("unprocessed metrics from db", zap.Reflect("uam", uam))

    for session, batch := range uam {
        c.Logger.Debug("processing uam from session", zap.String("session", session))

        // Convert the metrics into protobuf
        amb, err := adaptModelsToProtoBatch(batch, &c.Identity.PublicKey)
        if err != nil {
            c.Logger.Error("failed to adapt models to protobuf batch", zap.Error(err))
            return
        }

        // Generate an ephemeral key per session id
        ephemeralKey, err := crypto.GenerateKey()
        if err != nil {
            c.Logger.Error("failed to generate an ephemeral key", zap.Error(err))
            return
        }

        // Prepare the protobuf message
        encodedMessage, err := proto.Marshal(amb)
        if err != nil {
            c.Logger.Error("failed to marshal protobuf", zap.Error(err))
            return
        }
        rawMessage := common.RawMessage{
            Payload:             encodedMessage,
            Sender:              ephemeralKey,
            SkipEncryptionLayer: true,
            SendOnPersonalTopic: true,
            MessageType:         protobuf.ApplicationMetadataMessage_ANONYMOUS_METRIC_BATCH,
        }

        c.Logger.Debug("rawMessage prepared from unprocessed anonymous metrics", zap.Reflect("rawMessage", rawMessage))

        // Send the metrics batch
        _, err = c.messageSender.SendPrivate(context.Background(), c.Config.SendAddress, &rawMessage)
        if err != nil {
            c.Logger.Error("failed to send metrics batch message", zap.Error(err))
            return
        }

        // Mark metrics as processed
        err = c.DB.SetToProcessed(batch)
        if err != nil {
            c.Logger.Error("failed to set metrics as processed in db", zap.Error(err))
        }
    }
}

func (c *Client) mainLoop() error {
    if c.Config.Active != ActiveClientPhrase {
        return nil
    }

    c.Logger.Debug("mainLoop() triggered")

    for {
        c.sendUnprocessedMetrics()

        waitFor := time.Duration(c.IntervalInc.Next()) * time.Second
        c.Logger.Debug("mainLoop() wait interval set", zap.Duration("waitFor", waitFor))
        select {
        case <-time.After(waitFor):
        case <-c.mainLoopQuit:
            return nil
        }
    }
}

func (c *Client) startMainLoop() {
    if c.Config.Active != ActiveClientPhrase {
        return
    }

    c.Logger.Debug("startMainLoop() triggered")

    c.stopMainLoop()
    c.mainLoopQuit = make(chan struct{})
    go func() {
        defer gocommon.LogOnPanic()
        c.Logger.Debug("startMainLoop() anonymous go routine triggered")
        err := c.mainLoop()
        if err != nil {
            c.Logger.Error("main loop exited with an error", zap.Error(err))
        }
    }()
}

func (c *Client) deleteLoop() error {
    // Sleep to give the main lock time to process any old messages
    time.Sleep(time.Second * 10)

    for {
        func() {
            c.DBLock.Lock()
            defer c.DBLock.Unlock()

            oneWeekAgo := time.Now().Add(time.Hour * 24 * 7 * -1)
            err := c.DB.DeleteOlderThan(&oneWeekAgo)
            if err != nil {
                c.Logger.Error("failed to delete metrics older than given time",
                    zap.Time("time given", oneWeekAgo),
                    zap.Error(err))
            }
        }()

        select {
        case <-time.After(time.Hour):
        case <-c.deleteLoopQuit:
            return nil
        }
    }
}

func (c *Client) startDeleteLoop() {
    c.stopDeleteLoop()
    c.deleteLoopQuit = make(chan struct{})
    go func() {
        defer gocommon.LogOnPanic()
        err := c.deleteLoop()
        if err != nil {
            c.Logger.Error("delete loop exited with an error", zap.Error(err))
        }
    }()
}

func (c *Client) Start() error {
    c.Logger.Debug("Main Start() triggered")
    if c.messageSender == nil {
        return errors.New("can't start, missing message processor")
    }

    c.startMainLoop()
    c.startDeleteLoop()
    return nil
}

func (c *Client) stopMainLoop() {
    c.Logger.Debug("stopMainLoop() triggered")

    if c.mainLoopQuit != nil {
        c.Logger.Debug("mainLoopQuit not set, attempting to close")

        close(c.mainLoopQuit)
        c.mainLoopQuit = nil
    }
}

func (c *Client) stopDeleteLoop() {
    if c.deleteLoopQuit != nil {
        close(c.deleteLoopQuit)
        c.deleteLoopQuit = nil
    }
}

func (c *Client) Stop() error {
    c.stopMainLoop()
    c.stopDeleteLoop()
    return nil
}