synapsecns/sanguine

View on GitHub
services/explorer/graphql/server/graph/queryutils.go

Summary

Maintainability
F
1 wk
Test Coverage
package graph

import (
    "context"
    "fmt"
    "math/big"
    "strconv"
    "sync"
    "time"

    "github.com/synapsecns/sanguine/services/explorer/contracts/user"
    "golang.org/x/sync/errgroup"

    "github.com/synapsecns/sanguine/services/explorer/db/sql"
    "github.com/synapsecns/sanguine/services/explorer/graphql/server/graph/model"
    "github.com/synapsecns/sanguine/services/explorer/types/bridge"
)

// nolint:unparam
func generateDeDepQuery(filter string, page *int, offset *int) string {
    if page != nil || offset != nil {
        return fmt.Sprintf("SELECT * FROM bridge_events %s ORDER BY timestamp DESC, block_number DESC, event_index DESC, insert_time DESC LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash LIMIT %d OFFSET %d", filter, *page, *offset)
    }

    return fmt.Sprintf("SELECT * FROM bridge_events %s ORDER BY timestamp DESC, block_number DESC, event_index DESC, insert_time DESC LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash", filter)
}

func generateDeDepQueryCTE(filter string, page *int, offset *int, in bool) string {
    minTimestamp := " (SELECT min(timestamp) - 86400 FROM baseQuery) AS minTimestamp, (SELECT count(*) FROM baseQuery) AS rowCount"
    if in {
        minTimestamp = " (SELECT min(timestamp) FROM baseQuery) AS minTimestamp, (SELECT count(*) FROM baseQuery) AS rowCount"
    }
    if page != nil || offset != nil {
        return fmt.Sprintf("WITH baseQuery AS (SELECT * FROM bridge_events %s ORDER BY timestamp DESC, block_number DESC, event_index DESC, insert_time DESC LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash LIMIT %d OFFSET %d), %s, %s", filter, *page, *offset, minTimestamp, swapDeDup)
    }
    return fmt.Sprintf("WITH baseQuery AS (SELECT * FROM bridge_events %s ORDER BY timestamp DESC, block_number DESC, event_index DESC, insert_time DESC LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash), %s, %s", filter, minTimestamp, swapDeDup)
}

func (r *queryResolver) getDirectionIn(direction *model.Direction) bool {
    var directionIn bool

    if direction != nil {
        directionIn = *direction == model.DirectionIn
    } else {
        directionIn = true
    }

    return directionIn
}

// GetTargetTime converts the number of hours into a timestamp.
func GetTargetTime(hours *int) uint64 {
    var targetTime uint64

    if hours == nil {
        targetTime = uint64(time.Now().Add(-time.Hour * 24).Unix())
    } else {
        targetTime = uint64(time.Now().Add(-time.Hour * time.Duration(*hours)).Unix())
    }

    return targetTime
}

func (r *queryResolver) mergeBridgeTransactions(origin []*model.BridgeTransaction, destination []*model.BridgeTransaction) []*model.BridgeTransaction {
    var results []*model.BridgeTransaction
    uniqueBridgeTransactions := make(map[string]*model.BridgeTransaction)

    for _, originTx := range origin {
        key := keyGen(fmt.Sprintf("%d", *originTx.FromInfo.ChainID), *originTx.Kappa)
        uniqueBridgeTransactions[key] = originTx
    }

    for _, destinationTx := range destination {
        key := keyGen(fmt.Sprintf("%d", *destinationTx.FromInfo.ChainID), *destinationTx.Kappa)
        uniqueBridgeTransactions[key] = destinationTx
    }

    for _, v := range uniqueBridgeTransactions {
        results = append(results, v)
    }

    return results
}

// generateAddressSpecifierSQL generates a where function with an string.
//
// nolint:unparam
func generateAddressSpecifierSQL(address *string, firstFilter *bool, tablePrefix string) string {
    // if address != nil {
    //    if *firstFilter {
    //        *firstFilter = false
    //
    //        return fmt.Sprintf(" WHERE (%s%s = '%s' OR  %s%s = '%s')", tablePrefix, sql.RecipientFieldName, *address, tablePrefix, sql.SenderFieldName, *address)
    //    }
    //
    //    return fmt.Sprintf(" AND (%s%s = '%s' OR %s%s = '%s')", tablePrefix, sql.RecipientFieldName, *address, tablePrefix, sql.SenderFieldName, *address)
    //}
    //
    // return ""
    if address != nil {
        if *firstFilter {
            *firstFilter = false

            return fmt.Sprintf(" WHERE %s%s = '%s'", tablePrefix, sql.SenderFieldName, *address)
        }

        return fmt.Sprintf(" AND %s%s = '%s'", tablePrefix, sql.SenderFieldName, *address)
    }

    return ""
}

// generateAddressSpecifierSQL generates a where function with an string.
//
// nolint:unparam
func generateAddressSpecifierSQLMv(address *string, firstFilter *bool, firstInLocale *bool, tablePrefix string) string {
    // if address != nil {
    //    if *firstFilter {
    //        *firstFilter = false
    //
    //        return fmt.Sprintf(" WHERE (%s%s = '%s' OR  %s%s = '%s')", tablePrefix, sql.RecipientFieldName, *address, tablePrefix, sql.SenderFieldName, *address)
    //    }
    //
    //    return fmt.Sprintf(" AND (%s%s = '%s' OR %s%s = '%s')", tablePrefix, sql.RecipientFieldName, *address, tablePrefix, sql.SenderFieldName, *address)
    //}
    //
    // return ""
    if address != nil {
        if *firstInLocale {
            *firstFilter = false
            *firstInLocale = false
            return fmt.Sprintf("  %s%s = '%s'", tablePrefix, sql.SenderFieldName, *address)
        }
        if *firstFilter {
            *firstFilter = false

            return fmt.Sprintf(" WHERE %s%s = '%s'", tablePrefix, sql.SenderFieldName, *address)
        }

        return fmt.Sprintf(" AND %s%s = '%s'", tablePrefix, sql.SenderFieldName, *address)
    }

    return ""
}

func generateRecipientSpecifierSQL(address *string, firstFilter *bool, tablePrefix string) string {
    // if address != nil {
    //    if *firstFilter {
    //        *firstFilter = false
    //
    //        return fmt.Sprintf(" WHERE (%s%s = '%s' OR  %s%s = '%s')", tablePrefix, sql.RecipientFieldName, *address, tablePrefix, sql.SenderFieldName, *address)
    //    }
    //
    //    return fmt.Sprintf(" AND (%s%s = '%s' OR %s%s = '%s')", tablePrefix, sql.RecipientFieldName, *address, tablePrefix, sql.SenderFieldName, *address)
    //}
    //
    // return ""
    if address != nil {
        if *firstFilter {
            *firstFilter = false

            return fmt.Sprintf(" WHERE %s%s = '%s'", tablePrefix, sql.RecipientFieldName, *address)
        }

        return fmt.Sprintf(" AND %s%s = '%s'", tablePrefix, sql.RecipientFieldName, *address)
    }

    return ""
}

func generateRecipientSpecifierSQLMv(address *string, firstFilter *bool, firstInLocale *bool, tablePrefix string) string {
    // if address != nil {
    //    if *firstFilter {
    //        *firstFilter = false
    //
    //        return fmt.Sprintf(" WHERE (%s%s = '%s' OR  %s%s = '%s')", tablePrefix, sql.RecipientFieldName, *address, tablePrefix, sql.SenderFieldName, *address)
    //    }
    //
    //    return fmt.Sprintf(" AND (%s%s = '%s' OR %s%s = '%s')", tablePrefix, sql.RecipientFieldName, *address, tablePrefix, sql.SenderFieldName, *address)
    //}
    //
    // return ""
    if address != nil {
        if *firstInLocale {
            *firstFilter = false
            *firstInLocale = false
            return fmt.Sprintf(" %s%s = '%s'", tablePrefix, sql.RecipientFieldName, *address)
        }
        if *firstFilter {
            *firstFilter = false

            return fmt.Sprintf(" WHERE %s%s = '%s'", tablePrefix, sql.RecipientFieldName, *address)
        }

        return fmt.Sprintf(" AND %s%s = '%s'", tablePrefix, sql.RecipientFieldName, *address)
    }

    return ""
}

// generateEqualitySpecifierSQL generates a where function with an equality.
//
// nolint:unparam
func generateEqualitySpecifierSQL(value *int, field string, firstFilter *bool, tablePrefix string, greaterThan bool) string {
    operator := "<"
    if greaterThan {
        operator = ">"
    }
    if value != nil {
        if *firstFilter {
            *firstFilter = false

            return fmt.Sprintf(" WHERE %s%s %s %d", tablePrefix, field, operator, *value)
        }

        return fmt.Sprintf(" AND %s%s %s %d", tablePrefix, field, operator, *value)
    }

    return ""
}

// generateCCTPSpecifierSQLMv generates a where function with event type to filter only cctp events.
func generateCCTPSpecifierSQL(onlyCctp *bool, to bool, field string, firstFilter *bool, tablePrefix string) string {
    if onlyCctp != nil && *onlyCctp {
        // From explorer/types/bridge/eventtypes.go
        eventType := 10
        if to {
            eventType = 11
        }

        if *firstFilter {
            *firstFilter = false

            return fmt.Sprintf(" WHERE %s%s =  %d", tablePrefix, field, eventType)
        }

        return fmt.Sprintf(" AND %s%s = %d", tablePrefix, field, eventType)
    }

    return ""
}

// generateEqualitySpecifierSQL generates a where function with an equality.
//
// nolint:unparam
func generateEqualitySpecifierSQLMv(value *int, field string, firstFilter *bool, firstInLocale *bool, tablePrefix string, greaterThan bool) string {
    operator := "<"
    if greaterThan {
        operator = ">"
    }
    if value != nil {
        if *firstInLocale {
            *firstFilter = false
            *firstInLocale = false
            return fmt.Sprintf(" %s%s %s %d", tablePrefix, field, operator, *value)
        }
        if *firstFilter {
            *firstFilter = false

            return fmt.Sprintf(" WHERE %s%s %s %d", tablePrefix, field, operator, *value)
        }

        return fmt.Sprintf(" AND %s%s %s %d", tablePrefix, field, operator, *value)
    }

    return ""
}

// generateDirectionSpecifierSQL generates a where function with a string.
//
// nolint:unparam
func generateDirectionSpecifierSQL(in bool, firstFilter *bool, tablePrefix string) string {
    if in {
        if *firstFilter {
            *firstFilter = false

            return fmt.Sprintf(" WHERE %s%s > 0", tablePrefix, sql.DestinationChainIDFieldName)
        }

        return fmt.Sprintf(" AND %s%s > 0", tablePrefix, sql.DestinationChainIDFieldName)
    }
    if *firstFilter {
        *firstFilter = false

        return fmt.Sprintf(" WHERE %s%s = 0", tablePrefix, sql.DestinationChainIDFieldName)
    }

    return fmt.Sprintf(" AND %s%s = 0", tablePrefix, sql.DestinationChainIDFieldName)
}

// generateSingleSpecifierI32SQL generates a where function with an uint32.
//
// nolint:unparam
func generateSingleSpecifierI32SQL(value *int, field string, firstFilter *bool, tablePrefix string) string {
    if value != nil {
        if *firstFilter {
            *firstFilter = false

            return fmt.Sprintf(" WHERE %s%s = %d", tablePrefix, field, *value)
        }

        return fmt.Sprintf(" AND %s%s = %d", tablePrefix, field, *value)
    }

    return ""
}

// generateSingleSpecifierI32ArrSQL generates a where function with an uint32.
//
// nolint:unparam
func generateSingleSpecifierI32ArrSQL(values []*int, field string, firstFilter *bool, tablePrefix string) string {
    if len(values) == 0 {
        return ""
    }
    var final string
    if *firstFilter {
        *firstFilter = false
        final += whereString
    }

    for i := range values {
        final += fmt.Sprintf(" %s%s = %d", tablePrefix, field, *values[i])
        if i < len(values)-1 {
            final += orString
        }
    }

    return final + ")"
}

// generateSingleSpecifierI32ArrSQL generates a where function with an uint32.
//
// nolint:unparam
func generateSingleSpecifierI32ArrSQLMv(values []*int, field string, firstFilter *bool, firstInLocale *bool, tablePrefix string) string {
    if len(values) == 0 {
        return ""
    }
    var final string
    if *firstInLocale {
        *firstInLocale = false
        *firstFilter = false
        final += " ("
    } else if *firstFilter {
        *firstFilter = false
        final += whereString
    }
    for i := range values {
        final += fmt.Sprintf(" %s%s = %d", tablePrefix, field, *values[i])
        if i < len(values)-1 {
            final += orString
        }
    }

    return final + ")"
}

// GenerateSingleSpecifierStringSQL generates a where function with a string.
//
// nolint:unparam
func generateSingleSpecifierStringArrSQL(values []*string, field string, firstFilter *bool, tablePrefix string) string {
    if len(values) == 0 {
        return ""
    }
    var final string
    if *firstFilter {
        *firstFilter = false
        final += whereString
    } else {
        final += " AND ("
    }

    for i := range values {
        if values[i] != nil {
            final += fmt.Sprintf(" %s%s = '%s'", tablePrefix, field, *values[i])
            if i < len(values)-1 {
                final += orString
            }
        }
    }

    return final + ")"
}

