migraf/fhir-kindling

View on GitHub
fhir_kindling/privacy/k_anonymity.py

Summary

Maintainability
A
2 hrs
Test Coverage
from typing import List

import pandas as pd
from pandas.api.types import is_categorical_dtype, is_numeric_dtype, is_string_dtype
from pandas.api.types import is_datetime64_any_dtype as is_datetime


def is_k_anonymized(
    df: pd.DataFrame,
    k: int = 3,
    id_cols: List[str] = None,
    excluded_cols: List[str] = None,
) -> bool:
    """

    Checks if a dataframe satisfies k-anonymity for the given k. If id_cols is given only these columns are checked
    :param df: pandas dataframe to check
    :param k: k value to check for
    :param id_cols: optional list of columns to check for k-anonymity
    :param excluded_cols: optional list of columns to exclude from the k-anonymity check
    :return: bool value indicating whether the dataframe satisfies k-anonymity
    """

    for index, row in df.iterrows():
        query = _make_k_anon_query_for_row(
            row, df, excluded_cols=excluded_cols, id_cols=id_cols
        )
        rows = df.query(query)
        if rows.shape[0] < k:
            return False
    return True


def anonymize(df: pd.DataFrame, k: int = 3, id_cols: List[str] = None) -> pd.DataFrame:
    """
    Attempts to generalize the given dataframe to make it k-anonymized

    :param df: dataframe to check
    :param k: k value to check for
    :param id_cols: optional parameter specifying a subset of columns in the dataframe to generalize
    :return: anonymized dataframe
    """
    anon_df = df.copy()
    # If id cols are given anonymize those otherwise use all columns
    for col in id_cols if id_cols else df.columns:
        if is_datetime(df[col]):
            print("Datetime column detected")
            anon_df[col] = generalize_datetime_column(df[col])

        elif is_numeric_dtype(df[col]):
            print("Numeric column detected")
            anon_df[col] = generalize_numeric_column(df[col])

        elif is_string_dtype(df[col]) or is_categorical_dtype(df[col]):
            print("String column detected")
            # TODO categorical/string variable handling
            anon_df[col] = df[col]

        else:
            print("Unknown column type:", df[col].dtype)
            anon_df[col] = df[col]

    if is_k_anonymized(anon_df, k=k):
        return anon_df

    else:
        print("More generalization required")


def _make_k_anon_query_for_row(
    row: pd.Series,
    df: pd.DataFrame,
    excluded_cols: List[str] = None,
    id_cols: List[str] = None,
) -> str:
    """
    Creates a query string for a given row of a dataframe to check if there are enough rows in the dataframe to satisfy
    k-anonymity
    :param row: the row of a dataframe to check
    :param df: underlying dataframe
    :param excluded_cols: columns in the dataframe to exclude from the k-anonymity check
    :param id_cols:
    :return: query string to run against the dataframe
    """
    if id_cols:
        cols = id_cols
    else:
        if excluded_cols:
            cols = [col for col in df.columns if col not in excluded_cols]
        else:
            cols = df.columns
    query_cols = []
    for col in cols:
        if isinstance(row[col], int) or isinstance(row[col], float):
            query_cols.append(f"{col} == {row[col]}")
        else:
            query_cols.append(f"{col} == '{row[col]}'")
    query = " & ".join(query_cols)

    return query


def generalize_numeric_column(num_col: pd.Series):
    return num_col


def generalize_datetime_column(date_col: pd.Series, level: int = 2):
    col = pd.to_datetime(date_col)

    if level == 2:
        generalized_col = col.apply(lambda x: x.strftime("m-%Y"))
        return pd.to_datetime(generalized_col)

    elif level == 3:
        generalized_col = col.apply(lambda x: x.strftime("%Y"))
        return pd.to_datetime(generalized_col)