Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: EnforceSorting should not remove a needed coalesces #14637

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

wiedld
Copy link
Contributor

@wiedld wiedld commented Feb 12, 2025

Which issue does this PR close?

I'll make a new issue, once we confirm this reproducer is a bug.

Closes: #14691

Rationale for this change

We have physical plans which are failing in the sanity checker. Taking the plan as it is when provided to the EnforceSorting, it is a valid plan (defined "valid" as passing the sanity check). After the EnforceSorting, it fails due to the removal of the needed coalesce.

I've added a temporary patch to our forked datafusion which adds to this conditional, in order to prevent the removal of the coalesce. However, I'm unclear whether a proper fix should be the replacement of the coalesce with an SPM at the correct location (instead of placing the SPM after the aggregation, which no longer fulfills the distribution requirements of the aggregate).

What changes are included in this PR?

Demonstrate the bug.

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the core Core DataFusion crate label Feb 12, 2025
@wiedld
Copy link
Contributor Author

wiedld commented Feb 12, 2025

Asking for advice from @alamb, @mustafasrepo , or anyone else on the expected behavior. 🙏🏼

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @wiedld -- I agree this is a bug

cc @berkaysynnada

vec![
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is a bug -- the AggregateExec (with mode=SinglePartitioned) requires the input to have a single partition

In the input plan, CoalescePartitionsExec makes a single partition

Int he new plan, that CoalescePartitionsExec is removed incorrectly

I would expect that the AggregateExec reports that its input distribution requirement was required a single partition and that the enforce sorting pass would respect this

However, the actual AggregateExec::required_input_distribution seems somewhat more subtle.:

https://docs.rs/datafusion-physical-plan/45.0.0/src/datafusion_physical_plan/aggregates/mod.rs.html#812-824

I think it is saying the input needs to be hash partitioned by the group keys (which this plan has clearly violated)

Comment on lines +129 to 134
///
/// The tracker halts at each [`SortExec`] (where the SPM will act to replace the coalesce).
///
/// This requires a bottom-up traversal was previously performed, updating the
/// children previously.
pub type PlanWithCorrespondingCoalescePartitions = PlanContext<bool>;
Copy link
Contributor Author

@wiedld wiedld Feb 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a refactor. See this commit.

  • The parallelize_sorts and its helper remove_bottleneck_in_subplan removed more nodes than it should, including needed nodes. Those nodes were then added back in a few conditionals.
  • Instead, if I tightened up the context PlanWithCorrespondingCoalescePartitions and how it was built in update_coalesce_ctx_children -- then (a) it avoids excess node removal, and also (b) we no longer need to add back nodes later.

Comment on lines -301 to -314
} else if is_coalesce_partitions(&requirements.plan) {
// There is an unnecessary `CoalescePartitionsExec` in the plan.
// This will handle the recursive `CoalescePartitionsExec` plans.
} else if unneeded_coalesce {
requirements = remove_bottleneck_in_subplan(requirements)?;
// For the removal of self node which is also a `CoalescePartitionsExec`.
requirements = requirements.children.swap_remove(0);
requirements = requirements.update_plan_from_children()?;

Ok(Transformed::yes(
PlanWithCorrespondingCoalescePartitions::new(
Arc::new(CoalescePartitionsExec::new(Arc::clone(&requirements.plan))),
false,
vec![requirements],
),
))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the context refactor, the remove_bottleneck_in_subplan is no longer removing too many nodes.
As a result, here we no longer have to add back the coalesce again (old line 310).

@wiedld wiedld force-pushed the wiedld/enforce-sorting-removes-coalesce branch from 05ff553 to 411d882 Compare February 15, 2025 04:47
@wiedld wiedld force-pushed the wiedld/enforce-sorting-removes-coalesce branch from 411d882 to 89556c2 Compare February 15, 2025 04:49
);
let parent_req_single_partition = matches!(parent.required_input_distribution()[0], Distribution::SinglePartition)
// handle aggregates with input=hashPartitioning with a single output partition
|| (is_aggregate(parent) && parent.properties().output_partitioning().partition_count() <= 1);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This single line is the fix for our found issue.
It's also isolated in it's own commit, with the fixed test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb -- lmk if you want the refactor to go in first, in a separate PR, before the the reproducer + fix PR.

@wiedld wiedld marked this pull request as ready for review February 15, 2025 05:31
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The title of this PR is still

Demonstrate EnforceSorting can remove a needed coalesce #14637

