diff --git a/data/pets/pets1.csv b/data/pets/pets1.csv new file mode 100644 index 00000000..19e6c4b0 --- /dev/null +++ b/data/pets/pets1.csv @@ -0,0 +1,4 @@ +nickname,age +fofo,3 +tio,1 +lulu,9 diff --git a/data/pets/pets2.csv b/data/pets/pets2.csv new file mode 100644 index 00000000..2e0c0a2e --- /dev/null +++ b/data/pets/pets2.csv @@ -0,0 +1,3 @@ +nickname,age +ooo,3 +ppp,9 diff --git a/data/pets/pets3.csv b/data/pets/pets3.csv new file mode 100644 index 00000000..96df3d77 --- /dev/null +++ b/data/pets/pets3.csv @@ -0,0 +1,3 @@ +nickname,age +aaa,2 +bbb,4 diff --git a/data/pets/pets4.csv b/data/pets/pets4.csv new file mode 100644 index 00000000..78ee47cd --- /dev/null +++ b/data/pets/pets4.csv @@ -0,0 +1,3 @@ +nickname,age +fff,7 +lll,12 diff --git a/dataframes/05-predicate-pushdown-filtering.ipynb b/dataframes/05-predicate-pushdown-filtering.ipynb new file mode 100644 index 00000000..16d0992d --- /dev/null +++ b/dataframes/05-predicate-pushdown-filtering.ipynb @@ -0,0 +1,387 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# DataFrames: Parquet Predicate Pushdown Filtering\n", + "\n", + "This notebook shows how to perform parquet predicate pushdown filtering to skip data when reading files.\n", + "\n", + "Parquet stores metadata in the file footer, including the min / max value of each column.\n", + "\n", + "Dask can use the min / max statistics to intelligently skip entire files when performing filtering operations. The performance gains from using predicate pushdown filters depend on how many files can be skipped.\n", + "\n", + "If you have 1,000 files and can skip 990 of them with Parquet predicate pushdown filtering, the performance gains will be massive." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Example setup\n", + "\n", + "There are four CSV files in the `data/pets` directory. We'll convert these to four Parquet files.\n", + "\n", + "There are four Parquet files in the `data/pets_parquet` directory:\n", + "\n", + "```\n", + "data/\n", + " pets_parquet/\n", + " part.0.parquet\n", + " part.1.parquet\n", + " part.2.parquet\n", + " part.3.parquet\n", + "```\n", + "\n", + "Each Parquet file has `firstname` and `age` columns.\n", + "\n", + "The Parquet footer stores the min and max value for the age column in each Parquet file. Here are the min / max values in our example files:\n", + "\n", + "```\n", + "| File | min | max |\n", + "|---------------|-----|-----|\n", + "| pets0.parquet | 1 | 9 |\n", + "| pets1.parquet | 3 | 9 |\n", + "| pets2.parquet | 2 | 4 |\n", + "| pets3.parquet | 7 | 12 |\n", + "```\n", + "\n", + "Suppose we'd like to perform a filtering operation and fetch pets with that are older than 10. We know from the Parquet metadata that `pets1`, `pets2`, and `pets3` don't have any pets with an age greater than 10. We can skip those files entirely and only filter `pets4`.\n", + "\n", + "Reading files and transferring them to a cluster is time consuming. In this example, we're able to skip 75% of the files, so Parquet partition pruning will give a nice performance gain." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "import dask.dataframe as dd" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Convert the CSV files into Parquet files" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "dd.read_csv('../data/pets/*.csv').to_parquet('data/pets_parquet')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Inspect the contents of one of the Parquet files" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
nicknameage
0fofo3
1tio1
2lulu9
\n", + "
" + ], + "text/plain": [ + " nickname age\n", + "0 fofo 3\n", + "1 tio 1\n", + "2 lulu 9" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "dd.read_parquet('data/pets_parquet/part.0.parquet').head(3)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Inefficient approach" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Read in all of the Parquet files into a DataFrame and perform a filtering operation to grab all the pets that are older than 10" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "df1 = dd.read_parquet('data/pets_parquet/*')" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "df1 = df1[df1['age'] > 10]" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
nicknameage
1lll12
\n", + "
" + ], + "text/plain": [ + " nickname age\n", + "1 lll 12" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df1.head(1, npartitions=4)" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "4" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df1.npartitions" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This approach sends all four Parquet files to the cluster. We know all four files are getting sent to Dask because four partitions are created.\n", + "\n", + "Dask needs to filter over all the files, even in the files we know don't have any pets greater older than 10. Let's use predicate pushdown filtering so we don't needlessly filter files that don't contain any matching data." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Efficient approach" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [], + "source": [ + "df1 = dd.read_parquet('data/pets_parquet/*', filters=[('age', '>', 10)])" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
nicknameage
0fff7
\n", + "
" + ], + "text/plain": [ + " nickname age\n", + "0 fff 7" + ] + }, + "execution_count": 16, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df1.head(1, npartitions=1)" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df1.npartitions" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This approach performs the Parquet predicate pushdown filtering. We can tell because the DataFrame only has one partition, so Dask only read one file. When the `filters` parameter is populated, Dask will intelligently inspect the metadata of the Parquet files and skip entire files whenever possible." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.5" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}