
View on GitHub


2 days
Test Coverage
package keeper

import (

    sdk ""
    sdkerrors ""
    executionpb ""
    typespb ""

// Keeper of the execution store
type Keeper struct {
    storeKey sdk.StoreKey
    cdc      *codec.Codec

    serviceKeeper  types.ServiceKeeper
    instanceKeeper types.InstanceKeeper
    runnerKeeper   types.RunnerKeeper
    processKeeper  types.ProcessKeeper
    creditKeeper   types.CreditKeeper

// NewKeeper creates a execution keeper
func NewKeeper(
    cdc *codec.Codec,
    key sdk.StoreKey,
    serviceKeeper types.ServiceKeeper,
    instanceKeeper types.InstanceKeeper,
    runnerKeeper types.RunnerKeeper,
    processKeeper types.ProcessKeeper,
    creditKeeper types.CreditKeeper,
) Keeper {
    return Keeper{
        storeKey:       key,
        cdc:            cdc,
        serviceKeeper:  serviceKeeper,
        instanceKeeper: instanceKeeper,
        runnerKeeper:   runnerKeeper,
        processKeeper:  processKeeper,
        creditKeeper:   creditKeeper,

// Logger returns a module-specific logger.
func (k Keeper) Logger(ctx sdk.Context) log.Logger {
    return ctx.Logger().With("module", fmt.Sprintf("x/%s", types.ModuleName))

// Create creates a new execution with proposed status.
// The execution reaches consensus only when more than 2/3 of emitters proposed the same execution.
// TODO: we should split the message and keeper function of execution create from user and for process.
func (k *Keeper) Create(ctx sdk.Context, msg types.MsgCreate) (*executionpb.Execution, error) {
    run, err := k.runnerKeeper.Get(ctx, msg.ExecutorHash)
    if err != nil {
        return nil, err
    inst, err := k.instanceKeeper.Get(ctx, run.InstanceHash)
    if err != nil {
        return nil, err
    srv, err := k.serviceKeeper.Get(ctx, inst.ServiceHash)
    if err != nil {
        return nil, err
    if err := srv.RequireTaskInputs(msg.TaskKey, msg.Inputs); err != nil {
        return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, err.Error())
    var proc *process.Process
    if !msg.ProcessHash.IsZero() {
        proc, err = k.processKeeper.Get(ctx, msg.ProcessHash)
        if err != nil {
            return nil, err

    if proc == nil && run.Owner != msg.Signer.String() {
        return nil, sdkerrors.Wrap(sdkerrors.ErrUnauthorized, "signer is not the execution's executor")

    exec, err := executionpb.New(
    if err != nil {
        return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, err.Error())

    // check if exec already exists
    store := ctx.KVStore(k.storeKey)
    if store.Has(exec.Hash) {
        if exec, err = k.Get(ctx, exec.Hash); err != nil {
            return nil, err
        if exec.Status != executionpb.Status_Proposed {
            return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "the execution's status %q should be proposed", exec.Status)
    } else {
        // set everything that should be put at the creation of the exec

        // init execution's emitters
        if proc == nil {
            // no process, set the signer as only emitter
            exec.Emitters = []*executionpb.Execution_Emitter{{
                RunnerHash:  run.Hash,
                BlockHeight: 0,
        } else {
            matchedRuns, err := k.fetchEmitters(ctx, proc, exec.NodeKey)
            if err != nil {
                return nil, err
            if len(matchedRuns) == 0 {
                return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "no runner is running the instance that should have trigger this execution")
            for _, matchedRun := range matchedRuns {
                exec.Emitters = append(exec.Emitters, &executionpb.Execution_Emitter{
                    RunnerHash:  matchedRun.Hash,
                    BlockHeight: 0,

    // check if signer is in emitters, set emitter's block height, return error if not present.
    emitterIsPresent := false
    for _, emitter := range exec.Emitters {
        runEmitter, err := k.runnerKeeper.Get(ctx, emitter.RunnerHash)
        if err != nil {
            return nil, err
        if runEmitter.Owner == msg.Signer.String() {
            emitterIsPresent = true
            emitter.BlockHeight = ctx.BlockHeight()

            // emit event with action proposed
            event := sdk.NewEvent(
                sdk.NewAttribute(sdk.AttributeKeyAction, types.AttributeActionProposed),
                sdk.NewAttribute(types.AttributeKeyHash, exec.Hash.String()),
                sdk.NewAttribute(types.AttributeKeyAddress, exec.Address.String()),
                sdk.NewAttribute(types.AttributeKeyExecutor, exec.ExecutorHash.String()),
                sdk.NewAttribute(types.AttributeKeyInstance, exec.InstanceHash.String()),
            if !exec.ProcessHash.IsZero() {
                event = event.AppendAttributes(
                    sdk.NewAttribute(types.AttributeKeyExecutor, exec.ProcessHash.String()),

    if !emitterIsPresent {
        return nil, sdkerrors.Wrapf(sdkerrors.ErrUnauthorized, "message's signer is not in the execution's emitters")

    // define consensus requirements
    nbrEmitterRequired := int(math.Ceil(float64(len(exec.Emitters)) * 2 / 3))

    // calculate emitter already proposed
    nbrProposedEmitters := 0
    for _, emitter := range exec.Emitters {
        if emitter.BlockHeight > 0 {

    // check if emitter consensus is reached
    if nbrProposedEmitters == nbrEmitterRequired {
        // set block height when consensus is reached
        exec.BlockHeight = ctx.BlockHeight()

        // change the status of the exec
        if err := exec.Execute(); err != nil {
            return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, err.Error())

        // emit event
        event := sdk.NewEvent(
            sdk.NewAttribute(sdk.AttributeKeyAction, types.AttributeActionCreated),
            sdk.NewAttribute(types.AttributeKeyHash, exec.Hash.String()),
            sdk.NewAttribute(types.AttributeKeyAddress, exec.Address.String()),
            sdk.NewAttribute(types.AttributeKeyExecutor, exec.ExecutorHash.String()),
            sdk.NewAttribute(types.AttributeKeyInstance, exec.InstanceHash.String()),
        if !exec.ProcessHash.IsZero() {
            event = event.AppendAttributes(
                sdk.NewAttribute(types.AttributeKeyExecutor, exec.ProcessHash.String()),

        if !ctx.IsCheckTx() {

    // save the exec
    value, err := k.cdc.MarshalBinaryLengthPrefixed(exec)
    if err != nil {
        return nil, sdkerrors.Wrapf(sdkerrors.ErrJSONMarshal, err.Error())
    store.Set(exec.Hash, value)

    return exec, nil

// Update updates a new execution from definition.
func (k *Keeper) Update(ctx sdk.Context, msg types.MsgUpdate) (*executionpb.Execution, error) {
    store := ctx.KVStore(k.storeKey)
    if !store.Has(msg.Hash) {
        return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "execution %q doesn't exist", msg.Hash)
    var exec *executionpb.Execution
    if err := k.cdc.UnmarshalBinaryLengthPrefixed(store.Get(msg.Hash), &exec); err != nil {
        return nil, sdkerrors.Wrapf(sdkerrors.ErrJSONUnmarshal, err.Error())

    // check if signer is the executor
    runExecutor, err := k.runnerKeeper.Get(ctx, exec.ExecutorHash)
    if err != nil {
        return nil, err
    if runExecutor.Owner != msg.Executor.String() {
        return nil, sdkerrors.Wrap(sdkerrors.ErrUnauthorized, "signer is not the execution's executor")

    eventAction := types.AttributeActionFailed
    switch res := msg.Result.(type) {
    case *types.MsgUpdate_Outputs:
        if err := k.validateOutput(ctx, exec.InstanceHash, exec.TaskKey, res.Outputs); err != nil {
            if err1 := exec.Fail(err); err1 != nil {
                return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, err1.Error())
        } else if err := exec.Complete(res.Outputs); err != nil {
            return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, err.Error())
        eventAction = types.AttributeActionCompleted
    case *types.MsgUpdate_Error:
        if err := exec.Fail(errors.New(res.Error)); err != nil {
            return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, err.Error())
        return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "no execution result supplied")

    exec.Start = msg.Start
    exec.Stop = msg.Stop
    price, err := k.calculatePrice(ctx, exec)
    if err != nil {
        return nil, err
    exec.Price = price.String()

    value, err := k.cdc.MarshalBinaryLengthPrefixed(exec)
    if err != nil {
        return nil, sdkerrors.Wrapf(sdkerrors.ErrJSONMarshal, err.Error())

    if !ctx.IsCheckTx() {

    from := msg.Executor
    if !exec.ProcessHash.IsZero() {
        proc, err := k.processKeeper.Get(ctx, exec.ProcessHash)
        if err != nil {
            return nil, err
        from = proc.PaymentAddress
    if _, err = k.creditKeeper.Sub(ctx, from, price); err != nil {
        return nil, err

    store.Set(exec.Hash, value)

    // emit event
    event := sdk.NewEvent(
        sdk.NewAttribute(sdk.AttributeKeyAction, eventAction),
        sdk.NewAttribute(types.AttributeKeyHash, exec.Hash.String()),
        sdk.NewAttribute(types.AttributeKeyAddress, exec.Address.String()),
        sdk.NewAttribute(types.AttributeKeyExecutor, exec.ExecutorHash.String()),
        sdk.NewAttribute(types.AttributeKeyInstance, exec.InstanceHash.String()),
    if !exec.ProcessHash.IsZero() {
        event = event.AppendAttributes(
            sdk.NewAttribute(types.AttributeKeyExecutor, exec.ProcessHash.String()),

    return exec, nil

// Get returns the execution that matches given hash.
func (k *Keeper) Get(ctx sdk.Context, hash hash.Hash) (*executionpb.Execution, error) {
    var exec *executionpb.Execution
    store := ctx.KVStore(k.storeKey)
    if !store.Has(hash) {
        return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "execution %q not found", hash)
    if err := k.cdc.UnmarshalBinaryLengthPrefixed(store.Get(hash), &exec); err != nil {
        return nil, sdkerrors.Wrapf(sdkerrors.ErrJSONUnmarshal, err.Error())
    return exec, nil

// List returns all executions.
func (k *Keeper) List(ctx sdk.Context, filter types.ListFilter) ([]*executionpb.Execution, error) {
    var (
        execs []*executionpb.Execution
        iter  = ctx.KVStore(k.storeKey).Iterator(nil, nil)
    for iter.Valid() {
        var exec *executionpb.Execution
        value := iter.Value()
        if err := k.cdc.UnmarshalBinaryLengthPrefixed(value, &exec); err != nil {
            return nil, sdkerrors.Wrapf(sdkerrors.ErrJSONUnmarshal, err.Error())
        if filter.Match(exec) {
            execs = append(execs, exec)
    return execs, nil

func (k *Keeper) validateOutput(ctx sdk.Context, instanceHash hash.Hash, taskKey string, outputs *typespb.Struct) error {
    inst, err := k.instanceKeeper.Get(ctx, instanceHash)
    if err != nil {
        return err
    srv, err := k.serviceKeeper.Get(ctx, inst.ServiceHash)
    if err != nil {
        return err
    if err := srv.RequireTaskOutputs(taskKey, outputs); err != nil {
        return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, err.Error())
    return nil

// fetchEmitters returns the runners running the instance that was responsible for creating this execution from the process.
func (k *Keeper) fetchEmitters(ctx sdk.Context, proc *process.Process, nodeKey string) ([]*runner.Runner, error) {
    // get parent node's instance hash
    parentNode, err := proc.FindParentWithType(nodeKey, func(node *process.Process_Node) bool {
        switch node.GetType().(type) {
        case *process.Process_Node_Event_, *process.Process_Node_Result_:
            return true
            return false
    if err != nil {
        return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, err.Error())
    var instanceHash hash.Hash
    switch n := parentNode.GetType().(type) {
    case *process.Process_Node_Event_:
        instanceHash = n.Event.InstanceHash
    case *process.Process_Node_Result_:
        instanceHash = n.Result.InstanceHash
        return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "parent node type should be an event or a result")

    // get runners of this instance
    runs, err := k.runnerKeeper.List(ctx)
    if err != nil {
        return nil, err
    matchedRuns := make([]*runner.Runner, 0)
    for _, run := range runs {
        if run.InstanceHash.Equal(instanceHash) {
            matchedRuns = append(matchedRuns, run)
    return matchedRuns, nil

func (k *Keeper) calculatePrice(ctx sdk.Context, exec *executionpb.Execution) (sdk.Int, error) {
    inst, err := k.instanceKeeper.Get(ctx, exec.InstanceHash)
    if err != nil {
        return sdk.Int{}, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, err.Error())
    srv, err := k.serviceKeeper.Get(ctx, inst.ServiceHash)
    if err != nil {
        return sdk.Int{}, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, err.Error())
    task, err := srv.GetTask(exec.TaskKey)
    if err != nil {
        return sdk.Int{}, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, err.Error())
    duration := sdk.NewInt(exec.GetDuration())
    inputs, err := k.cdc.MarshalBinaryLengthPrefixed(exec.Inputs)
    if err != nil {
        return sdk.Int{}, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, err.Error())
    outputs, err := k.cdc.MarshalBinaryLengthPrefixed(exec.Outputs)
    if err != nil {
        return sdk.Int{}, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, err.Error())
    datasize := sdk.NewInt(int64(math.Ceil(float64(len(inputs)+len(outputs)) / 1000)))
    return task.Price.PerCall.Add(task.Price.PerSec.Mul(duration)).Add(task.Price.PerKB.Mul(datasize)), nil

// Import imports a list of executions into the store.
func (k *Keeper) Import(ctx sdk.Context, execs []*executionpb.Execution) error {
    store := ctx.KVStore(k.storeKey)
    for _, exec := range execs {
        value, err := k.cdc.MarshalBinaryLengthPrefixed(exec)
        if err != nil {
            return sdkerrors.Wrapf(sdkerrors.ErrJSONMarshal, err.Error())
        store.Set(exec.Hash, value)
    return nil