// GenerateSingleSpecifierStringSQL generates a where function with a string.
//
// nolint:unparam
func generateSingleSpecifierStringArrSQLMv(values []*string, field string, firstFilter *bool, firstInLocale *bool, tablePrefix string) string {
    if len(values) == 0 {
        return ""
    }
    var final string
    if *firstInLocale {
        *firstInLocale = false
        *firstFilter = false
        final += " ("
    } else {
        if *firstFilter {
            *firstFilter = false
            final += whereString
        } else {
            final += " AND ("
        }
    }
    for i := range values {
        if values[i] != nil {
            final += fmt.Sprintf(" %s%s = '%s'", tablePrefix, field, *values[i])
            if i < len(values)-1 {
                final += orString
            }
        }
    }

    return final + ")"
}

// generateTimestampSpecifierSQL generates a where function with an uint64.
//
// nolint:unparam
func generateTimestampSpecifierSQL(value *uint64, field string, firstFilter *bool, tablePrefix string) string {
    if value != nil {
        if *firstFilter {
            *firstFilter = false

            return fmt.Sprintf(" WHERE %s%s >= %d", tablePrefix, field, *value)
        }

        return fmt.Sprintf(" AND %s%s >= %d", tablePrefix, field, *value)
    }

    return ""
}

// GenerateSingleSpecifierStringSQL generates a where function with a string.
//
// nolint:unparam
func generateSingleSpecifierStringSQL(value *string, field string, firstFilter *bool, tablePrefix string) string {
    if value != nil {
        if *firstFilter {
            *firstFilter = false

            return fmt.Sprintf(" WHERE %s%s =  '%s'", tablePrefix, field, *value)
        }

        return fmt.Sprintf(" AND %s%s =  '%s'", tablePrefix, field, *value)
    }

    return ""
}

// GenerateSingleSpecifierStringSQL generates a where function with a string.
//
// nolint:unparam
func generateSingleSpecifierStringSQLMv(value *string, field string, firstFilter *bool, firstLocale *bool, tablePrefix string) string {
    if value != nil {
        if *firstLocale {
            *firstFilter = false
            *firstLocale = false
            return fmt.Sprintf(" %s%s = '%s'", tablePrefix, field, *value)
        }
        if *firstFilter {
            *firstFilter = false

            return fmt.Sprintf(" WHERE %s%s = '%s'", tablePrefix, field, *value)
        }

        return fmt.Sprintf(" AND %s%s = '%s'", tablePrefix, field, *value)
    }

    return ""
}

// generateKappaSpecifierSQL generates a where function with a string.
func generateKappaSpecifierSQL(value *string, field string, firstFilter *bool, tablePrefix string) string {
    if value != nil {
        if *firstFilter {
            *firstFilter = false

            return fmt.Sprintf(" WHERE %s%s = '%s'", tablePrefix, field, *value)
        }

        return fmt.Sprintf(" AND %s%s = '%s'", tablePrefix, field, *value)
    }

    return ""
}

// generateKappaSpecifierSQL generates a where function with a string.
func generateKappaSpecifierSQLMv(value *string, field string, firstFilter *bool, firstInLocale *bool, tablePrefix string) string {
    if value != nil {
        if *firstInLocale {
            *firstFilter = false
            *firstInLocale = false
            return fmt.Sprintf(" %s%s = '%s'", tablePrefix, field, *value)
        }
        if *firstFilter {
            *firstFilter = false

            return fmt.Sprintf(" WHERE %s%s = '%s'", tablePrefix, field, *value)
        }

        return fmt.Sprintf(" AND %s%s = '%s'", tablePrefix, field, *value)
    }

    return ""
}

// generateCCTPSpecifierSQLMv generates a where function with event type to filter only cctp events.
func generateCCTPSpecifierSQLMv(onlyCctp *bool, to bool, field string, firstFilter *bool, firstInLocale *bool, tablePrefix string) string {
    if onlyCctp != nil && *onlyCctp {
        // From explorer/types/bridge/eventtypes.go
        eventType := 10
        if to {
            eventType = 11
        }

        if *firstInLocale {
            *firstFilter = false
            *firstInLocale = false
            return fmt.Sprintf(" %s%s = %d", tablePrefix, field, eventType)
        }
        if *firstFilter {
            *firstFilter = false

            return fmt.Sprintf(" WHERE %s%s =  %d", tablePrefix, field, eventType)
        }

        return fmt.Sprintf(" AND %s%s = %d", tablePrefix, field, eventType)
    }

    return ""
}

//// generateDestinationChainIDSpecifierSQL generates a where function with a string.
// func generateDestinationChainIDSpecifierSQL(field string, firstFilter *bool, tablePrefix string, destination bool) string {
//    if destination {
//        if *firstFilter {
//            *firstFilter = false
//
//            return fmt.Sprintf(" WHERE %s%s == 0", tablePrefix, field)
//        }
//
//        return fmt.Sprintf(" AND %s%s  == 0", tablePrefix, field)
//    }
//    if *firstFilter {
//        *firstFilter = false
//
//        return fmt.Sprintf(" WHERE %s%s > 0", tablePrefix, field)
//    }
//    return fmt.Sprintf(" AND %s%s  > 0", tablePrefix, field)
//}

// generateBridgeEventCountQuery creates the query for bridge event count.
func generateBridgeEventCountQuery(chainID *int, address *string, tokenAddress *string, directionIn bool, timestamp *uint64, isTokenCount bool) string {
    chainField := sql.ChainIDFieldName

    firstFilter := true
    directionSpecifier := generateDirectionSpecifierSQL(directionIn, &firstFilter, "")
    chainIDSpecifier := generateSingleSpecifierI32SQL(chainID, chainField, &firstFilter, "")
    addressSpecifier := generateSingleSpecifierStringSQL(address, sql.RecipientFieldName, &firstFilter, "")
    tokenAddressSpecifier := generateSingleSpecifierStringSQL(tokenAddress, sql.TokenFieldName, &firstFilter, "")
    timestampSpecifier := generateTimestampSpecifierSQL(timestamp, sql.TimeStampFieldName, &firstFilter, "")

    compositeFilters := fmt.Sprintf(
        `%s%s%s%s%s`,
        directionSpecifier, chainIDSpecifier, addressSpecifier, tokenAddressSpecifier, timestampSpecifier,
    )
    var query string
    if isTokenCount {
        query = fmt.Sprintf(`%s SELECT %s, %s AS TokenAddress, COUNT(DISTINCT (%s)) AS Count FROM (SELECT %s FROM %s %s) GROUP BY %s, %s ORDER BY Count Desc`,
            generateDeDepQueryCTE(compositeFilters, nil, nil, true), sql.ChainIDFieldName, sql.TokenFieldName, sql.TxHashFieldName, singleSideCol, "baseQuery", singleSideJoinsCTE, sql.TokenFieldName, sql.ChainIDFieldName)
    } else {
        query = fmt.Sprintf(`%s SELECT %s, COUNT(DISTINCT (%s)) AS Count FROM (SELECT %s FROM %s %s) GROUP BY %s ORDER BY Count Desc`,
            generateDeDepQueryCTE(compositeFilters, nil, nil, true), sql.ChainIDFieldName, sql.TxHashFieldName, singleSideCol, "baseQuery", singleSideJoinsCTE, sql.ChainIDFieldName)
    }
    return query
}

// GetPartialInfoFromBridgeEventHybrid returns the partial info from bridge event.
//
// nolint:cyclop
func GetPartialInfoFromBridgeEventHybrid(bridgeEvent sql.HybridBridgeEvent, includePending *bool) (*model.BridgeTransaction, error) {
    if includePending != nil && *includePending && bridgeEvent.TTxHash != "" {
        // nolint:nilnil
        return nil, nil
    }
    var bridgeTx model.BridgeTransaction
    fromChainID := int(bridgeEvent.FChainID)
    fromDestinationChainID := int(bridgeEvent.FDestinationChainID.Uint64())
    fromBlockNumber := int(bridgeEvent.FBlockNumber)
    fromValue := bridgeEvent.FAmount.String()
    fromEventTypeFormatted := bridge.GetEventType(bridgeEvent.FEventType)
    fromEventType := int(bridgeEvent.FEventType)

    var fromTimestamp int
    var fromFormattedValue *float64
    var fromTimeStampFormatted string
    if bridgeEvent.FTokenDecimal != nil {
        fromFormattedValue = getAdjustedValue(bridgeEvent.FAmount, *bridgeEvent.FTokenDecimal)
    } else {
        return nil, fmt.Errorf("token decimal is not valid")
    }
    if bridgeEvent.FTimeStamp != nil {
        fromTimestamp = int(*bridgeEvent.FTimeStamp)
        fromTimeStampFormatted = time.Unix(int64(*bridgeEvent.FTimeStamp), 0).String()
    } else {
        return nil, fmt.Errorf("timestamp is not valid")
    }

    fAddress := bridgeEvent.FRecipient.String
    if bridgeEvent.FEventType == bridge.CircleRequestSentEvent.Int() {
        fAddress = bridgeEvent.FSender
    }
    fromInfos := &model.PartialInfo{
        ChainID:            &fromChainID,
        DestinationChainID: &fromDestinationChainID,
        Address:            &fAddress,
        TxnHash:            &bridgeEvent.FTxHash,
        Value:              &fromValue,
        FormattedValue:     fromFormattedValue,
        USDValue:           bridgeEvent.FAmountUSD,
        TokenAddress:       &bridgeEvent.FToken,
        TokenSymbol:        &bridgeEvent.FTokenSymbol.String,
        BlockNumber:        &fromBlockNumber,
        Time:               &fromTimestamp,
        FormattedTime:      &fromTimeStampFormatted,
        FormattedEventType: &fromEventTypeFormatted,
        EventType:          &fromEventType,
    }

    // If not pending, return a destination partial, otherwise toInfos will be null.
    var pending bool
    var toInfos *model.PartialInfo
    // nolint:nestif
    if bridgeEvent.TTxHash != "" {
        toChainID := int(bridgeEvent.TChainID)
        toBlockNumber := int(bridgeEvent.TBlockNumber)
        toValue := bridgeEvent.TAmount.String()
        var toTimestamp int
        var toFormattedValue *float64
        var toTimeStampFormatted string
        if bridgeEvent.TTokenDecimal != nil {
            toFormattedValue = getAdjustedValue(bridgeEvent.TAmount, *bridgeEvent.TTokenDecimal)
        } else {
            return nil, fmt.Errorf("token decimal is not valid")
        }
        if bridgeEvent.TTimeStamp != nil {
            toTimestamp = int(*bridgeEvent.TTimeStamp)
            toTimeStampFormatted = time.Unix(int64(*bridgeEvent.TTimeStamp), 0).String()
        } else {
            return nil, fmt.Errorf("timestamp is not valid")
        }
        toEventTypeFormatted := bridge.GetEventType(bridgeEvent.TEventType)
        toEventType := int(bridgeEvent.TEventType)

        tAddress := bridgeEvent.TRecipient.String
        if bridgeEvent.FEventType == bridge.CircleRequestFulfilledEvent.Int() {
            tAddress = bridgeEvent.TSender
        }

        toInfos = &model.PartialInfo{
            ChainID:            &toChainID,
            Address:            &tAddress,
            TxnHash:            &bridgeEvent.TTxHash,
            Value:              &toValue,
            FormattedValue:     toFormattedValue,
            USDValue:           bridgeEvent.TAmountUSD,
            TokenAddress:       &bridgeEvent.TToken,
            TokenSymbol:        &bridgeEvent.TTokenSymbol.String,
            BlockNumber:        &toBlockNumber,
            Time:               &toTimestamp,
            FormattedTime:      &toTimeStampFormatted,
            FormattedEventType: &toEventTypeFormatted,
            EventType:          &toEventType,
        }
    } else {
        toInfos = nil
        pending = true
    }

    var swapSuccess bool
    if bridgeEvent.TSwapSuccess.Uint64() == 1 {
        swapSuccess = true
    }
    if includePending != nil && !*includePending && pending {
        // nolint:nilnil
        return nil, nil
    }
    kappa := bridgeEvent.FDestinationKappa
    if kappa == "" {
        kappa = bridgeEvent.TKappa.String
    }
    bridgeModule := getBridgeModule(int(bridgeEvent.FEventType))
    bridgeTx = model.BridgeTransaction{
        FromInfo:     fromInfos,
        ToInfo:       toInfos,
        Kappa:        &kappa,
        Pending:      &pending,
        SwapSuccess:  &swapSuccess,
        BridgeModule: &bridgeModule,
    }
    return &bridgeTx, nil
}

