synapsecns/sanguine

View on GitHub
services/explorer/consumer/parser/swapparser.go

Summary

Maintainability
F
4 days
Test Coverage
package parser

import (
    "context"
    "database/sql"
    "fmt"
    "math"
    "math/big"
    "time"

    "github.com/synapsecns/sanguine/core"
    "github.com/synapsecns/sanguine/services/explorer/consumer/fetcher/tokenprice"
    "golang.org/x/sync/errgroup"

    "github.com/synapsecns/sanguine/services/explorer/consumer/parser/tokendata"
    "github.com/synapsecns/sanguine/services/explorer/consumer/parser/tokenpool"
    "github.com/synapsecns/sanguine/services/explorer/contracts/metaswap"

    "github.com/ethereum/go-ethereum/common"
    ethTypes "github.com/ethereum/go-ethereum/core/types"
    "github.com/synapsecns/sanguine/services/explorer/consumer/fetcher"
    "github.com/synapsecns/sanguine/services/explorer/contracts/swap"
    "github.com/synapsecns/sanguine/services/explorer/db"
    model "github.com/synapsecns/sanguine/services/explorer/db/sql"
    "github.com/synapsecns/sanguine/services/explorer/static"
    swapTypes "github.com/synapsecns/sanguine/services/explorer/types/swap"
)

// SwapParser parses events from the swap contract.
type SwapParser struct {
    // consumerDB is the database to store parsed data in.
    consumerDB db.ConsumerDB
    // swap is the address of the bridge.
    swapAddress common.Address
    // Filterer is the swap Filterer we use to parse events.
    Filterer *swap.SwapFlashLoanFilterer
    // consumerFetcher is the ScribeFetcher for sender and timestamp.
    consumerFetcher fetcher.ScribeFetcher
    // tokenDataService contains the token data service/cache
    tokenDataService tokendata.Service
    // poolTokenDataService get the token address from the token index
    poolTokenDataService tokenpool.Service
    // tokenPriceService get the token price from the coingecko id
    tokenPriceService tokenprice.Service
    // swapService is the swap service
    swapService fetcher.SwapService
    // FiltererMetaSwap is the meta swap Filterer we use to parse events.
    FiltererMetaSwap *metaswap.MetaSwapFilterer
    // coinGeckoIDs is a mapping from coin token symbol to coin gecko ID
    coinGeckoIDs map[string]string
}

// NewSwapParser creates a new parser for a given bridge.
func NewSwapParser(consumerDB db.ConsumerDB, swapAddress common.Address, metaSwap bool, consumerFetcher fetcher.ScribeFetcher, swapService fetcher.SwapService, tokenDataService tokendata.Service, tokenPriceService tokenprice.Service) (*SwapParser, error) {
    var filterer *swap.SwapFlashLoanFilterer
    var filtererMetaSwap *metaswap.MetaSwapFilterer
    var err error
    if metaSwap {
        filtererMetaSwap, err = metaswap.NewMetaSwapFilterer(swapAddress, nil)
        if err != nil {
            return nil, fmt.Errorf("could not create %T: %w", metaswap.MetaSwapFilterer{}, err)
        }
        filterer = nil
    } else {
        filterer, err = swap.NewSwapFlashLoanFilterer(swapAddress, nil)
        if err != nil {
            return nil, fmt.Errorf("could not create %T: %w", swap.SwapFlashLoanFilterer{}, err)
        }
        filtererMetaSwap = nil
    }

    poolTokenDataService, err := tokenpool.NewPoolTokenDataService(swapService, consumerDB)
    if err != nil {
        return nil, fmt.Errorf("could not create token data service: %w", err)
    }

    coinGeckoIDs, err := ParseYaml(static.GetTokenIDToCoingekoConfig())
    if err != nil {
        return nil, fmt.Errorf("could not open yaml file: %w", err)
    }

    return &SwapParser{
        consumerDB:           consumerDB,
        swapAddress:          swapAddress,
        Filterer:             filterer,
        consumerFetcher:      consumerFetcher,
        tokenDataService:     tokenDataService,
        poolTokenDataService: poolTokenDataService,
        tokenPriceService:    tokenPriceService,
        swapService:          swapService,
        FiltererMetaSwap:     filtererMetaSwap,
        coinGeckoIDs:         coinGeckoIDs}, nil
}

