oceanprotocol/aquarius

View on GitHub
aquarius/app/assets.py

Summary

Maintainability
C
1 day
Test Coverage
A
95%
#
# Copyright 2023 Ocean Protocol Foundation
# SPDX-License-Identifier: Apache-2.0
#
import copy
import elasticsearch
from flask import Blueprint, jsonify, request
from datetime import timedelta
import json
import logging
import os

from aquarius.app.es_instance import ElasticsearchInstance
from aquarius.app.util import (
    sanitize_record,
    sanitize_query_result,
    get_signature_vrs,
)
from aquarius.ddo_checker.shacl_checker import validate_dict
from aquarius.log import setup_logging
from aquarius.myapp import app
from aquarius.events.purgatory import Purgatory
from aquarius.retry_mechanism import RetryMechanism
from aquarius.rbac import RBAC


setup_logging()
assets = Blueprint("assets", __name__)

logger = logging.getLogger("aquarius")
es_instance = ElasticsearchInstance()


@assets.route("/ddo/<did>", methods=["GET"])
def get_ddo(did):
    """Get DDO of a particular asset.
    ---
    tags:
      - ddo
    parameters:
      - name: did
        in: path
        description: DID of the asset.
        required: true
        type: string
    responses:
      200:
        description: On successful operation returns DDO information.
        example:
          application/json: {
              "@context": "https://w3id.org/did/v1",
              "id": "did:op:00018b5b84eA05930f9D0dB8FFbb3B93EF86983b",
              "publicKey": [
                  {
                      "id": "did:op:00018b5b84eA05930f9D0dB8FFbb3B93EF86983b",
                      "type": "EthereumECDSAKey",
                      "owner": "0x8aa92201E19E4930d4D25c0f7a245c1dCdD5A242"
                  }
              ],
              "authentication": [
                  {
                      "type": "RsaSignatureAuthentication2018",
                      "publicKey": "did:op:00018b5b84eA05930f9D0dB8FFbb3B93EF86983b"
                  }
              ],
              "service": [
                  {
                      "type": "metadata",
                      "attributes": {
                          "main": {
                              "type": "dataset",
                              "name": "Nu nl",
                              "dateCreated": "2021-04-02T17:59:32Z",
                              "author": "ab",
                              "license": "MIT",
                              "files": [
                                  {
                                      "index": 0,
                                      "contentType": "application/json"
                                  }
                              ],
                              "datePublished": "2021-04-20T22:56:01Z"
                          },
                          "encryptedFiles": "0x047c992274f3fa2bf9c5cc57d0e0852f7b3ec22d7ab4e798e3e73e77e7f97"
                      },
                      "index": 0
                  },
                  {
                      "type": "access",
                      "index": 1,
                      "serviceEndpoint": "https://provider.datatunnel.allianceblock.io",
                      "attributes": {
                          "main": {
                              "creator": "0x8aa92201E19E4930d4D25c0f7a245c1dCdD5A242",
                              "datePublished": "2021-04-02T17:57:57Z",
                              "cost": "1",
                              "timeout": 2592000000,
                              "name": "dataAssetAccess"
                          }
                      }
                  }
              ],
              "datatoken": "0x00018b5b84eA05930f9D0dB8FFbb3B93EF86983b",
              "created": "2021-04-02T18:00:01Z",
              "proof": {
                  "created": "2021-04-02T17:59:33Z",
                  "creator": "0x8aa92201E19E4930d4D25c0f7a245c1dCdD5A242",
                  "type": "AddressHash",
                  "signatureValue": "0xd23d2f28fcf152347e5b5f1064422ba0288dd608f0ea6cf433a3717fb735a92d"
              },
              "datatokenInfo": {
                  "address": "0x00018b5b84eA05930f9D0dB8FFbb3B93EF86983b",
                  "name": "Parsimonious Plankton Token",
                  "symbol": "PARPLA-59",
                  "decimals": 18,
                  "totalSupply": 100.0,
                  "cap": 1000.0,
                  "minter": "0x8aa92201E19E4930d4D25c0f7a245c1dCdD5A242",
                  "minterBalance": 99.999
              },
              "updated": "2021-04-02T18:00:01Z",
              "price": {
                  "datatoken": 0.0,
                  "ocean": 0.0,
                  "value": 0.0,
                  "type": "",
                  "exchange_id": "",
                  "address": "",
                  "pools": [],
                  "isConsumable": ""
              },
              "isInPurgatory": "false"
            }
        content:
          application/json:
            schema:
              type: object
      404:
        description: This asset DID is not in ES.
    """
    try:
        asset_record = es_instance.get(did)
        response = app.response_class(
            response=sanitize_record(asset_record),
            status=200,
            mimetype="application/json",
        )
        return response
    except elasticsearch.exceptions.NotFoundError:
        return jsonify(error=f"Asset DID {did} not found in Elasticsearch."), 404
    except Exception as e:
        return (
            jsonify(
                error=f"Error encountered while searching the asset DID {did}: {str(e)}."
            ),
            404,
        )


