synapsecns/sanguine

View on GitHub
services/explorer/graphql/server/graph/queries.resolvers.go

Summary

Maintainability
D
2 days
Test Coverage
package graph

// This file will be automatically regenerated based on the schema, any resolver implementations
// will be copied through when generating and any unknown code will be moved to the end.
// Code generated by github.com/99designs/gqlgen version v0.17.36

import (
    "context"
    "fmt"
    "sort"
    "sync"

    "github.com/synapsecns/sanguine/services/explorer/db/sql"
    "github.com/synapsecns/sanguine/services/explorer/graphql/server/graph/model"
    resolvers "github.com/synapsecns/sanguine/services/explorer/graphql/server/graph/resolver"
    "golang.org/x/sync/errgroup"
)

// BridgeTransactions is the resolver for the bridgeTransactions2 field.
func (r *queryResolver) BridgeTransactions(ctx context.Context, chainIDFrom []*int, chainIDTo []*int, addressFrom *string, addressTo *string, maxAmount *int, minAmount *int, maxAmountUsd *int, minAmountUsd *int, startTime *int, endTime *int, txnHash *string, kappa *string, pending *bool, useMv *bool, page *int, tokenAddressFrom []*string, tokenAddressTo []*string, onlyCctp *bool) ([]*model.BridgeTransaction, error) {
    var results []*model.BridgeTransaction
    if useMv != nil && *useMv {
        var mvResults []*model.BridgeTransaction
        var err error
        mvResults, err = r.GetBridgeTxs(ctx, chainIDFrom, chainIDTo, addressFrom, addressTo, maxAmount, minAmount, maxAmountUsd, minAmountUsd, startTime, endTime, txnHash, tokenAddressTo, tokenAddressFrom, kappa, pending, onlyCctp, page)
        if err != nil {
            return nil, fmt.Errorf("failed to get bridge transaction: %w", err)
        }
        sort.Sort(SortBridgeTxType(mvResults))
        return mvResults, nil
    }

    var fromResults []*model.BridgeTransaction
    var toResults []*model.BridgeTransaction

    var wg sync.WaitGroup
    var originErr error
    var destinationErr error
    wg.Add(1)
    go func() {
        defer wg.Done()
        fromResults, originErr = r.GetBridgeTxsFromOrigin(ctx, useMv, chainIDFrom, chainIDTo, addressFrom, addressTo, maxAmount, minAmount, maxAmountUsd, minAmountUsd, startTime, endTime, txnHash, tokenAddressTo, tokenAddressFrom, kappa, pending, onlyCctp, page, false)
    }()
    wg.Add(1)
    go func() {
        defer wg.Done()
        toResults, destinationErr = r.GetBridgeTxsFromDestination(ctx, useMv, chainIDFrom, chainIDTo, addressFrom, addressTo, maxAmount, minAmount, minAmountUsd, maxAmountUsd, startTime, endTime, txnHash, kappa, tokenAddressFrom, tokenAddressTo, onlyCctp, page, pending)
    }()

    wg.Wait()
    if originErr != nil || destinationErr != nil {
        return nil, fmt.Errorf("error while getting txs. origin err: %w, destination err: %w", originErr, destinationErr)
    }
    results = r.mergeBridgeTransactions(fromResults, toResults)
    sort.Sort(SortBridgeTxType(results))
    return results, nil
}

// MessageBusTransactions is the resolver for the messageBusTransactions field.
func (r *queryResolver) MessageBusTransactions(ctx context.Context, chainID []*int, contractAddress *string, startTime *int, endTime *int, txnHash *string, messageID *string, pending *bool, reverted *bool, page *int) ([]*model.MessageBusTransaction, error) {
    var err error
    var results []*model.MessageBusTransaction
    results, err = r.GetMessageBusTxs(ctx, chainID, contractAddress, startTime, endTime, txnHash, messageID, *pending, *reverted, page)
    if err != nil {
        return nil, fmt.Errorf("could not get message bus transactions %w", err)
    }
    sort.Sort(SortMessageBusTxType(results))
    return results, nil
}