func generateMessageBusQuery(chainID []*int, address *string, startTime *int, endTime *int, messageID *string, pending bool, reverted bool, txHash *string, page int) string {
    firstFilter := true

    chainIDSpecifier := generateSingleSpecifierI32ArrSQL(chainID, sql.ChainIDFieldName, &firstFilter, "")

    minTimeSpecfier := generateEqualitySpecifierSQL(startTime, sql.TimeStampFieldName, &firstFilter, "", true)
    maxTimeSpecfier := generateEqualitySpecifierSQL(endTime, sql.TimeStampFieldName, &firstFilter, "", false)

    addressSpecifier := generateAddressSpecifierSQL(address, &firstFilter, "")
    messageIDSpecifier := generateSingleSpecifierStringSQL(messageID, "message_id", &firstFilter, "")
    txHashSpecifier := generateSingleSpecifierStringSQL(txHash, sql.TxHashFieldName, &firstFilter, "")
    operation := " = ''"
    if !pending {
        operation = " != ''"
    }
    pendingSpecifier := fmt.Sprintf(" WHERE t.message_id %s", operation)
    compositeFilters := chainIDSpecifier + minTimeSpecfier + maxTimeSpecfier + addressSpecifier + messageIDSpecifier + txHashSpecifier
    pageValue := sql.PageSize
    pageOffset := (page - 1) * sql.PageSize

    cte := fmt.Sprintf("WITH baseQuery AS (SELECT * FROM message_bus_events %s ORDER BY timestamp DESC, block_number DESC, event_index DESC, insert_time DESC LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash), (SELECT min(timestamp) FROM baseQuery) AS minTimestamp", compositeFilters)

    finalQuery := fmt.Sprintf("%s SELECT * FROM (SELECT * FROM (SELECT * FROM %s WHERE %s = 1 ) f LEFT JOIN (SELECT * FROM (%s) WHERE %s = 0) t ON f.%s = t.%s %s)  LIMIT %d OFFSET %d", cte, "baseQuery", sql.EventTypeFieldName, baseMessageBus, sql.EventTypeFieldName, "message_id", "message_id", pendingSpecifier, pageValue, pageOffset)

    if reverted {
        finalQuery = fmt.Sprintf("%s SELECT * FROM  (SELECT * FROM (select * from (%s) WHERE %s = 1) f RIGHT OUTER JOIN (Select r.reverted_reason AS reverted_reason, j.reverted_reason AS rrr, * FROM (select * from %s WHERE event_type = 0 and status = 'Fail') j LEFT JOIN (select reverted_reason, tx_hash from (%s) WHERE %s = 2) r on j.tx_hash = r.tx_hash) t ON f.%s = t.%s)  LIMIT %d OFFSET %d", cte, baseMessageBus, sql.EventTypeFieldName, "baseQuery", baseMessageBus, sql.EventTypeFieldName, "message_id", "message_id", pageValue, pageOffset)
    }
    return finalQuery
}
func generateAllBridgeEventsQueryFromDestination(chainIDTo []*int, chainIDFrom []*int, addressFrom *string, addressTo *string, maxAmount *int, minAmount *int, maxAmountUsd *int, minAmountUsd *int, startTime *int, endTime *int, tokenAddressFrom []*string, tokenAddressTo []*string, kappa *string, txHash *string, onlyCctp *bool, page int, in bool) string {
    firstFilter := true
    chainIDToFilter := generateSingleSpecifierI32ArrSQL(chainIDTo, sql.ChainIDFieldName, &firstFilter, "")
    minTimeFilter := generateEqualitySpecifierSQL(startTime, sql.TimeStampFieldName, &firstFilter, "", true)
    maxTimeFilter := generateEqualitySpecifierSQL(endTime, sql.TimeStampFieldName, &firstFilter, "", false)
    addressToFilter := generateAddressSpecifierSQL(addressTo, &firstFilter, "")
    kappaFilter := generateKappaSpecifierSQL(kappa, sql.KappaFieldName, &firstFilter, "")
    txHashFilter := generateSingleSpecifierStringSQL(txHash, sql.TxHashFieldName, &firstFilter, "")
    directionFilter := generateDirectionSpecifierSQL(in, &firstFilter, "")
    cctpFilter := generateCCTPSpecifierSQL(onlyCctp, true, sql.EventTypeFieldName, &firstFilter, "")

    toFilters := chainIDToFilter + minTimeFilter + maxTimeFilter + addressToFilter + kappaFilter + txHashFilter + directionFilter + cctpFilter

    firstFilter = false
    chainIDFromFilter := generateSingleSpecifierI32ArrSQL(chainIDFrom, sql.ChainIDFieldName, &firstFilter, "")
    addressFromFilter := generateAddressSpecifierSQL(addressFrom, &firstFilter, "")

    fromFilters := chainIDFromFilter + addressFromFilter

    firstFilter = true
    minAmountFilter := generateEqualitySpecifierSQL(minAmount, "tamount", &firstFilter, "", true)
    minAmountFilterUsd := generateEqualitySpecifierSQL(minAmountUsd, "tamount_usd", &firstFilter, "", true)
    maxAmountFilter := generateEqualitySpecifierSQL(maxAmount, "famount", &firstFilter, "", false)
    maxAmountFilterUsd := generateEqualitySpecifierSQL(maxAmountUsd, "famount_usd", &firstFilter, "", false)
    tokenAddressToFilter := generateSingleSpecifierStringArrSQL(tokenAddressTo, "ttoken", &firstFilter, "")
    tokenAddressFromFilter := generateSingleSpecifierStringArrSQL(tokenAddressFrom, "ftoken", &firstFilter, "")
    postJoinFilters := minAmountFilter + minAmountFilterUsd + maxAmountFilter + maxAmountFilterUsd + tokenAddressToFilter + tokenAddressFromFilter

    pageValue := sql.PageSize
    pageOffset := (page - 1) * sql.PageSize
    if postJoinFilters == "" {
        return fmt.Sprintf("%s SELECT %s FROM %s %s %s %s", generateDeDepQueryCTE(toFilters, &pageValue, &pageOffset, false), destToOriginCol, "baseQuery", destToOriginJoinsPt1, fromFilters, destToOriginJoinsPt2)
    }
    return fmt.Sprintf("%s SELECT * FROM (SELECT %s FROM %s %s %s %s) %s LIMIT %d OFFSET %d", generateDeDepQueryCTE(toFilters, nil, nil, false), destToOriginCol, "baseQuery", destToOriginJoinsPt1, fromFilters, destToOriginJoinsPt2, postJoinFilters, pageValue, pageOffset)
}

func generateAllBridgeEventsQueryFromDestinationMv(chainIDTo []*int, addressTo *string, minAmount *int, minAmountUsd *int, startTime *int, endTime *int, tokenAddressTo []*string, kappa *string, txHash *string, pending *bool, page int) string {
    firstFilter := true
    chainIDToFilter := generateSingleSpecifierI32ArrSQL(chainIDTo, sql.ChainIDFieldName, &firstFilter, "t")
    minTimeFilter := generateEqualitySpecifierSQL(startTime, sql.TimeStampFieldName, &firstFilter, "t", true)
    maxTimeFilter := generateEqualitySpecifierSQL(endTime, sql.TimeStampFieldName, &firstFilter, "t", false)
    addressToFilter := generateRecipientSpecifierSQL(addressTo, &firstFilter, "t")
    kappaFilter := generateKappaSpecifierSQL(kappa, sql.KappaFieldName, &firstFilter, "t")
    txHashFilter := generateSingleSpecifierStringSQL(txHash, sql.TxHashFieldName, &firstFilter, "t")
    minAmountFilter := generateEqualitySpecifierSQL(minAmount, "tamount", &firstFilter, "", true)
    minAmountFilterUsd := generateEqualitySpecifierSQL(minAmountUsd, "tamount_usd", &firstFilter, "", true)
    tokenAddressToFilter := generateSingleSpecifierStringArrSQL(tokenAddressTo, "ttoken", &firstFilter, "")

    // firstFilter = false
    // chainIDFromFilter := generateSingleSpecifierI32ArrSQL(chainIDFrom, sql.ChainIDFieldName, &firstFilter, "")
    // addressFromFilter := generateAddressSpecifierSQL(addressFrom, &firstFilter, "")
    // maxAmountFilter := generateEqualitySpecifierSQL(maxAmount, "famount", &firstFilter, "", false)
    // maxAmountFilterUsd := generateEqualitySpecifierSQL(maxAmountUsd, "famount_usd", &firstFilter, "", false)
    // tokenAddressFromFilter := generateSingleSpecifierStringArrSQL(tokenAddressFrom, "ftoken", &firstFilter, "")

    // fromFilters := chainIDFromFilter + addressFromFilter

    // firstFilter = true
    // minAmountFilter := generateEqualitySpecifierSQL(minAmount, "tamount", &firstFilter, "", true)
    // minAmountFilterUsd := generateEqualitySpecifierSQL(minAmountUsd, "tamount_usd", &firstFilter, "", true)
    pendingFilter := ""
    if pending != nil {
        prefix := " AND "
        if firstFilter {
            prefix = " WHERE "
        }
        if *pending {
            pendingFilter = prefix + "fdestination_kappa = ''"
        } else {
            pendingFilter = prefix + "fdestination_tkappa != ''"
        }
    }

    toFilters := chainIDToFilter + minTimeFilter + maxTimeFilter + addressToFilter + kappaFilter + txHashFilter + minAmountFilter + minAmountFilterUsd + tokenAddressToFilter + pendingFilter

    pageValue := sql.PageSize
    pageOffset := (page - 1) * sql.PageSize

    return fmt.Sprintf("SELECT * FROM mv_bridge_events %s ORDER BY ftimestamp DESC, fblock_number DESC, fevent_index DESC, insert_time DESC LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash LIMIT %d OFFSET %d ", toFilters, pageValue, pageOffset)
}

// generateAllBridgeEventsQueryFromOrigin gets all the filters for query from origin.
//
// nolint:dupl
func generateAllBridgeEventsQueryFromOrigin(chainIDFrom []*int, chainIDTo []*int, addressFrom *string, addressTo *string, maxAmount *int, minAmount *int, maxAmountUsd *int, minAmountUsd *int, startTime *int, endTime *int, tokenAddressFrom []*string, tokenAddressTo []*string, txHash *string, pending *bool, onlyCctp *bool, page int, in bool) string {
    firstFilter := true
    chainIDFromFilter := generateSingleSpecifierI32ArrSQL(chainIDFrom, sql.ChainIDFieldName, &firstFilter, "")
    minTimeFilter := generateEqualitySpecifierSQL(startTime, sql.TimeStampFieldName, &firstFilter, "", true)
    maxTimeFilter := generateEqualitySpecifierSQL(endTime, sql.TimeStampFieldName, &firstFilter, "", false)
    addressFromFilter := generateAddressSpecifierSQL(addressFrom, &firstFilter, "")
    txHashFilter := generateSingleSpecifierStringSQL(txHash, sql.TxHashFieldName, &firstFilter, "")
    directionFilter := generateDirectionSpecifierSQL(in, &firstFilter, "")
    cctpFilter := generateCCTPSpecifierSQL(onlyCctp, false, sql.EventTypeFieldName, &firstFilter, "")
    fromFilters := chainIDFromFilter + minTimeFilter + maxTimeFilter + addressFromFilter + txHashFilter + directionFilter + cctpFilter

    firstFilter = false
    chainIDToFilter := generateSingleSpecifierI32ArrSQL(chainIDTo, sql.ChainIDFieldName, &firstFilter, "")
    addressToFilter := generateAddressSpecifierSQL(addressTo, &firstFilter, "")

    toFilters := chainIDToFilter + addressToFilter

    firstFilter = false
    minAmountFilter := generateEqualitySpecifierSQL(minAmount, "tamount", &firstFilter, "", true)
    minAmountFilterUsd := generateEqualitySpecifierSQL(minAmountUsd, "tamount_usd", &firstFilter, "", true)
    maxAmountFilter := generateEqualitySpecifierSQL(maxAmount, "famount", &firstFilter, "", false)
    maxAmountFilterUsd := generateEqualitySpecifierSQL(maxAmountUsd, "famount_usd", &firstFilter, "", false)
    tokenAddressToFilter := generateSingleSpecifierStringArrSQL(tokenAddressTo, "ttoken", &firstFilter, "")
    tokenAddressFromFilter := generateSingleSpecifierStringArrSQL(tokenAddressFrom, "ftoken", &firstFilter, "")

    operation := " = ''"
    if pending != nil && !*pending {
        operation = " != ''"
    }
    pendingFilter := fmt.Sprintf(" WHERE t%s %s", sql.KappaFieldName, operation)
    postJoinFilters := minAmountFilter + minAmountFilterUsd + maxAmountFilter + maxAmountFilterUsd + tokenAddressToFilter + tokenAddressFromFilter

    pageValue := sql.PageSize
    pageOffset := (page - 1) * sql.PageSize
    if pending != nil && !*pending && postJoinFilters == "" {
        return fmt.Sprintf("%s SELECT %s FROM %s %s %s %s", generateDeDepQueryCTE(fromFilters, &pageValue, &pageOffset, false), originToDestCol, "baseQuery", originToDestJoinsPt1, toFilters, originToDestJoinsPt2)
    }
    return fmt.Sprintf("%s SELECT * FROM (SELECT %s FROM %s %s %s %s) %s LIMIT %d OFFSET %d", generateDeDepQueryCTE(fromFilters, nil, nil, false), originToDestCol, "baseQuery", originToDestJoinsPt1, toFilters, originToDestJoinsPt2, pendingFilter+postJoinFilters, pageValue, pageOffset)
}

