status-im/status-go

View on GitHub
protocol/anonmetrics/server.go

Summary

Maintainability
A
0 mins
Test Coverage
F
0%
package anonmetrics

import (
    "database/sql"

    // Import postgres driver
    _ "github.com/lib/pq"
    "go.uber.org/zap"

    "github.com/status-im/migrate/v4"
    "github.com/status-im/migrate/v4/database/postgres"
    bindata "github.com/status-im/migrate/v4/source/go_bindata"

    "github.com/status-im/status-go/appmetrics"
    "github.com/status-im/status-go/protocol/anonmetrics/migrations"
    "github.com/status-im/status-go/protocol/protobuf"
)

const ActiveServerPhrase = "I was thinking that it would be a pretty nice idea if the server functionality was working now, I express gratitude in the anticipation"

type ServerConfig struct {
    Enabled     bool
    PostgresURI string
    Active      string
}

type Server struct {
    Config     *ServerConfig
    Logger     *zap.Logger
    PostgresDB *sql.DB
}

func NewServer(postgresURI string) (*Server, error) {
    postgresMigration := bindata.Resource(migrations.AssetNames(), migrations.Asset)
    db, err := NewMigratedDB(postgresURI, postgresMigration)
    if err != nil {
        return nil, err
    }

    return &Server{
        PostgresDB: db,
    }, nil
}

func (s *Server) Stop() error {
    if s.PostgresDB != nil {
        return s.PostgresDB.Close()
    }
    return nil
}

func (s *Server) StoreMetrics(appMetricsBatch *protobuf.AnonymousMetricBatch) (appMetrics []*appmetrics.AppMetric, err error) {
    if s.Config.Active != ActiveServerPhrase {
        return nil, nil
    }

    s.Logger.Debug("StoreMetrics() triggered with payload",
        zap.Reflect("appMetricsBatch", appMetricsBatch))
    appMetrics, err = adaptProtoBatchToModels(appMetricsBatch)
    if err != nil {
        return
    }

    var (
        tx     *sql.Tx
        insert *sql.Stmt
    )

    // start txn
    tx, err = s.PostgresDB.Begin()
    if err != nil {
        return
    }

    defer func() {
        if err == nil {
            err = tx.Commit()
            return
        }
        _ = tx.Rollback()
    }()

    //noinspection ALL
    query := `INSERT INTO app_metrics (message_id, event, value, app_version, operating_system, session_id, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (message_id) DO NOTHING;`

    insert, err = tx.Prepare(query)
    if err != nil {
        return
    }

    for _, metric := range appMetrics {
        _, err = insert.Exec(
            metric.MessageID,
            metric.Event,
            metric.Value,
            metric.AppVersion,
            metric.OS,
            metric.SessionID,
            metric.CreatedAt,
        )
        if err != nil {
            return
        }
    }
    return
}

func (s *Server) getFromRows(rows *sql.Rows) (appMetrics []appmetrics.AppMetric, err error) {
    for rows.Next() {
        metric := appmetrics.AppMetric{}
        err = rows.Scan(
            &metric.ID,
            &metric.MessageID,
            &metric.Event,
            &metric.Value,
            &metric.AppVersion,
            &metric.OS,
            &metric.SessionID,
            &metric.CreatedAt,
            &metric.Processed,
            &metric.ReceivedAt,
        )
        if err != nil {
            return nil, err
        }
        appMetrics = append(appMetrics, metric)
    }
    return appMetrics, nil
}

func (s *Server) GetAppMetrics(limit int, offset int) ([]appmetrics.AppMetric, error) {
    if s.Config.Active != ActiveServerPhrase {
        return nil, nil
    }

    rows, err := s.PostgresDB.Query("SELECT id, message_id, event, value, app_version, operating_system, session_id, created_at, processed, received_at FROM app_metrics LIMIT $1 OFFSET $2", limit, offset)
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    return s.getFromRows(rows)
}

func NewMigratedDB(uri string, migrationResource *bindata.AssetSource) (*sql.DB, error) {
    db, err := sql.Open("postgres", uri)
    if err != nil {
        return nil, err
    }

    if err := setup(db, migrationResource); err != nil {
        return nil, err
    }

    return db, nil
}

func setup(d *sql.DB, migrationResource *bindata.AssetSource) error {
    m, err := MakeMigration(d, migrationResource)
    if err != nil {
        return err
    }

    if err = m.Up(); err != migrate.ErrNoChange {
        return err
    }

    return nil
}

func MakeMigration(d *sql.DB, migrationResource *bindata.AssetSource) (*migrate.Migrate, error) {
    source, err := bindata.WithInstance(migrationResource)
    if err != nil {
        return nil, err
    }

    driver, err := postgres.WithInstance(d, &postgres.Config{})
    if err != nil {
        return nil, err
    }

    return migrate.NewWithInstance(
        "go-bindata",
        source,
        "postgres",
        driver)
}