whylabs/whylogs-python

View on GitHub
python/examples/integrations/Pyspark_Profiling.ipynb

Summary

Maintainability
Test Coverage
{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    ">### 🚩 *Create a free WhyLabs account to get more value out of whylogs!*<br> \n",
    ">*Did you know you can store, visualize, and monitor whylogs profiles with the [WhyLabs Observability Platform](https://whylabs.ai/whylogs-free-signup?utm_source=whylogs-Github&utm_medium=whylogs-example&utm_campaign=Pyspark_Profiling)? Sign up for a [free WhyLabs account](https://whylabs.ai/whylogs-free-signup?utm_source=whylogs-Github&utm_medium=whylogs-example&utm_campaign=Pyspark_Profiling) to leverage the power of whylogs and WhyLabs together!*"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# PySpark Integration\n",
    "[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/whylabs/whylogs/blob/mainline/python/examples/integrations/Pyspark_Profiling.ipynb)\n",
    "\n",
    "\n",
    "Hi! Perhaps you're already feeling confident with our library, but you really wish there was an easy way to plug our profiling into your existing PySpark jobs. Well, glad you've made it here, because this is what we are going to cover in this example notebook 😃\n",
    "\n",
    "If you wish to have other insights on how to use whylogs, feel free to check our [other existing examples](https://github.com/whylabs/whylogs/tree/mainline/python/examples), as they might be extremely useful!"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Installing the extra dependency\n",
    "\n",
    "As we want to enable users to have exactly what they need to use from whylogs, the `pyspark` integration comes as an extra dependency. In order to have it available, simply uncomment and run the following cell:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Note: you may need to restart the kernel to use updated packages.\n",
    "%pip install 'whylogs[spark]'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Initializing a SparkSession\n",
    "\n",
    "Here we will initialize a SparkSession. I'm also setting the `pyarrow` execution config, because it makes our methods even more performant. \n",
    "\n",
    ">**IMPORTANT**: Make sure you have Spark 3.0+ available in your environment, as our implementation relies on it for a smoother integration"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "spark = SparkSession.builder.appName('whylogs-testing').getOrCreate()\n",
    "arrow_config_key = \"spark.sql.execution.arrow.pyspark.enabled\"\n",
    "spark.conf.set(arrow_config_key, \"true\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Reading the data\n",
    "\n",
    "For the sake of simplicity (and computational efforts, so you can run this notebook from your local machine), we will read the Wine Quality dataset, available in this URL: \"http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv\"."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark import SparkFiles\n",
    "\n",
    "data_url = \"http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv\"\n",
    "spark.sparkContext.addFile(data_url)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "spark_dataframe = spark.read.option(\"delimiter\", \";\").option(\"inferSchema\", \"true\").csv(SparkFiles.get(\"winequality-red.csv\"), header=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "-RECORD 0----------------------\n",
      " fixed acidity        | 7.4    \n",
      " volatile acidity     | 0.7    \n",
      " citric acid          | 0.0    \n",
      " residual sugar       | 1.9    \n",
      " chlorides            | 0.076  \n",
      " free sulfur dioxide  | 11.0   \n",
      " total sulfur dioxide | 34.0   \n",
      " density              | 0.9978 \n",
      " pH                   | 3.51   \n",
      " sulphates            | 0.56   \n",
      " alcohol              | 9.4    \n",
      " quality              | 5      \n",
      "only showing top 1 row\n",
      "\n"
     ]
    }
   ],
   "source": [
    "spark_dataframe.show(n=1, vertical=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- fixed acidity: double (nullable = true)\n",
      " |-- volatile acidity: double (nullable = true)\n",
      " |-- citric acid: double (nullable = true)\n",
      " |-- residual sugar: double (nullable = true)\n",
      " |-- chlorides: double (nullable = true)\n",
      " |-- free sulfur dioxide: double (nullable = true)\n",
      " |-- total sulfur dioxide: double (nullable = true)\n",
      " |-- density: double (nullable = true)\n",
      " |-- pH: double (nullable = true)\n",
      " |-- sulphates: double (nullable = true)\n",
      " |-- alcohol: double (nullable = true)\n",
      " |-- quality: integer (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "spark_dataframe.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Profiling the data with whylogs\n",
    "\n",
    "Now that we have a Spark DataFrame in place, let's see how easy it is to profile our data with whylogs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "from whylogs.api.pyspark.experimental import collect_column_profile_views\n",
    "\n",
    "column_views_dict = collect_column_profile_views(spark_dataframe)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Yeap. It's done. It is **that** easy.\n",
    "\n",
    "But what do we get with a `column_views_dict`? "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'alcohol': <whylogs.core.view.column_profile_view.ColumnProfileView object at 0x11d240e20>, 'chlorides': <whylogs.core.view.column_profile_view.ColumnProfileView object at 0x11d29ec70>, 'citric acid': <whylogs.core.view.column_profile_view.ColumnProfileView object at 0x11d2a2d00>, 'density': <whylogs.core.view.column_profile_view.ColumnProfileView object at 0x11d2a6d90>, 'fixed acidity': <whylogs.core.view.column_profile_view.ColumnProfileView object at 0x11d2a3e20>, 'free sulfur dioxide': <whylogs.core.view.column_profile_view.ColumnProfileView object at 0x11d2aaeb0>, 'pH': <whylogs.core.view.column_profile_view.ColumnProfileView object at 0x11d2adf40>, 'quality': <whylogs.core.view.column_profile_view.ColumnProfileView object at 0x11d2b7100>, 'residual sugar': <whylogs.core.view.column_profile_view.ColumnProfileView object at 0x11d2b7d60>, 'sulphates': <whylogs.core.view.column_profile_view.ColumnProfileView object at 0x11d2bddf0>, 'total sulfur dioxide': <whylogs.core.view.column_profile_view.ColumnProfileView object at 0x11d2c0d90>, 'volatile acidity': <whylogs.core.view.column_profile_view.ColumnProfileView object at 0x11d2c4e20>}\n"
     ]
    }
   ],
   "source": [
    "print(column_views_dict)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "It is a dictionary with one `ColumnProfileView` object per column in your dataset. And we can inspect some of the metrics on each one of them, such as the counts for a given column"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "(1599, 1599)"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "column_views_dict[\"density\"].get_metric(\"counts\").n.value, spark_dataframe.count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Or their `mean` value:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "0.9967466791744841"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "column_views_dict[\"density\"].get_metric(\"distribution\").mean.value"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And now let's check how accurate whylogs did store that `mean` calculation."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+\n",
      "|      avg(density)|\n",
      "+------------------+\n",
      "|0.9967466791744831|\n",
      "+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql.functions import mean\n",
    "spark_dataframe.select(mean(\"density\")).show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "It is not the literal exact value, but it gets really close, right? That is because we are not extracting the exact information, but we are also **not sampling** the data. `whylogs` will look at **every data point** and *statistically* decide wether or not that data point is relevant to the final calculation. \n",
    "\n",
    "Is it just me or this is extremely powerful? Yes, it is.\n",
    "\n",
    "> \"Cool! But what can I do with a bunch of `ColumnProfileView`'s from my Dataset? I want to see everything together\n",
    "\n",
    "Well, you've come to the right place, because we will inspect the next method that does just that!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "from whylogs.api.pyspark.experimental import collect_dataset_profile_view\n",
    "\n",
    "dataset_profile_view = collect_dataset_profile_view(input_df=spark_dataframe)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Yes, that easy. You now have a `DatasetProfileView`. As you might have seen from other example notebooks in our repo, you can turn this *lightweight* object into a pandas DataFrame, and visualize all the important metrics that we've profiled, like this:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>counts/n</th>\n",
       "      <th>counts/null</th>\n",
       "      <th>types/integral</th>\n",
       "      <th>types/fractional</th>\n",
       "      <th>types/boolean</th>\n",
       "      <th>types/string</th>\n",
       "      <th>types/object</th>\n",
       "      <th>cardinality/est</th>\n",
       "      <th>cardinality/upper_1</th>\n",
       "      <th>cardinality/lower_1</th>\n",
       "      <th>...</th>\n",
       "      <th>distribution/min</th>\n",
       "      <th>distribution/q_10</th>\n",
       "      <th>distribution/q_25</th>\n",
       "      <th>distribution/median</th>\n",
       "      <th>distribution/q_75</th>\n",
       "      <th>distribution/q_90</th>\n",
       "      <th>type</th>\n",
       "      <th>ints/max</th>\n",
       "      <th>ints/min</th>\n",
       "      <th>frequent_items/frequent_strings</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>column</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>alcohol</th>\n",
       "      <td>1599</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>1599</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>65.000010</td>\n",
       "      <td>65.003256</td>\n",
       "      <td>65.000000</td>\n",
       "      <td>...</td>\n",
       "      <td>8.40000</td>\n",
       "      <td>9.30000</td>\n",
       "      <td>9.5000</td>\n",
       "      <td>10.20000</td>\n",
       "      <td>11.10000</td>\n",
       "      <td>12.00000</td>\n",
       "      <td>SummaryType.COLUMN</td>\n",
       "      <td>NaN</td>\n",
       "      <td>NaN</td>\n",
       "      <td>NaN</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>chlorides</th>\n",
       "      <td>1599</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>1599</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>153.000058</td>\n",
       "      <td>153.007697</td>\n",
       "      <td>153.000000</td>\n",
       "      <td>...</td>\n",
       "      <td>0.01200</td>\n",
       "      <td>0.06000</td>\n",
       "      <td>0.0700</td>\n",
       "      <td>0.07900</td>\n",
       "      <td>0.09100</td>\n",
       "      <td>0.10900</td>\n",
       "      <td>SummaryType.COLUMN</td>\n",
       "      <td>NaN</td>\n",
       "      <td>NaN</td>\n",
       "      <td>NaN</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>citric acid</th>\n",
       "      <td>1599</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>1599</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>80.000016</td>\n",
       "      <td>80.004010</td>\n",
       "      <td>80.000000</td>\n",
       "      <td>...</td>\n",
       "      <td>0.00000</td>\n",
       "      <td>0.01000</td>\n",
       "      <td>0.0900</td>\n",
       "      <td>0.26000</td>\n",
       "      <td>0.43000</td>\n",
       "      <td>0.53000</td>\n",
       "      <td>SummaryType.COLUMN</td>\n",
       "      <td>NaN</td>\n",
       "      <td>NaN</td>\n",
       "      <td>NaN</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>density</th>\n",
       "      <td>1599</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>1599</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>439.557368</td>\n",
       "      <td>445.310933</td>\n",
       "      <td>433.943761</td>\n",
       "      <td>...</td>\n",
       "      <td>0.99007</td>\n",
       "      <td>0.99451</td>\n",
       "      <td>0.9956</td>\n",
       "      <td>0.99675</td>\n",
       "      <td>0.99786</td>\n",
       "      <td>0.99914</td>\n",
       "      <td>SummaryType.COLUMN</td>\n",
       "      <td>NaN</td>\n",
       "      <td>NaN</td>\n",
       "      <td>NaN</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>fixed acidity</th>\n",
       "      <td>1599</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>1599</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>0</td>\n",
       "      <td>96.000023</td>\n",
       "      <td>96.004816</td>\n",
       "      <td>96.000000</td>\n",
       "      <td>...</td>\n",
       "      <td>4.60000</td>\n",
       "      <td>6.60000</td>\n",
       "      <td>7.1000</td>\n",
       "      <td>7.90000</td>\n",
       "      <td>9.20000</td>\n",
       "      <td>10.70000</td>\n",
       "      <td>SummaryType.COLUMN</td>\n",
       "      <td>NaN</td>\n",
       "      <td>NaN</td>\n",
       "      <td>NaN</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "<p>5 rows × 24 columns</p>\n",
       "</div>"
      ],
      "text/plain": [
       "               counts/n  counts/null  types/integral  types/fractional  \\\n",
       "column                                                                   \n",
       "alcohol            1599            0               0              1599   \n",
       "chlorides          1599            0               0              1599   \n",
       "citric acid        1599            0               0              1599   \n",
       "density            1599            0               0              1599   \n",
       "fixed acidity      1599            0               0              1599   \n",
       "\n",
       "               types/boolean  types/string  types/object  cardinality/est  \\\n",
       "column                                                                      \n",
       "alcohol                    0             0             0        65.000010   \n",
       "chlorides                  0             0             0       153.000058   \n",
       "citric acid                0             0             0        80.000016   \n",
       "density                    0             0             0       439.557368   \n",
       "fixed acidity              0             0             0        96.000023   \n",
       "\n",
       "               cardinality/upper_1  cardinality/lower_1  ...  \\\n",
       "column                                                   ...   \n",
       "alcohol                  65.003256            65.000000  ...   \n",
       "chlorides               153.007697           153.000000  ...   \n",
       "citric acid              80.004010            80.000000  ...   \n",
       "density                 445.310933           433.943761  ...   \n",
       "fixed acidity            96.004816            96.000000  ...   \n",
       "\n",
       "               distribution/min  distribution/q_10  distribution/q_25  \\\n",
       "column                                                                  \n",
       "alcohol                 8.40000            9.30000             9.5000   \n",
       "chlorides               0.01200            0.06000             0.0700   \n",
       "citric acid             0.00000            0.01000             0.0900   \n",
       "density                 0.99007            0.99451             0.9956   \n",
       "fixed acidity           4.60000            6.60000             7.1000   \n",
       "\n",
       "               distribution/median  distribution/q_75  distribution/q_90  \\\n",
       "column                                                                     \n",
       "alcohol                   10.20000           11.10000           12.00000   \n",
       "chlorides                  0.07900            0.09100            0.10900   \n",
       "citric acid                0.26000            0.43000            0.53000   \n",
       "density                    0.99675            0.99786            0.99914   \n",
       "fixed acidity              7.90000            9.20000           10.70000   \n",
       "\n",
       "                             type  ints/max  ints/min  \\\n",
       "column                                                  \n",
       "alcohol        SummaryType.COLUMN       NaN       NaN   \n",
       "chlorides      SummaryType.COLUMN       NaN       NaN   \n",
       "citric acid    SummaryType.COLUMN       NaN       NaN   \n",
       "density        SummaryType.COLUMN       NaN       NaN   \n",
       "fixed acidity  SummaryType.COLUMN       NaN       NaN   \n",
       "\n",
       "               frequent_items/frequent_strings  \n",
       "column                                          \n",
       "alcohol                                    NaN  \n",
       "chlorides                                  NaN  \n",
       "citric acid                                NaN  \n",
       "density                                    NaN  \n",
       "fixed acidity                              NaN  \n",
       "\n",
       "[5 rows x 24 columns]"
      ]
     },
     "execution_count": 15,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import pandas as pd \n",
    "\n",
    "dataset_profile_view.to_pandas().head()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Persisting as a file\n",
    "\n",
    "After collecting profiles, it is a good practice to store them as a file. This will allow you to later on read them back, merge with future profiles and track how is your data behaving along the way."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "dataset_profile_view.write(path=\"my_super_awesome_profile.bin\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And that's it, you have just written a profile generated with spark to your local environment! If you wish to upload to different locations, such as s3, whylabs or others, please make sure to check out our [other examples](https://github.com/whylabs/whylogs/tree/mainline/python/examples) page.\n",
    "\n",
    "Hopefully this tutorial will help you get started to profile and observe your data behaviour in your Spark jobs with almost no friction :)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Important note\n",
    "\n",
    "As you might have seen from the imports, currently this pyspark implementation is the **experimental** phase. We ran some benchmark ourselves with it, and for the sake of example, a `90Gb` dataset with 80M rows could be profiled in under 3 minutes! Cool, right? But we still want more users to try this on their own, see if there are places to be improved and give us feedback before we make it officially **the** spark module here. \n",
    "Please, feel free to reach out to our [community Slack](https://communityinviter.com/apps/whylabs-community/rsqrd-ai-community) and interact with us there. We will love to hear from you :)"
   ]
  }
 ],
 "metadata": {
  "interpreter": {
   "hash": "16a10773934acde374a1cd808bcd53b1085f60e17ec18f4c0c26564dd890a5a0"
  },
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.14"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}