Skip to content

Commit

Permalink
[REVIEW] Fix dynamic_partition_pruning to handle a single Parquet f…
Browse files Browse the repository at this point in the history
…ile (#1280)

* handle single pq file

* style

* add pytest

* style fix

* remove version restriction

* remove sys

* Linting

* set dask configs

---------

Co-authored-by: Charles Blackmon-Luca <[email protected]>
  • Loading branch information
sarahyurick and charlesbluca authored Jan 31, 2024
1 parent db7931d commit f2025c1
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 4 deletions.
18 changes: 14 additions & 4 deletions src/sql/optimizer/dynamic_partition_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,11 +486,21 @@ fn read_table(
field_string: String,
tables: HashMap<String, TableInfo>,
) -> Option<HashSet<RowValue>> {
// Obtain filepaths to all relevant Parquet files, e.g., in a directory of Parquet files
let paths = fs::read_dir(tables.get(&table_string).unwrap().filepath.clone()).unwrap();
let file_path = tables.get(&table_string).unwrap().filepath.clone();
let paths: fs::ReadDir;
let mut files = vec![];
for path in paths {
files.push(path.unwrap().path().display().to_string())
if fs::metadata(&file_path)
.map(|metadata| metadata.is_dir())
.unwrap_or(false)
{
// Obtain filepaths to all relevant Parquet files, e.g., in a directory of Parquet files
paths = fs::read_dir(&file_path).unwrap();
for path in paths {
files.push(path.unwrap().path().display().to_string())
}
} else {
// Obtain single Parquet file
files.push(file_path);
}

// Using the filepaths to the Parquet tables, obtain the schemas of the relevant tables
Expand Down
35 changes: 35 additions & 0 deletions tests/unit/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,38 @@ def test_dynamic_partition_pruning(tmpdir):
dask_config.set({"sql.optimizer.verbose": True})
explain_string = c.explain(query)
assert inlist_expr in explain_string


def test_dpp_single_file_parquet(tmpdir):
c = Context()

dask_config.set({"sql.dynamic_partition_pruning": True})
dask_config.set({"sql.optimizer.verbose": True})

df1 = pd.DataFrame(
{
"x": [1, 2, 3],
"z": [7, 8, 9],
},
)
dd.from_pandas(df1, npartitions=1).to_parquet(
os.path.join(tmpdir, "df1_single_file")
)
df1 = dd.read_parquet(os.path.join(tmpdir, "df1_single_file/part.0.parquet"))
c.create_table("df1", df1)

df2 = pd.DataFrame(
{
"x": [1, 2, 3] * 1000,
"y": [4, 5, 6] * 1000,
},
)
dd.from_pandas(df2, npartitions=3).to_parquet(os.path.join(tmpdir, "df2"))
df2 = dd.read_parquet(os.path.join(tmpdir, "df2"))
c.create_table("df2", df2)

query = "SELECT * FROM df1, df2 WHERE df1.x = df2.x AND df1.z=7"
inlist_expr = "df2.x IN ([Int64(1)])"

explain_string = c.explain(query)
assert inlist_expr in explain_string

0 comments on commit f2025c1

Please sign in to comment.