diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 9d42580178a4..70b302a55c22 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -812,6 +812,7 @@ pub mod variable { #[cfg(not(target_arch = "wasm32"))] pub mod test; +mod schema_equivalence; pub mod test_util; #[cfg(doctest)] diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index cee3acc08dae..d96e60c25f40 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -89,6 +89,7 @@ use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::unnest::ListUnnest; use datafusion_sql::utils::window_expr_common_partition_keys; +use crate::schema_equivalence::schema_satisfied_by; use async_trait::async_trait; use futures::{StreamExt, TryStreamExt}; use itertools::{multiunzip, Itertools}; @@ -659,7 +660,10 @@ impl DefaultPhysicalPlanner { let physical_input_schema_from_logical = logical_input_schema.inner(); if !options.execution.skip_physical_aggregate_schema_check - && &physical_input_schema != physical_input_schema_from_logical + && !schema_satisfied_by( + physical_input_schema_from_logical, + &physical_input_schema, + ) { let mut differences = Vec::new(); if physical_input_schema.fields().len() @@ -688,7 +692,7 @@ impl DefaultPhysicalPlanner { if physical_field.data_type() != logical_field.data_type() { differences.push(format!("field data type at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.data_type(), logical_field.data_type())); } - if physical_field.is_nullable() != logical_field.is_nullable() { + if physical_field.is_nullable() && !logical_field.is_nullable() { differences.push(format!("field nullability at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.is_nullable(), logical_field.is_nullable())); } } diff --git a/datafusion/core/src/schema_equivalence.rs b/datafusion/core/src/schema_equivalence.rs new file mode 100644 index 000000000000..f0d2acad6be9 --- /dev/null +++ b/datafusion/core/src/schema_equivalence.rs @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_schema::{DataType, Field, Fields, Schema}; + +/// Verifies whether the original planned schema can be satisfied with data +/// adhering to the candidate schema. In practice, this is equality check on the +/// schemas except that original schema can have nullable fields where candidate +/// is constrained to not provide null data. +pub(crate) fn schema_satisfied_by(original: &Schema, candidate: &Schema) -> bool { + original.metadata() == candidate.metadata() + && fields_satisfied_by(original.fields(), candidate.fields()) +} + +/// See [`schema_satisfied_by`] for the contract. +fn fields_satisfied_by(original: &Fields, candidate: &Fields) -> bool { + original.len() == candidate.len() + && original + .iter() + .zip(candidate) + .all(|(original, candidate)| field_satisfied_by(original, candidate)) +} + +/// See [`schema_satisfied_by`] for the contract. +fn field_satisfied_by(original: &Field, candidate: &Field) -> bool { + original.name() == candidate.name() + && (original.is_nullable() || !candidate.is_nullable()) + && original.metadata() == candidate.metadata() + && data_type_satisfied_by(original.data_type(), candidate.data_type()) +} + +/// See [`schema_satisfied_by`] for the contract. +fn data_type_satisfied_by(original: &DataType, candidate: &DataType) -> bool { + match (original, candidate) { + (DataType::List(original_field), DataType::List(candidate_field)) => { + field_satisfied_by(original_field, candidate_field) + } + + (DataType::ListView(original_field), DataType::ListView(candidate_field)) => { + field_satisfied_by(original_field, candidate_field) + } + + ( + DataType::FixedSizeList(original_field, original_size), + DataType::FixedSizeList(candidate_field, candidate_size), + ) => { + original_size == candidate_size + && field_satisfied_by(original_field, candidate_field) + } + + (DataType::LargeList(original_field), DataType::LargeList(candidate_field)) => { + field_satisfied_by(original_field, candidate_field) + } + + ( + DataType::LargeListView(original_field), + DataType::LargeListView(candidate_field), + ) => field_satisfied_by(original_field, candidate_field), + + (DataType::Struct(original_fields), DataType::Struct(candidate_fields)) => { + fields_satisfied_by(original_fields, candidate_fields) + } + + // TODO (DataType::Union(, _), DataType::Union(_, _)) => {} + // TODO (DataType::Dictionary(_, _), DataType::Dictionary(_, _)) => {} + // TODO (DataType::Map(_, _), DataType::Map(_, _)) => {} + // TODO (DataType::RunEndEncoded(_, _), DataType::RunEndEncoded(_, _)) => {} + _ => original == candidate, + } +} diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index 484743fc1664..dfac9c031074 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -850,3 +850,13 @@ FROM ( ---- NULL false foo true + +query T +SELECT combined +FROM ( + SELECT concat('A', 'B') AS combined UNION ALL + SELECT concat('A', 'B') AS combined +) +GROUP BY combined +---- +AB