@assets.route("/metadata/<did>", methods=["GET"])
def get_metadata(did):
    """Get metadata of a particular asset
    ---
    tags:
      - metadata
    parameters:
      - name: did
        in: path
        description: DID of the asset.
        required: true
        type: string
    responses:
      200:
        description: successful operation.
        example:
          application/json: {
            "main": {
                "type": "dataset",
                "name": "Nu nl",
                "dateCreated": "2021-04-02T17:59:32Z",
                "author": "ab",
                "license": "MIT",
                "files": [
                    {
                        "index": 0,
                        "contentType": "application/json"
                    }
                ],
                "datePublished": "2021-04-20T22:56:01Z"
            },
            "encryptedFiles": "0x047c992274f3fa2bf9c5cc57d0e0852f7b3ec22d7ab4e798e3e73e77e7f971ff04896129c9f58deac"
          }
      404:
        description: This asset DID is not in ES.
    """
    try:
        asset_record = es_instance.get(did)
        response = app.response_class(
            response=sanitize_record(asset_record["metadata"]),
            status=200,
            mimetype="application/json",
        )
        return response
    except Exception as e:
        logger.error(f"get_metadata: {str(e)}")
        return (
            jsonify(error=f"Error encountered while retrieving metadata: {str(e)}."),
            404,
        )


@assets.route("/names", methods=["POST"])
def get_assets_names():
    """Get names of assets as specified in the payload
    ---
    tags:
      - name
    consumes:
      - application/json
    parameters:
      - in: body
        name: body
        required: true
        description: list of asset DIDs
        schema:
          type: object
          properties:
            didList:
              type: list
              description: list of dids
              example: ["did:op:123455644356", "did:op:533443322223344"]
    responses:
      200:
        description: successful operation.
        example:
          application/json: {
            "did:op:03115a5Dc5fC8Ff8DA0270E61F87EEB3ed2b3798": "SVM Classifier v2.0",
            "did:op:01738AA29Ce1D4028C0719F7A0fd497a1BFBe918": "Wine dataset 1.0"
          }
      404:
        description: assets not found
    """
    data = request.args if request.args else request.json
    if not isinstance(data, dict):
        return (
            jsonify(
                error="Invalid payload. The request could not be converted into a dict."
            ),
            400,
        )

    if "didList" not in data:
        return jsonify(error="`didList` is required in the request payload."), 400

    did_list = data.get("didList", [])
    if not did_list:
        return jsonify(error="The requested didList can not be empty."), 400
    if not isinstance(did_list, list):
        return jsonify(error="The didList must be a list."), 400

    names = dict()
    for did in did_list:
        try:
            asset_record = es_instance.get(did)
            names[did] = asset_record["metadata"]["name"]
        except Exception:
            names[did] = ""

    return jsonify(names)


