
View on GitHub


1 day
Test Coverage
package guard

import (


    ethTypes ""
    signerConfig ""
    omnirpcClient ""
    pbscribe ""


// Guard scans origins for latest state and submits snapshots to the Summit.
type Guard struct {
    bondedSigner       signer.Signer
    unbondedSigner     signer.Signer
    domains            map[uint32]domains.DomainClient
    summitDomainID     uint32
    refreshInterval    time.Duration
    summitLatestStates map[uint32]types.State
    // TODO: change to metrics type
    originLatestStates   map[uint32]types.State
    handler              metrics.Handler
    grpcClient           pbscribe.ScribeServiceClient
    grpcConn             *grpc.ClientConn
    logChans             map[uint32]chan *ethTypes.Log
    inboxParser          inbox.Parser
    lightInboxParser     lightinbox.Parser
    bondingManagerParser bondingmanager.Parser
    lightManagerParser   lightmanager.Parser
    boundOrigins         map[uint32]*origin.Origin
    txSubmitter          submitter.TransactionSubmitter
    retryConfig          []retry.WithBackoffConfigurator
    guardDB              db.GuardDB

const (
    logChanSize          = 1000
    scribeConnectTimeout = 30 * time.Second

func makeScribeClient(parentCtx context.Context, handler metrics.Handler, url string) (*grpc.ClientConn, pbscribe.ScribeServiceClient, error) {
    ctx, cancel := context.WithTimeout(parentCtx, scribeConnectTimeout)
    defer cancel()

    conn, err := grpc.DialContext(ctx, url,
    if err != nil {
        return nil, nil, fmt.Errorf("could not dial grpc: %w", err)

    scribeClient := pbscribe.NewScribeServiceClient(conn)

    // Ensure that gRPC is up and running.
    healthCheck, err := scribeClient.Check(ctx, &pbscribe.HealthCheckRequest{}, grpc.WaitForReady(true))
    if err != nil {
        return nil, nil, fmt.Errorf("could not check: %w", err)
    if healthCheck.Status != pbscribe.HealthCheckResponse_SERVING {
        return nil, nil, fmt.Errorf("not serving: %s", healthCheck.Status)

    return conn, scribeClient, nil

// NewGuard creates a new guard.
func NewGuard(ctx context.Context, cfg config.AgentConfig, omniRPCClient omnirpcClient.RPCClient, scribeClient client.ScribeClient, guardDB db.GuardDB, handler metrics.Handler) (guard *Guard, err error) {
    guard = &Guard{
        refreshInterval: time.Second * time.Duration(cfg.RefreshIntervalSeconds),
        domains:         make(map[uint32]domains.DomainClient),
        logChans:        make(map[uint32]chan *ethTypes.Log),
        boundOrigins:    make(map[uint32]*origin.Origin),

    guard.grpcConn, guard.grpcClient, err = makeScribeClient(ctx, handler, fmt.Sprintf("%s:%d", scribeClient.URL, scribeClient.Port))
    if err != nil {
        return nil, fmt.Errorf("could not create scribe client: %w", err)

    guard.bondedSigner, err = signerConfig.SignerFromConfig(ctx, cfg.BondedSigner)
    if err != nil {
        return nil, fmt.Errorf("error with bondedSigner, could not create guard: %w", err)

    guard.unbondedSigner, err = signerConfig.SignerFromConfig(ctx, cfg.UnbondedSigner)
    if err != nil {
        return nil, fmt.Errorf("error with unbondedSigner, could not create guard: %w", err)

    // Set up evm utilities for each domain
    for domainName, domain := range cfg.Domains {
        omnirpcClient, err := omniRPCClient.GetConfirmationsClient(ctx, int(domain.DomainID), 1)
        if err != nil {
            return nil, fmt.Errorf("error with omniRPCClient, could not create guard: %w", err)

        chainRPCURL := omniRPCClient.GetDefaultEndpoint(int(domain.DomainID))
        domainClient, err := evm.NewEVM(ctx, domainName, domain, chainRPCURL)
        if err != nil {
            return nil, fmt.Errorf("failing to create evm for domain, could not create guard for: %w", err)
        }[domain.DomainID] = domainClient

        guard.logChans[domain.DomainID] = make(chan *ethTypes.Log, logChanSize)
        guard.boundOrigins[domain.DomainID], err = origin.NewOrigin(
        if err != nil {
            return nil, fmt.Errorf("could not create origin: %w", err)

        // Initialize contract parsers for the summit domain.
        if domain.DomainID == cfg.SummitDomainID {
            guard.summitDomainID = domain.DomainID

            guard.inboxParser, err = inbox.NewParser(common.HexToAddress(domain.InboxAddress))
            if err != nil {
                return nil, fmt.Errorf("could not create inbox parser: %w", err)

            guard.lightInboxParser, err = lightinbox.NewParser(common.HexToAddress(domain.LightInboxAddress))
            if err != nil {
                return nil, fmt.Errorf("could not create inbox parser: %w", err)

            guard.bondingManagerParser, err = bondingmanager.NewParser(common.HexToAddress(domain.BondingManagerAddress))
            if err != nil {
                return nil, fmt.Errorf("could not create bonding manager parser: %w", err)

            guard.lightManagerParser, err = lightmanager.NewParser(common.HexToAddress(domain.LightManagerAddress))
            if err != nil {
                return nil, fmt.Errorf("could not create light manager parser: %w", err)

    _, ok :=[guard.summitDomainID]
    if !ok {
        return nil, fmt.Errorf("summit domain not set: %d", guard.summitDomainID)

    guard.summitLatestStates = make(map[uint32]types.State, len(
    guard.originLatestStates = make(map[uint32]types.State, len(
    guard.handler = handler
    guard.txSubmitter = submitter.NewTransactionSubmitter(handler, guard.unbondedSigner, omniRPCClient, guardDB.SubmitterDB(), &cfg.SubmitterConfig)

    if cfg.MaxRetrySeconds == 0 {
        cfg.MaxRetrySeconds = 60

    guard.retryConfig = []retry.WithBackoffConfigurator{
        retry.WithMaxAttemptTime(time.Second * time.Duration(cfg.MaxRetrySeconds)),
    guard.guardDB = guardDB

    return guard, nil

// streamLogs uses the grpcConnection to Scribe, with a chainID and address to get all logs from that address.
func (g Guard) streamLogs(ctx context.Context, chainID uint32, address string) error {
    // TODO: Get last block number to define starting point for streamLogs.
    fromBlock := strconv.FormatUint(0, 16)
    toBlock := "latest"
    stream, err := g.grpcClient.StreamLogs(ctx, &pbscribe.StreamLogsRequest{
        Filter: &pbscribe.LogFilter{
            ContractAddress: &pbscribe.NullableString{Kind: &pbscribe.NullableString_Data{Data: address}},
            ChainId:         chainID,
        FromBlock: fromBlock,
        ToBlock:   toBlock,
    if err != nil {
        return fmt.Errorf("could not stream logs: %w", err)

    for {
        response, err := stream.Recv()
        if err != nil {
            return fmt.Errorf("could not receive: %w", err)

        log := response.Log.ToLog()
        if log == nil {
            return fmt.Errorf("could not convert log")

        select {
        case <-ctx.Done():
            err := stream.CloseSend()
            if err != nil {
                return fmt.Errorf("could not close stream: %w", err)

            err = g.grpcConn.Close()
            if err != nil {
                return fmt.Errorf("could not close connection: %w", err)

            return fmt.Errorf("context done: %w", ctx.Err())
        case g.logChans[chainID] <- log:
            logger.Info("Received log with topic: %s", log.Topics[0].String())

// receiveLogs continuously receives logs from the log channel and processes them.
func (g Guard) receiveLogs(ctx context.Context, chainID uint32) error {
    for {
        select {
        case <-ctx.Done():
            return fmt.Errorf("context canceled: %w", ctx.Err())
        case log := <-g.logChans[chainID]:
            if log == nil {
                return fmt.Errorf("log is nil")

            err := g.handleLog(ctx, *log, chainID)
            if err != nil {
                return fmt.Errorf("could not process log: %w", err)

func (g Guard) handleLog(ctx context.Context, log ethTypes.Log, chainID uint32) error {
    switch {
    case isSnapshotAcceptedEvent(g.inboxParser, log):
        return g.handleSnapshotAccepted(ctx, log)
    case isAttestationAcceptedEvent(g.lightInboxParser, log):
        return g.handleAttestationAccepted(ctx, log)
    case isReceiptAcceptedEvent(g.inboxParser, log):
        return g.handleReceiptAccepted(ctx, log)
    case isStatusUpdatedEvent(g.bondingManagerParser, log):
        return g.handleStatusUpdated(ctx, log, chainID)
    case isRootUpdatedEvent(g.bondingManagerParser, log):
        return g.handleRootUpdated(ctx, log, chainID)
    return nil

func (g Guard) loadSummitLatestStates(parentCtx context.Context) {
    for _, domain := range {
        ctx, span := g.handler.Tracer().Start(parentCtx, "loadSummitLatestStates", trace.WithAttributes(
            attribute.Int("domain", int(domain.Config().DomainID)),

        originID := domain.Config().DomainID

        var latestState types.State
        var err error
        contractCall := func(ctx context.Context) error {
            latestState, err =[g.summitDomainID].Summit().GetLatestAgentState(ctx, originID, g.bondedSigner)
            if err != nil {
                return fmt.Errorf("failed calling GetLatestAgentState for originID %d on the Summit contract: err = %w", originID, err)

            return nil
        err = retry.WithBackoff(ctx, contractCall, g.retryConfig...)
        if err == nil && latestState.Nonce() > uint32(0) {
            g.summitLatestStates[originID] = latestState


func (g Guard) loadOriginLatestStates(parentCtx context.Context) {
    for _, d := range {
        domain := d
        ctx, span := g.handler.Tracer().Start(parentCtx, "loadOriginLatestStates", trace.WithAttributes(
            attribute.Int("domain", int(domain.Config().DomainID)),

        originID := domain.Config().DomainID

        // TODO: Wrap this with a retry if `Start` behavior changes.
        latestState, err := domain.Origin().SuggestLatestState(ctx)
        if err != nil {
            latestState = nil
            logger.Errorf("Failed calling SuggestLatestState for originID %d on the Origin contract: %v", originID, err)
            span.AddEvent("Failed calling SuggestLatestState for originID on the Origin contract", trace.WithAttributes(
                attribute.Int("originID", int(originID)),
                attribute.String("err", err.Error()),
        } else if latestState == nil || latestState.Nonce() == uint32(0) {
            logger.Errorf("No latest state found for origin id %d", originID)
            span.AddEvent("No latest state found for origin id", trace.WithAttributes(
                attribute.Int("originID", int(originID)),
        if latestState != nil {
            // TODO: if overwriting, end span and start a new one
            g.originLatestStates[originID] = latestState


func (g Guard) getLatestSnapshot() (types.Snapshot, map[uint32]types.State) {
    statesToSubmit := make(map[uint32]types.State, len(
    for _, domain := range {
        originID := domain.Config().DomainID
        summitLatest, ok := g.summitLatestStates[originID]
        if !ok || summitLatest == nil || summitLatest.Nonce() == 0 {
            summitLatest = nil
        originLatest, ok := g.originLatestStates[originID]
        if !ok || originLatest == nil || originLatest.Nonce() == 0 {
        if summitLatest != nil && summitLatest.Nonce() >= originLatest.Nonce() {
            // Here this guard already submitted this state
        // TODO: add event for submitting that state
        statesToSubmit[originID] = originLatest
    snapshotStates := make([]types.State, 0, len(statesToSubmit))
    for _, state := range statesToSubmit {
        if state.Nonce() == 0 {
        snapshotStates = append(snapshotStates, state)
    if len(snapshotStates) > 0 {
        return types.NewSnapshot(snapshotStates), statesToSubmit
    return nil, nil

func (g Guard) submitLatestSnapshot(parentCtx context.Context) {
    summitDomain :=[g.summitDomainID]

    ctx, span := g.handler.Tracer().Start(parentCtx, "submitLatestSnapshot", trace.WithAttributes(
        attribute.Int("domain", int(g.summitDomainID)),

    defer func() {

    snapshot, statesToSubmit := g.getLatestSnapshot()
    if snapshot == nil {

    snapshotSignature, encodedSnapshot, _, err := snapshot.SignSnapshot(ctx, g.bondedSigner)

    if err != nil {
        logger.Errorf("Error signing snapshot: %v", err)
        span.AddEvent("Error signing snapshot", trace.WithAttributes(
            attribute.String("err", err.Error()),
    } else {
        _, err = g.txSubmitter.SubmitTransaction(ctx, big.NewInt(int64(g.summitDomainID)), func(transactor *bind.TransactOpts) (tx *ethTypes.Transaction, err error) {
            tx, err = summitDomain.Inbox().SubmitSnapshot(transactor, encodedSnapshot, snapshotSignature)
            if err != nil {
                return nil, fmt.Errorf("failed to submit snapshot: %w", err)

        if err != nil {
            logger.Errorf("Failed to submit snapshot to inbox: %v", err)
            span.AddEvent("Failed to submit snapshot to inbox", trace.WithAttributes(
                attribute.String("err", err.Error()),
        } else {
            for originID, state := range statesToSubmit {
                g.summitLatestStates[originID] = state

// Start starts the guard.
func (g Guard) Start(parentCtx context.Context) error {
    // First initialize a map to track what was the last state signed by this guard

    group, ctx := errgroup.WithContext(parentCtx)

    group.Go(func() error {
        err := g.txSubmitter.Start(ctx)
        if err != nil {
            err = fmt.Errorf("could not start tx submitter: %w", err)
        return err

    group.Go(func() error {
        return g.streamLogs(ctx, g.summitDomainID,[g.summitDomainID].Config().InboxAddress)

    group.Go(func() error {
        return g.streamLogs(ctx, g.summitDomainID,[g.summitDomainID].Config().BondingManagerAddress)

    group.Go(func() error {
        return g.receiveLogs(ctx, g.summitDomainID)

    for _, domain := range {
        domainCfg := domain.Config()
        if domainCfg.DomainID == g.summitDomainID {

        group.Go(func() error {
            return g.streamLogs(ctx, domainCfg.DomainID, domainCfg.LightInboxAddress)

        group.Go(func() error {
            return g.streamLogs(ctx, domainCfg.DomainID, domainCfg.LightManagerAddress)

        group.Go(func() error {
            return g.receiveLogs(ctx, domainCfg.DomainID)

    group.Go(func() error {
        for {
            select {
            // parent loop terminated
            case <-ctx.Done():
                logger.Info("Guard exiting without error")
                return nil
            case <-time.After(g.refreshInterval):
                err := g.updateAgentStatuses(ctx)
                if err != nil {
                    return err

    if err := group.Wait(); err != nil {
        return fmt.Errorf("guard error: %w", err)

    return nil