
View on GitHub


4 hrs
Test Coverage
package cmd

import (

    markdown ""
    execConfig ""
    omnirpcClient ""
    scribeAPI ""
    scribeCmd ""

    // used to embed markdown.
    _ "embed"


var help string

// ExecutorInfoCommand gets info about using the executor agent.
var ExecutorInfoCommand = &cli.Command{
    Name:        "executor-info",
    Description: "learn how to use executor cli",
    Action: func(c *cli.Context) error {
        fmt.Println(string(markdown.Render(help, termsize.Width(), 6)))
        return nil

var configFlag = &cli.StringFlag{
    Name:      "config",
    Usage:     "--config /Users/synapsecns/config.yaml",
    TakesFile: true,
    Required:  true,

var metricsPortFlag = &cli.UintFlag{
    Name:  "metrics-port",
    Usage: "--port 5121",
    Value: 0,

var debugFlag = &cli.BoolFlag{
    Name:  "debug",
    Usage: "--debug",

func createExecutorParameters(ctx context.Context, c *cli.Context, metrics metrics.Handler) (executorConfig execConfig.Config, executorDB db.ExecutorDB, err error) {
    executorConfig, err = execConfig.DecodeConfig(core.ExpandOrReturnPath(c.String(configFlag.Name)))
    if err != nil {
        return executorConfig, nil, fmt.Errorf("failed to decode config: %w", err)

    if executorConfig.DBPrefix == "" && executorConfig.DBConfig.Type == dbcommon.Mysql.String() {
        executorConfig.DBPrefix = "executor"

    if executorConfig.DBConfig.Type == dbcommon.Sqlite.String() {
        executorConfig.DBPrefix = ""

    executorDB, err = InitExecutorDB(
    if err != nil {
        return executorConfig, nil, fmt.Errorf("failed to initialize database: %w", err)

    return executorConfig, executorDB, nil

// ExecutorRunCommand runs the executor.
var ExecutorRunCommand = &cli.Command{
    Name:        "executor-run",
    Description: "runs the executor service",
    Flags:       []cli.Flag{configFlag, metricsPortFlag, debugFlag},
    Action: func(c *cli.Context) error {

        var scribeClient client.ScribeClient

        g, ctx := errgroup.WithContext(c.Context)

        handler, err := metrics.NewFromEnv(ctx, metadata.BuildInfo())
        if err != nil {
            return fmt.Errorf("failed to create metrics handler: %w", err)

        executorConfig, executorDB, err := createExecutorParameters(ctx, c, handler)
        if err != nil {
            return err

        switch executorConfig.ScribeConfig.Type {
        case "embedded":
            eventDB, err := scribeAPI.InitDB(
            if err != nil {
                return fmt.Errorf("failed to initialize database: %w", err)

            scribeClients := make(map[uint32][]backend.ScribeBackend)

            for _, client := range executorConfig.ScribeConfig.EmbeddedScribeConfig.Chains {
                for confNum := 1; confNum <= scribeCmd.MaxConfirmations; confNum++ {
                    backendClient, err := backend.DialBackend(ctx, fmt.Sprintf("%s/%d/rpc/%d", executorConfig.ScribeConfig.EmbeddedScribeConfig.RPCURL, confNum, client.ChainID), handler)
                    if err != nil {
                        return fmt.Errorf("could not start client for %s", fmt.Sprintf("%s/1/rpc/%d", executorConfig.ScribeConfig.EmbeddedScribeConfig.RPCURL, client.ChainID))

                    scribeClients[client.ChainID] = append(scribeClients[client.ChainID], backendClient)

            scribe, err := service.NewScribe(eventDB, scribeClients, executorConfig.ScribeConfig.EmbeddedScribeConfig, handler)
            if err != nil {
                return fmt.Errorf("failed to initialize scribe: %w", err)

            g.Go(func() error {
                err := scribe.Start(ctx)
                if err != nil {
                    return fmt.Errorf("failed to start scribe: %w", err)

                return nil

            embedded := client.NewEmbeddedScribe(

            g.Go(func() error {
                err := embedded.Start(ctx)
                if err != nil {
                    return fmt.Errorf("failed to start embedded scribe: %w", err)

                return nil

            scribeClient = embedded.ScribeClient
        case "remote":
            scribeClient = client.NewRemoteScribe(
            return fmt.Errorf("invalid scribe type: %s", executorConfig.ScribeConfig.Type)

        var baseOmniRPCClient omnirpcClient.RPCClient
        if debugFlag.IsSet() {
            baseOmniRPCClient = omnirpcClient.NewOmnirpcClient(executorConfig.BaseOmnirpcURL, handler, omnirpcClient.WithCaptureReqRes())
        } else {
            baseOmniRPCClient = omnirpcClient.NewOmnirpcClient(executorConfig.BaseOmnirpcURL, handler)

        executor, err := executor.NewExecutor(ctx, executorConfig, executorDB, scribeClient, baseOmniRPCClient, handler)
        if err != nil {
            return fmt.Errorf("failed to create executor: %w", err)

        g.Go(func() error {
            err := api.Start(ctx, uint16(c.Uint(metricsPortFlag.Name)))
            if err != nil {
                return fmt.Errorf("failed to start api: %w", err)

            return nil

        g.Go(func() error {
            err := executor.Run(ctx)
            if err != nil {
                return fmt.Errorf("failed to run executor: %w", err)

            return nil

        if err := g.Wait(); err != nil {
            return fmt.Errorf("failed to run executor: %w", err)

        return nil

func init() {
    metricsPortFlag.Value = uint(freeport.GetPort())

// InitExecutorDB initializes a database given a database type and path.
func InitExecutorDB(parentCtx context.Context, database string, path string, tablePrefix string, handler metrics.Handler) (_ db.ExecutorDB, err error) {
    ctx, span := handler.Tracer().Start(parentCtx, "InitExecutorDB", trace.WithAttributes(
        attribute.String("database", database),
        attribute.String("path", path),
        attribute.String("tablePrefix", tablePrefix),

    defer func() {
        metrics.EndSpanWithErr(span, err)

    switch {
    case database == dbcommon.Sqlite.String():
        sqliteStore, err := sqlite.NewSqliteStore(ctx, path, handler, false)
        if err != nil {
            return nil, fmt.Errorf("failed to create sqlite store: %w", err)

        return sqliteStore, nil

    case database == dbcommon.Mysql.String():
        if os.Getenv("OVERRIDE_MYSQL") != "" {
            dbname := os.Getenv("MYSQL_DATABASE")
            connString := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true", core.GetEnv("MYSQL_USER", "root"), os.Getenv("MYSQL_PASSWORD"), core.GetEnv("MYSQL_HOST", ""), core.GetEnvInt("MYSQL_PORT", 3306), dbname)

            mysqlStore, err := mysql.NewMysqlStore(ctx, connString, handler, false)
            if err != nil {
                return nil, fmt.Errorf("failed to create mysql store: %w", err)

            return mysqlStore, nil

        if tablePrefix != "" {
            mysql.NamingStrategy = schema.NamingStrategy{
                TablePrefix: fmt.Sprintf("%s_", tablePrefix),

        mysqlStore, err := mysql.NewMysqlStore(ctx, path, handler, false)
        if err != nil {
            return nil, fmt.Errorf("failed to create mysql store: %w", err)

        return mysqlStore, nil

        return nil, fmt.Errorf("invalid database type: %s", database)