// CountByChainID is the resolver for the countByChainId field.
func (r *queryResolver) CountByChainID(ctx context.Context, chainID *int, address *string, direction *model.Direction, hours *int) ([]*model.TransactionCountResult, error) {
    directionIn := r.getDirectionIn(direction)
    targetTime := GetTargetTime(hours)
    results, err := r.DB.GetTxCounts(ctx, generateBridgeEventCountQuery(chainID, address, nil, directionIn, &targetTime, false))
    if err != nil {
        return nil, fmt.Errorf("failed to get count by chain ID: %w", err)
    }
    return results, nil
}

// CountByTokenAddress is the resolver for the countByTokenAddress field.
func (r *queryResolver) CountByTokenAddress(ctx context.Context, chainID *int, address *string, direction *model.Direction, hours *int) ([]*model.TokenCountResult, error) {
    directionIn := r.getDirectionIn(direction)
    targetTime := GetTargetTime(hours)
    results, err := r.DB.GetTokenCounts(ctx, generateBridgeEventCountQuery(chainID, address, nil, directionIn, &targetTime, true))

    if err != nil {
        return nil, fmt.Errorf("failed to get count by chain ID: %w", err)
    }

    return results, nil
}

// AddressRanking is the resolver for the addressRanking field.
func (r *queryResolver) AddressRanking(ctx context.Context, hours *int) ([]*model.AddressRanking, error) {
    targetTime := GetTargetTime(hours)
    firstFilter := true
    timeStampSpecifier := generateTimestampSpecifierSQL(&targetTime, sql.TimeStampFieldName, &firstFilter, "")
    directionSpecifier := generateDirectionSpecifierSQL(true, &firstFilter, "")
    compositeFilters := fmt.Sprintf("%s%s", timeStampSpecifier, directionSpecifier)
    query := fmt.Sprintf(`SELECT %s AS address, COUNT(DISTINCT %s) AS Count FROM (%s) GROUP BY %s ORDER BY Count Desc`, sql.SenderFieldName, sql.TxHashFieldName, generateDeDepQuery(compositeFilters, nil, nil), sql.SenderFieldName)
    res, err := r.DB.GetAddressRanking(ctx, query)
    if err != nil {
        return nil, fmt.Errorf("failed to get count by chain ID: %w", err)
    }

    return res, nil
}