// generateAllBridgeEventsQueryFromOriginMv gets all the filters for query from origin.
//
// nolint:dupl
func generateAllBridgeEventsQueryFromOriginMv(chainIDFrom []*int, addressFrom *string, maxAmount *int, maxAmountUsd *int, startTime *int, endTime *int, tokenAddressFrom []*string, txHash *string, kappa *string, pending *bool, page int) string {
    firstFilter := true
    chainIDFromFilter := generateSingleSpecifierI32ArrSQL(chainIDFrom, sql.ChainIDFieldName, &firstFilter, "f")
    minTimeFilter := generateEqualitySpecifierSQL(startTime, sql.TimeStampFieldName, &firstFilter, "f", true)
    maxTimeFilter := generateEqualitySpecifierSQL(endTime, sql.TimeStampFieldName, &firstFilter, "f", false)
    addressFromFilter := generateAddressSpecifierSQL(addressFrom, &firstFilter, "f")
    txHashFilter := generateSingleSpecifierStringSQL(txHash, sql.TxHashFieldName, &firstFilter, "f")
    tokenAddressFromFilter := generateSingleSpecifierStringArrSQL(tokenAddressFrom, "ftoken", &firstFilter, "")
    maxAmountFilter := generateEqualitySpecifierSQL(maxAmount, "famount", &firstFilter, "", false)
    maxAmountFilterUsd := generateEqualitySpecifierSQL(maxAmountUsd, "famount_usd", &firstFilter, "", false)
    kappaFilter := generateKappaSpecifierSQL(kappa, sql.DestinationKappaFieldName, &firstFilter, "f")
    // firstFilter = false
    // chainIDToFilter := generateSingleSpecifierI32ArrSQL(chainIDTo, sql.ChainIDFieldName, &firstFilter, "")
    // addressToFilter := generateAddressSpecifierSQL(addressTo, &firstFilter, "")
    // minAmountFilter := generateEqualitySpecifierSQL(minAmount, "tamount", &firstFilter, "", true)
    // minAmountFilterUsd := generateEqualitySpecifierSQL(minAmountUsd, "tamount_usd", &firstFilter, "", true)
    // tokenAddressToFilter := generateSingleSpecifierStringArrSQL(tokenAddressTo, "ttoken", &firstFilter, "")

    pendingFilter := ""
    if pending != nil {
        prefix := " AND "
        if firstFilter {
            prefix = " WHERE "
        }
        if *pending {
            pendingFilter = prefix + "tkappa = ''"
        } else {
            pendingFilter = prefix + "tkappa != ''"
        }
    }

    fromFilters := chainIDFromFilter + minTimeFilter + maxTimeFilter + addressFromFilter + txHashFilter + tokenAddressFromFilter + maxAmountFilter + maxAmountFilterUsd + pendingFilter + kappaFilter
    pageValue := sql.PageSize
    pageOffset := (page - 1) * sql.PageSize
    return fmt.Sprintf("SELECT * FROM mv_bridge_events %s ORDER BY ftimestamp DESC, fblock_number DESC, fevent_index DESC, insert_time DESC LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash LIMIT %d OFFSET %d ", fromFilters, pageValue, pageOffset)
}
func generateAllBridgeEventsQueryMv(chainIDFrom []*int, chainIDTo []*int, addressFrom *string, addressTo *string, maxAmount *int, minAmount *int, maxAmountUsd *int, minAmountUsd *int, startTime *int, endTime *int, tokenAddressFrom []*string, tokenAddressTo []*string, txHash *string, kappa *string, pending *bool, onlyCctp *bool, page int) string {
    firstFilter := true
    firstInLocale := true
    chainIDFromFilter := generateSingleSpecifierI32ArrSQLMv(chainIDFrom, sql.ChainIDFieldName, &firstFilter, &firstInLocale, "f")
    addressFromFilter := generateAddressSpecifierSQLMv(addressFrom, &firstFilter, &firstInLocale, "f")
    txHashFromFilter := generateSingleSpecifierStringSQLMv(txHash, sql.TxHashFieldName, &firstFilter, &firstInLocale, "f")
    tokenAddressFromFilter := generateSingleSpecifierStringArrSQLMv(tokenAddressFrom, sql.TokenFieldName, &firstFilter, &firstInLocale, "f")
    maxAmountFilter := generateEqualitySpecifierSQLMv(maxAmount, sql.AmountFieldName, &firstFilter, &firstInLocale, "f", false)
    maxAmountFilterUsd := generateEqualitySpecifierSQLMv(maxAmountUsd, sql.AmountUSDFieldName, &firstFilter, &firstInLocale, "f", false)
    kappaFromFilter := generateKappaSpecifierSQLMv(kappa, sql.DestinationKappaFieldName, &firstFilter, &firstInLocale, "f")
    onlyCCTPFromFilter := generateCCTPSpecifierSQLMv(onlyCctp, false, sql.EventTypeFieldName, &firstFilter, &firstInLocale, "f")

    // firstFilter = false
    firstInLocale = true
    chainIDToFilter := generateSingleSpecifierI32ArrSQLMv(chainIDTo, sql.ChainIDFieldName, &firstFilter, &firstInLocale, "t")
    addressToFilter := generateRecipientSpecifierSQLMv(addressTo, &firstFilter, &firstInLocale, "t")
    txHashToFilter := generateSingleSpecifierStringSQLMv(txHash, sql.TxHashFieldName, &firstFilter, &firstInLocale, "t")
    tokenAddressToFilter := generateSingleSpecifierStringArrSQLMv(tokenAddressTo, sql.TokenFieldName, &firstFilter, &firstInLocale, "t")
    minAmountFilter := generateEqualitySpecifierSQLMv(minAmount, sql.AmountFieldName, &firstFilter, &firstInLocale, "t", true)
    minAmountFilterUsd := generateEqualitySpecifierSQLMv(minAmountUsd, sql.AmountUSDFieldName, &firstFilter, &firstInLocale, "t", true)
    kappaToFilter := generateKappaSpecifierSQLMv(kappa, sql.KappaFieldName, &firstFilter, &firstInLocale, "t")
    onlyCCTPToFilter := generateCCTPSpecifierSQLMv(onlyCctp, true, sql.EventTypeFieldName, &firstFilter, &firstInLocale, "t")

    toFilters := chainIDFromFilter + addressFromFilter + txHashFromFilter + tokenAddressFromFilter + maxAmountFilter + maxAmountFilterUsd + kappaFromFilter + onlyCCTPFromFilter
    fromFilters := chainIDToFilter + addressToFilter + txHashToFilter + tokenAddressToFilter + minAmountFilter + minAmountFilterUsd + kappaToFilter + onlyCCTPToFilter

    minTimeFilter := generateEqualitySpecifierSQL(startTime, sql.TimeStampFieldName, &firstFilter, "f", true)
    maxTimeFilter := generateEqualitySpecifierSQL(endTime, sql.TimeStampFieldName, &firstFilter, "f", false)

    var allFilters string
    switch {
    case fromFilters != "" && toFilters != "":
        allFilters = fmt.Sprintf("WHERE ((%s) OR (%s)) %s", fromFilters, toFilters, minTimeFilter+maxTimeFilter)
    case fromFilters != "" && toFilters == "":
        allFilters = fmt.Sprintf("WHERE (%s) %s", fromFilters, minTimeFilter+maxTimeFilter)
    case fromFilters == "" && toFilters != "":
        allFilters = fmt.Sprintf("WHERE (%s) %s ", toFilters, minTimeFilter+maxTimeFilter)
    default:
        allFilters = minTimeFilter + maxTimeFilter
    }

    var pendingFilter string
    if pending != nil {
        if *pending {
            pendingFilter = "WHERE tkappa = '' AND fdestination_chain_id != 121014925"
        } else {
            pendingFilter = " WHERE tkappa != ''"
        }
    }
    pageValue := sql.PageSize
    pageOffset := (page - 1) * sql.PageSize
    return fmt.Sprintf("SELECT * FROM(SELECT * FROM mv_bridge_events %s ORDER BY ftimestamp DESC, fblock_number DESC, fevent_index DESC, insert_time DESC LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash) %s LIMIT %d OFFSET %d SETTINGS memory_overcommit_ratio_denominator=4000, memory_usage_overcommit_max_wait_microseconds=500 ", allFilters, pendingFilter, pageValue, pageOffset)
}

// nolint:cyclop
func (r *queryResolver) GetBridgeTxsFromDestination(ctx context.Context, useMv *bool, chainIDFrom []*int, chainIDTo []*int, addressFrom *string, addressTo *string, maxAmount *int, minAmount *int, maxAmountUsd *int, minAmountUsd *int, startTime *int, endTime *int, txHash *string, kappa *string, tokenAddressFrom []*string, tokenAddressTo []*string, onlyCctp *bool, page *int, pending *bool) ([]*model.BridgeTransaction, error) {
    var err error
    var results []*model.BridgeTransaction
    var query string
    if useMv != nil && *useMv {
        if chainIDTo == nil && addressTo == nil && minAmount == nil && minAmountUsd == nil && startTime == nil && endTime == nil && tokenAddressTo == nil && kappa == nil && txHash == nil {
            return nil, nil
        }
        query = generateAllBridgeEventsQueryFromDestinationMv(chainIDTo, addressTo, minAmount, minAmountUsd, startTime, endTime, tokenAddressTo, kappa, txHash, pending, *page)
    } else {
        query = generateAllBridgeEventsQueryFromDestination(chainIDFrom, chainIDTo, addressFrom, addressTo, maxAmount, minAmount, minAmountUsd, maxAmountUsd, startTime, endTime, tokenAddressFrom, tokenAddressTo, kappa, txHash, onlyCctp, *page, false)
    }
    allBridgeEvents, err := r.DB.GetAllBridgeEvents(ctx, query)

    if err != nil {
        return nil, fmt.Errorf("failed to get destinationbridge events from identifiers: %w", err)
    }
    if len(allBridgeEvents) == 0 {
        return nil, nil
    }

    // Iterate through all bridge events and return all partials
    for i := range allBridgeEvents {
        bridgeTx, err := GetPartialInfoFromBridgeEventHybrid(allBridgeEvents[i], nil)
        if err != nil {
            return nil, fmt.Errorf("failed to get partial info from bridge event: %w", err)
        }
        if bridgeTx != nil {
            results = append(results, bridgeTx)
        }
    }
    return results, nil
}

func (r *queryResolver) GetBridgeTxsFromOrigin(ctx context.Context, useMv *bool, chainIDFrom []*int, chainIDTo []*int, addressFrom *string, addressTo *string, maxAmount *int, minAmount *int, maxAmountUsd *int, minAmountUsd *int, startTime *int, endTime *int, txHash *string, tokenAddressTo []*string, tokenAddressFrom []*string, kappa *string, pending *bool, onlyCctp *bool, page *int, latest bool) ([]*model.BridgeTransaction, error) {
    var err error
    var chainMap = make(map[uint32]bool)
    var results []*model.BridgeTransaction
    query := generateAllBridgeEventsQueryFromOrigin(chainIDFrom, chainIDTo, addressFrom, addressTo, maxAmount, minAmount, maxAmountUsd, minAmountUsd, startTime, endTime, tokenAddressFrom, tokenAddressTo, txHash, pending, onlyCctp, *page, true)
    if useMv != nil && *useMv {
        query = generateAllBridgeEventsQueryFromOriginMv(chainIDFrom, addressFrom, maxAmount, maxAmountUsd, startTime, endTime, tokenAddressFrom, txHash, kappa, pending, *page)
    }
    allBridgeEvents, err := r.DB.GetAllBridgeEvents(ctx, query)

    if err != nil {
        return nil, fmt.Errorf("failed to get destinationbridge events from identifiers: %w", err)
    }
    if len(allBridgeEvents) == 0 {
        return nil, nil
    }

    // Iterate through all bridge events and return all partials
    for i := range allBridgeEvents {
        if latest && chainMap[allBridgeEvents[i].FChainID] {
            continue
        }

        bridgeTx, err := GetPartialInfoFromBridgeEventHybrid(allBridgeEvents[i], pending)
        if err != nil {
            return nil, fmt.Errorf("failed to get partial info from bridge event: %w", err)
        }
        if bridgeTx != nil {
            results = append(results, bridgeTx)
            chainMap[allBridgeEvents[i].FChainID] = true
        }
    }
    return results, nil
}

