sprawl/sprawl

View on GitHub
service/order.go

Summary

Maintainability
F
4 days
Test Coverage
package service

import (
    "context"
    "crypto/hmac"
    "crypto/sha256"
    "strings"

    "github.com/golang/protobuf/proto"
    ptypes "github.com/golang/protobuf/ptypes"
    "github.com/libp2p/go-libp2p-core/crypto"
    peer "github.com/libp2p/go-libp2p-core/peer"
    "github.com/sprawl/sprawl/errors"
    "github.com/sprawl/sprawl/identity"
    "github.com/sprawl/sprawl/interfaces"
    "github.com/sprawl/sprawl/pb"
)

// OrderService implements the OrderService Server service.proto
type OrderService struct {
    Logger    interfaces.Logger
    Storage   interfaces.Storage
    P2p       interfaces.P2p
    websocket interfaces.WebsocketService
}

func getOrderStorageKey(channelID []byte, orderID []byte) []byte {
    return []byte(strings.Join([]string{string(interfaces.OrderPrefix), string(channelID), string(orderID)}, ""))
}

func getOrderQueryPrefix(channelID []byte) []byte {
    return []byte(strings.Join([]string{string(interfaces.OrderPrefix), string(channelID)}, ""))
}

// RegisterWebsocket registers a websocket service to enable websocket connections between client and node
func (s *OrderService) RegisterWebsocket(websocket interfaces.WebsocketService) {
    s.websocket = websocket
}

// RegisterStorage registers a storage service to store the Orders in
func (s *OrderService) RegisterStorage(storage interfaces.Storage) {
    s.Storage = storage
}

// RegisterP2p registers a p2p service
func (s *OrderService) RegisterP2p(p2p interfaces.P2p) {
    s.P2p = p2p
}

// GetSignature generates signature from order and returns it
func (s *OrderService) GetSignature(order *pb.Order) ([]byte, error) {
    orderCopy := *order
    orderCopy.State = pb.State_OPEN
    orderCopy.Signature = nil
    orderCopy.Nonce = 0
    orderInBytes, err := proto.Marshal(&orderCopy)
    if !errors.IsEmpty(err) {
        return nil, errors.E(errors.Op("Marshal order in GetSignature"), err)
    }

    return identity.Sign(s.Storage, orderInBytes)
}

// VerifyOrder verifies order
func (s *OrderService) VerifyOrder(publicKey crypto.PubKey, order *pb.Order) (bool, error) {
    orderCopy := *order
    sig := orderCopy.Signature
    orderCopy.Signature = nil
    orderCopy.State = pb.State_OPEN
    orderCopy.Nonce = 0
    orderInBytes, err := proto.Marshal(&orderCopy)
    if !errors.IsEmpty(err) {
        return false, errors.E(errors.Op("Marshal order in VerifyOrder"), err)
    }
    return identity.Verify(publicKey, orderInBytes, sig)
}

