synapsecns/sanguine

View on GitHub
services/rfq/api/rest/server.go

Summary

Maintainability
A
0 mins
Test Coverage
// Package rest provides RESTful API services for RFQ
package rest

import (
    "context"
    "encoding/json"
    "math/big"

    "fmt"
    "net/http"
    "sync"
    "time"

    "github.com/google/uuid"
    "github.com/ipfs/go-log"
    "github.com/puzpuzpuz/xsync"
    swaggerfiles "github.com/swaggo/files"
    ginSwagger "github.com/swaggo/gin-swagger"
    "github.com/synapsecns/sanguine/core/ginhelper"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/metric"
    "go.opentelemetry.io/otel/trace"

    "github.com/ethereum/go-ethereum/accounts/abi/bind"
    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/crypto"
    "github.com/gin-gonic/gin"
    "github.com/gorilla/websocket"
    "github.com/jellydator/ttlcache/v3"
    "github.com/synapsecns/sanguine/core/metrics"
    baseServer "github.com/synapsecns/sanguine/core/server"
    omniClient "github.com/synapsecns/sanguine/services/omnirpc/client"
    "github.com/synapsecns/sanguine/services/rfq/api/config"
    "github.com/synapsecns/sanguine/services/rfq/api/db"
    "github.com/synapsecns/sanguine/services/rfq/api/docs"
    "github.com/synapsecns/sanguine/services/rfq/api/model"
    "github.com/synapsecns/sanguine/services/rfq/contracts/fastbridge"
    "github.com/synapsecns/sanguine/services/rfq/relayer/relapi"
)

const meterName = "github.com/synapsecns/sanguine/services/rfq/api/rest"

func getCurrentVersion() (string, error) {
    if len(APIversions.Versions) == 0 {
        return "", fmt.Errorf("no versions found")
    }

    return APIversions.Versions[0].Version, nil
}

// QuoterAPIServer is a struct that holds the configuration, database connection, gin engine, RPC client, metrics handler, and fast bridge contracts.
// It is used to initialize and run the API server.
type QuoterAPIServer struct {
    cfg                 config.Config
    db                  db.APIDB
    engine              *gin.Engine
    upgrader            websocket.Upgrader
    omnirpcClient       omniClient.RPCClient
    handler             metrics.Handler
    meter               metric.Meter
    fastBridgeContracts map[uint32]*fastbridge.FastBridge
    roleCache           map[uint32]*ttlcache.Cache[string, bool]
    // relayAckCache contains a set of transactionID values that reflect
    // transactions that have been acked for relay
    relayAckCache *ttlcache.Cache[string, string]
    // ackMux is a mutex used to ensure that only one transaction id can be acked at a time.
    ackMux sync.Mutex
    // latestQuoteAgeGauge is a gauge that records the age of the latest quote.
    latestQuoteAgeGauge metric.Float64ObservableGauge
    // wsClients maintains a mapping of connection ID to a channel for sending quote requests.
    wsClients     *xsync.MapOf[string, WsClient]
    pubSubManager PubSubManager
}