// EventType returns the event type of a swap log.
func (p *SwapParser) EventType(log ethTypes.Log) (_ swapTypes.EventType, ok bool) {
    for _, logTopic := range log.Topics {
        eventType := swap.EventTypeFromTopic(logTopic)
        if eventType == nil {
            continue
        }

        return *eventType, true
    }

    // Return an unknown event to avoid cases where user failed to check the event type.
    return swapTypes.EventType(len(swapTypes.AllEventTypes()) + 2), false
}

// eventToSwapEvent stores a swap event.
func eventToSwapEvent(event swapTypes.EventLog, chainID uint32) model.SwapEvent {
    var buyer sql.NullString

    if event.GetBuyer() != nil {
        buyer.Valid = true
        buyer.String = event.GetBuyer().String()
    } else {
        buyer.Valid = false
    }

    var provider sql.NullString

    if event.GetProvider() != nil {
        provider.Valid = true
        provider.String = event.GetProvider().String()
    } else {
        provider.Valid = false
    }

    var receiver sql.NullString

    if event.GetReceiver() != nil {
        receiver.Valid = true
        receiver.String = event.GetReceiver().String()
    } else {
        receiver.Valid = false
    }

    return model.SwapEvent{
        InsertTime:      uint64(time.Now().UnixNano()),
        ContractAddress: event.GetContractAddress().String(),
        ChainID:         chainID,
        EventType:       event.GetEventType().Int(),
        BlockNumber:     event.GetBlockNumber(),
        TxHash:          event.GetTxHash().String(),
        EventIndex:      event.GetEventIndex(),
        Sender:          "",
        Buyer:           buyer,
        TokensSold:      event.GetTokensSold(),
        TokensBought:    event.GetTokensBought(),
        SoldID:          event.GetSoldID(),
        BoughtID:        event.GetBoughtID(),
        Provider:        provider,

        Invariant:     event.GetInvariant(),
        LPTokenSupply: event.GetLPTokenSupply(),
        LPTokenAmount: event.GetLPTokenAmount(),
        NewAdminFee:   event.GetNewAdminFee(),
        NewSwapFee:    event.GetNewSwapFee(),
        Amount:        event.GetAmount(),
        Fee:           event.GetAmountFee(),
        ProtocolFee:   event.GetProtocolFee(),
        OldA:          event.GetOldA(),
        NewA:          event.GetNewA(),
        InitialTime:   event.GetInitialTime(),
        FutureTime:    event.GetFutureTime(),
        CurrentA:      event.GetCurrentA(),
        Time:          event.GetTime(),
        Receiver:      receiver,

        TimeStamp:    nil,
        TokenPrice:   nil,
        TokenSymbol:  nil,
        TokenDecimal: nil,
        AdminFee:     nil,
        FeeUSD:       nil,
        AdminFeeUSD:  nil,
        AmountUSD:    nil,
    }
}

// ParserType returns the type of parser.
func (p *SwapParser) ParserType() string {
    return "swap"
}

