Vizzuality/landgriffon

View on GitHub
data/preprocessing/livestock_processed/preprocess_faostats_ha_prod.py

Summary

Maintainability
D
2 days
Test Coverage
import os
import logging
import argparse

import psycopg2
import pandas as pd
import geopandas as gpd

logging.basicConfig(level=logging.INFO)
log = logging.getLogger("preprocessing_processed_livestock_faostats_file")


# Define the function to get the country geometry from gadm database
def get_country_geometry():
    """
    Get the geometry and the iso3 code from the database
    This geometry is used to spatalised the faostat data
    """
    # Connect to the database

    conn = psycopg2.connect(
        host=os.getenv("API_POSTGRES_HOST"),
        port=os.getenv("API_POSTGRES_PORT"),
        database=os.getenv("API_POSTGRES_DATABASE"),
        user=os.getenv("API_POSTGRES_USERNAME"),
        password=os.getenv("API_POSTGRES_PASSWORD"),
    )

    # Get the countries geometries
    countries_df = gpd.read_postgis(
        """SELECT ar."isoA3", gr."theGeom"
                                    FROM admin_region ar
                                    INNER JOIN geo_region gr ON gr.id = ar."geoRegionId"
                                    WHERE ar."level" = 0
                                    """,
        conn,
        geom_col="theGeom",
    )

    return countries_df


def open_clean(file, data_type):
    # Define the units available for conversion
    units_to_tonnes = {"100 g/An": 100000, "100 mg/An": 1000000, "LSU/ha": 1}

    # Open the file
    df = pd.read_csv(file)

    # Check unique units from the dataframe
    unit_list = list(df["Unit"].unique())

    # Check if there is a single unit and it exists in the conversion dictionary
    if len(unit_list) == 1 and unit_list[0] in units_to_tonnes:
        unit = unit_list[0]
        conversion = units_to_tonnes[unit]
    else:
        # Check which units from the dataframe exist in the conversion dictionary
        valid_units = [unit for unit in unit_list if unit in units_to_tonnes]
        if len(valid_units) == 1:
            unit = valid_units[0]
            conversion = units_to_tonnes[unit]
        else:
            log.info("Multiple valid units or conversions found.")
            return None

    # Clean file and get just the specified unit value
    df = df.loc[df["Unit"] == unit, ["Area Code (ISO3)", "Year", "Unit", "Value"]]

    # Convert the 100mg /Animal to T/Animal
    df["Value"] = df["Value"] / conversion
    if data_type == "production":
        df["Unit"] = "T/An"
    elif data_type == "harvest":
        df["Unit"] = "LSU/ha"
    else:
        log.warning("Invalid data type specified.")
        return None

    return df


# Define the fuction to merge the FAOSTAT data with the country geometry
def merge_faostat_country(df, countries_df, output_file):
    # Merge the data with the country geometry
    df = df.merge(countries_df, left_on="Area Code (ISO3)", right_on="isoA3")
    # Set geoeometry and crs
    df = df.set_geometry("theGeom")
    df = df.set_crs("EPSG:4326")

    # Save to file
    df.to_file(output_file)

    return df


def process_faostats_data(input_file, output_file, data_type):
    # Get the country geometry
    countries_df = get_country_geometry()

    # Open and clean the file
    df = open_clean(input_file, data_type)

    # create a preprocess directory if it does not exist
    if not os.path.exists("./data/faostats_processed") and data_type == "production":
        os.makedirs("./data/faostats_processed/production")
    elif not os.path.exists("./data/faostats_processed") and data_type == "harvest":
        os.makedirs("./data/faostats_processed/harvest")

    # Merge the data with the country geometry
    df = merge_faostat_country(df, countries_df, output_file)

    return df


def main():
    # Parse command-line arguments
    parser = argparse.ArgumentParser(description="Process livestock preprocessed faostats data.")
    parser.add_argument("input_file", type=str, help="Path to the input file containing vector files")
    parser.add_argument("output_file", type=str, help="Path to the output file to save processed data")
    parser.add_argument("data_type", type=str, help="Type of data to process")
    args = parser.parse_args()

    # Process the specified folder
    process_faostats_data(args.input_file, args.output_file, args.data_type)


if __name__ == "__main__":
    main()