// NewAPI holds the configuration, database connection, gin engine, RPC client, metrics handler, and fast bridge contracts.
// It is used to initialize and run the API server.
func NewAPI(
    ctx context.Context,
    cfg config.Config,
    handler metrics.Handler,
    omniRPCClient omniClient.RPCClient,
    store db.APIDB,
) (*QuoterAPIServer, error) {
    if ctx == nil {
        return nil, fmt.Errorf("context is nil")
    }
    if handler == nil {
        return nil, fmt.Errorf("handler is nil")
    }
    if omniRPCClient == nil {
        return nil, fmt.Errorf("omniRPCClient is nil")
    }
    if store == nil {
        return nil, fmt.Errorf("store is nil")
    }

    docs.SwaggerInfo.Title = "RFQ Quoter API"

    bridges := make(map[uint32]*fastbridge.FastBridge)
    roles := make(map[uint32]*ttlcache.Cache[string, bool])
    for chainID, bridge := range cfg.Bridges {
        chainClient, err := omniRPCClient.GetChainClient(ctx, int(chainID))
        if err != nil {
            return nil, fmt.Errorf("could not create omnirpc client: %w", err)
        }
        bridges[chainID], err = fastbridge.NewFastBridge(common.HexToAddress(bridge), chainClient)
        if err != nil {
            return nil, fmt.Errorf("could not create bridge contract: %w", err)
        }

        // create the roles cache
        roles[chainID] = ttlcache.New[string, bool](
            ttlcache.WithTTL[string, bool](cacheInterval),
        )
        roleCache := roles[chainID]
        go roleCache.Start()
        go func() {
            <-ctx.Done()
            roleCache.Stop()
        }()
    }

    // create the relay ack cache
    relayAckCache := ttlcache.New[string, string](
        ttlcache.WithTTL[string, string](cfg.GetRelayAckTimeout()),
        ttlcache.WithDisableTouchOnHit[string, string](),
    )
    go relayAckCache.Start()
    go func() {
        <-ctx.Done()
        relayAckCache.Stop()
    }()

    q := &QuoterAPIServer{
        cfg:                 cfg,
        db:                  store,
        omnirpcClient:       omniRPCClient,
        handler:             handler,
        meter:               handler.Meter(meterName),
        fastBridgeContracts: bridges,
        roleCache:           roles,
        relayAckCache:       relayAckCache,
        ackMux:              sync.Mutex{},
        wsClients:           xsync.NewMapOf[WsClient](),
        pubSubManager:       NewPubSubManager(),
    }

    // Prometheus metrics setup
    var err error
    q.latestQuoteAgeGauge, err = q.meter.Float64ObservableGauge("latest_quote_age")
    if err != nil {
        return nil, fmt.Errorf("could not create latest quote age gauge: %w", err)
    }

    _, err = q.meter.RegisterCallback(q.recordLatestQuoteAge, q.latestQuoteAgeGauge)
    if err != nil {
        return nil, fmt.Errorf("could not register callback: %w", err)
    }

    return q, nil
}

const (
    // QuoteRoute is the API endpoint for handling quote related requests.
    QuoteRoute = "/quotes"
    // BulkQuotesRoute is the API endpoint for handling bulk quote related requests.
    BulkQuotesRoute = "/bulk_quotes"
    // AckRoute is the API endpoint for handling relay ack related requests.
    AckRoute = "/ack"
    // ContractsRoute is the API endpoint for returning a list of contracts.
    ContractsRoute = "/contracts"
    // RFQStreamRoute is the API endpoint for handling active quote requests via websocket.
    RFQStreamRoute = "/rfq_stream"
    // RFQRoute is the API endpoint for handling RFQ requests.
    RFQRoute = "/rfq"
    // ChainsHeader is the header for specifying chains during a websocket handshake.
    ChainsHeader = "Chains"
    // AuthorizationHeader is the header for specifying the authorization.
    AuthorizationHeader = "Authorization"
    cacheInterval       = time.Minute
)

var logger = log.Logger("rfq-api")

// Run runs the quoter api server.
func (r *QuoterAPIServer) Run(ctx context.Context) error {
    engine := ginhelper.New(logger)
    h := NewHandler(r.db, r.cfg)

    versionNumber, versionNumErr := getCurrentVersion()
    if versionNumErr != nil {
        return fmt.Errorf("could not get current API version: %w", versionNumErr)
    }
    engine.Use(APIVersionMiddleware(versionNumber))
    engine.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerfiles.Handler))

    // Authenticated routes
    quotesPut := engine.Group(QuoteRoute)
    quotesPut.Use(r.AuthMiddleware())
    quotesPut.PUT("", h.ModifyQuote)
    bulkQuotesPut := engine.Group(BulkQuotesRoute)
    bulkQuotesPut.Use(r.AuthMiddleware())
    bulkQuotesPut.PUT("", h.ModifyBulkQuotes)
    ackPut := engine.Group(AckRoute)
    ackPut.Use(r.AuthMiddleware())
    ackPut.PUT("", r.PutRelayAck)
    openQuoteRequestsGet := engine.Group(RFQRoute)
    openQuoteRequestsGet.Use(r.AuthMiddleware())
    openQuoteRequestsGet.GET("", h.GetOpenQuoteRequests)

    // WebSocket route
    wsRoute := engine.Group(RFQStreamRoute)
    wsRoute.Use(r.AuthMiddleware())
    wsRoute.GET("", func(c *gin.Context) {
        r.GetActiveRFQWebsocket(ctx, c)
    })

    // Unauthenticated routes
    engine.GET(QuoteRoute, h.GetQuotes)
    engine.GET(ContractsRoute, h.GetContracts)
    engine.PUT(RFQRoute, r.PutRFQRequest)

    // WebSocket upgrader
    r.upgrader = websocket.Upgrader{
        CheckOrigin: func(_ *http.Request) bool {
            return true // TODO: Implement a more secure check
        },
    }

    r.engine = engine

    // Start the main HTTP server
    connection := baseServer.Server{}
    fmt.Printf("starting api at http://localhost:%s\n", r.cfg.Port)

    err := connection.ListenAndServe(ctx, fmt.Sprintf(":%s", r.cfg.Port), r.engine)
    if err != nil {
        return fmt.Errorf("could not start rest api server: %w", err)
    }

    return nil
}

