diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index d8e62b3045f9..1c774a95d0e8 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -258,6 +258,26 @@ impl Statistics { self } + /// Project the statistics to the given column indices. + /// + /// For example, if we had statistics for columns `{"a", "b", "c"}`, + /// projecting to `vec![2, 1]` would return statistics for columns `{"c", + /// "b"}`. + pub fn project(mut self, projection: Option<&Vec>) -> Self { + let Some(projection) = projection else { + return self; + }; + + // todo: it would be nice to avoid cloning column statistics if + // possible (e.g. if the projection did not contain duplicates) + self.column_statistics = projection + .iter() + .map(|&i| self.column_statistics[i].clone()) + .collect(); + + self + } + /// Calculates the statistics after `fetch` and `skip` operations apply. /// Here, `self` denotes per-partition statistics. Use the `n_partitions` /// parameter to compute global statistics in a multi-partition setting. diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 70c507511453..24f1000693aa 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -2606,6 +2606,54 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_aggregate_with_union() -> Result<()> { + let df = test_table().await?; + + let df1 = df + .clone() + // GROUP BY `c1` + .aggregate(vec![col("c1")], vec![min(col("c2"))])? + // SELECT `c1` , min(c2) as `result` + .select(vec![col("c1"), min(col("c2")).alias("result")])?; + let df2 = df + .clone() + // GROUP BY `c1` + .aggregate(vec![col("c1")], vec![max(col("c3"))])? + // SELECT `c1` , max(c3) as `result` + .select(vec![col("c1"), max(col("c3")).alias("result")])?; + + let df_union = df1.union(df2)?; + let df = df_union + // GROUP BY `c1` + .aggregate( + vec![col("c1")], + vec![sum(col("result")).alias("sum_result")], + )? + // SELECT `c1`, sum(result) as `sum_result` + .select(vec![(col("c1")), col("sum_result")])?; + + let df_results = df.collect().await?; + + #[rustfmt::skip] + assert_batches_sorted_eq!( + [ + "+----+------------+", + "| c1 | sum_result |", + "+----+------------+", + "| a | 84 |", + "| b | 69 |", + "| c | 124 |", + "| d | 126 |", + "| e | 121 |", + "+----+------------+" + ], + &df_results + ); + + Ok(()) + } + #[tokio::test] async fn test_aggregate_subexpr() -> Result<()> { let df = test_table().await?; diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 60f2b2dcefa9..e16986c660ad 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -241,16 +241,14 @@ pub fn transform_schema_to_view(schema: &Schema) -> Schema { .fields .iter() .map(|field| match field.data_type() { - DataType::Utf8 | DataType::LargeUtf8 => Arc::new(Field::new( - field.name(), - DataType::Utf8View, - field.is_nullable(), - )), - DataType::Binary | DataType::LargeBinary => Arc::new(Field::new( - field.name(), - DataType::BinaryView, - field.is_nullable(), - )), + DataType::Utf8 | DataType::LargeUtf8 => Arc::new( + Field::new(field.name(), DataType::Utf8View, field.is_nullable()) + .with_metadata(field.metadata().to_owned()), + ), + DataType::Binary | DataType::LargeBinary => Arc::new( + Field::new(field.name(), DataType::BinaryView, field.is_nullable()) + .with_metadata(field.metadata().to_owned()), + ), _ => field.clone(), }) .collect(); diff --git a/datafusion/core/src/physical_optimizer/sanity_checker.rs b/datafusion/core/src/physical_optimizer/sanity_checker.rs index 4d2baf1fe1ab..dd0b914edf05 100644 --- a/datafusion/core/src/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/src/physical_optimizer/sanity_checker.rs @@ -35,6 +35,8 @@ use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; use datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list; use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::union::UnionExec; use itertools::izip; /// The SanityCheckPlan rule rejects the following query plans: @@ -126,6 +128,14 @@ pub fn check_plan_sanity( plan.required_input_ordering().iter(), plan.required_input_distribution().iter() ) { + // TEMP HACK WORKAROUND https://github.com/apache/datafusion/issues/11492 + if child.as_any().downcast_ref::().is_some() { + continue; + } + if child.as_any().downcast_ref::().is_some() { + continue; + } + let child_eq_props = child.equivalence_properties(); if let Some(sort_req) = sort_req { if !child_eq_props.ordering_satisfy_requirement(sort_req) { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 520392c9f075..4851419e7d37 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -658,7 +658,7 @@ impl DefaultPhysicalPlanner { logical_input_schema.as_ref().clone().into(); if physical_input_schema != physical_input_schema_from_logical { - return internal_err!("Physical input schema should be the same as the one converted from logical input schema."); + log::warn!("Physical input schema should be the same as the one converted from logical input schema, but did not match for logical plan:\n{}", input.display_indent()); } let groups = self.create_grouping_physical_expr( diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index cc8ddf8ec8e8..8e489f07a7a9 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -50,8 +50,8 @@ use datafusion_common::display::ToStringifiedPlan; use datafusion_common::file_options::file_type::FileType; use datafusion_common::{ get_target_functional_dependencies, internal_err, not_impl_err, plan_datafusion_err, - plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, - TableReference, ToDFSchema, UnnestOptions, + plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, FunctionalDependencies, + Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions, }; use super::dml::InsertOp; @@ -1401,7 +1401,12 @@ pub fn validate_unique_names<'a>( pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result { // Temporarily use the schema from the left input and later rely on the analyzer to // coerce the two schemas into a common one. - let schema = Arc::clone(left_plan.schema()); + + // Functional Dependencies doesn't preserve after UNION operation + let schema = (**left_plan.schema()).clone(); + let schema = + Arc::new(schema.with_functional_dependencies(FunctionalDependencies::empty())?); + Ok(LogicalPlan::Union(Union { inputs: vec![Arc::new(left_plan), Arc::new(right_plan)], schema, diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index cc245b3572ec..50ee0f9e3446 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -125,7 +125,7 @@ impl AggregateUDFImpl for Count { } fn is_nullable(&self) -> bool { - false + true } fn state_fields(&self, args: StateFieldsArgs) -> Result> { diff --git a/datafusion/functions/src/datetime/now.rs b/datafusion/functions/src/datetime/now.rs index b2221215b94b..74eb5aea4255 100644 --- a/datafusion/functions/src/datetime/now.rs +++ b/datafusion/functions/src/datetime/now.rs @@ -21,7 +21,7 @@ use arrow::datatypes::DataType; use arrow::datatypes::DataType::Timestamp; use arrow::datatypes::TimeUnit::Nanosecond; -use datafusion_common::{internal_err, Result, ScalarValue}; +use datafusion_common::{internal_err, ExprSchema, Result, ScalarValue}; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo}; use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature, Volatility}; @@ -84,4 +84,8 @@ impl ScalarUDFImpl for NowFunc { ScalarValue::TimestampNanosecond(now_ts, Some("+00:00".into())), ))) } + + fn is_nullable(&self, _args: &[Expr], _schema: &dyn ExprSchema) -> bool { + false + } } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 2bdaed479655..9466ff6dd459 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -26,6 +26,7 @@ use crate::aggregates::{ topk_stream::GroupedTopKAggregateStream, }; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use crate::projection::get_field_metadata; use crate::windows::get_ordered_partition_by_indices; use crate::{ DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode, @@ -795,14 +796,17 @@ fn create_schema( ) -> Result { let mut fields = Vec::with_capacity(group_expr.len() + aggr_expr.len()); for (index, (expr, name)) in group_expr.iter().enumerate() { - fields.push(Field::new( - name, - expr.data_type(input_schema)?, - // In cases where we have multiple grouping sets, we will use NULL expressions in - // order to align the grouping sets. So the field must be nullable even if the underlying - // schema field is not. - group_expr_nullable[index] || expr.nullable(input_schema)?, - )) + fields.push( + Field::new( + name, + expr.data_type(input_schema)?, + // In cases where we have multiple grouping sets, we will use NULL expressions in + // order to align the grouping sets. So the field must be nullable even if the underlying + // schema field is not. + group_expr_nullable[index] || expr.nullable(input_schema)?, + ) + .with_metadata(get_field_metadata(expr, input_schema).unwrap_or_default()), + ) } match mode { @@ -823,7 +827,10 @@ fn create_schema( } } - Ok(Schema::new(fields)) + Ok(Schema::new_with_metadata( + fields, + input_schema.metadata().clone(), + )) } fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef { diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 417d2098b083..100a1eecffe7 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -370,7 +370,12 @@ impl ExecutionPlan for FilterExec { /// The output statistics of a filtering operation can be estimated if the /// predicate's selectivity value can be determined for the incoming data. fn statistics(&self) -> Result { - Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity) + let stats = Self::statistics_helper( + &self.input, + self.predicate(), + self.default_selectivity, + )?; + Ok(stats.project(self.projection.as_ref())) } } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 11153556f253..a70645f3d6c0 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -69,15 +69,22 @@ impl CrossJoinExec { /// Create a new [CrossJoinExec]. pub fn new(left: Arc, right: Arc) -> Self { // left then right - let all_columns: Fields = { + let (all_columns, metadata) = { let left_schema = left.schema(); let right_schema = right.schema(); let left_fields = left_schema.fields().iter(); let right_fields = right_schema.fields().iter(); - left_fields.chain(right_fields).cloned().collect() + + let mut metadata = left_schema.metadata().clone(); + metadata.extend(right_schema.metadata().clone()); + + ( + left_fields.chain(right_fields).cloned().collect::(), + metadata, + ) }; - let schema = Arc::new(Schema::new(all_columns)); + let schema = Arc::new(Schema::new(all_columns).with_metadata(metadata)); let cache = Self::compute_properties(&left, &right, Arc::clone(&schema)); CrossJoinExec { left, diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index f1b9cdaf728f..0f4d06130d8a 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -40,7 +40,7 @@ use datafusion_common::stats::Precision; use datafusion_common::Result; use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::ProjectionMapping; -use datafusion_physical_expr::expressions::Literal; +use datafusion_physical_expr::expressions::{CastExpr, Literal}; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -237,10 +237,14 @@ impl ExecutionPlan for ProjectionExec { /// If e is a direct column reference, returns the field level /// metadata for that field, if any. Otherwise returns None -fn get_field_metadata( +pub(crate) fn get_field_metadata( e: &Arc, input_schema: &Schema, ) -> Option> { + if let Some(cast) = e.as_any().downcast_ref::() { + return get_field_metadata(cast.expr(), input_schema); + } + // Look up field by index in schema (not NAME as there can be more than one // column with the same name) e.as_any() diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 78b25686054d..895647864384 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -468,16 +468,30 @@ pub fn can_interleave>>( } fn union_schema(inputs: &[Arc]) -> SchemaRef { - let fields: Vec = (0..inputs[0].schema().fields().len()) + let fields: Vec = (0..std::cmp::max( + inputs[0].schema().fields().len(), + inputs + .get(1) + .map(|l| l.schema().fields().len()) + .unwrap_or_default(), + )) .map(|i| { inputs .iter() - .filter_map(|input| { - if input.schema().fields().len() > i { - Some(input.schema().field(i).clone()) - } else { - None - } + .enumerate() + .filter_map(|(input_idx, input)| { + let field = input.schema().field(i).clone(); + let mut metadata = field.metadata().clone(); + + let other_side_metdata = inputs + .get(input_idx ^ (1 << 0)) + .map(|other_input| { + other_input.schema().field(i).metadata().clone() + }) + .unwrap_or_default(); + + metadata.extend(other_side_metdata); + Some(field.with_metadata(metadata)) }) .find_or_first(|f| f.is_nullable()) .unwrap() diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index d3ee720467b6..9a0db1c41c71 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -314,8 +314,13 @@ pub async fn register_metadata_tables(ctx: &SessionContext) { String::from("metadata_key"), String::from("the name field"), )])); + let l_name = + Field::new("l_name", DataType::Utf8, true).with_metadata(HashMap::from([( + String::from("metadata_key"), + String::from("the l_name field"), + )])); - let schema = Schema::new(vec![id, name]).with_metadata(HashMap::from([( + let schema = Schema::new(vec![id, name, l_name]).with_metadata(HashMap::from([( String::from("metadata_key"), String::from("the entire schema"), )])); @@ -325,6 +330,7 @@ pub async fn register_metadata_tables(ctx: &SessionContext) { vec![ Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])) as _, Arc::new(StringArray::from(vec![None, Some("bar"), Some("baz")])) as _, + Arc::new(StringArray::from(vec![None, Some("l_bar"), Some("l_baz")])) as _, ], ) .unwrap(); diff --git a/datafusion/sqllogictest/test_files/metadata.slt b/datafusion/sqllogictest/test_files/metadata.slt index 3b2b219244f5..d0853b9e4983 100644 --- a/datafusion/sqllogictest/test_files/metadata.slt +++ b/datafusion/sqllogictest/test_files/metadata.slt @@ -25,7 +25,7 @@ ## with metadata in SQL. query IT -select * from table_with_metadata; +select id, name from table_with_metadata; ---- 1 NULL NULL bar @@ -58,5 +58,72 @@ WHERE "data"."id" = "samples"."id"; 1 3 + + +# Regression test: prevent field metadata loss per https://github.com/apache/datafusion/issues/12687 +query I +select count(distinct name) from table_with_metadata; +---- +2 + +# Regression test: prevent field metadata loss per https://github.com/apache/datafusion/issues/12687 +query I +select approx_median(distinct id) from table_with_metadata; +---- +2 + +# Regression test: prevent field metadata loss per https://github.com/apache/datafusion/issues/12687 +statement ok +select array_agg(distinct id) from table_with_metadata; + +query I +select distinct id from table_with_metadata order by id; +---- +1 +3 +NULL + +query I +select count(id) from table_with_metadata; +---- +2 + +query I +select count(id) cnt from table_with_metadata group by name order by cnt; +---- +0 +1 +1 + + + +# Regression test: missing schema metadata, when aggregate on cross join +query I +SELECT count("data"."id") +FROM + ( + SELECT "id" FROM "table_with_metadata" + ) as "data", + ( + SELECT "id" FROM "table_with_metadata" + ) as "samples"; +---- +6 + +# Regression test: missing field metadata, from the NULL field on the left side of the union +query ITT +(SELECT id, NULL::string as name, l_name FROM "table_with_metadata") + UNION +(SELECT id, name, NULL::string as l_name FROM "table_with_metadata") +ORDER BY id, name, l_name; +---- +1 NULL NULL +3 baz NULL +3 NULL l_baz +NULL bar NULL +NULL NULL l_bar + + + statement ok drop table table_with_metadata; diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index f8b163adc796..031eb9f0ff38 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -348,3 +348,51 @@ DROP TABLE list_columns; # Clean up statement ok DROP TABLE listing_table; + +## Tests for https://github.com/apache/datafusion/issues/13186 +statement ok +create table cpu (time timestamp, usage_idle float, usage_user float, cpu int); + +statement ok +insert into cpu values ('1970-01-01 00:00:00', 1.0, 2.0, 3); + +# must put it into a parquet file to get statistics +statement ok +copy (select * from cpu) to 'test_files/scratch/parquet/cpu.parquet'; + +# Run queries against parquet files +statement ok +create external table cpu_parquet +stored as parquet +location 'test_files/scratch/parquet/cpu.parquet'; + +# Double filtering +# +# Expect 1 row for both queries +query PI +select time, rn +from ( + select time, row_number() OVER (ORDER BY usage_idle, time) as rn + from cpu + where cpu = 3 +) where rn > 0; +---- +1970-01-01T00:00:00 1 + +query PI +select time, rn +from ( + select time, row_number() OVER (ORDER BY usage_idle, time) as rn + from cpu_parquet + where cpu = 3 +) where rn > 0; +---- +1970-01-01T00:00:00 1 + + +# Clean up +statement ok +drop table cpu; + +statement ok +drop table cpu_parquet; diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index a3d0ff4383ae..12eb551ff876 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -538,6 +538,9 @@ physical_plan # Clean up after the test ######## +statement ok +drop table t + statement ok drop table t1; @@ -761,3 +764,37 @@ SELECT NULL WHERE FALSE; ---- 0.5 1 + +### +# Test for https://github.com/apache/datafusion/issues/11492 +### + +# Input data is +# a,b,c +# 1,2,3 + +statement ok +CREATE EXTERNAL TABLE t ( + a INT, + b INT, + c INT +) +STORED AS CSV +LOCATION '../core/tests/data/example.csv' +WITH ORDER (a ASC) +OPTIONS ('format.has_header' 'true'); + +query T +SELECT (SELECT a from t ORDER BY a) UNION ALL (SELECT 'bar' as a from t) ORDER BY a; +---- +1 +bar + +query I +SELECT (SELECT a from t ORDER BY a) UNION ALL (SELECT NULL as a from t) ORDER BY a; +---- +1 +NULL + +statement ok +drop table t