Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: EnforceSorting should not remove a needed coalesces #14637

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ fn csv_exec_multiple_sorted(output_ordering: Vec<LexOrdering>) -> Arc<DataSource
.new_exec()
}

fn projection_exec_with_alias(
pub(crate) fn projection_exec_with_alias(
input: Arc<dyn ExecutionPlan>,
alias_pairs: Vec<(String, String)>,
) -> Arc<dyn ExecutionPlan> {
Expand Down
51 changes: 50 additions & 1 deletion datafusion/core/tests/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

use std::sync::Arc;

use crate::physical_optimizer::enforce_distribution::projection_exec_with_alias;
use crate::physical_optimizer::sanity_checker::assert_sanity_check;
use crate::physical_optimizer::test_utils::{
aggregate_exec, bounded_window_exec, bounded_window_exec_non_set_monotonic,
bounded_window_exec_with_partition, check_integrity, coalesce_batches_exec,
coalesce_partitions_exec, create_test_schema, create_test_schema2,
create_test_schema3, create_test_schema4, filter_exec, global_limit_exec,
hash_join_exec, limit_exec, local_limit_exec, memory_exec, parquet_exec,
repartition_exec, sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec,
parquet_exec_with_stats, repartition_exec, schema, single_partitioned_aggregate,
sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec,
sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch,
spr_repartition_exec, stream_exec_ordered, union_exec, RequirementsTestExec,
};
Expand Down Expand Up @@ -2280,3 +2283,49 @@ async fn test_not_replaced_with_partial_sort_for_unbounded_input() -> Result<()>
assert_optimized!(expected_input, expected_no_change, physical_plan, true);
Ok(())
}

#[tokio::test]
async fn test_preserve_needed_coalesce() -> Result<()> {
// Input to EnforceSorting, from our test case.
let plan = projection_exec_with_alias(
union_exec(vec![parquet_exec_with_stats(); 2]),
vec![
("a".to_string(), "a".to_string()),
("b".to_string(), "value".to_string()),
],
);
let plan = Arc::new(CoalescePartitionsExec::new(plan));
let schema = schema();
let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}]);
let plan: Arc<dyn ExecutionPlan> =
single_partitioned_aggregate(plan, vec![("a".to_string(), "a1".to_string())]);
let plan = sort_exec(sort_key, plan);

// Starting plan: as in our test case.
let starting_plan = vec![
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
" CoalescePartitionsExec",
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
" UnionExec",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
];
assert_eq!(get_plan_string(&plan), starting_plan);

// Test: plan is valid.
assert_sanity_check(&plan, true);

// EnforceSorting will not remove the coalesce, as it's required.
let optimizer = EnforceSorting::new();
let optimized = optimizer.optimize(plan, &Default::default())?;
assert_eq!(get_plan_string(&optimized), starting_plan);

// Test: plan is valid.
assert_sanity_check(&optimized, true);

Ok(())
}
2 changes: 1 addition & 1 deletion datafusion/core/tests/physical_optimizer/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ fn create_test_schema2() -> SchemaRef {
}

/// Check if sanity checker should accept or reject plans.
fn assert_sanity_check(plan: &Arc<dyn ExecutionPlan>, is_sane: bool) {
pub(crate) fn assert_sanity_check(plan: &Arc<dyn ExecutionPlan>, is_sane: bool) {
let sanity_checker = SanityCheckPlan::new();
let opts = ConfigOptions::default();
assert_eq!(
Expand Down
65 changes: 64 additions & 1 deletion datafusion/core/tests/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ use arrow::record_batch::RecordBatch;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
use datafusion_common::config::ConfigOptions;
use datafusion_common::stats::Precision;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::utils::expr::COUNT_STAR_EXPANSION;
use datafusion_common::{JoinType, Result};
use datafusion_common::{ColumnStatistics, JoinType, Result, Statistics};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::{WindowFrame, WindowFunctionDefinition};
Expand Down Expand Up @@ -102,6 +103,44 @@ pub fn schema() -> SchemaRef {
]))
}

fn int64_stats() -> ColumnStatistics {
ColumnStatistics {
null_count: Precision::Absent,
sum_value: Precision::Absent,
max_value: Precision::Exact(1_000_000.into()),
min_value: Precision::Exact(0.into()),
distinct_count: Precision::Absent,
}
}