// AuthMiddleware is the Gin authentication middleware that authenticates requests using EIP191.
//
//nolint:gosec
func (r *QuoterAPIServer) AuthMiddleware() gin.HandlerFunc {
    return func(c *gin.Context) {
        var loggedRequest interface{}
        var err error
        destChainIDs := []uint32{}

        // Parse the dest chain id from the request
        switch c.Request.URL.Path {
        case QuoteRoute:
            var req model.PutRelayerQuoteRequest
            err = c.BindJSON(&req)
            if err == nil {
                destChainIDs = append(destChainIDs, uint32(req.DestChainID))
                loggedRequest = &req
            }
        case BulkQuotesRoute:
            var req model.PutBulkQuotesRequest
            err = c.BindJSON(&req)
            if err == nil {
                for _, quote := range req.Quotes {
                    destChainIDs = append(destChainIDs, uint32(quote.DestChainID))
                }
                loggedRequest = &req
            }
        case AckRoute:
            var req model.PutAckRequest
            err = c.BindJSON(&req)
            if err == nil {
                destChainIDs = append(destChainIDs, uint32(req.DestChainID))
                loggedRequest = &req
            }
        case RFQRoute, RFQStreamRoute:
            chainsHeader := c.GetHeader(ChainsHeader)
            if chainsHeader != "" {
                var chainIDs []int
                err = json.Unmarshal([]byte(chainsHeader), &chainIDs)
                if err == nil {
                    for _, chainID := range chainIDs {
                        destChainIDs = append(destChainIDs, uint32(chainID))
                    }
                }
            }
        default:
            err = fmt.Errorf("unexpected request path: %s", c.Request.URL.Path)
        }
        if err != nil {
            c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
            c.Abort()
            return
        }

        // Authenticate and fetch the address from the request
        var addressRecovered *common.Address
        for _, destChainID := range destChainIDs {
            addr, err := r.checkRole(c, destChainID)
            if err != nil {
                c.JSON(http.StatusBadRequest, gin.H{"msg": err.Error()})
                c.Abort()
                return
            }
            if addressRecovered == nil {
                addressRecovered = &addr
            } else if *addressRecovered != addr {
                c.JSON(http.StatusBadRequest, gin.H{"msg": "relayer address mismatch"})
                c.Abort()
                return
            }
        }

        // Log and pass to the next middleware if authentication succeeds
        // Store the request in context after binding and validation
        c.Set("putRequest", loggedRequest)
        c.Set("relayerAddr", addressRecovered.Hex())
        c.Next()
    }
}

func (r *QuoterAPIServer) checkRole(c *gin.Context, destChainID uint32) (addressRecovered common.Address, err error) {
    bridge, ok := r.fastBridgeContracts[destChainID]
    if !ok {
        err = fmt.Errorf("dest chain id not supported: %d", destChainID)
        return addressRecovered, err
    }

    ops := &bind.CallOpts{Context: c}
    relayerRole := crypto.Keccak256Hash([]byte("RELAYER_ROLE"))

    // authenticate relayer signature with EIP191
    deadline := time.Now().Unix() - 1000 // TODO: Replace with some type of r.cfg.AuthExpiryDelta
    addressRecovered, err = EIP191Auth(c, deadline)
    if err != nil {
        err = fmt.Errorf("unable to authenticate relayer: %w", err)
        return addressRecovered, err
    }

    // Check and update cache
    cachedRoleItem := r.roleCache[destChainID].Get(addressRecovered.Hex())
    var hasRole bool

    if cachedRoleItem == nil || cachedRoleItem.IsExpired() {
        // Cache miss or expired, check on-chain
        hasRole, err = bridge.HasRole(ops, relayerRole, addressRecovered)
        if err != nil {
            return addressRecovered, fmt.Errorf("unable to check relayer role on-chain: %w", err)
        }
        // Update cache
        r.roleCache[destChainID].Set(addressRecovered.Hex(), hasRole, cacheInterval)
    } else {
        // Use cached value
        hasRole = cachedRoleItem.Value()
    }

    // Verify role
    if !hasRole {
        return addressRecovered, fmt.Errorf("relayer not an on-chain relayer")
    }

    return addressRecovered, nil
}

