oceanprotocol/provider

View on GitHub
ocean_provider/routes/consume.py

Summary

Maintainability
C
1 day
Test Coverage
A
93%
#
# Copyright 2023 Ocean Protocol Foundation
# SPDX-License-Identifier: Apache-2.0
#
import json
import logging
from _decimal import Decimal

from flask import jsonify, request
from flask_sieve import validate

from ocean_provider.constants import BaseURLs
from ocean_provider.file_types.file_types_factory import FilesTypeFactory
from ocean_provider.requests_session import get_requests_session
from ocean_provider.routes import services
from ocean_provider.user_nonce import get_nonce, update_nonce
from ocean_provider.utils.asset import (
    check_asset_consumable,
    get_asset_from_metadatastore,
)
from ocean_provider.utils.basics import (
    get_metadata_url,
    get_provider_wallet,
    get_web3,
    get_network_name,
)
from ocean_provider.utils.datatoken import validate_order
from ocean_provider.utils.error_responses import error_response
from ocean_provider.utils.proof import send_proof
from ocean_provider.utils.provider_fees import get_c2d_environments, get_provider_fees
from ocean_provider.utils.services import ServiceType
from ocean_provider.utils.util import get_request_data, get_service_files_list
from ocean_provider.validation.provider_requests import (
    DownloadRequest,
    FileInfoRequest,
    InitializeRequest,
    NonceRequest,
)
from web3.main import Web3

requests_session = get_requests_session()

logger = logging.getLogger(__name__)

standard_headers = {"Content-type": "application/json", "Connection": "close"}


@services.route("/nonce", methods=["GET"])
@validate(NonceRequest)
def nonce():
    """Returns a decimal `nonce` for the given account address.

    ---
    tags:
      - consume
    consumes:
      - application/json
    parameters:
      - name: userAddress
        description: The address of the account
        required: true
        type: string
    responses:
      200:
        description: nonce returned
      405:
        description: Method rejected.

    return: nonce for user address
    """
    if request.method.upper() in BaseURLs.NOT_ALLOWED_METHODS:
        return error_response("Method Not Allowed", 405, logger)

    data = get_request_data(request)
    logger.info(f"nonce endpoint called with data: {data}")
    address = data.get("userAddress")
    nonce = get_nonce(address)

    if not nonce:
        new_nonce = 1
        update_nonce(address, new_nonce)
        nonce = get_nonce(address)
        assert Decimal(nonce) == Decimal(
            new_nonce
        ), "New nonce could not be stored correctly."

    logger.info(f"nonce for user {address} is {nonce}")

    response = jsonify(nonce=Decimal(nonce)), 200
    logger.info(f"nonce response = {response}")

    return response


@services.route("/fileinfo", methods=["POST"])
@validate(FileInfoRequest)
def fileinfo():
    """Retrieves Content-Type and Content-Length from the given URL or asset. Supports a payload of either url or did.
    This can be used by the publisher of an asset to check basic information
    about the URL(s). For now, this information consists of the Content-Type
    and Content-Length of the request, using primarily OPTIONS, with fallback
    to GET. In the future, we will add a hash to make sure that the file was
    not tampered with at consumption time.

    ---
    tags:
      - consume

    responses:
      200:
        description: the URL(s) could be analysed (returns the result).
      400:
        description: the URL(s) could not be analysed (bad request).
      503:
        description: Service Unavailable.

    return: list of file info (index, valid, contentLength, contentType)
    """
    data = get_request_data(request)
    logger.info(f"fileinfo called. arguments = {data}")
    did = data.get("did")
    service_id = data.get("serviceId")

    if did:
        asset = get_asset_from_metadatastore(get_metadata_url(), did)
        if not asset:
            return error_response(
                "Cannot resolve DID",
                400,
                logger,
            )

        network_name = get_network_name(asset.chain_id)
        logger.info(
            f"Provider {network_name}: Retrieved asset {did} from Metadata store."
        )
        service = asset.get_service_by_id(service_id)
        if not service:
            return error_response(
                f"Provider {network_name}: Invalid serviceId.",
                400,
                logger,
            )
        provider_wallet = get_provider_wallet(asset.chain_id)
        files_list = get_service_files_list(service, provider_wallet, asset)
        if not files_list:
            return error_response(
                f"Provider {network_name}: Unable to get dataset files", 400, logger
            )
    else:
        files_list = [data]

    with_checksum = data.get("checksum", False)

    files_info = []
    for i, file in enumerate(files_list):
        file["userdata"] = data.get("userdata")
        logger.debug(f"Validating :{file}")
        valid, message = FilesTypeFactory.validate_and_create(file)
        if not valid:
            return error_response(message, 400, logger)

        file_instance = message
        valid, details = file_instance.check_details(with_checksum=with_checksum)
        info = {"index": i, "valid": valid, "type": file["type"]}
        info.update(details)
        files_info.append(info)

    response = jsonify(files_info), 200
    logger.info(f"fileinfo response = {response}")

    return response