// AmountStatistic is the resolver for the amountStatistic field.
func (r *queryResolver) AmountStatistic(ctx context.Context, typeArg model.StatisticType, duration *model.Duration, platform *model.Platform, chainID *int, address *string, tokenAddress *string, useCache *bool, useMv *bool) (*model.ValueResult, error) {
    if useCache != nil && *useCache {
        res, err := r.getValueResultFromCache(fmt.Sprintf("amountStatistic, %s, %s, %s, %s, %s, %s", typeArg.String(), platform.String(), duration.String(), keyGenHandleNilInt(chainID), keyGenHandleNilString(address), keyGenHandleNilString(tokenAddress)))
        if err == nil {
            return res, nil
        }
    }

    var err error
    firstFilter := true
    timestampSpecifier := GetDurationFilter(duration, &firstFilter, "")
    addressSpecifier := generateSingleSpecifierStringSQL(address, sql.SenderFieldName, &firstFilter, "")
    chainIDSpecifier := generateSingleSpecifierI32SQL(chainID, sql.ChainIDFieldName, &firstFilter, "")

    compositeFilters := fmt.Sprintf(
        `%s%s%s`,
        timestampSpecifier, addressSpecifier, chainIDSpecifier,
    )
    var finalSQL *string
    switch *platform {
    case model.PlatformBridge:
        finalSQL, err = GenerateAmountStatisticBridgeSQL(typeArg, address, chainID, tokenAddress)
        if err != nil {
            return nil, err
        }
    case model.PlatformSwap:
        finalSQL, err = GenerateAmountStatisticSwapSQL(typeArg, compositeFilters, tokenAddress)
        if err != nil {
            return nil, err
        }
    case model.PlatformMessageBus:
        if tokenAddress != nil {
            return nil, fmt.Errorf("cannot filter by token on message bus events")
        }

        finalSQL, err = GenerateAmountStatisticMessageBusSQL(typeArg, compositeFilters)

        if err != nil {
            return nil, err
        }

    case model.PlatformAll:
        var value *string
        value, err = r.getAmountStatisticsAll(ctx, typeArg, chainID, address, tokenAddress, compositeFilters)
        if err != nil {
            return nil, fmt.Errorf("could not calculate value across all platforms, %w", err)
        }
        output := model.ValueResult{
            Value: value,
        }
        err = r.Cache.CacheResponse(fmt.Sprintf("amountStatistic, %s, %s, %s, %s, %s, %s", typeArg.String(), platform.String(), duration.String(), keyGenHandleNilInt(chainID), keyGenHandleNilString(address), keyGenHandleNilString(tokenAddress)), &output)
        if err != nil {
            return nil, fmt.Errorf("error caching results, %w", err)
        }
        return &output, nil
    default:
        return nil, fmt.Errorf("invalid statistic type: %s", typeArg)
    }
    if finalSQL == nil {
        return nil, fmt.Errorf("invalid statistic or platform type: %s", typeArg)
    }
    res, err := r.DB.GetFloat64(ctx, *finalSQL)
    if err != nil {
        return nil, fmt.Errorf("failed to get amount data stats: %w", err)
    }

    value := fmt.Sprintf("%f", res)
    output := model.ValueResult{
        Value: &value,
    }
    err = r.Cache.CacheResponse(fmt.Sprintf("amountStatistic, %s, %s, %s, %s, %s, %s", typeArg.String(), platform.String(), duration.String(), keyGenHandleNilInt(chainID), keyGenHandleNilString(address), keyGenHandleNilString(tokenAddress)), &output)
    if err != nil {
        return nil, fmt.Errorf("error storing cache data, %w", err)
    }
    return &output, nil
}

// DailyStatisticsByChain is the resolver for the dailyStatisticsByChain field.
func (r *queryResolver) DailyStatisticsByChain(ctx context.Context, chainID *int, typeArg *model.DailyStatisticType, platform *model.Platform, duration *model.Duration, useCache *bool, useMv *bool) ([]*model.DateResultByChain, error) {
    cacheKey := fmt.Sprintf("dailyStatisticsByChain, %s, %s, %s, %s", keyGenHandleNilInt(chainID), typeArg.String(), duration.String(), platform.String())

    if useCache != nil && *useCache {
        locker := r.CacheMutex.Lock(cacheKey)
        defer locker.Unlock()

        res, err := r.getDateResultByChainFromCache(cacheKey)
        if err == nil {
            return res, nil
        }
    }

    if useMv != nil && *useMv {
        mvResult, err := r.getDateResultByChainMv(ctx, chainID, typeArg, platform, duration)
        if err != nil {
            return nil, fmt.Errorf("error getting mv data, %w", err)
        }
        return mvResult, nil
    }

    var err error
    firstFilter := true
    timestampSpecifier := GetDurationFilter(duration, &firstFilter, "")
    chainIDSpecifier := generateSingleSpecifierI32SQL(chainID, sql.ChainIDFieldName, &firstFilter, "")
    compositeFilters := fmt.Sprintf(
        `%s%s`,
        timestampSpecifier, chainIDSpecifier,
    )

    var res []*model.DateResultByChain
    var query *string
    g, groupCtx := errgroup.WithContext(ctx)
    switch *platform {
    case model.PlatformBridge:
        query, err = GenerateDailyStatisticByChainBridgeSQL(typeArg, compositeFilters, &firstFilter)
        if err != nil {
            return nil, err
        }
    case model.PlatformSwap:
        query, err = GenerateDailyStatisticByChainSwapSQL(typeArg, compositeFilters)
        if err != nil {
            return nil, err
        }
    case model.PlatformMessageBus:
        query, err = GenerateDailyStatisticByChainMessageBusSQL(typeArg, compositeFilters)
        if err != nil {
            return nil, err
        }
    case model.PlatformAll:
        query, err = GenerateDailyStatisticByChainAllSQL(typeArg, compositeFilters, &firstFilter)
    default:
        return nil, fmt.Errorf("unsupported platform")
    }
    g.Go(func() error {
        res, err = r.DB.GetDailyTotals(groupCtx, *query)
        if err != nil {
            return fmt.Errorf("failed to get dateResults: %w", err)
        }
        return nil
    })

    err = g.Wait()
    if err != nil {
        return nil, fmt.Errorf("could not get daily data by chain: %w", err)
    }
    err = r.Cache.CacheResponse(fmt.Sprintf("dailyStatisticsByChain, %s, %s, %s, %s", keyGenHandleNilInt(chainID), typeArg.String(), duration.String(), platform.String()), res)
    if err != nil {
        return nil, fmt.Errorf("error cahcing response, %w", err)
    }
    return res, nil
}