// Parse parses the swap logs.
//
//nolint:gocognit,cyclop,dupl,maintidx
func (p *SwapParser) Parse(ctx context.Context, log ethTypes.Log, chainID uint32) (interface{}, error) {
    logTopic := log.Topics[0]

    iFace, err := func(log ethTypes.Log) (swapTypes.EventLog, error) {
        // nolint:nestif
        if p.FiltererMetaSwap != nil {
            switch logTopic {
            case swap.Topic(swapTypes.TokenSwapUnderlyingEvent):

                iFace, err := p.FiltererMetaSwap.ParseTokenSwapUnderlying(log)
                if err != nil {
                    return nil, fmt.Errorf("could not store token swap underlying: %w", err)
                }

                return iFace, nil
            default:
                logger.Warnf("ErrUnknownTopic in meta swap: %s %s chain: %d address: %s", log.TxHash, logTopic.String(), chainID, log.Address.Hex())

                return nil, fmt.Errorf(ErrUnknownTopic)
            }
        } else {
            switch logTopic {
            case swap.Topic(swapTypes.TokenSwapEvent):

                iFace, err := p.Filterer.ParseTokenSwap(log)
                if err != nil {
                    return nil, fmt.Errorf("could not parse token swap: %w", err)
                }

                return iFace, nil
            case swap.Topic(swapTypes.AddLiquidityEvent):
                iFace, err := p.Filterer.ParseAddLiquidity(log)
                if err != nil {
                    return nil, fmt.Errorf("could not parse add liquidity: %w", err)
                }

                return iFace, nil
            case swap.Topic(swapTypes.RemoveLiquidityEvent):
                iFace, err := p.Filterer.ParseRemoveLiquidity(log)
                if err != nil {
                    return nil, fmt.Errorf("could not parse remove liquidity: %w", err)
                }

                return iFace, nil
            case swap.Topic(swapTypes.RemoveLiquidityOneEvent):
                iFace, err := p.Filterer.ParseRemoveLiquidityOne(log)
                if err != nil {
                    return nil, fmt.Errorf("could not parse remove liquidity one: %w", err)
                }

                return iFace, nil
            case swap.Topic(swapTypes.RemoveLiquidityImbalanceEvent):
                iFace, err := p.Filterer.ParseRemoveLiquidityImbalance(log)
                if err != nil {
                    return nil, fmt.Errorf("could not parse remove liquidity imbalance: %w", err)
                }

                return iFace, nil
            case swap.Topic(swapTypes.NewAdminFeeEvent):
                iFace, err := p.Filterer.ParseNewAdminFee(log)
                if err != nil {
                    return nil, fmt.Errorf("could not parse new admin fee: %w", err)
                }
                err = p.consumerDB.StoreSwapFee(ctx, chainID, log.BlockNumber, log.Address.String(), iFace.NewAdminFee.Uint64(), "admin")
                if err != nil {
                    return nil, fmt.Errorf("could not store new admin fee : %w", err)
                }
                return iFace, nil
            case swap.Topic(swapTypes.NewSwapFeeEvent):
                iFace, err := p.Filterer.ParseNewSwapFee(log)
                if err != nil {
                    return nil, fmt.Errorf("could not parse new swap fee: %w", err)
                }
                err = p.consumerDB.StoreSwapFee(ctx, chainID, log.BlockNumber, log.Address.String(), iFace.NewSwapFee.Uint64(), "swap")
                if err != nil {
                    return nil, fmt.Errorf("could not store new admin fee : %w", err)
                }
                return iFace, nil
            case swap.Topic(swapTypes.RampAEvent):
                iFace, err := p.Filterer.ParseRampA(log)
                if err != nil {
                    return nil, fmt.Errorf("could not parse ramp a: %w", err)
                }

                return iFace, nil
            case swap.Topic(swapTypes.StopRampAEvent):
                iFace, err := p.Filterer.ParseStopRampA(log)
                if err != nil {
                    return nil, fmt.Errorf("could not parse stop ramp a: %w", err)
                }

                return iFace, nil
            case swap.Topic(swapTypes.FlashLoanEvent):
                iFace, err := p.Filterer.ParseFlashLoan(log)
                if err != nil {
                    return nil, fmt.Errorf("could not parse flash loan: %w", err)
                }

                return iFace, nil

            default:
                logger.Warnf("ErrUnknownTopic in swap: %s %s chain: %d address: %s", log.TxHash, logTopic.String(), chainID, log.Address.Hex())

                return nil, fmt.Errorf(ErrUnknownTopic)
            }
        }
    }(log)
    if err != nil {
        // Switch failed.
        return nil, err
    }
    swapEvent := eventToSwapEvent(iFace, chainID)

    var sender *string
    var timeStamp *uint64

    timeStamp, sender, err = p.consumerFetcher.FetchTx(ctx, iFace.GetTxHash().String(), int(chainID), int(swapEvent.BlockNumber))
    if err != nil {
        return nil, fmt.Errorf("could not get timestamp, sender on chain %d and tx %s from tx %w", chainID, iFace.GetTxHash().String(), err)
    }

    if *timeStamp == 0 {
        logger.Errorf("empty block time: chain: %d address %s", chainID, log.Address.Hex())
        return nil, fmt.Errorf("empty block time: chain: %d address %s", chainID, log.Address.Hex())
    }

    swapEvent.TimeStamp = timeStamp
    swapEvent.Sender = *sender

    maxIndex := uint8(0)
    totalTokenIndexRange := make(map[uint8]bool)
    if swapEvent.Amount != nil {
        for k := range swapEvent.Amount {
            totalTokenIndexRange[k] = true
            if k > maxIndex {
                maxIndex = k
            }
        }
    }
    if swapEvent.Fee != nil {
        for k := range swapEvent.Fee {
            totalTokenIndexRange[k] = true
            if k > maxIndex {
                maxIndex = k
            }
        }
    }

    adminFee := uint64(6000000000)
    swapFee := uint64(4000000)

    tokenPricesArr := make([]float64, maxIndex+1)
    tokenDecimalsArr := make([]uint8, maxIndex+1)
    tokenSymbolsArr := make([]string, maxIndex+1)
    tokenCoinGeckoIDsArr := make([]string, maxIndex+1)

    tokenPrices := make(map[uint8]float64, len(totalTokenIndexRange))
    tokenDecimals := make(map[uint8]uint8, len(totalTokenIndexRange))
    tokenSymbols := make(map[uint8]string, len(totalTokenIndexRange))
    tokenCoinGeckoIDs := make(map[uint8]string, len(totalTokenIndexRange))
    g, groupCtx := errgroup.WithContext(ctx)

    // TODO: need to deploy the test swap contracts with token indexes that match the test token address
    // nolint:nestif
    if !core.IsTest() {
        for i := range totalTokenIndexRange {
            tokenIndex := i
            g.Go(func() error {
                var tokenData tokendata.ImmutableTokenData
                // Get token symbol and decimals from the erc20 contract associated to the token.
                tokenAddress, err := p.poolTokenDataService.GetTokenAddress(groupCtx, chainID, tokenIndex, swapEvent.ContractAddress)
                if err != nil {
                    logger.Errorf("token with index %d not in pool: %v, %d, %s, %v %s, %d, %v", tokenIndex, err, chainID, swapEvent.ContractAddress, swapEvent.Amount, swapEvent.TxHash, swapEvent.EventType, p.FiltererMetaSwap)
                    return fmt.Errorf("token with index %d not in pool: %w, %d, %s, %v %s, %d, %v", tokenIndex, err, chainID, swapEvent.ContractAddress, swapEvent.Amount, swapEvent.TxHash, swapEvent.EventType, p.FiltererMetaSwap)
                }
                tokenData, err = p.tokenDataService.GetPoolTokenData(groupCtx, chainID, *tokenAddress, p.swapService)
                if err != nil {
                    logger.Errorf("could not get token data: %v", err)
                    return fmt.Errorf("could not get pool token data: %w", err)
                }
                tokenSymbolsArr[tokenIndex] = tokenData.TokenID()
                tokenDecimalsArr[tokenIndex] = tokenData.Decimals()
                coinGeckoID := p.coinGeckoIDs[tokenData.TokenID()]
                tokenCoinGeckoIDsArr[tokenIndex] = coinGeckoID

                if !(coinGeckoID == "xjewel" && *timeStamp < 1649030400) && !(coinGeckoID == "synapse-2" && *timeStamp < 1630281600) && !(coinGeckoID == "governance-ohm" && *timeStamp < 1638316800) && !(coinGeckoID == "highstreet" && *timeStamp < 1634263200) {
                    tokenPrice := p.tokenPriceService.GetPriceData(groupCtx, int(*swapEvent.TimeStamp), coinGeckoID)
                    if (tokenPrice == nil) && coinGeckoID != noTokenID && coinGeckoID != noPrice {
                        return fmt.Errorf("SWAP could not get token price for coingeckotoken:  %s chain: %d txhash %s %d", coinGeckoID, chainID, swapEvent.TxHash, swapEvent.TimeStamp)
                    }
                    tokenPricesArr[tokenIndex] = *tokenPrice
                }

                // TODO DELETE
                if tokenPricesArr[tokenIndex] == 0 {
                    logger.Warnf("SWAP - TOKEN PRICE IS ZERO tokenPricesArr[tokenIndex]: s%s, chainID: %d, tokenIndex: %d, tokenAddress: %s", tokenPricesArr[tokenIndex], chainID, tokenIndex, tokenAddress)
                }
                return nil
            })
        }
    }
    g.Go(func() error {
        // Check for all fee emitting event types
        if swapEvent.EventType == 0 || swapEvent.EventType == 1 || swapEvent.EventType == 4 || swapEvent.EventType == 9 || swapEvent.EventType == 10 {
            dbAdminFee, dbSwapFee, err := p.GetCorrectSwapFee(ctx, swapEvent)
            if err != nil {
                return fmt.Errorf("could not process swap event: %w", err)
            }
            if dbAdminFee > 0 {
                logger.Infof("USING DIFFERENT ADMIN FEE: %d", dbAdminFee)
                adminFee = dbAdminFee
            }
            if dbSwapFee > 0 {
                logger.Infof("USING DIFFERENT SWAP FEE: %d", dbSwapFee)
                swapFee = dbSwapFee
            }
        }
        return nil
    })

    err = g.Wait()
    if err != nil {
        return nil, fmt.Errorf("could get all swap data from go routines: %w", err)
    }

    for i := range totalTokenIndexRange {
        tokenPrices[i] = tokenPricesArr[i]
        tokenDecimals[i] = tokenDecimalsArr[i]
        tokenSymbols[i] = tokenSymbolsArr[i]
        tokenCoinGeckoIDs[i] = tokenCoinGeckoIDsArr[i]
    }

    swapEvent.TokenPrice = tokenPrices
    swapEvent.TokenDecimal = tokenDecimals
    swapEvent.TokenSymbol = tokenSymbols
    swapEvent.TokenCoinGeckoID = tokenCoinGeckoIDs

    amountResults := make(map[uint8]float64, len(totalTokenIndexRange))
    feeResults := make(map[uint8]float64, len(totalTokenIndexRange))
    adminFeeResults := make(map[uint8]float64, len(totalTokenIndexRange))
    adminFeeAmountResults := make(map[uint8]string, len(totalTokenIndexRange))

    if swapEvent.EventType == 0 || swapEvent.EventType == 10 {
        fee, err := convertFee(swapEvent.TokensSold, swapEvent.TokenDecimal[uint8(swapEvent.SoldID.Uint64())], swapFee)
        if err != nil {
            return nil, fmt.Errorf("could not convert fee: %w %d %d %d %d", err, swapEvent.TokensSold, uint8(swapEvent.SoldID.Uint64()), swapFee, adminFee)
        }
        swapEvent.Fee[uint8(swapEvent.SoldID.Uint64())] = fee
    }

    for i := range swapEvent.Amount {
        n := new(big.Int)
        n, ok := n.SetString(swapEvent.Amount[i], 10)
        if !ok {
            return nil, fmt.Errorf("error in parsing amount %s", swapEvent.Amount[i])
        }
        price := swapEvent.TokenPrice[i]
        amountResults[i] = *GetAmountUSD(n, swapEvent.TokenDecimal[i], &price)
    }

    for i := range swapEvent.Fee {
        n := new(big.Int)
        n, ok := n.SetString(swapEvent.Fee[i], 10)
        if !ok {
            return nil, fmt.Errorf("error in parsing fee amount %s", swapEvent.Fee[i])
        }

        price := swapEvent.TokenPrice[i]
        feeResults[i] = *GetAmountUSD(n, swapEvent.TokenDecimal[i], &price)
        adminFeeAmountResults[i], err = convertFee(n, swapEvent.TokenDecimal[i], adminFee)
        if err != nil {
            return nil, fmt.Errorf("could not convert fee: %w %d %d %d %d", err, swapEvent.TokensSold, uint8(swapEvent.SoldID.Uint64()), swapFee, adminFee)
        }

        adminFeeResults[i] = feeResults[i] * getAdjustedFee(adminFee, 10)
    }

    swapEvent.AmountUSD = amountResults
    swapEvent.FeeUSD = feeResults
    swapEvent.AdminFee = adminFeeAmountResults
    swapEvent.AdminFeeUSD = adminFeeResults

    return swapEvent, nil
}

