From f2025c1ad90f9c341e12f1f0cff5469db86b8c98 Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Wed, 31 Jan 2024 13:45:10 -0800 Subject: [PATCH] [REVIEW] Fix `dynamic_partition_pruning` to handle a single Parquet file (#1280) * handle single pq file * style * add pytest * style fix * remove version restriction * remove sys * Linting * set dask configs --------- Co-authored-by: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> --- .../optimizer/dynamic_partition_pruning.rs | 18 +++++++--- tests/unit/test_config.py | 35 +++++++++++++++++++ 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/src/sql/optimizer/dynamic_partition_pruning.rs b/src/sql/optimizer/dynamic_partition_pruning.rs index b6ab70508..00aa6c965 100644 --- a/src/sql/optimizer/dynamic_partition_pruning.rs +++ b/src/sql/optimizer/dynamic_partition_pruning.rs @@ -486,11 +486,21 @@ fn read_table( field_string: String, tables: HashMap, ) -> Option> { - // Obtain filepaths to all relevant Parquet files, e.g., in a directory of Parquet files - let paths = fs::read_dir(tables.get(&table_string).unwrap().filepath.clone()).unwrap(); + let file_path = tables.get(&table_string).unwrap().filepath.clone(); + let paths: fs::ReadDir; let mut files = vec![]; - for path in paths { - files.push(path.unwrap().path().display().to_string()) + if fs::metadata(&file_path) + .map(|metadata| metadata.is_dir()) + .unwrap_or(false) + { + // Obtain filepaths to all relevant Parquet files, e.g., in a directory of Parquet files + paths = fs::read_dir(&file_path).unwrap(); + for path in paths { + files.push(path.unwrap().path().display().to_string()) + } + } else { + // Obtain single Parquet file + files.push(file_path); } // Using the filepaths to the Parquet tables, obtain the schemas of the relevant tables diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index ad4fb2883..aad045656 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -153,3 +153,38 @@ def test_dynamic_partition_pruning(tmpdir): dask_config.set({"sql.optimizer.verbose": True}) explain_string = c.explain(query) assert inlist_expr in explain_string + + +def test_dpp_single_file_parquet(tmpdir): + c = Context() + + dask_config.set({"sql.dynamic_partition_pruning": True}) + dask_config.set({"sql.optimizer.verbose": True}) + + df1 = pd.DataFrame( + { + "x": [1, 2, 3], + "z": [7, 8, 9], + }, + ) + dd.from_pandas(df1, npartitions=1).to_parquet( + os.path.join(tmpdir, "df1_single_file") + ) + df1 = dd.read_parquet(os.path.join(tmpdir, "df1_single_file/part.0.parquet")) + c.create_table("df1", df1) + + df2 = pd.DataFrame( + { + "x": [1, 2, 3] * 1000, + "y": [4, 5, 6] * 1000, + }, + ) + dd.from_pandas(df2, npartitions=3).to_parquet(os.path.join(tmpdir, "df2")) + df2 = dd.read_parquet(os.path.join(tmpdir, "df2")) + c.create_table("df2", df2) + + query = "SELECT * FROM df1, df2 WHERE df1.x = df2.x AND df1.z=7" + inlist_expr = "df2.x IN ([Int64(1)])" + + explain_string = c.explain(query) + assert inlist_expr in explain_string