@assets.route("/query", methods=["POST"])
def query_ddo():
    """Runs a native ES query.
    ---
    tags:
      - ddo
    consumes:
      - application/json
    responses:
      200:
        description: successful action
      500:
        description: elasticsearch exception
    """
    data = request.json
    if not isinstance(request.json, dict):
        return (
            jsonify(
                error="Invalid payload. The request could not be converted into a dict."
            ),
            400,
        )

    try:
        args = copy.deepcopy(data)
        if "from" in args.keys():
            args["from_"] = args.pop("from")
        result = es_instance.es.search(**args)
        return jsonify(sanitize_query_result(result.body))
    except elasticsearch.exceptions.TransportError as e:
        error = e.message
        logger.info(f"Received elasticsearch TransportError: {error}.")
        return (jsonify(error=error), 400)
    except Exception as e:
        logger.error(f"Received elasticsearch Error: {str(e)}.")
        return jsonify(error=f"Encountered Elasticsearch Exception: {str(e)}"), 500


@assets.route("/ddo/validate", methods=["POST"])
def validate_remote():
    """Validate DDO content.
    ---
    tags:
      - ddo
    consumes:
      - application/octet-stream
    parameters:
      - in: body
        name: body
        required: true
        description: Asset DDO.
        schema:
          type: object
    responses:
      200:
        description: successfully request.
      400:
        description: Invalid DDO format
      500:
        description: Error
    """
    if request.content_type != "application/octet-stream":
        return (
            jsonify(
                error="Invalid request content type: should be application/octet-stream"
            ),
            400,
        )

    raw = request.get_data()

    try:
        try:
            data = json.loads(raw.decode("utf-8"))
        except json.decoder.JSONDecodeError:
            return (
                jsonify(
                    error="Invalid payload. The request could not be converted into a dict."
                ),
                400,
            )

        version = data.get("version", None)

        if os.getenv("RBAC_SERVER_URL"):
            valid = RBAC.validate_ddo_rbac(data)

            if not valid:
                return (
                    jsonify(error="DDO marked invalid by the RBAC server."),
                    400,
                )

        version = data.get("version", None)
        if not version:
            return (jsonify([{"message": "no version provided for DDO."}]), 400)

        valid, errors = validate_dict(
            data, data.get("chainId", ""), data.get("nftAddress", "")
        )

        if valid:
            return jsonify(get_signature_vrs(raw))

        return (jsonify(errors=errors), 400)
    except Exception as e:
        logger.error(f"validate_remote failed: {str(e)}.")
        return jsonify(error=f"Encountered error when validating asset: {str(e)}."), 500


@assets.route("/triggerCaching", methods=["POST"])
def trigger_caching():
    """Triggers manual caching of a specific transaction (MetadataCreated or MetadataUpdated event)
    ---
    tags:
      - name
    consumes:
      - application/json
    parameters:
      - in: body
        name: body
        required: true
        description: JSON object containing transaction details
        schema:
          type: object
          properties:
            transactionId:
              type: string
              description: transaction id containing MetadataCreated or MetadataUpdated event
              example: "0xaabbccdd"
            chainId:
              type: int
              description: chain Id id of MetadataCreated or MetadataUpdated event
              example: 8996
            logIndex:
              type: int
              required: false
              description: log index for the event in the transaction
              example: 1
    responses:
      200:
        description: successful operation.
      400:
        description: bad request. Log index not found or event not found.
      500:
        description: server error/exception
    """
    try:
        data = request.args if request.args else request.json
        tx_id = data.get("transactionId")
        chain_id = data.get("chain_id")
        if not tx_id or not chain_id:
            return (
                jsonify(error="Invalid transactionId or chain_id"),
                400,
            )
        log_index = int(data.get("logIndex", 0))

        es_instance = ElasticsearchInstance()
        retries_db_index = f"{es_instance.db_index}_retries"
        purgatory = (
            Purgatory(es_instance)
            if (os.getenv("ASSET_PURGATORY_URL") or os.getenv("ACCOUNT_PURGATORY_URL"))
            else None
        )

        retry_mechanism = RetryMechanism(
            es_instance, retries_db_index, purgatory, chain_id, None
        )
        retry_mechanism.retry_interval = timedelta(seconds=1)
        retry_mechanism.add_tx_to_retry_queue(tx_id, log_index)
        response = app.response_class(
            response="Queued",
            status=200,
            mimetype="application/json",
        )
        return response
    except Exception as e:
        logger.error(f"trigger_caching failed: {str(e)}.")
        return (
            jsonify(error=f"Encountered error when triggering caching: {str(e)}."),
            500,
        )