func (r *queryResolver) GetBridgeTxs(ctx context.Context, chainIDFrom []*int, chainIDTo []*int, addressFrom *string, addressTo *string, maxAmount *int, minAmount *int, maxAmountUsd *int, minAmountUsd *int, startTime *int, endTime *int, txHash *string, tokenAddressTo []*string, tokenAddressFrom []*string, kappa *string, pending *bool, onlyCctp *bool, page *int) ([]*model.BridgeTransaction, error) {
    var err error
    var results []*model.BridgeTransaction
    query := generateAllBridgeEventsQueryMv(chainIDFrom, chainIDTo, addressFrom, addressTo, maxAmount, minAmount, maxAmountUsd, minAmountUsd, startTime, endTime, tokenAddressFrom, tokenAddressTo, txHash, kappa, pending, onlyCctp, *page)
    allBridgeEvents, err := r.DB.GetAllBridgeEvents(ctx, query)

    if err != nil {
        return nil, fmt.Errorf("failed to get destinationbridge events from identifiers: %w", err)
    }
    if len(allBridgeEvents) == 0 {
        return nil, nil
    }

    // Iterate through all bridge events and return all partials
    for i := range allBridgeEvents {
        bridgeTx, err := GetPartialInfoFromBridgeEventHybrid(allBridgeEvents[i], pending)
        if err != nil {
            return nil, fmt.Errorf("failed to get partial info from bridge event: %w", err)
        }
        if bridgeTx != nil {
            results = append(results, bridgeTx)
        }
    }
    return results, nil
}

// GetPartialInfoFromMessageBusEventHybrid returns the partial info from message bus event.
//
// nolint:cyclop
func GetPartialInfoFromMessageBusEventHybrid(ctx context.Context, messageBusEvent sql.HybridMessageBusEvent, pending bool) (*model.MessageBusTransaction, error) {
    var messageBusTx model.MessageBusTransaction
    fromChainID := int(messageBusEvent.FChainID)
    fromDestinationChainID := int(messageBusEvent.FDestinationChainID.Uint64())
    fromBlockNumber := int(messageBusEvent.FBlockNumber)
    fromTimeStamp := int(*messageBusEvent.FTimeStamp)
    fromTimeStampFormatted := time.Unix(int64(*messageBusEvent.FTimeStamp), 0).String()

    toChainID := int(messageBusEvent.TChainID)
    toBlockNumber := int(messageBusEvent.TBlockNumber)
    toTimeStamp := int(*messageBusEvent.TTimeStamp)
    toTimeStampFormatted := time.Unix(int64(*messageBusEvent.TTimeStamp), 0).String()

    fromInfos := &model.PartialMessageBusInfo{
        ChainID:            &fromChainID,
        DestinationChainID: &fromDestinationChainID,
        ContractAddress:    &messageBusEvent.FContractAddress,
        TxnHash:            &messageBusEvent.FTxHash,
        Message:            &messageBusEvent.FMessage.String,
        BlockNumber:        &fromBlockNumber,
        Time:               &fromTimeStamp,
        FormattedTime:      &fromTimeStampFormatted,
        RevertedReason:     nil,
    }

    toInfos := &model.PartialMessageBusInfo{
        ChainID:            &toChainID,
        DestinationChainID: nil,
        ContractAddress:    &messageBusEvent.TContractAddress,
        TxnHash:            &messageBusEvent.TTxHash,
        Message:            &messageBusEvent.TMessage.String,
        BlockNumber:        &toBlockNumber,
        Time:               &toTimeStamp,
        FormattedTime:      &toTimeStampFormatted,
        RevertedReason:     &messageBusEvent.TRevertedReason.String,
    }

    var wg sync.WaitGroup
    wg.Add(2)

    go func() {
        defer wg.Done()
        fromInfos.MessageType = user.Decode(ctx, messageBusEvent.FMessage.String)
    }()

    go func() {
        defer wg.Done()
        toInfos.MessageType = user.Decode(ctx, messageBusEvent.TMessage.String)
    }()

    wg.Wait()

    messageBusTx = model.MessageBusTransaction{
        FromInfo:  fromInfos,
        ToInfo:    toInfos,
        MessageID: &messageBusEvent.FMessageID.String,
        Pending:   &pending,
    }
    return &messageBusTx, nil
}

// nolint:gocognit,cyclop
func (r *queryResolver) GetMessageBusTxs(ctx context.Context, chainID []*int, address *string, startTime *int, endTime *int, txHash *string, messageID *string, pending bool, reverted bool, page *int) ([]*model.MessageBusTransaction, error) {
    var err error
    allMessageBusEvents, err := r.DB.GetAllMessageBusEvents(ctx, generateMessageBusQuery(chainID, address, startTime, endTime, messageID, pending, reverted, txHash, *page))
    if err != nil {
        return nil, fmt.Errorf("failed to get destinationbridge events from identifiers: %w", err)
    }

    if len(allMessageBusEvents) == 0 {
        return nil, nil
    }

    results := make([]*model.MessageBusTransaction, len(allMessageBusEvents))
    var sliceMux sync.Mutex
    g, ctx := errgroup.WithContext(ctx)
    // Iterate through all bridge events and return all partials
    for i := range allMessageBusEvents {
        i := i // capture func literal
        g.Go(func() error {
            messageBusTx, err := GetPartialInfoFromMessageBusEventHybrid(ctx, allMessageBusEvents[i], pending)
            if err != nil {
                return fmt.Errorf("failed to get partial info from bridge event: %w", err)
            }
            if messageBusTx != nil {
                sliceMux.Lock()
                results[i] = messageBusTx
                sliceMux.Unlock()
            }

            return nil
        })
    }
    err = g.Wait()

    if err != nil {
        return nil, fmt.Errorf("could not get partial info from message bus event: %w", err)
    }
    return results, nil
}

// getAdjustedValue gets the adjusted value.
func getAdjustedValue(amount *big.Int, decimals uint8) *float64 {
    decimalMultiplier := new(big.Float).SetInt(big.NewInt(0).Exp(big.NewInt(10), big.NewInt(int64(decimals)), nil))
    adjustedAmount := new(big.Float).Quo(new(big.Float).SetInt(amount), decimalMultiplier)
    trueAmountStr := adjustedAmount.SetMode(big.AwayFromZero).Text('f', 4)
    priceFloat, err := strconv.ParseFloat(trueAmountStr, 64)
    if err != nil {
        return nil
    }
    return &priceFloat
}
func keyGen(chainID string, kappa string) string {
    return fmt.Sprintf("%s-%s", chainID, kappa)
}

// GenerateAmountStatisticBridgeSQL generate sql for the bridge platform.
func GenerateAmountStatisticBridgeSQL(typeArg model.StatisticType, address *string, chainID *int, tokenAddress *string) (*string, error) {
    var operation string
    var finalSQL string
    firstFilter2 := true
    addressFilter := generateSingleSpecifierStringSQL(address, sql.SenderFieldName, &firstFilter2, "f")
    chainIDFilter := generateSingleSpecifierI32SQL(chainID, sql.ChainIDFieldName, &firstFilter2, "f")
    tokenAddressFilter := generateSingleSpecifierStringSQL(tokenAddress, sql.TokenFieldName, &firstFilter2, "f")
    compositeFilters := addressFilter + chainIDFilter + tokenAddressFilter
    switch typeArg {
    case model.StatisticTypeMeanVolumeUsd:
        operation = fmt.Sprintf("AVG(f%s)", sql.AmountUSDFieldName)
        finalSQL = fmt.Sprintf("SELECT %s from (select * FROM mv_bridge_events %s ORDER BY ftimestamp DESC, fblock_number DESC, fevent_index DESC, insert_time DESC LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash)  ", operation, compositeFilters)
    case model.StatisticTypeMedianVolumeUsd:
        operation = fmt.Sprintf("median(f%s)", sql.AmountUSDFieldName)
        finalSQL = fmt.Sprintf("SELECT %s FROM (select * FROM mv_bridge_events %s ORDER BY ftimestamp DESC, fblock_number DESC, fevent_index DESC, insert_time DESC LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash)  ", operation, compositeFilters)
    case model.StatisticTypeTotalVolumeUsd:
        operation = fmt.Sprintf("sumKahan(f%s)", sql.AmountUSDFieldName)
        finalSQL = fmt.Sprintf("SELECT %s from ( select * FROM mv_bridge_events %s ORDER BY ftimestamp DESC, fblock_number DESC, fevent_index DESC, insert_time DESC LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash)", operation, compositeFilters)
    case model.StatisticTypeCountTransactions:
        operation = fmt.Sprintf("uniq(f%s, f%s) AS res", sql.ChainIDFieldName, sql.TxHashFieldName)
        finalSQL = fmt.Sprintf("SELECT %s from ( select * FROM mv_bridge_events %s ORDER BY ftimestamp DESC, fblock_number DESC, fevent_index DESC, insert_time DESC LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash)", operation, compositeFilters)
    case model.StatisticTypeCountAddresses:
        operation = fmt.Sprintf("uniq(f%s, f%s) AS res", sql.ChainIDFieldName, sql.SenderFieldName)
        finalSQL = fmt.Sprintf("SELECT %s from ( select * FROM mv_bridge_events %s ORDER BY ftimestamp DESC, fblock_number DESC, fevent_index DESC, insert_time DESC LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash)", operation, compositeFilters)
    case model.StatisticTypeMeanFeeUsd:
        operation = fmt.Sprintf("AVG(%s)", "tfee_amount_usd")
        finalSQL = fmt.Sprintf("SELECT %s from ( select * FROM mv_bridge_events %s ORDER BY ftimestamp DESC, fblock_number DESC, fevent_index DESC, insert_time DESC LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash)", operation, compositeFilters)
    case model.StatisticTypeMedianFeeUsd:
        operation = fmt.Sprintf("median(%s)", "tfee_amount_usd")
        finalSQL = fmt.Sprintf("SELECT %s from ( select * FROM mv_bridge_events %s ORDER BY ftimestamp DESC, fblock_number DESC, fevent_index DESC, insert_time DESC LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash)", operation, compositeFilters)
    case model.StatisticTypeTotalFeeUsd:
        operation = fmt.Sprintf("sumKahan(%s)", "tfee_amount_usd")
        finalSQL = fmt.Sprintf("SELECT %s from ( select * FROM mv_bridge_events %s ORDER BY ftimestamp DESC, fblock_number DESC, fevent_index DESC, insert_time DESC LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash)", operation, compositeFilters)

    default:
        return nil, fmt.Errorf("invalid statistic type: %s", typeArg)
    }
    return &finalSQL, nil
}

// GenerateAmountStatisticSwapSQL generates sql to get statistics on the swap platform.
//
// nolint:cyclop
func GenerateAmountStatisticSwapSQL(typeArg model.StatisticType, compositeFilters string, tokenAddress *string) (*string, error) {
    var operation string
    var finalSQL string

    switch typeArg {
    case model.StatisticTypeMeanVolumeUsd:
        operation = fmt.Sprintf("AVG(%s)", swapVolumeSelect)
    case model.StatisticTypeMedianVolumeUsd:
        operation = fmt.Sprintf("median(%s)", swapVolumeSelect)
    case model.StatisticTypeTotalVolumeUsd:
        operation = fmt.Sprintf("sumKahan(%s)", swapVolumeSelect)
    case model.StatisticTypeCountTransactions:
        operation = fmt.Sprintf("uniq(%s, %s) AS res", sql.ChainIDFieldName, sql.TxHashFieldName)
    case model.StatisticTypeCountAddresses:
        operation = fmt.Sprintf("uniq(%s, %s) AS res", sql.ChainIDFieldName, sql.SenderFieldName)
    case model.StatisticTypeMeanFeeUsd:
        operation = fmt.Sprintf("AVG(arraySum(mapValues(%s)))", sql.FeeUSDFieldName)
    case model.StatisticTypeMedianFeeUsd:
        operation = fmt.Sprintf("median(arraySum(mapValues(%s)))", sql.FeeUSDFieldName)
    case model.StatisticTypeTotalFeeUsd:
        operation = fmt.Sprintf("sumKahan(arraySum(mapValues(%s)))", sql.FeeUSDFieldName)
    default:
        return nil, fmt.Errorf("invalid statistic type: %s", typeArg)
    }
    if tokenAddress == nil {
        finalSQL = fmt.Sprintf("SELECT %s FROM (SELECT * FROM swap_events %s LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash)", operation, compositeFilters)
    } else {
        firstFilter := true
        tokenAddressSpecifier := generateSingleSpecifierStringSQL(tokenAddress, sql.TokenFieldName, &firstFilter, "")
        finalSQL = fmt.Sprintf("SELECT %s FROM (%s %s %s %s", operation, baseSwapWithTokenPt1, compositeFilters, baseSwapWithTokenPt2, tokenAddressSpecifier)
    }
    return &finalSQL, nil
}