fn column_stats() -> Vec<ColumnStatistics> {
vec![
int64_stats(), // a
int64_stats(), // b
int64_stats(), // c
ColumnStatistics::default(),
ColumnStatistics::default(),
]
}

/// Create parquet datasource exec using schema from [`schema`].
pub(crate) fn parquet_exec_with_stats() -> Arc<DataSourceExec> {
let mut statistics = Statistics::new_unknown(&schema());
statistics.num_rows = Precision::Inexact(10);
statistics.column_statistics = column_stats();

let config = FileScanConfig::new(
ObjectStoreUrl::parse("test:///").unwrap(),
schema(),
Arc::new(ParquetSource::new(Default::default())),
)
.with_file(PartitionedFile::new("x".to_string(), 10000))
.with_statistics(statistics);
assert_eq!(config.statistics.num_rows, Precision::Inexact(10));

config.new_exec()
}

pub fn create_test_schema() -> Result<SchemaRef> {
let nullable_column = Field::new("nullable_col", DataType::Int32, true);
let non_nullable_column = Field::new("non_nullable_col", DataType::Int32, false);
Expand Down Expand Up @@ -575,6 +614,30 @@ pub fn build_group_by(input_schema: &SchemaRef, columns: Vec<String>) -> Physica
PhysicalGroupBy::new_single(group_by_expr.clone())
}

pub(crate) fn single_partitioned_aggregate(
input: Arc<dyn ExecutionPlan>,
alias_pairs: Vec<(String, String)>,
) -> Arc<dyn ExecutionPlan> {
let schema = schema();
let group_by = alias_pairs
.iter()
.map(|(column, alias)| (col(column, &input.schema()).unwrap(), alias.to_string()))
.collect::<Vec<_>>();
let group_by = PhysicalGroupBy::new_single(group_by);

Arc::new(
AggregateExec::try_new(
AggregateMode::SinglePartitioned,
group_by,
vec![],
vec![],
input,
schema,
)
.unwrap(),
)
}

