Vizzuality/landgriffon

View on GitHub
data/h3_data_importer/utils.py

Summary

Maintainability
A
50 mins
Test Coverage
import json
import logging
import os
from pathlib import Path
from re import sub

import jsonschema
import pandas as pd
import psycopg
from jsonschema import ValidationError
from psycopg2 import sql
from psycopg2.extensions import connection

log = logging.getLogger(__name__)  # here we can use __name__ because it is an imported module

DTYPES_TO_PG = {
    "boolean": "bool",
    "uint8": "smallint",
    "uint16": "int",
    "uint32": "bigint",
    "uint64": "bigint",
    "int8": "smallint",
    "int16": "smallint",
    "int32": "int",
    "int64": "bigint",
    "float32": "real",
    "float64": "double precision",
}


def slugify(s):
    # TODO: IS THIS NECESSARY? FIND A PACKAGE THAT DOES IT
    s = sub(r"[_-]+", " ", s).title().replace(" ", "")
    return "".join([s[0].lower(), s[1:]])


def snakify(s):
    return sub(r"(?<!^)(?=[A-Z])", "_", s).lower()


def get_contextual_layer_category_enum(conn: connection) -> set:
    """Get the enum of contextual layer categories"""

    with conn.cursor() as cursor:
        cursor.execute("SELECT unnest(enum_range(NULL::contextual_layer_category));")
        values = set(r[0] for r in cursor.fetchall())
    return values


def insert_to_h3_data_and_contextual_layer_tables(
    table: str,
    column: str,
    h3_res: int,
    dataset: str,
    category: str,
    year: int,
    connection: connection,
):
    categories_enum = get_contextual_layer_category_enum(connection)
    if category not in categories_enum:
        log.error(f"Category '{category}' not supported. Supported categories: {categories_enum}")
        return

    with connection:
        with connection.cursor() as cursor:
            # remove existing entries
            cursor.execute('DELETE FROM "h3_data" WHERE "h3tableName" = (%s)', (table,))
            cursor.execute('DELETE FROM "contextual_layer" WHERE "name" = (%s)', (dataset,))

            # insert new entries
            log.info("Inserting record into h3_data table...")
            h3_data_query = sql.SQL(
                """
                INSERT INTO "h3_data" ("h3tableName", "h3columnName", "h3resolution", "year")
                VALUES ({table}, {column}, {h3_res}, {year})
                """
            ).format(
                table=sql.Literal(table),
                column=sql.Literal(column),
                h3_res=sql.Literal(h3_res),
                year=sql.Literal(year),
            )
            cursor.execute(h3_data_query)

            log.info("Inserting record into contextual_layer table...")
            metadata = json.dumps(get_metadata(table))
            insert_query = sql.SQL(
                """
                INSERT INTO "contextual_layer" ("name", "metadata", "category")
                VALUES ({dataset}, {metadata}, {category})
                RETURNING id;
            """
            ).format(dataset=sql.Literal(dataset), metadata=sql.Literal(metadata), category=sql.Literal(category))
            cursor.execute(insert_query)
            contextual_data_id = cursor.fetchall()[0][0]
            update_query = sql.SQL(
                """
                UPDATE "h3_data" SET "contextualLayerId" = {contextual_data_id}
                WHERE "h3tableName" = {table};
            """
            ).format(contextual_data_id=sql.Literal(contextual_data_id), table=sql.Literal(table))
            cursor.execute(update_query)


def get_metadata(table: str) -> dict:
    """Returns the metadata for the given table"""
    metadata_base_path = Path(__file__).parent / "contextual_layers_metadata"
    # load the json schema
    with open(metadata_base_path / "contextual_metadata_schema.json") as f:
        schema = json.load(f)

    metadata_path = metadata_base_path / f"{table}_metadata.json"

    if not metadata_path.exists():
        log.error(f"No metadata found for {table} with the name {metadata_path.name}")
        # todo: should we raise exception or return empty metadata and keep going?
        raise FileNotFoundError(f"Metadata file {metadata_path.name} not found")

    with open(metadata_path, "r") as f:
        metadata = json.load(f)
        try:
            jsonschema.validate(metadata, schema)
        except ValidationError as e:
            log.error(f"Metadata for {table} is not valid: {e}")
            # todo: should we raise exception or return empty metadata and keep going?
            raise e
        return metadata


def link_to_indicator_table(connection: connection, indicator_code: str, h3_table_name: str, h3_column_name: str):
    """Gets indicatorID and links it to the h3table corresponding entry"""
    with connection:
        with connection.cursor() as cursor:
            cursor.execute(f"""select id from "indicator" where "nameCode" = '{indicator_code}'""")
            indicator_id = cursor.fetchall()[0][0]
            if indicator_id:
                cursor.execute(
                    f"""update "h3_data"  set "indicatorId" = '{indicator_id}'
                    where "h3tableName" = '{h3_table_name}' and "h3columnName" = '{h3_column_name}'"""
                )
                log.info(f"Updated indicatorId '{indicator_id}' in h3_data for {h3_table_name}.{h3_column_name}")
            else:
                log.error(f"Indicator with name code {indicator_code} does not exist")


def get_connection_info() -> str:
    """Returns a connection info string for psycopg based on env variables"""
    return psycopg.conninfo.make_conninfo(
        host=os.getenv("API_POSTGRES_HOST"),
        port=os.getenv("API_POSTGRES_PORT"),
        user=os.getenv("API_POSTGRES_USERNAME"),
        password=os.getenv("API_POSTGRES_PASSWORD"),
    )


def h3_table_schema(df: pd.DataFrame) -> sql.Composable:
    """Construct an SQL schema for an H3 table from a pandas DataFrame
    TODO: make this func used everywhere and carefull with psycpg version in the future.
    Examples:
        >>> schema = h3_table_schema(df)
        >>> sql.SQL("CREATE TABLE {} ({})").format(sql.Identifier(table), schema)
    """
    index = [sql.SQL("h3index h3index PRIMARY KEY")]
    extra = [
        sql.SQL("{} {}").format(sql.Identifier(col), sql.SQL(DTYPES_TO_PG[str(dtype)]))
        for col, dtype in zip(df.columns, df.dtypes)
    ]
    schema = sql.SQL(", ").join(index + extra)
    return schema