// Create creates an Order, storing it locally and broadcasts the Order to all other nodes on the channel
func (s *OrderService) Create(ctx context.Context, in *pb.CreateRequest) (*pb.CreateResponse, error) {

    _, publicKey, err := identity.GetIdentity(s.Storage)
    if !errors.IsEmpty(err) {
        errors.E(errors.Op("Get public key in create order"), err)
    }

    // Get current timestamp as protobuf type
    now := ptypes.TimestampNow()

    secret, err := publicKey.Bytes()
    if !errors.IsEmpty(err) {
        errors.E(errors.Op("Turn public key into bytes"), err)
    }

    // Create a new HMAC by defining the hash type and the key (as byte array)
    h := hmac.New(sha256.New, secret)

    // Write Data to it
    h.Write(append([]byte(in.String()), []byte(now.String())...))

    // Get result and encode as hexadecimal string
    id := h.Sum(nil)

    // Construct the order
    order := &pb.Order{
        Id:           id,
        Created:      now,
        Asset:        in.Asset,
        CounterAsset: in.CounterAsset,
        Amount:       in.Amount,
        Price:        in.Price,
        State:        pb.State_OPEN, //Mutable
        Nonce:        0,             //Mutable
    }

    sig, err := s.GetSignature(order)
    if !errors.IsEmpty(err) {
        return &pb.CreateResponse{
            CreatedOrder: order,
        }, errors.E(errors.Op("Get Signature"), err)
    }

    order.Signature = sig

    // Get order as bytes
    orderInBytes, err := proto.Marshal(order)
    if !errors.IsEmpty(err) {
        s.Logger.Warn(errors.E(errors.Op("Marshal order"), err))
    }

    // Save order to LevelDB locally
    err = s.Storage.Put(getOrderStorageKey(in.GetChannelID(), id), orderInBytes)
    if !errors.IsEmpty(err) {
        err = errors.E(errors.Op("Put order"), err)
    }

    // Construct the message to send to other peers
    wireMessage := &pb.WireMessage{ChannelID: in.GetChannelID(), Operation: pb.Operation_CREATE, Data: orderInBytes}

    if s.P2p != nil {
        // Send the order creation by wire
        s.P2p.Send(wireMessage)
    } else {
        s.Logger.Warn("P2p service not registered with OrderService, not publishing or receiving orders from the network!")
    }

    return &pb.CreateResponse{
        CreatedOrder: order,
    }, err
}