@services.route("/initialize", methods=["GET"])
@validate(InitializeRequest)
def initialize():
    """Initialize a service access request.
    In order to consume a data service the user is required to send
    one datatoken to the provider.

    The datatoken is transferred via the ethereum blockchain network
    by requesting the user to sign an ERC20 approval transaction
    where the approval is given to the provider's ethereum account for
    the number of tokens required by the service.

    tags:
      - consume
    responses:
      400:
        description: One or more of the required attributes are missing or invalid.
      503:
        description: Service Unavailable.

    return:
        json object as follows:
        ```JSON
        {
            "datatoken": <data-token-contract-address>,
            "nonce": <nonce-used-in-consumer-signature>,
            "providerFee": <object containing provider fees>,
            "computeAddress": <compute address>,
            "transferTxId": <optional tx_id just to check an existing order>
        }
        ```
    """
    data = get_request_data(request)
    logger.info(f"initialize endpoint called. arguments = {data}")

    did = data.get("documentId")
    consumer_address = data.get("consumerAddress")

    asset = get_asset_from_metadatastore(get_metadata_url(), did)
    if not asset:
        return error_response(
            "Cannot resolve DID",
            400,
            logger,
        )
    network_name = get_network_name(asset.chain_id)
    logger.info(f"Provider {network_name}: Retrieved asset {did} from Metadata store.")

    consumable, message = check_asset_consumable(asset, consumer_address, logger)
    if not consumable:
        return error_response(f"Provider {network_name}: " + message, 400, logger)

    service_id = data.get("serviceId")
    service = asset.get_service_by_id(service_id)
    if not service:
        return error_response(
            f"Provider {network_name}: Invalid serviceId.",
            400,
            logger,
        )
    if service.type == "compute":
        return error_response(
            f"Provider {network_name}: Use the initializeCompute endpoint to initialize compute jobs.",
            400,
            logger,
        )

    valid_order = None
    if "transferTxId" in data:
        try:
            _tx, _order_log, _, _ = validate_order(
                get_web3(asset.chain_id),
                consumer_address,
                data["transferTxId"],
                asset,
                service,
                allow_expired_provider_fees=True,
            )
            return {"validOrder": _order_log.transactionHash.hex()}, 200
        except Exception as e:
            logger.error(
                f"Provider {network_name}: Received error when validation order: {e}"
            )

    token_address = service.datatoken_address

    file_index = int(data.get("fileIndex", "-1"))
    # we check if the file is valid only if we have fileIndex
    if file_index > -1:
        provider_wallet = get_provider_wallet(asset.chain_id)
        url_object = get_service_files_list(service, provider_wallet, asset)[file_index]
        url_object["userdata"] = data.get("userdata")
        valid, message = FilesTypeFactory.validate_and_create(url_object)
        if not valid:
            return error_response(f"Provider {network_name}: " + message, 400, logger)

        file_instance = message
        valid, url_details = file_instance.check_details(with_checksum=False)
        if not valid or not url_details:
            return error_response(
                f"Provider {network_name}: Asset URL not found, not available or invalid. \n"
                f"Payload was: {data}",
                400,
                logger,
            )

    # Prepare the `transfer` tokens transaction with the appropriate number
    # of tokens required for this service
    # The consumer must sign and execute this transaction in order to be
    # able to consume the service
    provider_fee = get_provider_fees(asset, service, consumer_address, 0)
    if provider_fee:
        provider_fee["providerFeeAmount"] = str(provider_fee["providerFeeAmount"])
    approve_params = {
        "datatoken": token_address,
        "nonce": get_nonce(consumer_address),
        "providerFee": provider_fee,
    }

    if valid_order:
        approve_params["validOrder"] = valid_order

    response = jsonify(approve_params), 200
    logger.info(f"Provider {network_name}: initialize response = {response}")

    return response