// GenerateAmountStatisticMessageBusSQL generates sql for getting stats on the message bus platform.
func GenerateAmountStatisticMessageBusSQL(typeArg model.StatisticType, compositeFilters string) (*string, error) {
    var operation string
    var finalSQL string
    switch typeArg {
    case model.StatisticTypeMeanVolumeUsd:
        return nil, fmt.Errorf("cannot calculate volume data for messagebus events")
    case model.StatisticTypeMedianVolumeUsd:
        return nil, fmt.Errorf("cannot calculate volume data for messagebus events")
    case model.StatisticTypeTotalVolumeUsd:
        return nil, fmt.Errorf("cannot calculate volume data for messagebus events")
    case model.StatisticTypeCountTransactions:
        operation = fmt.Sprintf("uniq(%s, %s) AS res", sql.ChainIDFieldName, sql.TxHashFieldName)
        finalSQL = fmt.Sprintf("SELECT %s FROM (%s) %s", operation, baseMessageBus, compositeFilters)
    case model.StatisticTypeCountAddresses:
        operation = fmt.Sprintf("uniq(%s, source_address) AS res", sql.ChainIDFieldName)
        finalSQL = fmt.Sprintf("SELECT %s FROM (%s) %s", operation, baseMessageBus, compositeFilters)
    case model.StatisticTypeMeanFeeUsd:
        operation = fmt.Sprintf("AVG(%s)", sql.FeeUSDFieldName)
        finalSQL = fmt.Sprintf("SELECT %s FROM (%s) %s", operation, baseMessageBus, compositeFilters)
    case model.StatisticTypeMedianFeeUsd:
        operation = fmt.Sprintf("median(%s)", sql.FeeUSDFieldName)
        finalSQL = fmt.Sprintf("SELECT %s FROM (%s) %s", operation, baseMessageBus, compositeFilters)
    case model.StatisticTypeTotalFeeUsd:
        operation = fmt.Sprintf("sumKahan(%s)", sql.FeeUSDFieldName)
        finalSQL = fmt.Sprintf("SELECT %s FROM (%s) %s", operation, baseMessageBus, compositeFilters)
    default:
        return nil, fmt.Errorf("invalid statistic type: %s", typeArg)
    }
    return &finalSQL, nil
}

// GenerateRankedChainsByVolumeSQL generates sql for getting all chains ranked in order of volume.
func GenerateRankedChainsByVolumeSQL(compositeFilters string, firstFilter *bool) string {
    directionSpecifier := generateDirectionSpecifierSQL(true, firstFilter, "")
    return fmt.Sprintf("%s %s FULL OUTER JOIN (SELECT chain_id, sumKahan(multiIf(event_type = 0, amount_usd[sold_id], event_type = 1, arraySum(mapValues(amount_usd)), event_type = 9, arraySum(mapValues(amount_usd)), event_type = 10, amount_usd[sold_id], 0)) as usdTotal FROM (SELECT * FROM swap_events %s LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash) group by chain_id) s ON b.pre_fchain_id = s.chain_id ORDER BY total DESC SETTINGS join_use_nulls = 1", generateDeDepQueryCTE(compositeFilters+directionSpecifier, nil, nil, true), rankedChainsBridgeVolume, compositeFilters)
}

// GenerateDailyStatisticByChainAllSQL generates sql for getting daily stats across all chains.
func GenerateDailyStatisticByChainAllSQL(typeArg *model.DailyStatisticType, compositeFilters string, firstFilter *bool) (*string, error) {
    var query string
    switch *typeArg {
    case model.DailyStatisticTypeVolume:
        directionSpecifier := generateDirectionSpecifierSQL(true, firstFilter, "")
        query = fmt.Sprintf("%s %s FULL OUTER JOIN (SELECT %s, chain_id, sumKahan(multiIf(event_type = 0, amount_usd[sold_id], event_type = 1,    arraySum(mapValues(amount_usd)), event_type = 9,    arraySum(mapValues(amount_usd)), event_type = 10, amount_usd[sold_id],    0) )     as usdTotal FROM (SELECT * FROM swap_events %s LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash) group by date, chain_id ) s ON b.date = s.date AND b.pre_fchain_id = s.chain_id) group by date order by date) SETTINGS join_use_nulls=1", generateDeDepQueryCTE(compositeFilters+directionSpecifier, nil, nil, true), dailyVolumeBridge, toDateSelect, compositeFilters)
    case model.DailyStatisticTypeFee:
        query = fmt.Sprintf("%s FROM ( SELECT %s, chain_id, sumKahan(fee_usd) as sumTotal FROM (SELECT * FROM bridge_events %s LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash) GROUP BY date, chain_id) b  FULL OUTER JOIN ( SELECT %s, chain_id, sumKahan(arraySum(mapValues(fee_usd))) AS sumTotal FROM (SELECT * FROM swap_events %s LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash) group by date, chain_id ) s ON b.date = s.date AND b.chain_id = s.chain_id  FULL OUTER JOIN ( SELECT %s, chain_id, sumKahan(fee_usd) AS sumTotal FROM (SELECT * FROM message_bus_events %s LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash) group by date, chain_id ) m ON b.date = m.date AND b.chain_id = m.chain_id) group by date order by date ) SETTINGS join_use_nulls = 1", dailyStatisticGenericSelect, toDateSelect, compositeFilters, toDateSelect, compositeFilters, toDateSelect, compositeFilters)
    case model.DailyStatisticTypeAddresses:
        query = fmt.Sprintf("%s FROM ( SELECT %s, chain_id, uniq(chain_id, sender) as sumTotal FROM (SELECT * FROM bridge_events %s LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash) GROUP BY date, chain_id) b  FULL OUTER JOIN ( SELECT %s, chain_id, uniq(chain_id, sender) AS sumTotal FROM (SELECT * FROM swap_events %s LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash) group by date, chain_id ) s ON b.date = s.date AND b.chain_id = s.chain_id  FULL OUTER JOIN ( SELECT %s, chain_id, uniq(chain_id, source_address) AS sumTotal FROM (SELECT * FROM message_bus_events %s LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash) group by date, chain_id ) m ON b.date = m.date AND b.chain_id = m.chain_id) group by date order by date ) SETTINGS join_use_nulls = 1", dailyStatisticGenericSelect, toDateSelect, compositeFilters, toDateSelect, compositeFilters, toDateSelect, compositeFilters)
    case model.DailyStatisticTypeTransactions:
        directionSpecifier := generateDirectionSpecifierSQL(true, firstFilter, "")
        query = fmt.Sprintf("%s FROM ( SELECT %s, chain_id, uniq(chain_id, tx_hash) as sumTotal FROM (SELECT * FROM bridge_events %s LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash) GROUP BY date, chain_id) b  FULL OUTER JOIN ( SELECT %s, chain_id, uniq(chain_id, tx_hash) AS sumTotal FROM (SELECT * FROM swap_events %s LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash) group by date, chain_id ) s ON b.date = s.date AND b.chain_id = s.chain_id  FULL OUTER JOIN ( SELECT %s, chain_id, uniq(chain_id, tx_hash) AS sumTotal FROM (SELECT * FROM message_bus_events %s LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash) group by date, chain_id ) m ON b.date = m.date AND b.chain_id = m.chain_id) group by date order by date ) SETTINGS join_use_nulls = 1", dailyStatisticGenericSelect, toDateSelect, compositeFilters+directionSpecifier, toDateSelect, compositeFilters, toDateSelect, compositeFilters)
    default:
        return nil, fmt.Errorf("unsupported statistic type")
    }
    return &query, nil
}

// TODO make this more dynamic.

// GenerateDailyStatisticByChainBridgeSQL generates sql for getting data for daily stats across the bridge platform.
func GenerateDailyStatisticByChainBridgeSQL(typeArg *model.DailyStatisticType, compositeFilters string, firstFilter *bool) (*string, error) {
    var query string
    switch *typeArg {
    case model.DailyStatisticTypeVolume:
        directionSpecifier := generateDirectionSpecifierSQL(true, firstFilter, "")
        query = fmt.Sprintf("%s  %s sumKahan(amount_usd) AS sumTotal %s group by date, chain_id order by date, chain_id) group by date order by date )", generateDeDepQueryCTE(compositeFilters+directionSpecifier, nil, nil, true), dailyStatisticGenericSinglePlatform, dailyStatisticBridge)
    case model.DailyStatisticTypeFee:
        query = fmt.Sprintf("%s  %s sumKahan(fee_usd) AS sumTotal %s group by date, chain_id order by date, chain_id) group by date order by date )", generateDeDepQueryCTE(compositeFilters, nil, nil, true), dailyStatisticGenericSinglePlatform, dailyStatisticBridge)
    case model.DailyStatisticTypeAddresses:
        query = fmt.Sprintf("%s  %s uniq(chain_id, sender) AS sumTotal %s group by date, chain_id order by date, chain_id) group by date order by date )", generateDeDepQueryCTE(compositeFilters, nil, nil, true), dailyStatisticGenericSinglePlatform, dailyStatisticBridge)
    case model.DailyStatisticTypeTransactions:
        directionSpecifier := generateDirectionSpecifierSQL(true, firstFilter, "")
        query = fmt.Sprintf("%s %s uniq(chain_id, tx_hash) AS sumTotal  %s group by date, chain_id order by date, chain_id) group by date order by date )", generateDeDepQueryCTE(compositeFilters+directionSpecifier, nil, nil, true), dailyStatisticGenericSinglePlatform, dailyStatisticBridge)
    default:
        return nil, fmt.Errorf("unsupported statistic type")
    }
    return &query, nil
}

// GenerateDailyStatisticByChainSwapSQL generates sql for getting daily stats across the swap platform.
func GenerateDailyStatisticByChainSwapSQL(typeArg *model.DailyStatisticType, compositeFilters string) (*string, error) {
    var query string
    switch *typeArg {
    case model.DailyStatisticTypeVolume:
        query = fmt.Sprintf("%s sumKahan(multiIf(event_type = 0, amount_usd[sold_id], event_type = 1, arraySum(mapValues(amount_usd)), event_type = 9, arraySum(mapValues(amount_usd)), event_type = 10, amount_usd[sold_id],0)) AS sumTotal FROM (%s) %s group by date, chain_id) group by date order by date)", dailyStatisticGenericSinglePlatform, baseSwap, compositeFilters)
    case model.DailyStatisticTypeFee:
        query = fmt.Sprintf("%s sumKahan(arraySum(mapValues(%s))) AS sumTotal FROM (%s) %s group by date, chain_id) group by date order by date)", dailyStatisticGenericSinglePlatform, sql.FeeUSDFieldName, baseSwap, compositeFilters)
    case model.DailyStatisticTypeAddresses:
        query = fmt.Sprintf("%s uniq(%s, %s) AS sumTotal FROM (%s) %s group by date, chain_id) group by date order by date)", dailyStatisticGenericSinglePlatform, sql.ChainIDFieldName, sql.SenderFieldName, baseSwap, compositeFilters)
    case model.DailyStatisticTypeTransactions:
        query = fmt.Sprintf("%s uniq(%s, %s) AS sumTotal FROM (%s) %s group by date, chain_id) group by date order by date)", dailyStatisticGenericSinglePlatform, sql.ChainIDFieldName, sql.TxHashFieldName, baseSwap, compositeFilters)
    default:
        return nil, fmt.Errorf("unsupported statistic type")
    }
    return &query, nil
}

// GenerateDailyStatisticByChainMessageBusSQL generates sql for getting daily stats across the message bus platform.
func GenerateDailyStatisticByChainMessageBusSQL(typeArg *model.DailyStatisticType, compositeFilters string) (*string, error) {
    var query string
    switch *typeArg {
    case model.DailyStatisticTypeVolume:
        return nil, fmt.Errorf("cannot calculate volume for messagebus")
    case model.DailyStatisticTypeFee:
        query = fmt.Sprintf("%s sumKahan(%s) AS sumTotal FROM (%s) %s group by date, chain_id) group by date order by date)", dailyStatisticGenericSinglePlatform, sql.FeeUSDFieldName, baseMessageBus, compositeFilters)
    case model.DailyStatisticTypeAddresses:
        query = fmt.Sprintf("%s uniq(%s, %s) AS sumTotal FROM (%s)%s group by date, chain_id) group by date order by date)", dailyStatisticGenericSinglePlatform, sql.ChainIDFieldName, sql.SenderFieldName, baseMessageBus, compositeFilters)
    case model.DailyStatisticTypeTransactions:
        query = fmt.Sprintf("%s uniq(%s, %s) AS sumTotal FROM (%s) %s group by date, chain_id) group by date order by date)", dailyStatisticGenericSinglePlatform, sql.ChainIDFieldName, sql.TxHashFieldName, baseMessageBus, compositeFilters)
    default:
        return nil, fmt.Errorf("unsupported statistic type")
    }
    return &query, nil
}

// SortBridgeTxType sorts bridge transactions by time.
type SortBridgeTxType []*model.BridgeTransaction

func (s SortBridgeTxType) Len() int           { return len(s) }
func (s SortBridgeTxType) Less(i, j int) bool { return *s[i].FromInfo.Time > *s[j].FromInfo.Time }
func (s SortBridgeTxType) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }

// SortMessageBusTxType sorts message bus transactions by time.
type SortMessageBusTxType []*model.MessageBusTransaction

func (s SortMessageBusTxType) Len() int           { return len(s) }
func (s SortMessageBusTxType) Less(i, j int) bool { return *s[i].FromInfo.Time > *s[j].FromInfo.Time }
func (s SortMessageBusTxType) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }

func keyGenHandleNilInt(item *int) string {
    if item != nil {
        return fmt.Sprintf("%d", *item)
    }
    return ""
}

func keyGenHandleNilString(item *string) string {
    if item != nil {
        return *item
    }
    return ""
}