// Receive receives a buffer from p2p and tries to unmarshal it into a struct
func (s *OrderService) Receive(buf []byte, from peer.ID) error {
    wireMessage := &pb.WireMessage{}
    err := proto.Unmarshal(buf, wireMessage)
    if !errors.IsEmpty(err) {
        return errors.E(errors.Op("Unmarshal wiremessage proto in Receive"), err)
    }
    if s.websocket != nil {
        s.websocket.PushToWebsockets(wireMessage)
    }

    // Read operation and data from the WireMessage
    op := wireMessage.GetOperation()
    data := wireMessage.GetData()
    channelID := wireMessage.GetChannelID()

    s.Logger.Debugf("%s: %s.%s", from.String(), channelID, op)

    if s.Storage != nil {
        switch op {

        case pb.Operation_CREATE:
            // Validate order
            order := &pb.Order{}
            err = proto.Unmarshal(data, order)
            if !errors.IsEmpty(err) {
                return errors.E(errors.Op("Unmarshal order proto in Receive"), err)
            }

            publickey, err := from.ExtractPublicKey()
            if !errors.IsEmpty(err) {
                return errors.E(errors.Op("Extract public key in Receive"), err)
            }
            isCreator, err := s.VerifyOrder(publickey, order)
            if !errors.IsEmpty(err) {
                return errors.E(errors.Op("Verify order creator in Receive"), err)
            }
            if isCreator {
                // Save order to LevelDB locally
                err = s.Storage.Put(getOrderStorageKey(channelID, order.GetId()), data)
                if !errors.IsEmpty(err) {
                    err = errors.E(errors.Op("Put order"), err)
                }
            } else {
                s.Logger.Debug("Received create request from someone that doesn't own the order")
            }

        case pb.Operation_DELETE:
            // Unmarshal order to get its key, validate
            order := &pb.Order{}
            err = proto.Unmarshal(data, order)
            if !errors.IsEmpty(err) {
                return errors.E(errors.Op("Unmarshal order proto in Receive"), err)
            }
            publickey, err := from.ExtractPublicKey()
            if !errors.IsEmpty(err) {
                return errors.E(errors.Op("Extract public key in Receive"), err)
            }

            isCreator, err := s.VerifyOrder(publickey, order)
            if !errors.IsEmpty(err) {
                return errors.E(errors.Op("Verify order creator in Receive"), err)
            }
            if isCreator {
                err = s.Storage.Delete(getOrderStorageKey(channelID, order.GetId()))
                if !errors.IsEmpty(err) {
                    return errors.E(errors.Op("Delete order"), err)
                }
            } else {
                s.Logger.Debug("Received delete request from someone that doesn't own the order")
            }

        case pb.Operation_SYNC_REQUEST:
            orders, err := s.Storage.GetAllWithPrefix(string(getOrderQueryPrefix(channelID)))
            if !errors.IsEmpty(err) {
                return errors.E(errors.Op("Fetch orders for sync"), err)
            }

            orderList := &pb.OrderList{}
            for _, value := range orders {
                order := &pb.Order{}
                proto.Unmarshal([]byte(value), order)
                orderList.Orders = append(orderList.Orders, order)
            }

            marshaledOrderList, err := proto.Marshal(orderList)
            if !errors.IsEmpty(err) {
                return errors.E(errors.Op("Marshal orderList in sync request"), err)
            }

            syncMessage := &pb.WireMessage{Operation: pb.Operation_SYNC_RECEIVE, ChannelID: channelID, Data: marshaledOrderList}

            marshaledData, err := proto.Marshal(syncMessage)
            if !errors.IsEmpty(err) {
                return errors.E(errors.Op("Marshal wireMessage in sync request"), err)
            }

            stream, err := s.P2p.OpenStream(from)
            if !errors.IsEmpty(err) {
                return errors.E(errors.Op("Open a sync request stream"), err)
            }

            err = stream.WriteToStream(marshaledData)
            if !errors.IsEmpty(err) {
                return errors.E(errors.Op("Write to stream"), err)
            }
            err = s.P2p.CloseStream(from)
            if !errors.IsEmpty(err) {
                return errors.E(errors.Op("Close the stream"), err)
            }

        case pb.Operation_SYNC_RECEIVE:
            orderList := &pb.OrderList{}
            err = proto.Unmarshal(data, orderList)
            if !errors.IsEmpty(err) {
                return errors.E(errors.Op("Unmarshal order proto in Receive"), err)
            }
            s.Logger.Info(orderList)
            for _, order := range orderList.GetOrders() {
                orderBytes, err := proto.Marshal(order)
                if !errors.IsEmpty(err) {
                    err = errors.E(errors.Op("Marshal order from received orderList"), err)
                }
                err = s.Storage.Put(getOrderStorageKey(channelID, order.GetId()), orderBytes)
                if !errors.IsEmpty(err) {
                    err = errors.E(errors.Op("Put order"), err)
                }
            }
        case pb.Operation_LOCK, pb.Operation_UNLOCK:
            // Unmarshal order to get its key, validate
            order := &pb.Order{}
            err = proto.Unmarshal(data, order)
            if !errors.IsEmpty(err) {
                return errors.E(errors.Op("Unmarshal order proto in Receive"), err)
            }

            previousOrderData, err := s.Storage.Get(getOrderStorageKey(channelID, order.GetId()))
            if !errors.IsEmpty(err) {
                return errors.E(errors.Op("Get previous order"), err)
            }
            previousOrder := &pb.Order{}
            proto.Unmarshal(previousOrderData, previousOrder)
            if previousOrder.Nonce >= order.Nonce {
                return errors.E(errors.Op("Compare nonces"), "received order state is behind current status")
            }

            publickey, err := from.ExtractPublicKey()
            if !errors.IsEmpty(err) {
                return errors.E(errors.Op("Extract public key in Receive"), err)
            }

            isCreator, err := s.VerifyOrder(publickey, order)
            if !errors.IsEmpty(err) {
                return errors.E(errors.Op("Verify order creator in Receive"), err)
            }

            if isCreator {
                // Save order to LevelDB locally
                err = s.Storage.Put(getOrderStorageKey(channelID, order.GetId()), data)
                if !errors.IsEmpty(err) {
                    return errors.E(errors.Op("Store lock/unlock order"), err)
                }
            } else {
                s.Logger.Debug("Received delete request from someone that doesn't own the order")
            }

        }
    } else {
        s.Logger.Warn("Storage not registered with OrderService, not persisting Orders!")
    }

    return err
}