// convertFee gets the fee amount.
func convertFee(amount *big.Int, decimal uint8, feeAmount uint64) (string, error) {
    adjustedAmount := GetAdjustedAmount(amount, decimal)
    if adjustedAmount == nil {
        return "", fmt.Errorf("SWAP - adjusted amount IS NIL %d", adjustedAmount)
    }

    fee := big.NewFloat(*adjustedAmount * getAdjustedFee(feeAmount, 10))
    feeDecimals := big.NewFloat(math.Pow(10, float64(decimal)))
    fee.Mul(fee, feeDecimals)

    result := new(big.Int)
    fee.Int(result)

    return fee.Text('f', 0), nil
}

func getAdjustedFee(fee uint64, decimal uint8) float64 {
    return float64(fee) / math.Pow(10, float64(decimal))
}

// TODO make more dynamic

// GetCorrectSwapFee returns the correct swap fee for the given pool contract.
func (p *SwapParser) GetCorrectSwapFee(ctx context.Context, swapEvent model.SwapEvent) (uint64, uint64, error) {
    var dbAdminFee uint64
    var dbSwapFee uint64
    var err error
    g, groupCtx := errgroup.WithContext(ctx)
    g.Go(func() error {
        dbAdminFee, err = p.consumerDB.GetUint64(groupCtx, fmt.Sprintf("SELECT fee FROM swap_fees WHERE chain_id = %d AND contract_address = '%s' AND fee_type = '%s' AND block_number <= %d ORDER BY block_number DESC LIMIT 1", swapEvent.ChainID, swapEvent.ContractAddress, "admin", swapEvent.BlockNumber))
        if err != nil {
            return fmt.Errorf("could not get admin fee: %w", err)
        }
        return nil
    })
    g.Go(func() error {
        dbSwapFee, err = p.consumerDB.GetUint64(groupCtx, fmt.Sprintf("SELECT fee FROM swap_fees WHERE chain_id = %d AND contract_address = '%s' AND fee_type = '%s' AND block_number <= %d ORDER BY block_number DESC LIMIT 1", swapEvent.ChainID, swapEvent.ContractAddress, "swap", swapEvent.BlockNumber))
        if err != nil {
            return fmt.Errorf("could not get swap fee: %w", err)
        }
        return nil
    })
    err = g.Wait()
    if err != nil {
        return 0, 0, fmt.Errorf("could notget newest swap fees: %w", err)
    }
    return dbAdminFee, dbSwapFee, nil
}