// RankedChainIDsByVolume is the resolver for the rankedChainIDsByVolume field.
func (r *queryResolver) RankedChainIDsByVolume(ctx context.Context, duration *model.Duration, useCache *bool) ([]*model.VolumeByChainID, error) {
    var err error
    firstFilter := true
    timestampSpecifier := GetDurationFilter(duration, &firstFilter, "")

    query := GenerateRankedChainsByVolumeSQL(timestampSpecifier, &firstFilter)

    var res []*model.VolumeByChainID
    g, groupCtx := errgroup.WithContext(ctx)

    g.Go(func() error {
        res, err = r.DB.GetRankedChainsByVolume(groupCtx, query)
        if err != nil {
            return fmt.Errorf("failed to get dateResults: %w", err)
        }
        return nil
    })

    err = g.Wait()

    if err != nil {
        return nil, fmt.Errorf("could not get daily data: %w", err)
    }

    return res, nil
}

// AddressData is the resolver for the addressData field.
func (r *queryResolver) AddressData(ctx context.Context, address string) (*model.AddressData, error) {
    bridgeQuery := fmt.Sprintf("SELECT toFloat64(sumKahan(famount_usd)) AS volumeTotal, toFloat64(sumKahan(tfee_amount_usd)) AS feeTotal, toInt64(uniq(fchain_id, ftx_hash)) AS txTotal FROM (SELECT * FROM mv_bridge_events where fsender = '%s' LIMIT 1 BY fchain_id,fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash)", address)
    swapQuery := fmt.Sprintf("SELECT toFloat64(sumKahan(multiIf(event_type = 0, amount_usd[sold_id], event_type = 1, arraySum(mapValues(amount_usd)), event_type = 9, arraySum(mapValues(amount_usd)), event_type = 10, amount_usd[sold_id],0))) AS volumeTotal, toFloat64(sumKahan(arraySum(mapValues(fee_usd)))) AS feeTotal,  toInt64(uniq(chain_id, tx_hash)) AS txTotal FROM (SELECT * FROM swap_events where sender = '%s' LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash)", address)
    rankingQuery := fmt.Sprintf("select rowNumber from (select sender, row_number() over (order by sumTotal desc ) as rowNumber from (select fsender as sender, sumKahan(famount_usd) as sumTotal from (SELECT * FROM mv_bridge_events where fsender != '' LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash) where fsender != '' group by fsender)) where sender = '%s'", address)
    firstTx := fmt.Sprintf("SELECT min(ftimestamp) AS earliestTime FROM (SELECT * FROM mv_bridge_events where fsender = '%s' LIMIT 1 BY fchain_id,fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash)", address)
    dailyDataQuery := fmt.Sprintf("SELECT coalesce(toString(date), toString(s.date)) AS date, toFloat64(coalesce(sumTotal, 0)) + toFloat64(coalesce(s.sumTotal, 0)) as count FROM (SELECT * FROM (SELECT %s, uniq(fchain_id, ftx_hash) AS sumTotal FROM (SELECT * FROM mv_bridge_events where fsender = '%s' LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash) group by date order by date) b FULL OUTER JOIN (SELECT %s, uniq(chain_id, tx_hash) AS sumTotal FROM (SELECT * FROM swap_events WHERE sender = '%s' LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash) group by date) s ON b.date = s.date) SETTINGS join_use_nulls=1", toDateSelectMv, address, toDateSelect, address)
    chainRankingQuery := fmt.Sprintf("SELECT row_number() over (order by VolumeUsd desc ) as Rank, tchain_id as ChainID, sumKahan(tamount_usd) AS VolumeUsd FROM (SELECT * FROM mv_bridge_events where fsender = '%s' LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash) where ChainID > 0 group by ChainID", address)
    var bridgeVolume float64
    var bridgeFees float64
    var bridgeTxs int
    var swapVolume float64
    var swapFees float64
    var swapTxs int
    var rank int
    var earliestTxTimestamp int
    var addressChainRanking []*model.AddressChainRanking

    var addressDailyData []*model.AddressDailyCount

    g, groupCtx := errgroup.WithContext(ctx)
    g.Go(func() error {
        var err error
        bridgeVolume, bridgeFees, bridgeTxs, err = r.DB.GetAddressData(groupCtx, bridgeQuery)
        if err != nil {
            return fmt.Errorf("failed to get bridge data for address %s: %w", address, err)
        }
        return nil
    })
    g.Go(func() error {
        var err error
        swapVolume, swapFees, swapTxs, err = r.DB.GetAddressData(groupCtx, swapQuery)
        if err != nil {
            return fmt.Errorf("failed to get swap data for address %s: %w", address, err)
        }
        return nil
    })
    g.Go(func() error {
        res, err := r.DB.GetUint64(groupCtx, rankingQuery)
        if err != nil {
            return fmt.Errorf("failed to get ranking for address %s: %w", address, err)
        }
        rank = int(res)
        return nil
    })
    g.Go(func() error {
        res, err := r.DB.GetUint64(groupCtx, firstTx)
        if err != nil {
            return fmt.Errorf("failed to get first timestamp for address %s: %w", address, err)
        }
        earliestTxTimestamp = int(res)
        return nil
    })
    g.Go(func() error {
        var err error
        addressDailyData, err = r.DB.GetAddressDailyData(groupCtx, dailyDataQuery)
        if err != nil {
            return fmt.Errorf("failed to get first daily data for address %s: %w", address, err)
        }
        return nil
    })

    g.Go(func() error {
        var err error
        addressChainRanking, err = r.DB.GetAddressChainRanking(groupCtx, chainRankingQuery)
        if err != nil {
            return fmt.Errorf("failed to get ranking for address %s: %w", address, err)
        }
        return nil
    })

    err := g.Wait()
    if err != nil {
        return nil, fmt.Errorf("could not get address data, %w", err)
    }
    res := &model.AddressData{
        BridgeVolume: &bridgeVolume,
        BridgeFees:   &bridgeFees,
        BridgeTxs:    &bridgeTxs,
        SwapVolume:   &swapVolume,
        SwapFees:     &swapFees,
        SwapTxs:      &swapTxs,
        Rank:         &rank,
        EarliestTx:   &earliestTxTimestamp,
        ChainRanking: addressChainRanking,
        DailyData:    addressDailyData,
    }
    return res, nil
}