// Gets the value result from cache.
func (r *queryResolver) getValueResultFromCache(key string) (*model.ValueResult, error) {
    cacheResult := r.Cache.GetCache(key)
    if cacheResult != nil {
        rawCache, ok := cacheResult.(*interface{})
        if !ok || rawCache == nil {
            return nil, fmt.Errorf("type assertion error when converting to *interface{}, rawCache %v", rawCache)
        }
        res, ok := (*rawCache).(*model.ValueResult)
        if !ok || res == nil {
            return nil, fmt.Errorf("type assertion error when converting to *model.ValueResult, res %v", res)
        }
        return res, nil
    }
    return nil, fmt.Errorf("could not get cached data")
}

// Gets the value result from cache.
func (r *queryResolver) getDateResultByChainFromCache(key string) ([]*model.DateResultByChain, error) {
    cacheResult := r.Cache.GetCache(key)
    if cacheResult != nil {
        rawCache, ok := cacheResult.(*interface{})
        if !ok || rawCache == nil {
            return nil, fmt.Errorf("type assertion error when converting to *interface{}, rawCache %v", rawCache)
        }
        res, ok := (*rawCache).([]*model.DateResultByChain)
        if !ok || res == nil {
            return nil, fmt.Errorf("type assertion error when converting to []*model.DateResultByChain, res %v", res)
        }
        return res, nil
    }
    return nil, fmt.Errorf("could not get cached data")
}

// GetDurationFilter creates a filter for the various time ranges for analysis.
func GetDurationFilter(duration *model.Duration, firstFilter *bool, prefix string) string {
    var timestampSpecifier string
    switch *duration {
    case model.DurationPastDay:
        hours := 24
        targetTime := GetTargetTime(&hours)
        timestampSpecifier = generateTimestampSpecifierSQL(&targetTime, sql.TimeStampFieldName, firstFilter, prefix)
    case model.DurationPastMonth:
        hours := 720
        targetTime := GetTargetTime(&hours)
        timestampSpecifier = generateTimestampSpecifierSQL(&targetTime, sql.TimeStampFieldName, firstFilter, prefix)
    case model.DurationPast3Months:
        hours := 2190
        targetTime := GetTargetTime(&hours)
        timestampSpecifier = generateTimestampSpecifierSQL(&targetTime, sql.TimeStampFieldName, firstFilter, prefix)
    case model.DurationPast6Months:
        hours := 4380
        targetTime := GetTargetTime(&hours)
        timestampSpecifier = generateTimestampSpecifierSQL(&targetTime, sql.TimeStampFieldName, firstFilter, prefix)
    case model.DurationPastYear:
        hours := 8760
        targetTime := GetTargetTime(&hours)
        timestampSpecifier = generateTimestampSpecifierSQL(&targetTime, sql.TimeStampFieldName, firstFilter, prefix)
    case model.DurationAllTime:
        timestampSpecifier = ""
    }
    return timestampSpecifier
}

// nolint:cyclop
func (r *queryResolver) getAmountStatisticsAll(ctx context.Context, typeArg model.StatisticType, chainID *int, address *string, tokenAddress *string, compositeFilters string) (*string, error) {
    if typeArg == model.StatisticTypeMedianVolumeUsd || typeArg == model.StatisticTypeMeanVolumeUsd || typeArg == model.StatisticTypeMedianFeeUsd || typeArg == model.StatisticTypeMeanFeeUsd {
        return nil, fmt.Errorf("cannot calculate averages or medians across all platforms")
    }
    var bridgeFinalSQL *string
    var swapFinalSQL *string
    var messageBusFinalSQL *string
    var err error
    var bridgeSum float64
    var swapSum float64
    var messageBusSum float64

    bridgeFinalSQL, err = GenerateAmountStatisticBridgeSQL(typeArg, address, chainID, tokenAddress)
    if err != nil {
        return nil, err
    }

    swapFinalSQL, err = GenerateAmountStatisticSwapSQL(typeArg, compositeFilters, tokenAddress)
    if err != nil {
        return nil, err
    }

    g, groupCtx := errgroup.WithContext(ctx)

    if tokenAddress == nil && typeArg != model.StatisticTypeTotalVolumeUsd && typeArg != model.StatisticTypeMedianVolumeUsd && typeArg != model.StatisticTypeMeanVolumeUsd {
        messageBusFinalSQL, err = GenerateAmountStatisticMessageBusSQL(typeArg, compositeFilters)
        if err != nil {
            return nil, err
        }
        g.Go(func() error {
            messageBusSum, err = r.DB.GetFloat64(groupCtx, *messageBusFinalSQL)

            if err != nil {
                return fmt.Errorf("failed to get dateResults: %w", err)
            }
            return nil
        })
    }
    g.Go(func() error {
        bridgeSum, err = r.DB.GetFloat64(groupCtx, *bridgeFinalSQL)
        if err != nil {
            return fmt.Errorf("failed to get dateResults: %w", err)
        }
        return nil
    })
    g.Go(func() error {
        swapSum, err = r.DB.GetFloat64(groupCtx, *swapFinalSQL)
        if err != nil {
            return fmt.Errorf("failed to get dateResults: %w", err)
        }

        return nil
    })

    err = g.Wait()
    if err != nil {
        return nil, fmt.Errorf("error getting data from all platforms, %w", err)
    }
    value := fmt.Sprintf("%f", bridgeSum+swapSum+messageBusSum)
    return &value, nil
}

// nolint:cyclop
func (r *queryResolver) getDateResultByChainMv(ctx context.Context, chainID *int, typeArg *model.DailyStatisticType, platform *model.Platform, duration *model.Duration) ([]*model.DateResultByChain, error) {
    var err error
    firstFilter := true
    timestampSpecifierMv := GetDurationFilter(duration, &firstFilter, "f")
    chainIDSpecifierMv := generateSingleSpecifierI32SQL(chainID, sql.ChainIDFieldName, &firstFilter, "f")
    compositeFiltersMv := fmt.Sprintf(
        `%s%s`,
        timestampSpecifierMv, chainIDSpecifierMv,
    )
    firstFilter = true
    timestampSpecifier := GetDurationFilter(duration, &firstFilter, "")
    chainIDSpecifier := generateSingleSpecifierI32SQL(chainID, sql.ChainIDFieldName, &firstFilter, "")
    compositeFilters := fmt.Sprintf(
        `%s%s`,
        timestampSpecifier, chainIDSpecifier,
    )

    var res []*model.DateResultByChain
    var query *string
    g, groupCtx := errgroup.WithContext(ctx)
    switch *platform {
    case model.PlatformBridge:
        // Change chainID filter to destination chainID as that's where fees are collected.
        if *typeArg == model.DailyStatisticTypeFee {
            chainIDSpecifierMv = generateSingleSpecifierI32SQL(chainID, sql.ChainIDFieldName, &firstFilter, "t")
            compositeFiltersMv = fmt.Sprintf(
                `%s%s`,
                timestampSpecifierMv, chainIDSpecifierMv,
            )
        }
        query, err = GenerateDailyStatisticByChainBridgeSQLMv(typeArg, compositeFiltersMv)
        if err != nil {
            return nil, err
        }
    case model.PlatformSwap:
        query, err = GenerateDailyStatisticByChainSwapSQL(typeArg, compositeFilters)
        if err != nil {
            return nil, err
        }
    case model.PlatformMessageBus:
        query, err = GenerateDailyStatisticByChainMessageBusSQL(typeArg, compositeFilters)
        if err != nil {
            return nil, err
        }
    case model.PlatformAll:
        query, err = GenerateDailyStatisticByChainAllSQLMv(typeArg, compositeFilters, compositeFiltersMv)
    default:
        return nil, fmt.Errorf("unsupported platform")
    }
    g.Go(func() error {
        res, err = r.DB.GetDailyTotals(groupCtx, *query)
        if err != nil {
            return fmt.Errorf("failed to get dateResults: %w", err)
        }
        return nil
    })

    err = g.Wait()
    if err != nil {
        return nil, fmt.Errorf("could not get daily data by chain: %w", err)
    }
    err = r.Cache.CacheResponse(fmt.Sprintf("dailyStatisticsByChain, %s, %s, %s, %s", keyGenHandleNilInt(chainID), typeArg.String(), duration.String(), platform.String()), res)
    if err != nil {
        return nil, fmt.Errorf("error caching response, %w", err)
    }
    return res, nil
}

// GenerateDailyStatisticByChainBridgeSQLMv generates sql for getting data for daily stats across the bridge platform.
func GenerateDailyStatisticByChainBridgeSQLMv(typeArg *model.DailyStatisticType, compositeFilters string) (*string, error) {
    var query string
    switch *typeArg {
    case model.DailyStatisticTypeVolume:
        query = fmt.Sprintf("%s  sumKahan(famount_usd) AS sumTotal from (select * FROM mv_bridge_events %s ORDER BY ftimestamp DESC, fblock_number DESC, fevent_index DESC, insert_time DESC LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash) group by date, chain_id) group by date order by date)  ", dailyStatisticGenericSinglePlatformMv, compositeFilters)
    case model.DailyStatisticTypeFee:
        query = fmt.Sprintf("%s  sumKahan(tfee_amount_usd) AS sumTotal from (select * FROM mv_bridge_events %s ORDER BY ftimestamp DESC, fblock_number DESC, fevent_index DESC, insert_time DESC LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash) group by date, chain_id) group by date order by date)  ", dailyStatisticGenericSinglePlatformMvFee, compositeFilters)
    case model.DailyStatisticTypeAddresses:
        query = fmt.Sprintf("%s  uniq(fchain_id, fsender) AS sumTotal from (select * FROM mv_bridge_events %s ORDER BY ftimestamp DESC, fblock_number DESC, fevent_index DESC, insert_time DESC LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash) group by date, chain_id) group by date order by date)  ", dailyStatisticGenericSinglePlatformMv, compositeFilters)
    case model.DailyStatisticTypeTransactions:
        query = fmt.Sprintf("%s  uniq(fchain_id, ftx_hash) AS sumTotal from (select * FROM mv_bridge_events %s ORDER BY ftimestamp DESC, fblock_number DESC, fevent_index DESC, insert_time DESC LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash) group by date, chain_id) group by date order by date) ", dailyStatisticGenericSinglePlatformMv, compositeFilters)
    default:
        return nil, fmt.Errorf("unsupported statistic type")
    }
    return &query, nil
}

// GenerateDailyStatisticByChainAllSQLMv generates sql for getting daily stats across all chains.
func GenerateDailyStatisticByChainAllSQLMv(typeArg *model.DailyStatisticType, compositeFilters string, compositeFiltersMv string) (*string, error) {
    var query string
    switch *typeArg {
    case model.DailyStatisticTypeVolume:
        query = fmt.Sprintf("%s %s %s FULL OUTER JOIN (SELECT %s, chain_id, sumKahan(multiIf(event_type = 0, amount_usd[sold_id], event_type = 1,    arraySum(mapValues(amount_usd)), event_type = 9,    arraySum(mapValues(amount_usd)), event_type = 10, amount_usd[sold_id],    0) )     as usdTotal FROM (SELECT * FROM swap_events %s LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash) group by date, chain_id ) s ON b.date = s.date AND b.chain_id = s.chain_id) group by date order by date) SETTINGS join_use_nulls=1", dailyVolumeBridgeMvPt1, compositeFiltersMv, dailyVolumeBridgeMvPt2, toDateSelect, compositeFilters)
    case model.DailyStatisticTypeFee: // destination chain fee used
        query = fmt.Sprintf("%s FROM ( SELECT %s, tchain_id AS chain_id, sumKahan(tfee_amount_usd) as sumTotal FROM (SELECT * FROM mv_bridge_events %s LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash) GROUP BY date, chain_id) b FULL OUTER JOIN ( SELECT %s, chain_id, sumKahan(arraySum(mapValues(fee_usd))) AS sumTotal FROM (SELECT * FROM swap_events %s LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash) group by date, chain_id ) s ON b.date = s.date AND b.chain_id = s.chain_id  FULL OUTER JOIN ( SELECT %s, chain_id, sumKahan(fee_usd) AS sumTotal FROM (SELECT * FROM message_bus_events %s LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash) group by date, chain_id ) m ON b.date = m.date AND b.chain_id = m.chain_id) group by date order by date ) SETTINGS join_use_nulls = 1", dailyStatisticGenericSelect, toDateSelectMv, compositeFiltersMv, toDateSelect, compositeFilters, toDateSelect, compositeFilters)
    case model.DailyStatisticTypeAddresses:
        query = fmt.Sprintf("%s FROM ( SELECT %s, fchain_id AS chain_id, uniq(fchain_id, fsender) as sumTotal FROM (SELECT * FROM mv_bridge_events %s LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash) GROUP BY date, chain_id) b FULL OUTER JOIN ( SELECT %s, chain_id, uniq(chain_id, sender) AS sumTotal FROM (SELECT * FROM swap_events %s LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash) group by date, chain_id ) s ON b.date = s.date AND b.chain_id = s.chain_id  FULL OUTER JOIN ( SELECT %s, chain_id, uniq(chain_id, source_address) AS sumTotal FROM (SELECT * FROM message_bus_events %s LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash) group by date, chain_id ) m ON b.date = m.date AND b.chain_id = m.chain_id) group by date order by date ) SETTINGS join_use_nulls = 1", dailyStatisticGenericSelect, toDateSelectMv, compositeFiltersMv, toDateSelect, compositeFilters, toDateSelect, compositeFilters)
    case model.DailyStatisticTypeTransactions:
        query = fmt.Sprintf("%s FROM ( SELECT %s, fchain_id AS chain_id, uniq(fchain_id, ftx_hash) as sumTotal FROM (SELECT * FROM mv_bridge_events %s LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash) GROUP BY date, chain_id) b FULL OUTER JOIN ( SELECT %s, chain_id, uniq(chain_id, tx_hash) AS sumTotal FROM (SELECT * FROM swap_events %s LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash) group by date, chain_id ) s ON b.date = s.date AND b.chain_id = s.chain_id  FULL OUTER JOIN ( SELECT %s, chain_id, uniq(chain_id, tx_hash) AS sumTotal FROM (SELECT * FROM message_bus_events %s LIMIT 1 BY chain_id, contract_address, event_type, block_number, event_index, tx_hash) group by date, chain_id ) m ON b.date = m.date AND b.chain_id = m.chain_id) group by date order by date ) SETTINGS join_use_nulls = 1", dailyStatisticGenericSelect, toDateSelectMv, compositeFiltersMv, toDateSelect, compositeFilters, toDateSelect, compositeFilters)
    default:
        return nil, fmt.Errorf("unsupported statistic type")
    }
    return &query, nil
}