// GetOrder fetches a single order from the database
func (s *OrderService) GetOrder(ctx context.Context, in *pb.OrderSpecificRequest) (*pb.Order, error) {
    data, err := s.Storage.Get(getOrderStorageKey(in.GetChannelID(), in.GetOrderID()))
    if !errors.IsEmpty(err) {
        return nil, errors.E(errors.Op("Get order"), err)
    }
    order := &pb.Order{}
    proto.Unmarshal(data, order)
    return order, nil
}

// GetAllOrders fetches all orders from the database
func (s *OrderService) GetAllOrders(ctx context.Context, in *pb.Empty) (*pb.OrderList, error) {
    data, err := s.Storage.GetAllWithPrefix(string(interfaces.OrderPrefix))
    if !errors.IsEmpty(err) {
        return nil, errors.E(errors.Op("Get all orders"), err)
    }

    orders := make([]*pb.Order, 0)
    i := 0
    for _, value := range data {
        order := &pb.Order{}
        proto.Unmarshal([]byte(value), order)
        orders = append(orders, order)
        i++
    }

    OrderList := &pb.OrderList{Orders: orders}
    return OrderList, nil
}

// Delete removes the Order with the specified ID locally, and broadcasts the same request to all other nodes on the channel
func (s *OrderService) Delete(ctx context.Context, in *pb.OrderSpecificRequest) (*pb.Empty, error) {
    orderInBytes, err := s.Storage.Get(getOrderStorageKey(in.GetChannelID(), in.GetOrderID()))
    if !errors.IsEmpty(err) {
        return nil, errors.E(errors.Op("Delete order"), err)
    }

    order := &pb.Order{}
    err = proto.Unmarshal(orderInBytes, order)
    if !errors.IsEmpty(err) {
        return nil, errors.E(errors.Op("Unmarshal order proto in Delete"), err)
    }

    _, publickey, err := identity.GetIdentity(s.Storage)
    if !errors.IsEmpty(err) {
        return nil, errors.E(errors.Op("Get public key in Delete"), err)
    }

    isCreator, err := s.VerifyOrder(publickey, order)
    if !errors.IsEmpty(err) {
        return nil, errors.E(errors.Op("Verify the order"), err)
    }

    // Construct the message to send to other peers
    wireMessage := &pb.WireMessage{ChannelID: in.GetChannelID(), Operation: pb.Operation_DELETE, Data: orderInBytes}

    if s.P2p != nil {
        if isCreator {
            // Send the order creation by wire
            s.P2p.Send(wireMessage)
        }
    } else {
        s.Logger.Warn("P2p service not registered with OrderService, not publishing or receiving orders from the network!")
    }

    // Try to delete the Order from LevelDB with specified ID
    err = s.Storage.Delete(getOrderStorageKey(in.GetChannelID(), in.GetOrderID()))
    if !errors.IsEmpty(err) {
        return nil, errors.E(errors.Op("Delete order"), err)
    }

    return &pb.Empty{}, nil
}

