Skip to content

Commit

Permalink
fix: refactor update_coalesce_ctx_children to more tightly define the…
Browse files Browse the repository at this point in the history
… context, selectively indicating when coalesce should be removed
  • Loading branch information
wiedld committed Feb 15, 2025
1 parent 670eff3 commit 411d882
Showing 1 changed file with 89 additions and 51 deletions.
140 changes: 89 additions & 51 deletions datafusion/physical-optimizer/src/enforce_sorting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::plan_err;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::Result;
use datafusion_physical_expr::{Distribution, Partitioning};
use datafusion_physical_expr::Distribution;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
Expand Down Expand Up @@ -126,29 +126,67 @@ fn update_sort_ctx_children(
/// [`CoalescePartitionsExec`] descendant(s) for every child of a plan. The data
/// attribute stores whether the plan is a `CoalescePartitionsExec` or is
/// connected to a `CoalescePartitionsExec` via its children.
///
/// The tracker halts at each [`SortExec`] (where the SPM will act to replace the coalesce).
///
/// This requires a bottom-up traversal was previously performed, updating the
/// children previously.
pub type PlanWithCorrespondingCoalescePartitions = PlanContext<bool>;

/// Determines if the coalesce may be safely removed.
fn is_coalesce_to_remove(
node: &Arc<dyn ExecutionPlan>,
parent: &Arc<dyn ExecutionPlan>,
) -> bool {
node.as_any()
.downcast_ref::<CoalescePartitionsExec>()
.map(|_coalesce| {
// TODO(wiedld): find a more generalized approach that does not rely on
// pattern matching the structure of the DAG
// Note that the `Partitioning::satisfy()` (parent vs. coalesce.child) cannot be used for cases of:
// * Repartition -> Coalesce -> Repartition

let parent_req_single_partition = matches!(
parent.required_input_distribution()[0],
Distribution::SinglePartition
);

// node above does not require single distribution
!parent_req_single_partition
// it doesn't immediately repartition
|| is_repartition(parent)
// any adjacent Coalesce->Sort can be replaced
|| is_sort(parent)
})
.unwrap_or(false)
}

fn update_coalesce_ctx_children(
coalesce_context: &mut PlanWithCorrespondingCoalescePartitions,
) {
let children = &coalesce_context.children;
coalesce_context.data = if children.is_empty() {
// Plan has no children, it cannot be a `CoalescePartitionsExec`.
false
} else if is_coalesce_partitions(&coalesce_context.plan) {
// Initiate a connection:
true
} else {
children.iter().enumerate().any(|(idx, node)| {
// Only consider operators that don't require a single partition,
// and connected to some `CoalescePartitionsExec`:
node.data
&& !matches!(
coalesce_context.plan.required_input_distribution()[idx],
Distribution::SinglePartition
)
})
};
// perform lookahead(1) during bottom up traversal
// since we are checking distribution requirements after the coalesce occurs
let parent = &coalesce_context.plan;

for child_context in coalesce_context.children.iter_mut() {
// determine if child, or it's descendents, are a coalesce to be removed
child_context.data = if child_context.children.is_empty() {
// Plan has no children, it cannot be a `CoalescePartitionsExec`.
false
} else if is_coalesce_to_remove(&child_context.plan, parent) {
// Initiate a connection:
true
} else if is_sort(&child_context.plan) {
// halt coalesce removals at the sort
false
} else {
// propogate
child_context
.children
.iter()
.any(|grandchild| grandchild.data)
};
}
}

/// The boolean flag `repartition_sorts` defined in the config indicates
Expand Down Expand Up @@ -246,32 +284,50 @@ fn replace_with_partial_sort(
/// This function turns plans of the form
/// ```text
/// "SortExec: expr=\[a@0 ASC\]",
/// " CoalescePartitionsExec",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// " ...nodes..."
/// " CoalescePartitionsExec",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// ```
/// to
/// ```text
/// "SortPreservingMergeExec: \[a@0 ASC\]",
/// " SortExec: expr=\[a@0 ASC\]",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// " ...nodes..."
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// ```
/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s.
/// By performing sorting in parallel, we can increase performance in some scenarios.
///
/// This requires that there are no nodes between the [`SortExec`] and [`CoalescePartitionsExec`]
/// which require single partitioning. Do not parallelize when the following scenario occurs:
/// ```text
/// "SortExec: expr=\[a@0 ASC\]",
/// " ...nodes requiring single partitioning..."
/// " CoalescePartitionsExec",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// ```
pub fn parallelize_sorts(
mut requirements: PlanWithCorrespondingCoalescePartitions,
) -> Result<Transformed<PlanWithCorrespondingCoalescePartitions>> {
requirements = requirements.update_plan_from_children()?;
update_coalesce_ctx_children(&mut requirements);
let coalesce_can_be_removed = requirements.children.iter().any(|child| child.data);

let should_parallelize_sort = (is_sort(&requirements.plan)
|| is_sort_preserving_merge(&requirements.plan))
&& requirements.plan.output_partitioning().partition_count() <= 1
&& coalesce_can_be_removed;

// Repartition -> Coalesce -> Repartition
let unneeded_coalesce = is_repartition(&requirements.plan) && coalesce_can_be_removed;

if requirements.children.is_empty() || !requirements.children[0].data {
// We only take an action when the plan is either a `SortExec`, a
// `SortPreservingMergeExec` or a `CoalescePartitionsExec`, and they
// all have a single child. Therefore, if the first child has no
// connection, we can return immediately.
Ok(Transformed::no(requirements))
} else if (is_sort(&requirements.plan)
|| is_sort_preserving_merge(&requirements.plan))
&& requirements.plan.output_partitioning().partition_count() <= 1
{
} else if should_parallelize_sort {
// Take the initial sort expressions and requirements
let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?;
let sort_reqs = LexRequirement::from(sort_exprs.clone());
Expand All @@ -286,8 +342,11 @@ pub fn parallelize_sorts(
// We also need to remove the self node since `remove_corresponding_coalesce_in_sub_plan`
// deals with the children and their children and so on.
requirements = requirements.children.swap_remove(0);
// sync the requirements.plan.children with the mutated requirements.children
requirements = requirements.update_plan_from_children()?;

requirements = add_sort_above_with_check(requirements, sort_reqs, fetch);
requirements = requirements.update_plan_from_children()?;

let spm =
SortPreservingMergeExec::new(sort_exprs, Arc::clone(&requirements.plan));
Expand All @@ -298,20 +357,11 @@ pub fn parallelize_sorts(
vec![requirements],
),
))
} else if is_coalesce_partitions(&requirements.plan) {
// There is an unnecessary `CoalescePartitionsExec` in the plan.
// This will handle the recursive `CoalescePartitionsExec` plans.
} else if unneeded_coalesce {
requirements = remove_bottleneck_in_subplan(requirements)?;
// For the removal of self node which is also a `CoalescePartitionsExec`.
requirements = requirements.children.swap_remove(0);
requirements = requirements.update_plan_from_children()?;

Ok(Transformed::yes(
PlanWithCorrespondingCoalescePartitions::new(
Arc::new(CoalescePartitionsExec::new(Arc::clone(&requirements.plan))),
false,
vec![requirements],
),
))
Ok(Transformed::yes(requirements))
} else {
Ok(Transformed::yes(requirements))
}
Expand Down Expand Up @@ -546,19 +596,7 @@ fn remove_bottleneck_in_subplan(
})
.collect::<Result<_>>()?;
}
let mut new_reqs = requirements.update_plan_from_children()?;
if let Some(repartition) = new_reqs.plan.as_any().downcast_ref::<RepartitionExec>() {
let input_partitioning = repartition.input().output_partitioning();
// We can remove this repartitioning operator if it is now a no-op:
let mut can_remove = input_partitioning.eq(repartition.partitioning());
// We can also remove it if we ended up with an ineffective RR:
if let Partitioning::RoundRobinBatch(n_out) = repartition.partitioning() {
can_remove |= *n_out == input_partitioning.partition_count();
}
if can_remove {
new_reqs = new_reqs.children.swap_remove(0)
}
}
let new_reqs = requirements.update_plan_from_children()?;
Ok(new_reqs)
}

Expand Down

0 comments on commit 411d882

Please sign in to comment.