But it seems like it is actually a bug fix

I'll make a new issue, once we confirm this reproducer is a bug.

Thank you -- that would be super helpful -- specifically some sort of end-user description would be most helpful. Like "what symptom in what circumstance would a user see this" (does it only affect systems like influxdb_iox that use low level plans?)

@xudong963
Copy link
Member

I plan to review it tomorrow or next Monday, thank you @wiedld

@wiedld wiedld changed the title Demonstrate EnforceSorting can remove a needed coalesce fix: EnforceSorting should not remove a needed coalesces Feb 15, 2025
@wiedld
Copy link
Contributor Author

wiedld commented Feb 16, 2025

I'll make a new issue, once we confirm this reproducer is a bug.

Thank you -- that would be super helpful -- specifically some sort of end-user description would be most helpful. Like "what symptom in what circumstance would a user see this" (does it only affect systems like influxdb_iox that use low level plans?)

See new issue #14691

/// ```
/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s.
/// By performing sorting in parallel, we can increase performance in some scenarios.
///
/// This requires that there are no nodes between the [`SortExec`] and [`CoalescePartitionsExec`]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// This requires that there are no nodes between the [`SortExec`] and [`CoalescePartitionsExec`]
/// This requires that there are nodes between the [`SortExec`] and [`CoalescePartitionsExec`]

?

Copy link
Contributor Author

@wiedld wiedld Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think I need better words. 😆

The context is made to find linked Sort->Coalesce cascades.

SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
   ...nodes...   ctx.data=true (e.g. are linked in cascade)
         Coalesce  ctx.data=true (e.g. is a coalesce) 

This linkage is then used to say "if we find a sort, remove the linked coalesces from the subplan". Specifically, this code. If the link is broken, a.k.a. if ctx.data=false, then stop going down the subplan looking for coalesce to remove.

So the link only exists as long as "no nodes" break the link.

Example of an unlinked Coalesce->Sort, since the aggregate requires the coalesce for single partitioned input:

SortExec ctx.data=false, to halt remove_bottleneck_in_subplan)
   AggregateExec  ctx.data=false, to stop the link
      ...nodes...   ctx.data=true (e.g. are linked in cascade)
         Coalesce  ctx.data=true (e.g. is a coalesce) 

Copy link
Contributor Author

@wiedld wiedld Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be a better way to say/explain this? 🙏🏼

Maybe I should add docs to the update_coalesce_ctx_children which constructs this context? Or the wording on the current docs for PlanWithCorrespondingCoalescePartitions? 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I got it.

Now I think current doc is very good. 😆

@xudong963
Copy link
Member

Will continue to review after working out.

// halt coalesce removals at the sort
false
} else {
// propogate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

propagate?

Copy link
Member

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM

Comment on lines -549 to -561
let mut new_reqs = requirements.update_plan_from_children()?;
if let Some(repartition) = new_reqs.plan.as_any().downcast_ref::<RepartitionExec>() {
let input_partitioning = repartition.input().output_partitioning();
// We can remove this repartitioning operator if it is now a no-op:
let mut can_remove = input_partitioning.eq(repartition.partitioning());
// We can also remove it if we ended up with an ineffective RR:
if let Partitioning::RoundRobinBatch(n_out) = repartition.partitioning() {
can_remove |= *n_out == input_partitioning.partition_count();
}
if can_remove {
new_reqs = new_reqs.children.swap_remove(0)
}
}
Copy link
Contributor Author

@wiedld wiedld Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is about identifying (and removing) Repartition->Coalesce->Repartition, to make it only a singular repartition.

Since the removal decisions were already being made when the context is built, I consolidated this removal decision to the same place (update_coalesce_ctx_children).

@alamb
Copy link
Contributor

alamb commented Feb 18, 2025

Thanks for working on this one. I am almost out of time today for reviews but I can take a look tomorrow (or feel free to merge it in and I will review afterwards)

Comment on lines +301 to +306
/// ```text
/// "SortExec: expr=\[a@0 ASC\]",
/// " ...nodes requiring single partitioning..."
/// " CoalescePartitionsExec",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// ```
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

Copy link
Member

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I think it's good to go if the PR blocks something from your side.

@berkaysynnada
Copy link
Contributor

I'd like to take a look at this tomorrow, please hold if it's not urgent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate optimizer Optimizer rules
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ParallelizeSorts, a subrule of EnforceSorting optimizer, should not remove necessary coalesce.
4 participants