// Leaderboard is the resolver for the leaderboard field.
func (r *queryResolver) Leaderboard(ctx context.Context, duration *model.Duration, chainID *int, useMv *bool, page *int) ([]*model.Leaderboard, error) {
    if !*useMv {
        return nil, fmt.Errorf("the leaderboard query does not support non-mv based queries")
    }
    firstFilter := false
    timestampSpecifier := GetDurationFilter(duration, &firstFilter, "f")
    chainIDSpecifier := generateSingleSpecifierI32SQL(chainID, sql.ChainIDFieldName, &firstFilter, "f")
    pageValue := sql.PageSize
    pageOffset := (*page - 1) * sql.PageSize
    filters := timestampSpecifier + chainIDSpecifier
    leaderboardQuery := fmt.Sprintf("select row_number() over (order by VolumeUsd desc ) as Rank, * from (select fsender as Address, toFloat64(sumKahan(famount_usd)) as VolumeUsd,toFloat64(avg(famount_usd)) as AvgVolumeUsd, count(DISTINCT ftx_hash) as Txs,toFloat64(sumKahan(tfee_amount_usd)) as Fees from (SELECT * FROM mv_bridge_events where fsender != '' LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash) where fsender != '' %s group by fsender) LIMIT %d OFFSET %d", filters, pageValue, pageOffset)
    leaderboardRes, err := r.DB.GetLeaderboard(ctx, leaderboardQuery)
    if err != nil {
        return nil, fmt.Errorf("failed to get leaderboard %w", err)
    }

    return leaderboardRes, nil
}