// Lock locks the given Order if the Order is created by this node, broadcasts the lock to other nodes on the channel.
func (s *OrderService) Lock(ctx context.Context, in *pb.OrderSpecificRequest) (*pb.Empty, error) {

    orderInBytes, err := s.Storage.Get(getOrderStorageKey(in.GetChannelID(), in.GetOrderID()))
    if !errors.IsEmpty(err) {
        return nil, errors.E(errors.Op("Get order in Lock"), err)
    }

    order := &pb.Order{}
    err = proto.Unmarshal(orderInBytes, order)
    if !errors.IsEmpty(err) {
        return nil, errors.E(errors.Op("Unmarshal order proto in Lock"), err)
    }

    if order.State == pb.State_LOCKED {
        return nil, errors.E(errors.Op("Check state"), "Trying to lock something that is already locked")
    }

    _, publickey, err := identity.GetIdentity(s.Storage)
    if !errors.IsEmpty(err) {
        return nil, errors.E(errors.Op("Get public key in Lock"), err)
    }

    isCreator, err := s.VerifyOrder(publickey, order)
    if !errors.IsEmpty(err) {
        return nil, errors.E(errors.Op("Verify the order in Lock"), err)
    }

    order.State = pb.State_LOCKED
    order.Nonce++

    // Get order as bytes
    orderInBytes, err = proto.Marshal(order)
    if !errors.IsEmpty(err) {
        s.Logger.Warn(errors.E(errors.Op("Marshal order"), err))
    }

    // Construct the message to send to other peers
    wireMessage := &pb.WireMessage{ChannelID: in.GetChannelID(), Operation: pb.Operation_LOCK, Data: orderInBytes}

    if s.P2p != nil {
        if isCreator {
            // Send the order creation by wire
            s.P2p.Send(wireMessage)
        }
    } else {
        s.Logger.Warn("P2p service not registered with OrderService, not publishing or receiving orders from the network!")
    }

    // Save order to LevelDB locally
    err = s.Storage.Put(getOrderStorageKey(in.GetChannelID(), in.GetOrderID()), orderInBytes)
    if !errors.IsEmpty(err) {
        err = errors.E(errors.Op("Put order"), err)
    }

    return &pb.Empty{}, nil
}

// Unlock unlocks the given Order if it's created by this node, broadcasts the unlocking operation to other nodes on the channel.
func (s *OrderService) Unlock(ctx context.Context, in *pb.OrderSpecificRequest) (*pb.Empty, error) {

    orderInBytes, err := s.Storage.Get(getOrderStorageKey(in.GetChannelID(), in.GetOrderID()))
    if !errors.IsEmpty(err) {
        return nil, errors.E(errors.Op("Get order in Unlock"), err)
    }

    order := &pb.Order{}
    err = proto.Unmarshal(orderInBytes, order)
    if !errors.IsEmpty(err) {
        return nil, errors.E(errors.Op("Unmarshal order proto in Unlock"), err)
    }

    //Might cause problem
    if order.State == pb.State_OPEN {
        return nil, errors.E(errors.Op("Check state"), "Trying to unlock something that is already open")
    }

    _, publickey, err := identity.GetIdentity(s.Storage)
    if !errors.IsEmpty(err) {
        return nil, errors.E(errors.Op("Get public key in Unlock"), err)
    }

    isCreator, err := s.VerifyOrder(publickey, order)
    if !errors.IsEmpty(err) {
        return nil, errors.E(errors.Op("Verify the order in Unlock"), err)
    }

    order.State = pb.State_OPEN
    order.Nonce++

    // Get order as bytes
    orderInBytes, err = proto.Marshal(order)
    if !errors.IsEmpty(err) {
        s.Logger.Warn(errors.E(errors.Op("Marshal order"), err))
    }

    // Construct the message to send to other peers
    wireMessage := &pb.WireMessage{ChannelID: in.GetChannelID(), Operation: pb.Operation_UNLOCK, Data: orderInBytes}

    if s.P2p != nil {
        if isCreator {
            // Send the order creation by wire
            s.P2p.Send(wireMessage)
        }
    } else {
        s.Logger.Warn("P2p service not registered with OrderService, not publishing or receiving orders from the network!")
    }

    // Save order to LevelDB locally
    err = s.Storage.Put(getOrderStorageKey(in.GetChannelID(), in.GetOrderID()), orderInBytes)
    if !errors.IsEmpty(err) {
        err = errors.E(errors.Op("Put order"), err)
    }

    return &pb.Empty{}, nil
}