@services.route("/download", methods=["GET"])
@validate(DownloadRequest)
def download():
    """Allows download of asset data file.

    ---
    tags:
      - consume
    consumes:
      - application/json
    parameters:
      - name: consumerAddress
        in: query
        description: The consumer address.
        required: true
        type: string
      - name: documentId
        in: query
        description: The ID of the asset/document (the DID).
        required: true
        type: string
      - name: signature
        in: query
        description: Signature of the documentId to verify that the consumer has rights to download the asset.
      - name: index
        in: query
        description: Index of the file in the array of files.
    responses:
      200:
        description: Redirect to valid asset url.
      400:
        description: One or more of the required attributes are missing or invalid.
      401:
        description: Invalid asset data.
      405:
        description: Method rejected.
      503:
        description: Service Unavailable
    """
    if request.method.upper() in BaseURLs.NOT_ALLOWED_METHODS:
        return error_response("Method Not Allowed", 405, logger)

    data = get_request_data(request)
    logger.info(f"download called. arguments = {data}")

    did = data.get("documentId")
    consumer_address = data.get("consumerAddress")
    service_id = data.get("serviceId")
    tx_id = data.get("transferTxId")

    # grab asset for did from the metadatastore associated with
    # the datatoken address
    asset = get_asset_from_metadatastore(get_metadata_url(), did)
    if not asset:
        return error_response(
            "Cannot resolve DID",
            400,
            logger,
        )
    network_name = get_network_name(asset.chain_id)
    logger.info(f"Provider {network_name}: Retrieved asset {did} from Metadata store.")

    consumable, message = check_asset_consumable(asset, consumer_address, logger)
    if not consumable:
        return error_response(f"Provider {network_name}: " + message, 400, logger)

    service = asset.get_service_by_id(service_id)

    if service.type != ServiceType.ACCESS:
        # allow our C2D to download a compute asset
        c2d_environments = get_c2d_environments(flat=True)

        is_c2d_consumer_address = bool(
            [
                True
                for env in c2d_environments
                if Web3.toChecksumAddress(env["consumerAddress"])
                == Web3.toChecksumAddress(consumer_address)
            ]
        )

        if not is_c2d_consumer_address:
            return error_response(
                f"Provider {network_name}: Service with index={service_id} is not an access service.",
                400,
                logger,
            )

    logger.info(
        f"Provider {network_name}: validate_order called from download endpoint."
    )

    try:
        _tx, _order_log, _, _ = validate_order(
            get_web3(asset.chain_id), consumer_address, tx_id, asset, service
        )
    except Exception as e:
        return error_response(
            f"Provider {network_name}: Order with tx_id {tx_id} could not be validated due to error: {e}",
            400,
            logger,
        )

    file_index = int(data.get("fileIndex"))
    provider_wallet = get_provider_wallet(asset.chain_id)
    files_list = get_service_files_list(service, provider_wallet, asset)
    if file_index > len(files_list):
        return error_response(
            f"Provider {network_name}: No such fileIndex {file_index}", 400, logger
        )
    url_object = files_list[file_index]
    url_object["userdata"] = data.get("userdata")
    url_valid, message = FilesTypeFactory.validate_and_create(url_object)

    if not url_valid:
        return error_response(f"Provider {network_name}: " + message, 400, logger)

    file_instance = message
    valid, details = file_instance.check_details(with_checksum=True)

    if not valid:
        return error_response(f"Provider {network_name}: " + details, 400, logger)

    logger.debug(
        f"Provider {network_name}: Done processing consume request for asset {did}, "
        f" url {file_instance.get_download_url()}"
    )
    update_nonce(consumer_address, data.get("nonce"))

    response = file_instance.build_download_response(request)
    logger.info(f"Provider {network_name}: download response = {response}")

    provider_proof_data = json.dumps(
        {
            "documentId": did,
            "serviceId": service_id,
            "fileIndex": file_index,
            "downloadedBytes": 0,  # TODO
        },
        separators=(",", ":"),
    )

    consumer_data = f'{did}{data.get("nonce")}'

    send_proof(
        asset.chain_id,
        order_tx_id=_tx.hash,
        provider_data=provider_proof_data,
        consumer_data=consumer_data,
        consumer_signature=data.get("signature"),
        consumer_address=consumer_address,
        datatoken_address=service.datatoken_address,
    )
    logger.info(f"Provider {network_name}: proof approved")

    return response