// PutRelayAck checks if a relay is pending or not.
// Note that the ack is not binding; that is, any relayer can still relay the transaction
// on chain if they ignore the response to this call.
// Also, this will not work if the API is run on multiple servers, since there is no inter-server
// communication to maintain the cache.
//
// PUT /ack.
// @dev Protected Method: Authentication is handled through middleware in server.go.
// @Summary Relay ack
// @Schemes
// @Description cache an ack request to synchronize relayer actions.
// @Param request body model.PutRelayerQuoteRequest true "query params"
// @Tags ack
// @Accept json
// @Produce json
// @Success 200
// @Header 200 {string} X-Api-Version "API Version Number - See docs for more info"
// @Router /ack [put].
func (r *QuoterAPIServer) PutRelayAck(c *gin.Context) {
    req, exists := c.Get("putRequest")
    if !exists {
        c.JSON(http.StatusBadRequest, gin.H{"error": "Request not found"})
        return
    }
    rawRelayerAddr, exists := c.Get("relayerAddr")
    if !exists {
        c.JSON(http.StatusBadRequest, gin.H{"error": "No relayer address recovered from signature"})
        return
    }
    relayerAddr, ok := rawRelayerAddr.(string)
    if !ok {
        c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid relayer address type"})
        return
    }
    ackReq, ok := req.(*model.PutAckRequest)
    if !ok {
        c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request type"})
        return
    }

    // If the tx id is already in the cache, it should not be relayed.
    // Otherwise, insert the current relayer's address into the cache.
    r.ackMux.Lock()
    ack := r.relayAckCache.Get(ackReq.TxID)
    shouldRelay := ack == nil || common.HexToAddress(relayerAddr).Hex() == common.HexToAddress(ack.Value()).Hex()
    if shouldRelay {
        r.relayAckCache.Set(ackReq.TxID, relayerAddr, ttlcache.DefaultTTL)
    } else {
        relayerAddr = ack.Value()
    }
    r.ackMux.Unlock()

    resp := relapi.PutRelayAckResponse{
        TxID:           ackReq.TxID,
        ShouldRelay:    shouldRelay,
        RelayerAddress: relayerAddr,
    }
    c.JSON(http.StatusOK, resp)
}

// GetActiveRFQWebsocket handles the WebSocket connection for active quote requests.
// GET /rfq_stream.
// @Summary Listen for Active RFQs
// @Schemes
// @Description Establish a WebSocket connection to listen for streaming active quote requests.
// @Tags quotes
// @Produce json
// @Success 101 {string} string "Switching Protocols"
// @Header 101 {string} X-Api-Version "API Version Number - See docs for more info"
// @Router /rfq_stream [get].
func (r *QuoterAPIServer) GetActiveRFQWebsocket(ctx context.Context, c *gin.Context) {
    ctx, span := r.handler.Tracer().Start(ctx, "GetActiveRFQWebsocket")
    defer func() {
        metrics.EndSpan(span)
    }()

    ws, err := r.upgrader.Upgrade(c.Writer, c.Request, nil)
    if err != nil {
        logger.Error("Failed to set websocket upgrade", "error", err)
        return
    }

    // use the relayer address as the ID for the connection
    rawRelayerAddr, exists := c.Get("relayerAddr")
    if !exists {
        c.JSON(http.StatusBadRequest, gin.H{"error": "No relayer address recovered from signature"})
        return
    }
    relayerAddr, ok := rawRelayerAddr.(string)
    if !ok {
        c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid relayer address type"})
        return
    }

    span.SetAttributes(
        attribute.String("relayer_address", relayerAddr),
    )

    // only one connection per relayer allowed
    _, ok = r.wsClients.Load(relayerAddr)
    if ok {
        c.JSON(http.StatusBadRequest, gin.H{"error": "relayer already connected"})
        return
    }

    defer func() {
        // cleanup ws registry
        r.wsClients.Delete(relayerAddr)
    }()

    client := newWsClient(relayerAddr, ws, r.pubSubManager, r.handler)
    r.wsClients.Store(relayerAddr, client)
    span.AddEvent("registered ws client")
    err = client.Run(ctx)
    if err != nil {
        logger.Error("Error running websocket client", "error", err)
    }
}

const (
    quoteTypeActive  = "active"
    quoteTypePassive = "passive"
)