// increase this to enable querying the db
// this has been disabled in prod to prevent the db from falling over.
var timeToFallback = time.Second * 0

// GetFallbackTime gets the fallback time for the bridge watcher.
// this is intended only for testing
func GetFallbackTime() time.Duration {
    return timeToFallback
}

// UnsafeSetFallbackTime sets the fallback time for the bridge watcher.
// it is intended for testing. Plese remember to reset this value.
func UnsafeSetFallbackTime(ttf time.Duration) {
    timeToFallback = ttf
}

// GetOriginBridgeTxBW gets an origin bridge tx.
func (r *queryResolver) GetOriginBridgeTxBW(ctx context.Context, chainID int, txnHash string, eventType model.BridgeType) (*model.BridgeWatcherTx, error) {
    txType := model.BridgeTxTypeOrigin
    query := fmt.Sprintf("SELECT * FROM mv_bridge_events WHERE fchain_id = %d AND ftx_hash = '%s' ORDER BY insert_time desc LIMIT 1 BY fchain_id, fcontract_address, fevent_type, fblock_number, fevent_index, ftx_hash", chainID, txnHash)

    bwQueryCtx, cancel := context.WithTimeout(ctx, timeToFallback)
    defer cancel()

    bridgeEventMV, err := r.DB.GetMVBridgeEvent(bwQueryCtx, query)

    if err != nil || bridgeEventMV == nil || bridgeEventMV.FChainID == 0 {
        switch eventType {
        case model.BridgeTypeBridge:
            return r.bwOriginFallback(ctx, uint32(chainID), txnHash)
        case model.BridgeTypeCctp:
            return r.bwOriginFallbackCCTP(ctx, uint32(chainID), txnHash)
        }
    }
    return bwBridgeMVToBWTxOrigin(bridgeEventMV, txType)
}

// GetDestinationBridgeTxBW returns the destination bridge transaction for the bridgewatcher.
func (r *queryResolver) GetDestinationBridgeTxBW(ctx context.Context, chainID int, address string, kappa string, timestamp int, historical bool, bridgeType model.BridgeType) (*model.BridgeWatcherTx, error) {
    var err error
    txType := model.BridgeTxTypeDestination
    bwQueryCtx, cancel := context.WithTimeout(ctx, timeToFallback)
    defer cancel()

    query := fmt.Sprintf("SELECT * FROM mv_bridge_events WHERE tchain_id = %d AND tkappa = '%s' ORDER BY insert_time desc LIMIT 1 BY tchain_id, tcontract_address, tevent_type, tblock_number, tevent_index, ttx_hash", chainID, kappa)
    bridgeEventMV, err := r.DB.GetMVBridgeEvent(bwQueryCtx, query)

    var bridgeTx model.PartialInfo
    isPending := true

    if err != nil || bridgeEventMV == nil || bridgeEventMV.TChainID == 0 {
        var txFromChain *model.BridgeWatcherTx
        txFromChain, err = r.bwDestinationFallback(ctx, uint32(chainID), address, kappa, timestamp, historical, bridgeType)
        if err != nil {
            if err.Error() == kappaDoesNotExist {
                pendingKappa := model.KappaStatusPending
                return &model.BridgeWatcherTx{
                    BridgeTx:    &bridgeTx,
                    Pending:     &isPending,
                    Type:        &txType,
                    Kappa:       &kappa,
                    KappaStatus: &pendingKappa,
                }, nil
            }
            return nil, fmt.Errorf("failed to get destination bridge event from chain: %w", err)
        }
        return txFromChain, nil
    }
    return bwBridgeMVToBWTxDestination(bridgeEventMV, txType)
}

func bwBridgeToBWTx(bridgeEvent *sql.BridgeEvent, txType model.BridgeTxType) (*model.BridgeWatcherTx, error) {
    var bridgeTx model.PartialInfo
    chainID := int(bridgeEvent.ChainID)
    isPending := false
    blockNumber := int(bridgeEvent.BlockNumber)
    value := bridgeEvent.Amount.String()
    var timestamp int
    var formattedValue *float64
    var timeStampFormatted string
    if bridgeEvent.TokenDecimal != nil {
        formattedValue = getAdjustedValue(bridgeEvent.Amount, *bridgeEvent.TokenDecimal)
    } else {
        return nil, fmt.Errorf("token decimal is not valid")
    }
    if bridgeEvent.TimeStamp != nil {
        timestamp = int(*bridgeEvent.TimeStamp)
        timeStampFormatted = time.Unix(int64(*bridgeEvent.TimeStamp), 0).String()
    } else {
        return nil, fmt.Errorf("timestamp is not valid")
    }

    kappa := bridgeEvent.DestinationKappa
    destinationChainID := int(bridgeEvent.ChainID)
    if txType == model.BridgeTxTypeOrigin {
        destinationChainID = int(bridgeEvent.DestinationChainID.Uint64())
    }
    if txType == model.BridgeTxTypeDestination {
        kappa = bridgeEvent.Kappa.String
    }
    bridgeTx = model.PartialInfo{
        ChainID:            &chainID,
        DestinationChainID: &destinationChainID,
        Address:            &bridgeEvent.Recipient.String,
        TxnHash:            &bridgeEvent.TxHash,
        Value:              &value,
        FormattedValue:     formattedValue,
        USDValue:           bridgeEvent.AmountUSD,
        TokenAddress:       &bridgeEvent.Token,
        TokenSymbol:        &bridgeEvent.TokenSymbol.String,
        BlockNumber:        &blockNumber,
        Time:               &timestamp,
        FormattedTime:      &timeStampFormatted,
    }
    result := &model.BridgeWatcherTx{
        BridgeTx: &bridgeTx,
        Pending:  &isPending,
        Type:     &txType,
        Kappa:    &kappa,
    }
    return result, nil
}

func bwBridgeMVToBWTxOrigin(bridgeEvent *sql.HybridBridgeEvent, txType model.BridgeTxType) (*model.BridgeWatcherTx, error) {
    var bridgeTx model.PartialInfo
    chainID := int(bridgeEvent.FChainID)
    isPending := false
    blockNumber := int(bridgeEvent.FBlockNumber)
    value := bridgeEvent.FAmount.String()
    var timestamp int
    var formattedValue *float64
    var timeStampFormatted string
    if bridgeEvent.FTokenDecimal != nil {
        formattedValue = getAdjustedValue(bridgeEvent.FAmount, *bridgeEvent.FTokenDecimal)
    } else {
        return nil, fmt.Errorf("token decimal is not valid")
    }
    if bridgeEvent.FTimeStamp != nil {
        timestamp = int(*bridgeEvent.FTimeStamp)
        timeStampFormatted = time.Unix(int64(*bridgeEvent.FTimeStamp), 0).String()
    } else {
        return nil, fmt.Errorf("timestamp is not valid")
    }

    kappa := bridgeEvent.FDestinationKappa
    destinationChainID := int(bridgeEvent.FDestinationChainID.Uint64())
    kappaStatus := model.KappaStatusUnknown
    bridgeTx = model.PartialInfo{
        ChainID:            &chainID,
        DestinationChainID: &destinationChainID,
        Address:            &bridgeEvent.FRecipient.String,
        TxnHash:            &bridgeEvent.FTxHash,
        Value:              &value,
        FormattedValue:     formattedValue,
        USDValue:           bridgeEvent.FAmountUSD,
        TokenAddress:       &bridgeEvent.FToken,
        TokenSymbol:        &bridgeEvent.FTokenSymbol.String,
        BlockNumber:        &blockNumber,
        Time:               &timestamp,
        FormattedTime:      &timeStampFormatted,
    }
    bridgeModule := getBridgeModule(int(bridgeEvent.FEventType))
    result := &model.BridgeWatcherTx{
        BridgeTx:     &bridgeTx,
        Pending:      &isPending,
        Type:         &txType,
        Kappa:        &kappa,
        KappaStatus:  &kappaStatus,
        BridgeModule: &bridgeModule,
    }
    return result, nil
}

func bwBridgeMVToBWTxDestination(bridgeEvent *sql.HybridBridgeEvent, txType model.BridgeTxType) (*model.BridgeWatcherTx, error) {
    var bridgeTx model.PartialInfo
    chainID := int(bridgeEvent.TChainID)
    isPending := false
    blockNumber := int(bridgeEvent.TBlockNumber)
    value := bridgeEvent.TAmount.String()
    var timestamp int
    var formattedValue *float64
    var timeStampFormatted string
    if bridgeEvent.TTokenDecimal != nil {
        formattedValue = getAdjustedValue(bridgeEvent.TAmount, *bridgeEvent.TTokenDecimal)
    } else {
        return nil, fmt.Errorf("token decimal is not valid")
    }
    if bridgeEvent.TTimeStamp != nil {
        timestamp = int(*bridgeEvent.TTimeStamp)
        timeStampFormatted = time.Unix(int64(*bridgeEvent.TTimeStamp), 0).String()
    } else {
        return nil, fmt.Errorf("timestamp is not valid")
    }

    destinationChainID := int(bridgeEvent.TChainID)
    kappa := bridgeEvent.TKappa.String
    kappaStatus := model.KappaStatusExists
    bridgeTx = model.PartialInfo{
        ChainID:            &chainID,
        DestinationChainID: &destinationChainID,
        Address:            &bridgeEvent.TRecipient.String,
        TxnHash:            &bridgeEvent.TTxHash,
        Value:              &value,
        FormattedValue:     formattedValue,
        USDValue:           bridgeEvent.TAmountUSD,
        TokenAddress:       &bridgeEvent.TToken,
        TokenSymbol:        &bridgeEvent.TTokenSymbol.String,
        BlockNumber:        &blockNumber,
        Time:               &timestamp,
        FormattedTime:      &timeStampFormatted,
    }
    bridgeModule := getBridgeModule(int(bridgeEvent.TEventType))
    result := &model.BridgeWatcherTx{
        BridgeTx:     &bridgeTx,
        Pending:      &isPending,
        Type:         &txType,
        Kappa:        &kappa,
        KappaStatus:  &kappaStatus,
        BridgeModule: &bridgeModule,
    }
    return result, nil
}

func (r *queryResolver) checkIfChainIDExists(chainIDNeeded uint32, bridgeType model.BridgeType) bool {
    exists := false
    for chainID, chainConfig := range r.Config.Chains {
        if chainID == chainIDNeeded {
            switch bridgeType {
            case model.BridgeTypeBridge:
                if chainConfig.Contracts.Bridge != "" {
                    exists = true
                }
            case model.BridgeTypeCctp:
                if chainConfig.Contracts.CCTP != "" {
                    exists = true
                }
            }
        }
    }
    return exists
}

func (r *queryResolver) getContractAddressFromType(chainID uint32, contractType model.ContractType) (string, error) {
    if _, ok := r.Config.Chains[chainID]; !ok {
        return "", fmt.Errorf("chain ID not found")
    }
    switch contractType {
    case model.ContractTypeBridge:
        return r.Config.Chains[chainID].Contracts.Bridge, nil
    case model.ContractTypeCctp:
        return r.Config.Chains[chainID].Contracts.CCTP, nil
    default:
        return "", fmt.Errorf("contract type not supported")
    }
}

// Module type constants for bridge event classification.
const (
    // ModuleSynapseBridge represents events with type less than 10, indicating standard bridge operations.
    ModuleSynapseBridge = 0
    // ModuleSynapseCCTP represents CCTP (Cross-Chain Transfer Protocol) bridge events.
    ModuleSynapseCCTP = 10
    // ModuleSynapseRFQ represents RFQ (Request for Quote) bridge events.
    ModuleSynapseRFQ = 12
)

func getBridgeModule(eventType int) string {
    switch {
    case eventType < ModuleSynapseCCTP:
        return "SynapseBridge"
    case eventType == ModuleSynapseCCTP:
        return "SynapseCCTP"
    case eventType == ModuleSynapseRFQ:
        return "SynapseRFQ"
    default:
        return ""
    }
}