pub fn assert_plan_matches_expected(
plan: &Arc<dyn ExecutionPlan>,
expected: &[&str],
Expand Down
142 changes: 89 additions & 53 deletions datafusion/physical-optimizer/src/enforce_sorting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ use crate::enforce_sorting::sort_pushdown::{
assign_initial_requirements, pushdown_sorts, SortPushDown,
};
use crate::utils::{
add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit,
is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window,
add_sort_above, add_sort_above_with_check, is_aggregate, is_coalesce_partitions,
is_limit, is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window,
};
use crate::PhysicalOptimizerRule;

use datafusion_common::config::ConfigOptions;
use datafusion_common::plan_err;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::Result;
use datafusion_physical_expr::{Distribution, Partitioning};
use datafusion_physical_expr::Distribution;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
Expand Down Expand Up @@ -126,29 +126,65 @@ fn update_sort_ctx_children(
/// [`CoalescePartitionsExec`] descendant(s) for every child of a plan. The data
/// attribute stores whether the plan is a `CoalescePartitionsExec` or is
/// connected to a `CoalescePartitionsExec` via its children.
///
/// The tracker halts at each [`SortExec`] (where the SPM will act to replace the coalesce).
///
/// This requires a bottom-up traversal was previously performed, updating the
/// children previously.
pub type PlanWithCorrespondingCoalescePartitions = PlanContext<bool>;
Comment on lines +129 to 134
Copy link
Contributor Author

@wiedld wiedld Feb 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a refactor. See this commit.

  • The parallelize_sorts and its helper remove_bottleneck_in_subplan removed more nodes than it should, including needed nodes. Those nodes were then added back in a few conditionals.
  • Instead, if I tightened up the context PlanWithCorrespondingCoalescePartitions and how it was built in update_coalesce_ctx_children -- then (a) it avoids excess node removal, and also (b) we no longer need to add back nodes later.


/// Determines if the coalesce may be safely removed.
fn is_coalesce_to_remove(
node: &Arc<dyn ExecutionPlan>,
parent: &Arc<dyn ExecutionPlan>,
) -> bool {
node.as_any().downcast_ref::<CoalescePartitionsExec>()
.map(|_coalesce| {
// TODO(wiedld): find a more generalized approach that does not rely on
// pattern matching the structure of the DAG
// Note that the `Partitioning::satisfy()` (parent vs. coalesce.child) cannot be used for cases of:
// * Repartition -> Coalesce -> Repartition
// * Coalesce -> AggregateExec(input=hash-partitioned)

let parent_req_single_partition = matches!(parent.required_input_distribution()[0], Distribution::SinglePartition)
// handle aggregates with input=hashPartitioning with a single output partition
|| (is_aggregate(parent) && parent.properties().output_partitioning().partition_count() <= 1);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This single line is the fix for our found issue.
It's also isolated in it's own commit, with the fixed test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb -- lmk if you want the refactor to go in first, in a separate PR, before the the reproducer + fix PR.


// node above does not require single distribution
!parent_req_single_partition
// it doesn't immediately repartition
|| is_repartition(parent)
// any adjacent Coalesce->Sort can be replaced
|| is_sort(parent)
}).unwrap_or(false)
}

fn update_coalesce_ctx_children(
coalesce_context: &mut PlanWithCorrespondingCoalescePartitions,
) {
let children = &coalesce_context.children;
coalesce_context.data = if children.is_empty() {
// Plan has no children, it cannot be a `CoalescePartitionsExec`.
false
} else if is_coalesce_partitions(&coalesce_context.plan) {
// Initiate a connection:
true
} else {
children.iter().enumerate().any(|(idx, node)| {
// Only consider operators that don't require a single partition,
// and connected to some `CoalescePartitionsExec`:
node.data
&& !matches!(
coalesce_context.plan.required_input_distribution()[idx],
Distribution::SinglePartition
)
})
};
// perform lookahead(1) during bottom up traversal
// since we are checking distribution requirements after the coalesce occurs
let parent = &coalesce_context.plan;

for child_context in coalesce_context.children.iter_mut() {
// determine if child, or it's descendents, are a coalesce to be removed
child_context.data = if child_context.children.is_empty() {
// Plan has no children, it cannot be a `CoalescePartitionsExec`.
false
} else if is_coalesce_to_remove(&child_context.plan, parent) {
// Initiate a connection:
true
} else if is_sort(&child_context.plan) {
// halt coalesce removals at the sort
false
} else {
// propogate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

propagate?

child_context
.children
.iter()
.any(|grandchild| grandchild.data)
};
}
}

/// The boolean flag `repartition_sorts` defined in the config indicates
Expand Down Expand Up @@ -246,32 +282,50 @@ fn replace_with_partial_sort(
/// This function turns plans of the form
/// ```text
/// "SortExec: expr=\[a@0 ASC\]",
/// " CoalescePartitionsExec",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// " ...nodes..."
/// " CoalescePartitionsExec",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// ```
/// to
/// ```text
/// "SortPreservingMergeExec: \[a@0 ASC\]",
/// " SortExec: expr=\[a@0 ASC\]",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// " ...nodes..."
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// ```
/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s.
/// By performing sorting in parallel, we can increase performance in some scenarios.
///
/// This requires that there are no nodes between the [`SortExec`] and [`CoalescePartitionsExec`]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// This requires that there are no nodes between the [`SortExec`] and [`CoalescePartitionsExec`]
/// This requires that there are nodes between the [`SortExec`] and [`CoalescePartitionsExec`]

?

Copy link
Contributor Author

@wiedld wiedld Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think I need better words. 😆

The context is made to find linked Sort->Coalesce cascades.

SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
   ...nodes...   ctx.data=true (e.g. are linked in cascade)
         Coalesce  ctx.data=true (e.g. is a coalesce) 

This linkage is then used to say "if we find a sort, remove the linked coalesces from the subplan". Specifically, this code. If the link is broken, a.k.a. if ctx.data=false, then stop going down the subplan looking for coalesce to remove.

So the link only exists as long as "no nodes" break the link.

Example of an unlinked Coalesce->Sort, since the aggregate requires the coalesce for single partitioned input:

SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
   AggregateExec  ctx.data=false, to stop the link
      ...nodes...   ctx.data=true (e.g. are linked in cascade)
         Coalesce  ctx.data=true (e.g. is a coalesce) 

Copy link
Contributor Author

@wiedld wiedld Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be a better way to say/explain this? 🙏🏼

Maybe I should add docs to the update_coalesce_ctx_children which constructs this context? Or the wording on the current docs for PlanWithCorrespondingCoalescePartitions? 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I got it.

Now I think current doc is very good. 😆

/// which require single partitioning. Do not parallelize when the following scenario occurs:
/// ```text
/// "SortExec: expr=\[a@0 ASC\]",
/// " ...nodes requiring single partitioning..."
/// " CoalescePartitionsExec",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// ```
Comment on lines +301 to +306
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

pub fn parallelize_sorts(
mut requirements: PlanWithCorrespondingCoalescePartitions,
) -> Result<Transformed<PlanWithCorrespondingCoalescePartitions>> {
requirements = requirements.update_plan_from_children()?;
update_coalesce_ctx_children(&mut requirements);
let coalesce_can_be_removed = requirements.children.iter().any(|child| child.data);

let should_parallelize_sort = (is_sort(&requirements.plan)
|| is_sort_preserving_merge(&requirements.plan))
&& requirements.plan.output_partitioning().partition_count() <= 1
&& coalesce_can_be_removed;

// Repartition -> Coalesce -> Repartition
let unneeded_coalesce = is_repartition(&requirements.plan) && coalesce_can_be_removed;

if requirements.children.is_empty() || !requirements.children[0].data {
// We only take an action when the plan is either a `SortExec`, a
// `SortPreservingMergeExec` or a `CoalescePartitionsExec`, and they
// all have a single child. Therefore, if the first child has no
// connection, we can return immediately.
Ok(Transformed::no(requirements))
} else if (is_sort(&requirements.plan)
|| is_sort_preserving_merge(&requirements.plan))
&& requirements.plan.output_partitioning().partition_count() <= 1
{
} else if should_parallelize_sort {
// Take the initial sort expressions and requirements
let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?;
let sort_reqs = LexRequirement::from(sort_exprs.clone());
Expand All @@ -286,8 +340,11 @@ pub fn parallelize_sorts(
// We also need to remove the self node since `remove_corresponding_coalesce_in_sub_plan`
// deals with the children and their children and so on.
requirements = requirements.children.swap_remove(0);
// sync the requirements.plan.children with the mutated requirements.children
requirements = requirements.update_plan_from_children()?;

requirements = add_sort_above_with_check(requirements, sort_reqs, fetch);
requirements = requirements.update_plan_from_children()?;

let spm =
SortPreservingMergeExec::new(sort_exprs, Arc::clone(&requirements.plan));
Expand All @@ -298,20 +355,11 @@ pub fn parallelize_sorts(
vec![requirements],
),
))
} else if is_coalesce_partitions(&requirements.plan) {
// There is an unnecessary `CoalescePartitionsExec` in the plan.
// This will handle the recursive `CoalescePartitionsExec` plans.
} else if unneeded_coalesce {
requirements = remove_bottleneck_in_subplan(requirements)?;
// For the removal of self node which is also a `CoalescePartitionsExec`.
requirements = requirements.children.swap_remove(0);
requirements = requirements.update_plan_from_children()?;

Ok(Transformed::yes(
PlanWithCorrespondingCoalescePartitions::new(
Arc::new(CoalescePartitionsExec::new(Arc::clone(&requirements.plan))),
false,
vec![requirements],
),
))
Comment on lines -301 to -314
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the context refactor, the remove_bottleneck_in_subplan is no longer removing too many nodes.
As a result, here we no longer have to add back the coalesce again (old line 310).

Ok(Transformed::yes(requirements))
} else {
Ok(Transformed::yes(requirements))
}
Expand Down Expand Up @@ -546,19 +594,7 @@ fn remove_bottleneck_in_subplan(
})
.collect::<Result<_>>()?;
}
let mut new_reqs = requirements.update_plan_from_children()?;
if let Some(repartition) = new_reqs.plan.as_any().downcast_ref::<RepartitionExec>() {
let input_partitioning = repartition.input().output_partitioning();
// We can remove this repartitioning operator if it is now a no-op:
let mut can_remove = input_partitioning.eq(repartition.partitioning());
// We can also remove it if we ended up with an ineffective RR:
if let Partitioning::RoundRobinBatch(n_out) = repartition.partitioning() {
can_remove |= *n_out == input_partitioning.partition_count();
}
if can_remove {
new_reqs = new_reqs.children.swap_remove(0)
}
}
Comment on lines -549 to -561
Copy link
Contributor Author

@wiedld wiedld Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is about identifying (and removing) Repartition->Coalesce->Repartition, to make it only a singular repartition.

Since the removal decisions were already being made when the context is built, I consolidated this removal decision to the same place (update_coalesce_ctx_children).

let new_reqs = requirements.update_plan_from_children()?;
Ok(new_reqs)
}

Expand Down
Loading
Loading