// PutRFQRequest handles a user request for a quote.
// PUT /rfq.
// @Summary Initiate an Active RFQ
// @Schemes
// @Description Initiate an Active Request-For-Quote and return the best quote available.
// @Param request body model.PutRFQRequest true "Initiate an Active Request-For-Quote"
// @Tags quotes
// @Accept json
// @Produce json
// @Success 200 {object} model.PutRFQResponse
// @Header 200 {string} X-Api-Version "API Version Number - See docs for more info"
// @Router /rfq [put].
//
//nolint:cyclop
func (r *QuoterAPIServer) PutRFQRequest(c *gin.Context) {
    var req model.PutRFQRequest
    err := c.BindJSON(&req)
    if err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
        return
    }

    requestID := uuid.New().String()
    ctx, span := r.handler.Tracer().Start(c.Request.Context(), "PutRFQRequest", trace.WithAttributes(
        attribute.String("request_id", requestID),
    ))
    defer func() {
        metrics.EndSpan(span)
    }()

    err = r.db.InsertActiveQuoteRequest(ctx, &req, requestID)
    if err != nil {
        logger.Warnf("Error inserting active quote request: %w", err)
    }

    var isActiveRFQ bool
    for _, quoteType := range req.QuoteTypes {
        if quoteType == quoteTypeActive {
            isActiveRFQ = true
            break
        }
    }
    span.SetAttributes(attribute.Bool("is_active_rfq", isActiveRFQ))

    // if specified, fetch the active quote. always consider passive quotes
    var activeQuote *model.QuoteData
    if isActiveRFQ {
        activeQuote = r.handleActiveRFQ(ctx, &req, requestID)
        if activeQuote != nil && activeQuote.DestAmount != nil {
            span.SetAttributes(attribute.String("active_quote_dest_amount", *activeQuote.DestAmount))
        }
    }
    passiveQuote, err := r.handlePassiveRFQ(ctx, &req)
    if err != nil {
        logger.Error("Error handling passive RFQ", "error", err)
    }
    if passiveQuote != nil && passiveQuote.DestAmount != nil {
        span.SetAttributes(attribute.String("passive_quote_dest_amount", *passiveQuote.DestAmount))
    }
    quote := getBestQuote(activeQuote, passiveQuote)
    var quoteType string
    if quote == activeQuote {
        quoteType = quoteTypeActive
    } else if quote == passiveQuote {
        quoteType = quoteTypePassive
    }

    // build and return the response
    resp := getQuoteResponse(ctx, quote, quoteType)
    c.JSON(http.StatusOK, resp)
}

func getQuoteResponse(ctx context.Context, quote *model.QuoteData, quoteType string) (resp model.PutRFQResponse) {
    span := trace.SpanFromContext(ctx)

    destAmount := big.NewInt(0)
    if quote != nil && quote.DestAmount != nil {
        amt, ok := destAmount.SetString(*quote.DestAmount, base10)
        if ok {
            destAmount = amt
        }
    }
    if destAmount.Sign() <= 0 {
        span.AddEvent("no quotes found")
        resp = model.PutRFQResponse{
            Success: false,
            Reason:  "no quotes found",
        }
    } else {
        span.SetAttributes(
            attribute.String("quote_type", quoteType),
            attribute.String("quote_dest_amount", *quote.DestAmount),
        )
        resp = model.PutRFQResponse{
            Success:        true,
            QuoteType:      quoteType,
            QuoteID:        quote.QuoteID,
            DestAmount:     *quote.DestAmount,
            RelayerAddress: *quote.RelayerAddress,
        }
    }

    return resp
}

func (r *QuoterAPIServer) recordLatestQuoteAge(ctx context.Context, observer metric.Observer) (err error) {
    if r.handler == nil || r.latestQuoteAgeGauge == nil {
        return nil
    }

    quotes, err := r.db.GetAllQuotes(ctx)
    if err != nil {
        return fmt.Errorf("could not get latest quote age: %w", err)
    }

    ageByRelayer := make(map[string]float64)
    for _, quote := range quotes {
        age := time.Since(quote.UpdatedAt).Seconds()
        prevAge, ok := ageByRelayer[quote.RelayerAddr]
        if !ok || age < prevAge {
            ageByRelayer[quote.RelayerAddr] = age
        }
    }

    for relayer, age := range ageByRelayer {
        opts := metric.WithAttributes(
            attribute.String("relayer", relayer),
        )
        observer.ObserveFloat64(r.latestQuoteAgeGauge, age, opts)
    }

    return nil
}