services/rfq/relayer/service/statushandler.go
package service
import (
"context"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/jellydator/ttlcache/v3"
"github.com/synapsecns/sanguine/core/mapmutex"
"github.com/synapsecns/sanguine/core/metrics"
"github.com/synapsecns/sanguine/services/rfq/api/client"
"github.com/synapsecns/sanguine/services/rfq/relayer/chain"
"github.com/synapsecns/sanguine/services/rfq/relayer/inventory"
"github.com/synapsecns/sanguine/services/rfq/relayer/limiter"
"github.com/synapsecns/sanguine/services/rfq/relayer/quoter"
"github.com/synapsecns/sanguine/services/rfq/relayer/relconfig"
"github.com/synapsecns/sanguine/services/rfq/relayer/reldb"
"github.com/synapsecns/sanguine/services/rfq/util"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// TODO: everything in this file should be moved to it's own module, at least as an interface
// QuoteRequestHandler is the helper for a quote request.
// lowercase fields are private, uppercase are public.
// the plan is to move this out of relayer which is when this distinction will matter.
type QuoteRequestHandler struct {
// Origin is the origin chain.
Origin chain.Chain
// Dest is the destination chain.
Dest chain.Chain
// db is the database.
db reldb.Service
// Inventory is the inventory.
Inventory inventory.Manager
// Quoter is the quoter.
Quoter quoter.Quoter
// handlers is the map of handlers.
handlers map[reldb.QuoteRequestStatus]Handler
// claimCache is the cache of claims used for figuring out when we should retry the claim method.
claimCache *ttlcache.Cache[common.Hash, bool]
// RelayerAddress is the relayer RelayerAddress
RelayerAddress common.Address
// metrics is the metrics handler.
metrics metrics.Handler
// apiClient is used to get acks before submitting a relay transaction.
apiClient client.AuthenticatedClient
// mutexMiddlewareFunc is used to wrap the handler in a mutex middleware.
// this should only be done if Handling, not forwarding.
mutexMiddlewareFunc func(func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error) func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error
// handlerMtx is the mutex for relaying.
handlerMtx mapmutex.StringMapMutex
// limiter is the rate limiter.
limiter limiter.Limiter
// tokenNames is the map of addresses to token names
tokenNames map[string]relconfig.TokenConfig
// balanceMtx is the mutex for balances.
balanceMtx mapmutex.StringMapMutex
// cfg is the relayer config.
cfg relconfig.Config
}
func getBalanceMtxKey(chainID uint32, token common.Address) string {
return fmt.Sprintf("%d-%s", chainID, token.Hex())
}
// Handler is the handler for a quote request.
type Handler func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error
func (r *Relayer) requestToHandler(ctx context.Context, req reldb.QuoteRequest) (*QuoteRequestHandler, error) {
origin, err := r.chainIDToChain(ctx, req.Transaction.OriginChainId)
if err != nil {
return nil, fmt.Errorf("could not get origin chain: %w", err)
}
dest, err := r.chainIDToChain(ctx, req.Transaction.DestChainId)
if err != nil {
return nil, fmt.Errorf("could not get dest chain: %w", err)
}
originTokens, err := r.cfg.GetTokens(req.Transaction.OriginChainId)
if err != nil {
return nil, fmt.Errorf("could not get tokens: %w", err)
}
qr := &QuoteRequestHandler{
Origin: *origin,
Dest: *dest,
db: r.db,
Inventory: r.inventory,
Quoter: r.quoter,
handlers: make(map[reldb.QuoteRequestStatus]Handler),
metrics: r.metrics,
RelayerAddress: r.signer.Address(),
claimCache: r.claimCache,
apiClient: r.apiClient,
mutexMiddlewareFunc: r.mutexMiddleware,
handlerMtx: r.handlerMtx,
limiter: limiter.NewRateLimiter(
r.cfg,
r.chainListeners[int(req.Transaction.OriginChainId)],
r.quoter,
r.metrics,
originTokens,
),
tokenNames: originTokens,
balanceMtx: r.balanceMtx,
cfg: r.cfg,
}
// wrap in deadline middleware since the relay has not yet happened
qr.handlers[reldb.Seen] = r.deadlineMiddleware(r.gasMiddleware(qr.handleSeen))
qr.handlers[reldb.CommittedPending] = r.deadlineMiddleware(r.gasMiddleware(qr.handleCommitPending))
qr.handlers[reldb.CommittedConfirmed] = r.deadlineMiddleware(r.gasMiddleware(qr.handleCommitConfirmed))
// no-op edge case, but we still want to check the deadline
qr.handlers[reldb.RelayStarted] = r.deadlineMiddleware(func(_ context.Context, _ trace.Span, _ reldb.QuoteRequest) error { return nil })
// no more need for deadline middleware now, we already relayed
qr.handlers[reldb.RelayCompleted] = qr.handleRelayCompleted
qr.handlers[reldb.ProvePosted] = qr.handleProofPosted
// error handlers only
qr.handlers[reldb.NotEnoughInventory] = r.deadlineMiddleware(qr.handleNotEnoughInventory)
return qr, nil
}
func (r *Relayer) mutexMiddleware(next func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error) func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error {
return func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) (err error) {
unlocker, ok := r.handlerMtx.TryLock(hexutil.Encode(req.TransactionID[:]))
if !ok {
span.SetAttributes(
attribute.Bool("locked", true),
attribute.StringSlice("current_locks", r.handlerMtx.Keys()),
)
return nil
}
defer unlocker.Unlock()
// make sure the status has not changed since we last saw it
dbReq, err := r.db.GetQuoteRequestByID(ctx, req.TransactionID)
if err != nil {
return fmt.Errorf("could not get request: %w", err)
}
if dbReq.Status != req.Status {
span.SetAttributes(
attribute.Bool("status_changed", true),
attribute.String("db_status", dbReq.Status.String()),
attribute.String("handler_status", req.Status.String()),
)
return nil
}
return next(ctx, span, req)
}
}
func (r *Relayer) deadlineMiddleware(next func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error) func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error {
return func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error {
// apply deadline buffer
buffer, err := r.cfg.GetDeadlineBuffer(int(req.Transaction.DestChainId))
if err != nil {
return fmt.Errorf("could not get deadline buffer: %w", err)
}
almostNow := time.Now().Add(-buffer)
// if deadline < now, we don't even have to bother calling the underlying function
if req.Transaction.Deadline.Cmp(big.NewInt(almostNow.Unix())) < 0 {
err := r.db.UpdateQuoteRequestStatus(ctx, req.TransactionID, reldb.DeadlineExceeded, &req.Status)
if err != nil {
return fmt.Errorf("could not update request status: %w", err)
}
return nil
}
return next(ctx, span, req)
}
}
// gasMiddleware checks that we have sufficient gas to process a request on origin and destination.
func (r *Relayer) gasMiddleware(next func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error) func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error {
return func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) (err error) {
var sufficientGasOrigin, sufficientGasDest bool
defer func() {
span.SetAttributes(
attribute.Bool("sufficient_gas_origin", sufficientGasOrigin),
attribute.Bool("sufficient_gas_dest", sufficientGasDest),
)
}()
sufficientGasOrigin, err = r.inventory.HasSufficientGas(ctx, int(req.Transaction.OriginChainId), nil)
if err != nil {
return fmt.Errorf("could not check gas on origin: %w", err)
}
// on destination, we need to check transactor.Value as well if we are dealing with ETH
// However, all requests with statuses CommittedPending, CommittedConfirmed and RelayStarted are considered
// in-flight and their respective amounts are already deducted from the inventory: see Manager.GetCommittableBalances().
// Therefore, we only need to check the gas value for requests with all the other statuses.
isInFlight := req.Status == reldb.CommittedPending || req.Status == reldb.CommittedConfirmed || req.Status == reldb.RelayStarted
var destGasValue *big.Int
if req.Transaction.DestToken == util.EthAddress && !isInFlight {
destGasValue = req.Transaction.DestAmount
span.SetAttributes(attribute.String("dest_gas_value", destGasValue.String()))
}
sufficientGasDest, err = r.inventory.HasSufficientGas(ctx, int(req.Transaction.DestChainId), destGasValue)
if err != nil {
return fmt.Errorf("could not check gas on dest: %w", err)
}
if !sufficientGasOrigin || !sufficientGasDest {
return nil
}
return next(ctx, span, req)
}
}
func (r *Relayer) chainIDToChain(ctx context.Context, chainID uint32) (*chain.Chain, error) {
id := int(chainID)
chainClient, err := r.client.GetChainClient(ctx, id)
if err != nil {
return nil, fmt.Errorf("could not get origin client: %w", err)
}
chain, err := chain.NewChain(ctx, r.cfg, chainClient, r.chainListeners[id], r.submitter)
if err != nil {
return nil, fmt.Errorf("could not create chain: %w", err)
}
return chain, nil
}
// shouldCheckClaim checks if we should check the claim method.
// if so it checks the claim method and updates the cache.
func (q *QuoteRequestHandler) shouldCheckClaim(request reldb.QuoteRequest) bool {
// we use claim cache to make sure we don't hit the rpc to check to often
if q.claimCache.Has(request.TransactionID) {
return false
}
q.claimCache.Set(request.TransactionID, true, 30*time.Second)
return true
}
// Handle handles a quote request.
// Note: this will panic if no method is available. This is done on purpose.
func (q *QuoteRequestHandler) Handle(ctx context.Context, request reldb.QuoteRequest) (err error) {
ctx, span := q.metrics.Tracer().Start(ctx, fmt.Sprintf("handle-%s", request.Status.String()), trace.WithAttributes(
attribute.String("transaction_id", hexutil.Encode(request.TransactionID[:])),
))
defer func() {
metrics.EndSpanWithErr(span, err)
}()
// we're handling and not forwarding, so we need to wrap the handler in a mutex middleware
handler := q.mutexMiddlewareFunc(q.handlers[request.Status])
return handler(ctx, span, request)
}
// Forward forwards a quote request.
// this ignores the mutex middleware.
func (q *QuoteRequestHandler) Forward(ctx context.Context, request reldb.QuoteRequest) (err error) {
txID := hexutil.Encode(request.TransactionID[:])
ctx, span := q.metrics.Tracer().Start(ctx, fmt.Sprintf("forward-%s", request.Status.String()), trace.WithAttributes(
attribute.String("transaction_id", txID),
))
defer func() {
metrics.EndSpanWithErr(span, err)
}()
// sanity check to make sure that the lock is already acquired for this tx
_, ok := q.handlerMtx.TryLock(txID)
if ok {
panic(fmt.Sprintf("attempted forward while lock was not acquired for tx: %s", txID))
}
return q.handlers[request.Status](ctx, span, request)
}