From 0ec6f34d9edcffe6fa68d60084137d3167dffa97 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 14 Feb 2025 12:32:57 -0800 Subject: [PATCH 1/6] chore: add documentation for EnforceSorting --- .../src/enforce_sorting/mod.rs | 87 +++++++++++++++---- datafusion/physical-plan/src/tree_node.rs | 12 ++- 2 files changed, 80 insertions(+), 19 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 6bfa02adf6dc..80a630b0a424 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -84,33 +84,43 @@ impl EnforceSorting { } } -/// This object is used within the [`EnforceSorting`] rule to track the closest +/// This context object is used within the [`EnforceSorting`] rule to track the closest /// [`SortExec`] descendant(s) for every child of a plan. The data attribute /// stores whether the plan is a `SortExec` or is connected to a `SortExec` /// via its children. pub type PlanWithCorrespondingSort = PlanContext; +/// For a given node, update the [`PlanContext.data`] attribute. +/// +/// If the node is a `SortExec`, or any of the node's children are a `SortExec`, +/// then set the attribute to true. +/// +/// This requires a bottom-up traversal was previously performed, updating the +/// children previously. fn update_sort_ctx_children( - mut node: PlanWithCorrespondingSort, + mut node_and_ctx: PlanWithCorrespondingSort, data: bool, ) -> Result { - for child_node in node.children.iter_mut() { - let plan = &child_node.plan; - child_node.data = if is_sort(plan) { - // Initiate connection: + // Update `child.data` for all children. + for child_node in node_and_ctx.children.iter_mut() { + let child_plan = &child_node.plan; + child_node.data = if is_sort(child_plan) { + // child is sort true - } else if is_limit(plan) { + } else if is_limit(child_plan) { // There is no sort linkage for this path, it starts at a limit. false } else { - let is_spm = is_sort_preserving_merge(plan); - let required_orderings = plan.required_input_ordering(); - let flags = plan.maintains_input_order(); + // If a descendent is a sort, and the child maintains the sort. + let is_spm = is_sort_preserving_merge(child_plan); + let required_orderings = child_plan.required_input_ordering(); + let flags = child_plan.maintains_input_order(); // Add parent node to the tree if there is at least one child with // a sort connection: izip!(flags, required_orderings).any(|(maintains, required_ordering)| { let propagates_ordering = (maintains && required_ordering.is_none()) || is_spm; + // `connected_to_sort` only returns the correct answer with bottom-up traversal let connected_to_sort = child_node.children.iter().any(|child| child.data); propagates_ordering && connected_to_sort @@ -118,8 +128,10 @@ fn update_sort_ctx_children( } } - node.data = data; - node.update_plan_from_children() + // set data attribute on current node + node_and_ctx.data = data; + // update the [`PlanContext.plan.children()`] from the mutated [`PlanContext.children`]. + node_and_ctx.update_plan_from_children() } /// This object is used within the [`EnforceSorting`] rule to track the closest @@ -151,10 +163,51 @@ fn update_coalesce_ctx_children( }; } -/// The boolean flag `repartition_sorts` defined in the config indicates -/// whether we elect to transform [`CoalescePartitionsExec`] + [`SortExec`] cascades -/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades, which enables us to -/// perform sorting in parallel. +/// If `repartition_sorts` is enabled, +/// then transform [`CoalescePartitionsExec`] + [`SortExec`] cascades +/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades. +/// +/// The [`CoalescePartitionsExec`] + [`SortExec`] cascades +/// combine the partitions first, and then sort: +/// +/// ┌ ─ ─ ─ ─ ─ ┐ +/// ┌─┬─┬─┐ +/// ││B│A│D│... ├──┐ +/// └─┴─┴─┘ │ +/// └ ─ ─ ─ ─ ─ ┘ │ ┌────────────────────────┐ ┌ ─ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ─ ─ ┐ +/// Partition 1 │ │ Coalesce │ ┌─┬─┬─┬─┬─┐ │ │ ┌─┬─┬─┬─┬─┐ +/// ├──▶(no ordering guarantees)│──▶││B│E│A│D│C│...───▶ Sort ├───▶││A│B│C│D│E│... │ +/// │ │ │ └─┴─┴─┴─┴─┘ │ │ └─┴─┴─┴─┴─┘ +/// ┌ ─ ─ ─ ─ ─ ┐ │ └────────────────────────┘ └ ─ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ─ ─ ┘ +/// ┌─┬─┐ │ Partition Partition +/// ││E│C│ ... ├──┘ +/// └─┴─┘ +/// └ ─ ─ ─ ─ ─ ┘ +/// Partition 2 +/// +/// +/// +/// The [`SortExec`] + [`SortPreservingMergeExec`] cascades +/// sorts each partition first, then merge partitions while retaining the sort: +/// +/// ┌ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ┐ +/// ┌─┬─┬─┐ │ │ ┌─┬─┬─┐ +/// ││B│A│D│... │──▶│ Sort │──▶││A│B│D│... │──┐ +/// └─┴─┴─┘ │ │ └─┴─┴─┘ │ +/// └ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ┘ │ ┌─────────────────────┐ ┌ ─ ─ ─ ─ ─ ─ ─ ┐ +/// Partition 1 Partition 1 │ │ │ ┌─┬─┬─┬─┬─┐ +/// ├──▶ SortPreservingMerge ├───▶││A│B│C│D│E│... │ +/// │ │ │ └─┴─┴─┴─┴─┘ +/// ┌ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ┐ │ └─────────────────────┘ └ ─ ─ ─ ─ ─ ─ ─ ┘ +/// ┌─┬─┐ │ │ ┌─┬─┐ │ Partition +/// ││E│C│ ... │──▶│ Sort ├──▶││C│E│ ... │──┘ +/// └─┴─┘ │ │ └─┴─┘ +/// └ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ┘ +/// Partition 2 Partition 2 +/// +/// +/// The latter [`SortExec`] + [`SortPreservingMergeExec`] cascade performs the +/// sort first on a per-partition basis, thereby paralleling the sort. impl PhysicalOptimizerRule for EnforceSorting { fn optimize( &self, @@ -318,7 +371,7 @@ pub fn parallelize_sorts( } /// This function enforces sorting requirements and makes optimizations without -/// violating these requirements whenever possible. +/// violating these requirements whenever possible. Requires a bottom-up traversal. pub fn ensure_sorting( mut requirements: PlanWithCorrespondingSort, ) -> Result> { diff --git a/datafusion/physical-plan/src/tree_node.rs b/datafusion/physical-plan/src/tree_node.rs index 96bd0de3d37c..2e16af62dbf2 100644 --- a/datafusion/physical-plan/src/tree_node.rs +++ b/datafusion/physical-plan/src/tree_node.rs @@ -39,8 +39,14 @@ impl DynTreeNode for dyn ExecutionPlan { } } -/// A node object beneficial for writing optimizer rules, encapsulating an [`ExecutionPlan`] node with a payload. -/// Since there are two ways to access child plans—directly from the plan and through child nodes—it's recommended +/// A node context object beneficial for writing optimizer rules. +/// This context encapsulating an [`ExecutionPlan`] node with a payload. +/// +/// Since each wrapped node has it's children within both the [`PlanContext.plan.children()`], +/// as well as separately within the [`PlanContext.children`] (which are child nodes wrapped in the context), +/// it's important to keep these child plans in sync when performing mutations. +/// +/// Since there are two ways to access child plans—directly -— it's recommended /// to perform mutable operations via [`Self::update_plan_from_children`]. #[derive(Debug)] pub struct PlanContext { @@ -61,6 +67,8 @@ impl PlanContext { } } + /// Update the [`PlanContext.plan.children()`] from the [`PlanContext.children`], + /// if the `PlanContext.children` have been changed. pub fn update_plan_from_children(mut self) -> Result { let children_plans = self.children.iter().map(|c| Arc::clone(&c.plan)).collect(); self.plan = with_new_children_if_necessary(self.plan, children_plans)?; From e2f5218937517dda726c8ce019461f2bf940155b Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 14 Feb 2025 13:02:19 -0800 Subject: [PATCH 2/6] chore: doc temporary TODO for xudong963's PR --- datafusion/physical-optimizer/src/enforce_sorting/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 80a630b0a424..268d20250043 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -130,7 +130,9 @@ fn update_sort_ctx_children( // set data attribute on current node node_and_ctx.data = data; - // update the [`PlanContext.plan.children()`] from the mutated [`PlanContext.children`]. + + // TODO(xudong963): the plans are not mutated, only the `data` attribute is set. + // Therefore this should be called before this function. node_and_ctx.update_plan_from_children() } From 6bf54ae0c6e2903c3b954236e7494540a3e9354b Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 14 Feb 2025 21:02:06 -0800 Subject: [PATCH 3/6] chore: remove unneeded reference to other PR --- datafusion/physical-optimizer/src/enforce_sorting/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 268d20250043..e718b1d5b56e 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -131,8 +131,6 @@ fn update_sort_ctx_children( // set data attribute on current node node_and_ctx.data = data; - // TODO(xudong963): the plans are not mutated, only the `data` attribute is set. - // Therefore this should be called before this function. node_and_ctx.update_plan_from_children() } From 2aac2bfc4106c83c9f91c160c130e4c5f15e3adc Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 14 Feb 2025 21:26:54 -0800 Subject: [PATCH 4/6] chore: fix docs to appropriately refer to subrules --- .../src/enforce_sorting/mod.rs | 114 ++++++++++-------- 1 file changed, 62 insertions(+), 52 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index e718b1d5b56e..cd1b40283277 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -72,8 +72,8 @@ use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties, InputOrde use itertools::izip; -/// This rule inspects [`SortExec`]'s in the given physical plan and removes the -/// ones it can prove unnecessary. +/// This rule inspects [`SortExec`]'s in the given physical plan in order to +/// remove unnecessary sorts, and optimize sort performance across the plan. #[derive(Default, Debug)] pub struct EnforceSorting {} @@ -163,51 +163,15 @@ fn update_coalesce_ctx_children( }; } -/// If `repartition_sorts` is enabled, -/// then transform [`CoalescePartitionsExec`] + [`SortExec`] cascades -/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades. +/// Performs optimizations based upon a series of subrules. /// -/// The [`CoalescePartitionsExec`] + [`SortExec`] cascades -/// combine the partitions first, and then sort: +/// Refer to each subrule for detailed descriptions of the optimizations performed: +/// [`ensure_sorting`], [`parallelize_sorts`], [`replace_with_order_preserving_variants()`], +/// and [`pushdown_sorts`]. /// -/// ┌ ─ ─ ─ ─ ─ ┐ -/// ┌─┬─┬─┐ -/// ││B│A│D│... ├──┐ -/// └─┴─┴─┘ │ -/// └ ─ ─ ─ ─ ─ ┘ │ ┌────────────────────────┐ ┌ ─ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ─ ─ ┐ -/// Partition 1 │ │ Coalesce │ ┌─┬─┬─┬─┬─┐ │ │ ┌─┬─┬─┬─┬─┐ -/// ├──▶(no ordering guarantees)│──▶││B│E│A│D│C│...───▶ Sort ├───▶││A│B│C│D│E│... │ -/// │ │ │ └─┴─┴─┴─┴─┘ │ │ └─┴─┴─┴─┴─┘ -/// ┌ ─ ─ ─ ─ ─ ┐ │ └────────────────────────┘ └ ─ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ─ ─ ┘ -/// ┌─┬─┐ │ Partition Partition -/// ││E│C│ ... ├──┘ -/// └─┴─┘ -/// └ ─ ─ ─ ─ ─ ┘ -/// Partition 2 -/// +/// Subrule application is ordering dependent. /// -/// -/// The [`SortExec`] + [`SortPreservingMergeExec`] cascades -/// sorts each partition first, then merge partitions while retaining the sort: -/// -/// ┌ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ┐ -/// ┌─┬─┬─┐ │ │ ┌─┬─┬─┐ -/// ││B│A│D│... │──▶│ Sort │──▶││A│B│D│... │──┐ -/// └─┴─┴─┘ │ │ └─┴─┴─┘ │ -/// └ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ┘ │ ┌─────────────────────┐ ┌ ─ ─ ─ ─ ─ ─ ─ ┐ -/// Partition 1 Partition 1 │ │ │ ┌─┬─┬─┬─┬─┐ -/// ├──▶ SortPreservingMerge ├───▶││A│B│C│D│E│... │ -/// │ │ │ └─┴─┴─┴─┴─┘ -/// ┌ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ┐ │ └─────────────────────┘ └ ─ ─ ─ ─ ─ ─ ─ ┘ -/// ┌─┬─┐ │ │ ┌─┬─┐ │ Partition -/// ││E│C│ ... │──▶│ Sort ├──▶││C│E│ ... │──┘ -/// └─┴─┘ │ │ └─┴─┘ -/// └ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ┘ -/// Partition 2 Partition 2 -/// -/// -/// The latter [`SortExec`] + [`SortPreservingMergeExec`] cascade performs the -/// sort first on a per-partition basis, thereby paralleling the sort. +/// The subrule `parallelize_sorts` is only applied if `repartition_sorts` is enabled. impl PhysicalOptimizerRule for EnforceSorting { fn optimize( &self, @@ -296,20 +260,66 @@ fn replace_with_partial_sort( Ok(plan) } -/// This function turns plans of the form +/// Transform [`CoalescePartitionsExec`] + [`SortExec`] into +/// [`SortExec`] + [`SortPreservingMergeExec`] as illustrated below: +/// +/// The [`CoalescePartitionsExec`] + [`SortExec`] cascades +/// combine the partitions first, and then sort: +/// ```text +/// ┌ ─ ─ ─ ─ ─ ┐ +/// ┌─┬─┬─┐ +/// ││B│A│D│... ├──┐ +/// └─┴─┴─┘ │ +/// └ ─ ─ ─ ─ ─ ┘ │ ┌────────────────────────┐ ┌ ─ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ─ ─ ┐ +/// Partition 1 │ │ Coalesce │ ┌─┬─┬─┬─┬─┐ │ │ ┌─┬─┬─┬─┬─┐ +/// ├──▶(no ordering guarantees)│──▶││B│E│A│D│C│...───▶ Sort ├───▶││A│B│C│D│E│... │ +/// │ │ │ └─┴─┴─┴─┴─┘ │ │ └─┴─┴─┴─┴─┘ +/// ┌ ─ ─ ─ ─ ─ ┐ │ └────────────────────────┘ └ ─ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ─ ─ ┘ +/// ┌─┬─┐ │ Partition Partition +/// ││E│C│ ... ├──┘ +/// └─┴─┘ +/// └ ─ ─ ─ ─ ─ ┘ +/// Partition 2 +/// ``` +/// +/// +/// The [`SortExec`] + [`SortPreservingMergeExec`] cascades +/// sorts each partition first, then merge partitions while retaining the sort: +/// ```text +/// ┌ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ┐ +/// ┌─┬─┬─┐ │ │ ┌─┬─┬─┐ +/// ││B│A│D│... │──▶│ Sort │──▶││A│B│D│... │──┐ +/// └─┴─┴─┘ │ │ └─┴─┴─┘ │ +/// └ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ┘ │ ┌─────────────────────┐ ┌ ─ ─ ─ ─ ─ ─ ─ ┐ +/// Partition 1 Partition 1 │ │ │ ┌─┬─┬─┬─┬─┐ +/// ├──▶ SortPreservingMerge ├───▶││A│B│C│D│E│... │ +/// │ │ │ └─┴─┴─┴─┴─┘ +/// ┌ ─ ─ ─ ─ ─ ┐ ┌────────┐ ┌ ─ ─ ─ ─ ─ ┐ │ └─────────────────────┘ └ ─ ─ ─ ─ ─ ─ ─ ┘ +/// ┌─┬─┐ │ │ ┌─┬─┐ │ Partition +/// ││E│C│ ... │──▶│ Sort ├──▶││C│E│ ... │──┘ +/// └─┴─┘ │ │ └─┴─┘ +/// └ ─ ─ ─ ─ ─ ┘ └────────┘ └ ─ ─ ─ ─ ─ ┘ +/// Partition 2 Partition 2 +/// ``` +/// +/// The latter [`SortExec`] + [`SortPreservingMergeExec`] cascade performs the +/// sort first on a per-partition basis, thereby paralleling the sort. +/// +/// +/// The outcome is that plans of the form /// ```text /// "SortExec: expr=\[a@0 ASC\]", -/// " CoalescePartitionsExec", -/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", +/// " ...nodes..." +/// " CoalescePartitionsExec", +/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", /// ``` -/// to +/// are transformed into /// ```text /// "SortPreservingMergeExec: \[a@0 ASC\]", -/// " SortExec: expr=\[a@0 ASC\]", -/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", +/// " ...nodes..." +/// " SortExec: expr=\[a@0 ASC\]", +/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", /// ``` -/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s. -/// By performing sorting in parallel, we can increase performance in some scenarios. pub fn parallelize_sorts( mut requirements: PlanWithCorrespondingCoalescePartitions, ) -> Result> { From 27543005afc457ac1abfd10f45eb8dc68ab6cee7 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 14 Feb 2025 21:35:40 -0800 Subject: [PATCH 5/6] chore: fix typos --- datafusion/physical-optimizer/src/enforce_sorting/mod.rs | 2 +- datafusion/physical-plan/src/tree_node.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index cd1b40283277..91431331dc67 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -303,7 +303,7 @@ fn replace_with_partial_sort( /// ``` /// /// The latter [`SortExec`] + [`SortPreservingMergeExec`] cascade performs the -/// sort first on a per-partition basis, thereby paralleling the sort. +/// sort first on a per-partition basis, thereby parallelizing the sort. /// /// /// The outcome is that plans of the form diff --git a/datafusion/physical-plan/src/tree_node.rs b/datafusion/physical-plan/src/tree_node.rs index 2e16af62dbf2..aa531ced9d05 100644 --- a/datafusion/physical-plan/src/tree_node.rs +++ b/datafusion/physical-plan/src/tree_node.rs @@ -46,7 +46,7 @@ impl DynTreeNode for dyn ExecutionPlan { /// as well as separately within the [`PlanContext.children`] (which are child nodes wrapped in the context), /// it's important to keep these child plans in sync when performing mutations. /// -/// Since there are two ways to access child plans—directly -— it's recommended +/// Since there are two ways to access child plans directly -— it's recommended /// to perform mutable operations via [`Self::update_plan_from_children`]. #[derive(Debug)] pub struct PlanContext { From 2b24136fa29aebc0bc14b228ca79905d24d7e77f Mon Sep 17 00:00:00 2001 From: wiedld Date: Sat, 15 Feb 2025 22:35:11 -0800 Subject: [PATCH 6/6] chore: cleanup docs after merge from main --- datafusion/physical-plan/src/tree_node.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/tree_node.rs b/datafusion/physical-plan/src/tree_node.rs index 27f9a9e60948..69b0a165315e 100644 --- a/datafusion/physical-plan/src/tree_node.rs +++ b/datafusion/physical-plan/src/tree_node.rs @@ -48,8 +48,8 @@ impl DynTreeNode for dyn ExecutionPlan { /// /// Since there are two ways to access child plans directly -— it's recommended /// to perform mutable operations via [`Self::update_plan_from_children`]. -/// After update `children`, please do the sync updating for `plan`'s children. -/// Or after creating the `PlanContext`, if you can't guarantee they are consistent, call `update_plan_from_children` to sync. +/// After mutating the `PlanContext.children`, or after creating the `PlanContext`, +/// call `update_plan_from_children` to sync. #[derive(Debug)] pub struct PlanContext { /// The execution plan associated with this context.