Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve EnforceSorting docs. #14673

Merged
merged 7 commits into from
Feb 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 90 additions & 27 deletions datafusion/physical-optimizer/src/enforce_sorting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties, InputOrde

use itertools::izip;

/// This rule inspects [`SortExec`]'s in the given physical plan and removes the
/// ones it can prove unnecessary.
/// This rule inspects [`SortExec`]'s in the given physical plan in order to
/// remove unnecessary sorts, and optimize sort performance across the plan.
#[derive(Default, Debug)]
pub struct EnforceSorting {}

Expand All @@ -84,42 +84,54 @@ impl EnforceSorting {
}
}

/// This object is used within the [`EnforceSorting`] rule to track the closest
/// This context object is used within the [`EnforceSorting`] rule to track the closest
/// [`SortExec`] descendant(s) for every child of a plan. The data attribute
/// stores whether the plan is a `SortExec` or is connected to a `SortExec`
/// via its children.
pub type PlanWithCorrespondingSort = PlanContext<bool>;

/// For a given node, update the [`PlanContext.data`] attribute.
///
/// If the node is a `SortExec`, or any of the node's children are a `SortExec`,
/// then set the attribute to true.
///
/// This requires a bottom-up traversal was previously performed, updating the
/// children previously.
fn update_sort_ctx_children_data(
mut node: PlanWithCorrespondingSort,
mut node_and_ctx: PlanWithCorrespondingSort,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

data: bool,
) -> Result<PlanWithCorrespondingSort> {
for child_node in node.children.iter_mut() {
let plan = &child_node.plan;
child_node.data = if is_sort(plan) {
// Initiate connection:
// Update `child.data` for all children.
for child_node in node_and_ctx.children.iter_mut() {
let child_plan = &child_node.plan;
child_node.data = if is_sort(child_plan) {
// child is sort
true
} else if is_limit(plan) {
} else if is_limit(child_plan) {
// There is no sort linkage for this path, it starts at a limit.
false
} else {
let is_spm = is_sort_preserving_merge(plan);
let required_orderings = plan.required_input_ordering();
let flags = plan.maintains_input_order();
// If a descendent is a sort, and the child maintains the sort.
let is_spm = is_sort_preserving_merge(child_plan);
let required_orderings = child_plan.required_input_ordering();
let flags = child_plan.maintains_input_order();
// Add parent node to the tree if there is at least one child with
// a sort connection:
izip!(flags, required_orderings).any(|(maintains, required_ordering)| {
let propagates_ordering =
(maintains && required_ordering.is_none()) || is_spm;
// `connected_to_sort` only returns the correct answer with bottom-up traversal
let connected_to_sort =
child_node.children.iter().any(|child| child.data);
propagates_ordering && connected_to_sort
})
}
}

node.data = data;
Ok(node)
// set data attribute on current node
node_and_ctx.data = data;

Ok(node_and_ctx)
}

/// This object is used within the [`EnforceSorting`] rule to track the closest
Expand Down Expand Up @@ -151,10 +163,15 @@ fn update_coalesce_ctx_children(
};
}

/// The boolean flag `repartition_sorts` defined in the config indicates
/// whether we elect to transform [`CoalescePartitionsExec`] + [`SortExec`] cascades
/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades, which enables us to
/// perform sorting in parallel.
/// Performs optimizations based upon a series of subrules.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

///
/// Refer to each subrule for detailed descriptions of the optimizations performed:
/// [`ensure_sorting`], [`parallelize_sorts`], [`replace_with_order_preserving_variants()`],
/// and [`pushdown_sorts`].
///
/// Subrule application is ordering dependent.
///
/// The subrule `parallelize_sorts` is only applied if `repartition_sorts` is enabled.
impl PhysicalOptimizerRule for EnforceSorting {
fn optimize(
&self,
Expand Down Expand Up @@ -243,20 +260,66 @@ fn replace_with_partial_sort(
Ok(plan)
}

/// This function turns plans of the form
/// Transform [`CoalescePartitionsExec`] + [`SortExec`] into
/// [`SortExec`] + [`SortPreservingMergeExec`] as illustrated below:
///
/// The [`CoalescePartitionsExec`] + [`SortExec`] cascades
/// combine the partitions first, and then sort:
/// ```text
/// ┌ ─ ─ ─ ─ ─ ┐
/// ┌─┬─┬─┐
/// ││B│A│D│... ├──┐
/// └─┴─┴─┘ │
/// └ ─ ─ ─ ─ ─ ┘ │ ┌────────────────────────┐ ┌ ─ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ─ ─ ┐
/// Partition 1 │ │ Coalesce │ ┌─┬─┬─┬─┬─┐ │ │ ┌─┬─┬─┬─┬─┐
/// ├──▶(no ordering guarantees)│──▶││B│E│A│D│C│...───▶ Sort ├───▶││A│B│C│D│E│... │
/// │ │ │ └─┴─┴─┴─┴─┘ │ │ └─┴─┴─┴─┴─┘
/// ┌ ─ ─ ─ ─ ─ ┐ │ └────────────────────────┘ └ ─ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ─ ─ ┘
/// ┌─┬─┐ │ Partition Partition
/// ││E│C│ ... ├──┘
/// └─┴─┘
/// └ ─ ─ ─ ─ ─ ┘
/// Partition 2
/// ```
///
///
/// The [`SortExec`] + [`SortPreservingMergeExec`] cascades
/// sorts each partition first, then merge partitions while retaining the sort:
/// ```text
/// ┌ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ┐
/// ┌─┬─┬─┐ │ │ ┌─┬─┬─┐
/// ││B│A│D│... │──▶│ Sort │──▶││A│B│D│... │──┐
/// └─┴─┴─┘ │ │ └─┴─┴─┘ │
/// └ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ┘ │ ┌─────────────────────┐ ┌ ─ ─ ─ ─ ─ ─ ─ ┐
/// Partition 1 Partition 1 │ │ │ ┌─┬─┬─┬─┬─┐
/// ├──▶ SortPreservingMerge ├───▶││A│B│C│D│E│... │
/// │ │ │ └─┴─┴─┴─┴─┘
/// ┌ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ┐ │ └─────────────────────┘ └ ─ ─ ─ ─ ─ ─ ─ ┘
/// ┌─┬─┐ │ │ ┌─┬─┐ │ Partition
/// ││E│C│ ... │──▶│ Sort ├──▶││C│E│ ... │──┘
/// └─┴─┘ │ │ └─┴─┘
/// └ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ┘
/// Partition 2 Partition 2
/// ```
///
/// The latter [`SortExec`] + [`SortPreservingMergeExec`] cascade performs the
/// sort first on a per-partition basis, thereby parallelizing the sort.
///
///
/// The outcome is that plans of the form
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

/// ```text
/// "SortExec: expr=\[a@0 ASC\]",
/// " CoalescePartitionsExec",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// " ...nodes..."
/// " CoalescePartitionsExec",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// ```
/// to
/// are transformed into
/// ```text
/// "SortPreservingMergeExec: \[a@0 ASC\]",
/// " SortExec: expr=\[a@0 ASC\]",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// " ...nodes..."
/// " SortExec: expr=\[a@0 ASC\]",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// ```
/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s.
/// By performing sorting in parallel, we can increase performance in some scenarios.
pub fn parallelize_sorts(
mut requirements: PlanWithCorrespondingCoalescePartitions,
) -> Result<Transformed<PlanWithCorrespondingCoalescePartitions>> {
Expand Down Expand Up @@ -318,7 +381,7 @@ pub fn parallelize_sorts(
}

/// This function enforces sorting requirements and makes optimizations without
/// violating these requirements whenever possible.
/// violating these requirements whenever possible. Requires a bottom-up traversal.
pub fn ensure_sorting(
mut requirements: PlanWithCorrespondingSort,
) -> Result<Transformed<PlanWithCorrespondingSort>> {
Expand Down
16 changes: 12 additions & 4 deletions datafusion/physical-plan/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,17 @@ impl DynTreeNode for dyn ExecutionPlan {
}
}

/// A node object beneficial for writing optimizer rules, encapsulating an [`ExecutionPlan`] node with a payload.
/// Since there are two ways to access child plans—directly from the plan and through child nodes—it's recommended
/// A node context object beneficial for writing optimizer rules.
/// This context encapsulating an [`ExecutionPlan`] node with a payload.
///
/// Since each wrapped node has it's children within both the [`PlanContext.plan.children()`],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

this is indeed quite subtle -- it would be awesome if we could find some way to make it harder to forget to call update_plan_from_children

/// as well as separately within the [`PlanContext.children`] (which are child nodes wrapped in the context),
/// it's important to keep these child plans in sync when performing mutations.
///
/// Since there are two ways to access child plans directly -— it's recommended
/// to perform mutable operations via [`Self::update_plan_from_children`].
/// After update `children`, please do the sync updating for `plan`'s children.
/// Or after creating the `PlanContext`, if you can't guarantee they are consistent, call `update_plan_from_children` to sync.
/// After mutating the `PlanContext.children`, or after creating the `PlanContext`,
/// call `update_plan_from_children` to sync.
#[derive(Debug)]
pub struct PlanContext<T: Sized> {
/// The execution plan associated with this context.
Expand All @@ -63,6 +69,8 @@ impl<T> PlanContext<T> {
}
}

/// Update the [`PlanContext.plan.children()`] from the [`PlanContext.children`],
/// if the `PlanContext.children` have been changed.
pub fn update_plan_from_children(mut self) -> Result<Self> {
let children_plans = self.children.iter().map(|c| Arc::clone(&c.plan)).collect();
self.plan = with_new_children_if_necessary(self.plan, children_plans)?;
Expand Down
Loading