From 479a277e4573e31f4a7897d78109259f4ef66104 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Fri, 7 Feb 2025 18:08:02 +0100 Subject: [PATCH] Relax physical schema validation (#14519) Physical plan can be further optimized. In particular, an expression can be determined as never null even if it wasn't known at the time of logical planning. Thus, the final schema check needs to be relax, allowing now-non-null data where nullable data was expected. This replaces schema equality check, with asymmetric "is satisfied by" relation. --- datafusion/core/src/lib.rs | 1 + datafusion/core/src/physical_planner.rs | 8 +- datafusion/core/src/schema_equivalence.rs | 84 ++++++++++++++++++++ datafusion/sqllogictest/test_files/union.slt | 10 +++ 4 files changed, 101 insertions(+), 2 deletions(-) create mode 100644 datafusion/core/src/schema_equivalence.rs diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 9d42580178a4..70b302a55c22 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -812,6 +812,7 @@ pub mod variable { #[cfg(not(target_arch = "wasm32"))] pub mod test; +mod schema_equivalence; pub mod test_util; #[cfg(doctest)] diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index cee3acc08dae..d96e60c25f40 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -89,6 +89,7 @@ use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::unnest::ListUnnest; use datafusion_sql::utils::window_expr_common_partition_keys; +use crate::schema_equivalence::schema_satisfied_by; use async_trait::async_trait; use futures::{StreamExt, TryStreamExt}; use itertools::{multiunzip, Itertools}; @@ -659,7 +660,10 @@ impl DefaultPhysicalPlanner { let physical_input_schema_from_logical = logical_input_schema.inner(); if !options.execution.skip_physical_aggregate_schema_check - && &physical_input_schema != physical_input_schema_from_logical + && !schema_satisfied_by( + physical_input_schema_from_logical, + &physical_input_schema, + ) { let mut differences = Vec::new(); if physical_input_schema.fields().len() @@ -688,7 +692,7 @@ impl DefaultPhysicalPlanner { if physical_field.data_type() != logical_field.data_type() { differences.push(format!("field data type at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.data_type(), logical_field.data_type())); } - if physical_field.is_nullable() != logical_field.is_nullable() { + if physical_field.is_nullable() && !logical_field.is_nullable() { differences.push(format!("field nullability at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.is_nullable(), logical_field.is_nullable())); } } diff --git a/datafusion/core/src/schema_equivalence.rs b/datafusion/core/src/schema_equivalence.rs new file mode 100644 index 000000000000..f0d2acad6be9 --- /dev/null +++ b/datafusion/core/src/schema_equivalence.rs @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_schema::{DataType, Field, Fields, Schema}; + +/// Verifies whether the original planned schema can be satisfied with data +/// adhering to the candidate schema. In practice, this is equality check on the +/// schemas except that original schema can have nullable fields where candidate +/// is constrained to not provide null data. +pub(crate) fn schema_satisfied_by(original: &Schema, candidate: &Schema) -> bool { + original.metadata() == candidate.metadata() + && fields_satisfied_by(original.fields(), candidate.fields()) +} + +/// See [`schema_satisfied_by`] for the contract. +fn fields_satisfied_by(original: &Fields, candidate: &Fields) -> bool { + original.len() == candidate.len() + && original + .iter() + .zip(candidate) + .all(|(original, candidate)| field_satisfied_by(original, candidate)) +} + +/// See [`schema_satisfied_by`] for the contract. +fn field_satisfied_by(original: &Field, candidate: &Field) -> bool { + original.name() == candidate.name() + && (original.is_nullable() || !candidate.is_nullable()) + && original.metadata() == candidate.metadata() + && data_type_satisfied_by(original.data_type(), candidate.data_type()) +} + +/// See [`schema_satisfied_by`] for the contract. +fn data_type_satisfied_by(original: &DataType, candidate: &DataType) -> bool { + match (original, candidate) { + (DataType::List(original_field), DataType::List(candidate_field)) => { + field_satisfied_by(original_field, candidate_field) + } + + (DataType::ListView(original_field), DataType::ListView(candidate_field)) => { + field_satisfied_by(original_field, candidate_field) + } + + ( + DataType::FixedSizeList(original_field, original_size), + DataType::FixedSizeList(candidate_field, candidate_size), + ) => { + original_size == candidate_size + && field_satisfied_by(original_field, candidate_field) + } + + (DataType::LargeList(original_field), DataType::LargeList(candidate_field)) => { + field_satisfied_by(original_field, candidate_field) + } + + ( + DataType::LargeListView(original_field), + DataType::LargeListView(candidate_field), + ) => field_satisfied_by(original_field, candidate_field), + + (DataType::Struct(original_fields), DataType::Struct(candidate_fields)) => { + fields_satisfied_by(original_fields, candidate_fields) + } + + // TODO (DataType::Union(, _), DataType::Union(_, _)) => {} + // TODO (DataType::Dictionary(_, _), DataType::Dictionary(_, _)) => {} + // TODO (DataType::Map(_, _), DataType::Map(_, _)) => {} + // TODO (DataType::RunEndEncoded(_, _), DataType::RunEndEncoded(_, _)) => {} + _ => original == candidate, + } +} diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index 484743fc1664..dfac9c031074 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -850,3 +850,13 @@ FROM ( ---- NULL false foo true + +query T +SELECT combined +FROM ( + SELECT concat('A', 'B') AS combined UNION ALL + SELECT concat('A', 'B') AS combined +) +GROUP BY combined +---- +AB