// GetOriginBridgeTx is the resolver for the getOriginBridgeTx field.
func (r *queryResolver) GetOriginBridgeTx(ctx context.Context, chainID int, txnHash string, bridgeType model.BridgeType) (*model.BridgeWatcherTx, error) {
    var results *model.BridgeWatcherTx
    var err error
    if !r.checkIfChainIDExists(uint32(chainID), bridgeType) {
        return nil, fmt.Errorf("chainID not supported by server")
    }

    results, err = r.GetOriginBridgeTxBW(ctx, chainID, txnHash, bridgeType)

    if err != nil {
        return nil, fmt.Errorf("could not get origin tx %w", err)
    }
    return results, nil
}

// GetDestinationBridgeTx is the resolver for the getDestinationBridgeTx field.
func (r *queryResolver) GetDestinationBridgeTx(ctx context.Context, chainID int, address string, kappa string, timestamp int, bridgeType model.BridgeType, historical *bool) (*model.BridgeWatcherTx, error) {
    if historical == nil {
        return nil, fmt.Errorf("historical flag must be set")
    }
    if !r.checkIfChainIDExists(uint32(chainID), bridgeType) {
        return nil, fmt.Errorf("chainID not supported by server")
    }
    var results *model.BridgeWatcherTx
    var err error
    results, err = r.GetDestinationBridgeTxBW(ctx, chainID, address, kappa, timestamp, *historical, bridgeType)

    if err != nil {
        return nil, fmt.Errorf("could not get destination tx %w", err)
    }
    return results, nil
}

// GetBlockHeight is the resolver for the getBlockHeight field.
func (r *queryResolver) GetBlockHeight(ctx context.Context, contracts []*model.ContractQuery) ([]*model.BlockHeight, error) {
    // Generate string for right side of IN clause
    var contractString string
    contractTypeMap := make(map[string]model.ContractType)
    for i, contract := range contracts {
        contractAddr, err := r.getContractAddressFromType(uint32(contract.ChainID), contract.Type)
        if err != nil {
            return nil, fmt.Errorf("could not get contract address from type %s, %w", contract.Type, err)
        }
        contractTypeMap[contractAddr] = contract.Type
        if i == 0 {
            contractString += fmt.Sprintf("('%s', %d)", contractAddr, contract.ChainID)
        } else {
            contractString += fmt.Sprintf(", ('%s', %d)", contractAddr, contract.ChainID)
        }
    }

    query := fmt.Sprintf("SELECT contract_address, chain_id, block_number FROM last_blocks WHERE (contract_address, chain_id) IN (%s) ORDER BY block_number", contractString)
    results, err := r.DB.GetBlockHeights(ctx, query, contractTypeMap)
    if err != nil {
        return nil, fmt.Errorf("could not get block heights from database %w", err)
    }
    return results, nil
}

// Query returns resolvers.QueryResolver implementation.
func (r *Resolver) Query() resolvers.QueryResolver { return &queryResolver{r} }

type queryResolver struct{ *Resolver }