diff --git a/Cargo.lock b/Cargo.lock index a03b295dabfc..47d73b8a1fab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -514,6 +514,18 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-compression" version = "0.4.18" @@ -1417,6 +1429,15 @@ dependencies = [ "unicode-width 0.2.0", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.15.10" @@ -2247,6 +2268,7 @@ dependencies = [ "arrow", "arrow-ord", "arrow-schema", + "async-channel", "async-trait", "chrono", "criterion", @@ -2597,6 +2619,27 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "event-listener" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -4016,6 +4059,12 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb37767f6569cd834a413442455e0f066d0d522de8630436e2a1761d9726ba56" +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.3" diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 96c90aa1f60d..a5574448f884 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -36,6 +36,7 @@ DATAFUSION_DIR=${DATAFUSION_DIR:-$SCRIPT_DIR/..} DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data} CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"} PREFER_HASH_JOIN=${PREFER_HASH_JOIN:-true} +PREFER_ROUND_ROBIN=${PREFER_ROUND_ROBIN:-true} VIRTUAL_ENV=${VIRTUAL_ENV:-$SCRIPT_DIR/venv} usage() { @@ -93,6 +94,7 @@ CARGO_COMMAND command that runs the benchmark binary DATAFUSION_DIR directory to use (default $DATAFUSION_DIR) RESULTS_NAME folder where the benchmark files are stored PREFER_HASH_JOIN Prefer hash join algorithm (default true) +PREFER_ROUND_ROBIN Prefer round robin partitioning (default true) VENV_PATH Python venv to use for compare and venv commands (default ./venv, override by /bin/activate) " exit 1 @@ -163,6 +165,9 @@ main() { tpch10) data_tpch "10" ;; + tpch50) + data_tpch "50" + ;; tpch_mem10) # same data as for tpch10 data_tpch "10" @@ -220,6 +225,7 @@ main() { echo "RESULTS_DIR: ${RESULTS_DIR}" echo "CARGO_COMMAND: ${CARGO_COMMAND}" echo "PREFER_HASH_JOIN: ${PREFER_HASH_JOIN}" + echo "PREFER_ROUND_ROBIN: ${PREFER_ROUND_ROBIN}" echo "***************************" # navigate to the appropriate directory @@ -252,6 +258,9 @@ main() { tpch10) run_tpch "10" ;; + tpch50) + run_tpch "50" + ;; tpch_mem10) run_tpch_mem "10" ;; @@ -378,7 +387,7 @@ run_tpch() { RESULTS_FILE="${RESULTS_DIR}/tpch_sf${SCALE_FACTOR}.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running tpch benchmark..." - $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}" + $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --prefer_round_robin "${PREFER_ROUND_ROBIN}" --format parquet -o "${RESULTS_FILE}" } # Runs the tpch in memory @@ -394,7 +403,7 @@ run_tpch_mem() { echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running tpch_mem benchmark..." # -m means in memory - $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" + $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --prefer_round_robin "${PREFER_ROUND_ROBIN}" --format parquet -o "${RESULTS_FILE}" } # Runs the parquet filter benchmark @@ -472,7 +481,7 @@ run_clickbench_partitioned() { RESULTS_FILE="${RESULTS_DIR}/clickbench_partitioned.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running clickbench (partitioned, 100 files) benchmark..." - $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o "${RESULTS_FILE}" + $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits_partitioned" --prefer_round_robin "${PREFER_ROUND_ROBIN}" --queries-path "${SCRIPT_DIR}/queries/clickbench/queries.sql" -o "${RESULTS_FILE}" } # Runs the clickbench "extended" benchmark with a single large parquet file diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs index a9750d9b4b84..5ffda65fd792 100644 --- a/benchmarks/src/clickbench.rs +++ b/benchmarks/src/clickbench.rs @@ -27,6 +27,9 @@ use datafusion_common::exec_datafusion_err; use datafusion_common::instant::Instant; use structopt::StructOpt; +// hack to avoid `default_value is meaningless for bool` errors +type BoolDefaultTrue = bool; + /// Run the clickbench benchmark /// /// The ClickBench[1] benchmarks are widely cited in the industry and @@ -68,6 +71,11 @@ pub struct RunOpt { /// If present, write results json here #[structopt(parse(from_os_str), short = "o", long = "output")] output_path: Option, + + /// If true then round robin repartitioning is used, if false then on demand repartitioning + /// True by default. + #[structopt(short = "r", long = "prefer_round_robin", default_value = "true")] + prefer_round_robin: BoolDefaultTrue, } struct AllQueries { @@ -124,6 +132,10 @@ impl RunOpt { parquet_options.binary_as_string = true; } + config + .options_mut() + .optimizer + .prefer_round_robin_repartition = self.prefer_round_robin; let rt_builder = self.common.runtime_env_builder()?; let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); self.register_hits(&ctx).await?; diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index eb9db821db02..40dfab8d0525 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -90,6 +90,11 @@ pub struct RunOpt { /// True by default. #[structopt(short = "j", long = "prefer_hash_join", default_value = "true")] prefer_hash_join: BoolDefaultTrue, + + /// If true then round robin repartitioning is used, if false then on demand repartitioning + /// True by default. + #[structopt(short = "r", long = "prefer_round_robin", default_value = "true")] + prefer_round_robin: BoolDefaultTrue, } const TPCH_QUERY_START_ID: usize = 1; @@ -121,6 +126,10 @@ impl RunOpt { .config() .with_collect_statistics(!self.disable_statistics); config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; + config + .options_mut() + .optimizer + .prefer_round_robin_repartition = self.prefer_round_robin; let rt_builder = self.common.runtime_env_builder()?; let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); @@ -357,6 +366,7 @@ mod tests { output_path: None, disable_statistics: false, prefer_hash_join: true, + prefer_round_robin: true, }; opt.register_tables(&ctx).await?; let queries = get_query_sql(query)?; @@ -393,6 +403,7 @@ mod tests { output_path: None, disable_statistics: false, prefer_hash_join: true, + prefer_round_robin: true, }; opt.register_tables(&ctx).await?; let queries = get_query_sql(query)?; diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index c9900204b97f..1ddbe6241b11 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -574,6 +574,10 @@ config_namespace! { /// repartitioning to increase parallelism to leverage more CPU cores pub enable_round_robin_repartition: bool, default = true + /// When set to false, the physical plan optimizer will replace the round robin + /// repartitioning with on demand repartitioning + pub prefer_round_robin_repartition: bool, default = true + /// When set to true, the optimizer will attempt to perform limit operations /// during aggregations, if possible pub enable_topk_aggregation: bool, default = true diff --git a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs index d23408743f9f..531337500b0a 100644 --- a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs @@ -45,8 +45,11 @@ mod sp_repartition_fuzz_tests { use test_utils::add_empty_batches; use datafusion_physical_expr_common::sort_expr::LexOrdering; - use datafusion_physical_plan::memory::MemorySourceConfig; use datafusion_physical_plan::source::DataSourceExec; + use datafusion_physical_plan::{ + memory::MemorySourceConfig, + repartition::on_demand_repartition::OnDemandRepartitionExec, + }; use itertools::izip; use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng}; @@ -296,25 +299,40 @@ mod sp_repartition_fuzz_tests { // behaviour. We can choose, n_distinct as we like. However, // we chose it a large number to decrease probability of having same rows in the table. let n_distinct = 1_000_000; - for (is_first_roundrobin, is_first_sort_preserving) in - [(false, false), (false, true), (true, false), (true, true)] - { - for is_second_roundrobin in [false, true] { - let mut handles = Vec::new(); - - for seed in seed_start..seed_end { - #[allow(clippy::disallowed_methods)] // spawn allowed only in tests - let job = tokio::spawn(run_sort_preserving_repartition_test( - make_staggered_batches::(n_row, n_distinct, seed as u64), - is_first_roundrobin, - is_first_sort_preserving, - is_second_roundrobin, - )); - handles.push(job); - } - - for job in handles { - job.await.unwrap(); + for use_on_demand_repartition in [false, true] { + for (is_first_roundrobin, is_first_sort_preserving) in + [(false, false), (false, true), (true, false), (true, true)] + { + for is_second_roundrobin in [false, true] { + // On demand repartition only replaces the roundrobin repartition + if use_on_demand_repartition + && !is_first_roundrobin + && !is_second_roundrobin + { + continue; + } + let mut handles = Vec::new(); + + for seed in seed_start..seed_end { + #[allow(clippy::disallowed_methods)] + // spawn allowed only in tests + let job = tokio::spawn(run_sort_preserving_repartition_test( + make_staggered_batches::( + n_row, + n_distinct, + seed as u64, + ), + is_first_roundrobin, + is_first_sort_preserving, + is_second_roundrobin, + use_on_demand_repartition, + )); + handles.push(job); + } + + for job in handles { + job.await.unwrap(); + } } } } @@ -343,9 +361,17 @@ mod sp_repartition_fuzz_tests { // If `true`, second repartition executor after `DataSourceExec` will be in `RoundRobin` mode // else it will be in `Hash` mode is_second_roundrobin: bool, + // If `true`, `OnDemandRepartitionExec` will be used instead of `RepartitionExec` + use_on_demand_repartition: bool, ) { let schema = input1[0].schema(); - let session_config = SessionConfig::new().with_batch_size(50); + let mut session_config = SessionConfig::new().with_batch_size(50); + if use_on_demand_repartition { + session_config + .options_mut() + .optimizer + .prefer_round_robin_repartition = false; + } let ctx = SessionContext::new_with_config(session_config); let mut sort_keys = LexOrdering::default(); for ordering_col in ["a", "b", "c"] { @@ -367,8 +393,20 @@ mod sp_repartition_fuzz_tests { let hash_exprs = vec![col("c", &schema).unwrap()]; let intermediate = match (is_first_roundrobin, is_first_sort_preserving) { - (true, true) => sort_preserving_repartition_exec_round_robin(running_source), - (true, false) => repartition_exec_round_robin(running_source), + (true, true) => { + if use_on_demand_repartition { + sort_preserving_repartition_exec_on_demand(running_source) + } else { + sort_preserving_repartition_exec_round_robin(running_source) + } + } + (true, false) => { + if use_on_demand_repartition { + repartition_exec_on_demand(running_source) + } else { + repartition_exec_round_robin(running_source) + } + } (false, true) => { sort_preserving_repartition_exec_hash(running_source, hash_exprs.clone()) } @@ -376,7 +414,11 @@ mod sp_repartition_fuzz_tests { }; let intermediate = if is_second_roundrobin { - sort_preserving_repartition_exec_round_robin(intermediate) + if use_on_demand_repartition { + sort_preserving_repartition_exec_on_demand(intermediate) + } else { + sort_preserving_repartition_exec_round_robin(intermediate) + } } else { sort_preserving_repartition_exec_hash(intermediate, hash_exprs.clone()) }; @@ -399,6 +441,16 @@ mod sp_repartition_fuzz_tests { ) } + fn sort_preserving_repartition_exec_on_demand( + input: Arc, + ) -> Arc { + Arc::new( + OnDemandRepartitionExec::try_new(input, Partitioning::OnDemand(2)) + .unwrap() + .with_preserve_order(), + ) + } + fn repartition_exec_round_robin( input: Arc, ) -> Arc { @@ -407,6 +459,14 @@ mod sp_repartition_fuzz_tests { ) } + fn repartition_exec_on_demand( + input: Arc, + ) -> Arc { + Arc::new( + OnDemandRepartitionExec::try_new(input, Partitioning::OnDemand(2)).unwrap(), + ) + } + fn sort_preserving_repartition_exec_hash( input: Arc, hash_expr: Vec>, diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index eb7e1ea6282b..d7d2e74d521f 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -119,6 +119,8 @@ pub enum Partitioning { Hash(Vec>, usize), /// Unknown partitioning scheme with a known number of partitions UnknownPartitioning(usize), + /// On Demand partitioning, where the partitioning is determined at runtime + OnDemand(usize), } impl Display for Partitioning { @@ -136,6 +138,7 @@ impl Display for Partitioning { Partitioning::UnknownPartitioning(size) => { write!(f, "UnknownPartitioning({size})") } + Partitioning::OnDemand(size) => write!(f, "OnDemand({size})"), } } } @@ -144,7 +147,7 @@ impl Partitioning { pub fn partition_count(&self) -> usize { use Partitioning::*; match self { - RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n, + RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) | OnDemand(n) => *n, } } @@ -232,6 +235,11 @@ impl PartialEq for Partitioning { { true } + (Partitioning::OnDemand(count1), Partitioning::OnDemand(count2)) + if count1 == count2 => + { + true + } _ => false, } } diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 5e76edad1f56..8d4ed73081b5 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -27,8 +27,8 @@ use std::sync::Arc; use crate::optimizer::PhysicalOptimizerRule; use crate::output_requirements::OutputRequirementExec; use crate::utils::{ - add_sort_above_with_check, is_coalesce_partitions, is_repartition, - is_sort_preserving_merge, + add_sort_above_with_check, is_coalesce_partitions, is_on_demand_repartition, + is_repartition, is_sort_preserving_merge, }; use arrow::compute::SortOptions; @@ -52,6 +52,7 @@ use datafusion_physical_plan::joins::{ CrossJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec, }; use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::repartition::on_demand_repartition::OnDemandRepartitionExec; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::tree_node::PlanContext; @@ -416,6 +417,10 @@ pub fn adjust_input_keys_ordering( requirements.data.clear(); } } else if plan.as_any().downcast_ref::().is_some() + || plan + .as_any() + .downcast_ref::() + .is_some() || plan .as_any() .downcast_ref::() @@ -869,6 +874,32 @@ fn add_roundrobin_on_top( } } +fn add_on_demand_on_top( + input: DistributionContext, + n_target: usize, +) -> Result { + // Adding repartition is helpful: + if input.plan.output_partitioning().partition_count() < n_target { + // When there is an existing ordering, we preserve ordering + // during repartition. This will be un-done in the future + // If any of the following conditions is true + // - Preserving ordering is not helpful in terms of satisfying ordering requirements + // - Usage of order preserving variants is not desirable + // (determined by flag `config.optimizer.prefer_existing_sort`) + let partitioning = Partitioning::OnDemand(n_target); + let repartition = + OnDemandRepartitionExec::try_new(Arc::clone(&input.plan), partitioning)? + .with_preserve_order(); + + let new_plan = Arc::new(repartition) as _; + + Ok(DistributionContext::new(new_plan, true, vec![input])) + } else { + // Partition is not helpful, we already have desired number of partitions. + Ok(input) + } +} + /// Adds a hash repartition operator: /// - to increase parallelism, and/or /// - to satisfy requirements of the subsequent operators. @@ -988,6 +1019,7 @@ fn remove_dist_changing_operators( mut distribution_context: DistributionContext, ) -> Result { while is_repartition(&distribution_context.plan) + || is_on_demand_repartition(&distribution_context.plan) || is_coalesce_partitions(&distribution_context.plan) || is_sort_preserving_merge(&distribution_context.plan) { @@ -1047,6 +1079,57 @@ fn replace_order_preserving_variants( )?); return Ok(context); } + } else if let Some(repartition) = context + .plan + .as_any() + .downcast_ref::() + { + if repartition.preserve_order() { + context.plan = Arc::new(OnDemandRepartitionExec::try_new( + Arc::clone(&context.children[0].plan), + repartition.partitioning().clone(), + )?); + return Ok(context); + } + } + + context.update_plan_from_children() +} + +fn replace_round_robin_repartition_with_on_demand( + mut context: DistributionContext, +) -> Result { + context.children = context + .children + .into_iter() + .map(|child| { + if child.data { + replace_round_robin_repartition_with_on_demand(child) + } else { + Ok(child) + } + }) + .collect::>>()?; + + if let Some(repartition) = context.plan.as_any().downcast_ref::() { + if let Partitioning::RoundRobinBatch(n) = repartition.partitioning() { + let child_plan = Arc::clone(&context.children[0].plan); + context.plan = if repartition.preserve_order() { + Arc::new( + OnDemandRepartitionExec::try_new( + child_plan, + Partitioning::OnDemand(*n), + )? + .with_preserve_order(), + ) + } else { + Arc::new(OnDemandRepartitionExec::try_new( + child_plan, + Partitioning::OnDemand(*n), + )?) + }; + return Ok(context); + } } context.update_plan_from_children() @@ -1168,6 +1251,8 @@ pub fn ensure_distribution( let target_partitions = config.execution.target_partitions; // When `false`, round robin repartition will not be added to increase parallelism let enable_round_robin = config.optimizer.enable_round_robin_repartition; + // When `false`, replace round robin repartition with on-demand repartition + let prefer_round_robin_repartition = config.optimizer.prefer_round_robin_repartition; let repartition_file_scans = config.optimizer.repartition_file_scans; let batch_size = config.execution.batch_size; let should_use_estimates = config @@ -1261,7 +1346,11 @@ pub fn ensure_distribution( if add_roundrobin { // Add round-robin repartitioning on top of the operator // to increase parallelism. - child = add_roundrobin_on_top(child, target_partitions)?; + child = if prefer_round_robin_repartition { + add_roundrobin_on_top(child, target_partitions)? + } else { + add_on_demand_on_top(child, target_partitions)? + }; } // When inserting hash is necessary to satisfy hash requirement, insert hash repartition. if hash_necessary { @@ -1273,7 +1362,11 @@ pub fn ensure_distribution( if add_roundrobin { // Add round-robin repartitioning on top of the operator // to increase parallelism. - child = add_roundrobin_on_top(child, target_partitions)?; + child = if prefer_round_robin_repartition { + add_roundrobin_on_top(child, target_partitions)? + } else { + add_on_demand_on_top(child, target_partitions)? + } } } }; @@ -1320,6 +1413,9 @@ pub fn ensure_distribution( } } } + if !prefer_round_robin_repartition { + child = replace_round_robin_repartition_with_on_demand(child)?; + } Ok(child) }, ) @@ -1405,5 +1501,6 @@ fn update_children(mut dist_context: DistributionContext) -> Result() + { + let input_partitioning = repartition.input().output_partitioning(); + // We can remove this repartitioning operator if it is now a no-op: + let mut can_remove = input_partitioning.eq(repartition.partitioning()); + // We can also remove it if we ended up with an ineffective RR: + if let Partitioning::OnDemand(n_out) = repartition.partitioning() { + can_remove |= *n_out == input_partitioning.partition_count(); + } + if can_remove { + new_reqs = new_reqs.children.swap_remove(0) + } } + Ok(new_reqs) } @@ -624,6 +643,13 @@ fn remove_corresponding_sort_from_sub_plan( Arc::clone(&node.children[0].plan), repartition.properties().output_partitioning().clone(), )?) as _; + } else if let Some(repartition) = + node.plan.as_any().downcast_ref::() + { + node.plan = Arc::new(OnDemandRepartitionExec::try_new( + Arc::clone(&node.children[0].plan), + repartition.properties().output_partitioning().clone(), + )?) as _; } }; // Deleting a merging sort may invalidate distribution requirements. diff --git a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs index c542f9261a24..4bfb01336cac 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs @@ -22,7 +22,8 @@ use std::sync::Arc; use crate::utils::{ - is_coalesce_partitions, is_repartition, is_sort, is_sort_preserving_merge, + is_coalesce_partitions, is_on_demand_repartition, is_repartition, is_sort, + is_sort_preserving_merge, }; use datafusion_common::config::ConfigOptions; @@ -31,6 +32,7 @@ use datafusion_common::Result; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::execution_plan::EmissionType; +use datafusion_physical_plan::repartition::on_demand_repartition::OnDemandRepartitionExec; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::tree_node::PlanContext; @@ -57,6 +59,7 @@ pub fn update_children(opc: &mut OrderPreservationContext) { maintains_input_order[idx] || is_coalesce_partitions(plan) || is_repartition(plan) + || is_on_demand_repartition(plan) }; // We cut the path towards nodes that do not maintain ordering. @@ -68,7 +71,8 @@ pub fn update_children(opc: &mut OrderPreservationContext) { *data = if plan_children.is_empty() { false } else if !children[0].data - && ((is_repartition(plan) && !maintains_input_order[0]) + && (((is_repartition(plan) || is_on_demand_repartition(plan)) + && !maintains_input_order[0]) || (is_coalesce_partitions(plan) && plan_children[0].output_ordering().is_some())) { @@ -135,6 +139,20 @@ fn plan_with_order_preserving_variants( ) as _; sort_input.children[0].data = true; return Ok(sort_input); + } + if is_on_demand_repartition(&sort_input.plan) + && !sort_input.plan.maintains_input_order()[0] + && is_spr_better + { + // When a `OnDemandRepartitionExec` doesn't preserve ordering, replace it with + // a sort-preserving variant if appropriate: + let child = Arc::clone(&sort_input.children[0].plan); + let partitioning = sort_input.plan.output_partitioning().clone(); + sort_input.plan = Arc::new( + OnDemandRepartitionExec::try_new(child, partitioning)?.with_preserve_order(), + ) as _; + sort_input.children[0].data = true; + return Ok(sort_input); } else if is_coalesce_partitions(&sort_input.plan) && is_spm_better { let child = &sort_input.children[0].plan; if let Some(ordering) = child.output_ordering() { @@ -188,6 +206,13 @@ fn plan_with_order_breaking_variants( let child = Arc::clone(&sort_input.children[0].plan); let partitioning = plan.output_partitioning().clone(); sort_input.plan = Arc::new(RepartitionExec::try_new(child, partitioning)?) as _; + } else if is_on_demand_repartition(plan) && plan.maintains_input_order()[0] { + // When a `OnDemandRepartitionExec` preserves ordering, replace it with a + // non-sort-preserving variant: + let child = Arc::clone(&sort_input.children[0].plan); + let partitioning = plan.output_partitioning().clone(); + sort_input.plan = + Arc::new(OnDemandRepartitionExec::try_new(child, partitioning)?) as _; } else if is_sort_preserving_merge(plan) { // Replace `SortPreservingMergeExec` with a `CoalescePartitionsExec`: let child = Arc::clone(&sort_input.children[0].plan); diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 13d46940c87c..9f9a4f56d7c2 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -38,6 +38,7 @@ use datafusion_physical_plan::joins::utils::{ }; use datafusion_physical_plan::joins::{HashJoinExec, SortMergeJoinExec}; use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::repartition::on_demand_repartition::OnDemandRepartitionExec; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::tree_node::PlanContext; @@ -282,6 +283,7 @@ fn pushdown_requirement_to_children( } else if maintains_input_order.is_empty() || !maintains_input_order.iter().any(|o| *o) || plan.as_any().is::() + || plan.as_any().is::() || plan.as_any().is::() // TODO: Add support for Projection push down || plan.as_any().is::() diff --git a/datafusion/physical-optimizer/src/utils.rs b/datafusion/physical-optimizer/src/utils.rs index 636e78a06ce7..973487af0021 100644 --- a/datafusion/physical-optimizer/src/utils.rs +++ b/datafusion/physical-optimizer/src/utils.rs @@ -21,6 +21,7 @@ use datafusion_physical_expr::LexRequirement; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; +use datafusion_physical_plan::repartition::on_demand_repartition::OnDemandRepartitionExec; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; @@ -100,6 +101,11 @@ pub fn is_repartition(plan: &Arc) -> bool { plan.as_any().is::() } +/// Checks whether the given operator is a [`OnDemandRepartitionExec`]. +pub fn is_on_demand_repartition(plan: &Arc) -> bool { + plan.as_any().is::() +} + /// Checks whether the given operator is a limit; /// i.e. either a [`LocalLimitExec`] or a [`GlobalLimitExec`]. pub fn is_limit(plan: &Arc) -> bool { diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index b84243b1b56b..aac0f35ebd74 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -43,6 +43,7 @@ ahash = { workspace = true } arrow = { workspace = true } arrow-ord = { workspace = true } arrow-schema = { workspace = true } +async-channel = "2.3.1" async-trait = { workspace = true } chrono = { workspace = true } datafusion-common = { workspace = true, default-features = true } diff --git a/datafusion/physical-plan/src/repartition/distributor_channels.rs b/datafusion/physical-plan/src/repartition/distributor_channels.rs index 6e06c87a4821..b6dae8cd7415 100644 --- a/datafusion/physical-plan/src/repartition/distributor_channels.rs +++ b/datafusion/physical-plan/src/repartition/distributor_channels.rs @@ -79,6 +79,18 @@ pub fn channels( (senders, receivers) } +pub fn tokio_channels( + n: usize, +) -> ( + Vec>, + Vec>, +) { + let (senders, receivers) = (0..n) + .map(|_| tokio::sync::mpsc::unbounded_channel()) + .unzip(); + (senders, receivers) +} + type PartitionAwareSenders = Vec>>; type PartitionAwareReceivers = Vec>>; @@ -92,6 +104,20 @@ pub fn partition_aware_channels( (0..n_in).map(|_| channels(n_out)).unzip() } +type OnDemandPartitionAwareSenders = Vec>>; +type OnDemandPartitionAwareReceivers = + Vec>>; + +pub fn on_demand_partition_aware_channels( + n_in: usize, + n_out: usize, +) -> ( + OnDemandPartitionAwareSenders, + OnDemandPartitionAwareReceivers, +) { + (0..n_in).map(|_| tokio_channels(n_out)).unzip() +} + /// Erroring during [send](DistributionSender::send). /// /// This occurs when the [receiver](DistributionReceiver) is gone. diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 25668fa67d5b..3142eaeab72e 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -53,16 +53,75 @@ use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use futures::stream::Stream; -use futures::{FutureExt, StreamExt, TryStreamExt}; +use futures::{ready, FutureExt, StreamExt, TryStreamExt}; use log::trace; use parking_lot::Mutex; mod distributor_channels; +pub mod on_demand_repartition; type MaybeBatch = Option>; type InputPartitionsToCurrentPartitionSender = Vec>; type InputPartitionsToCurrentPartitionReceiver = Vec>; +/// create channels for sending batches from input partitions to output partitions. +fn create_repartition_channels( + preserve_order: bool, + num_input_partitions: usize, + num_output_partitions: usize, +) -> ( + Vec, + Vec, +) { + if preserve_order { + let (txs, rxs) = + partition_aware_channels(num_input_partitions, num_output_partitions); + // Take transpose of senders and receivers. `state.channels` keeps track of entries per output partition + let txs = transpose(txs); + let rxs = transpose(rxs); + (txs, rxs) + } else { + // create one channel per *output* partition + // note we use a custom channel that ensures there is always data for each receiver + // but limits the amount of buffering if required. + let (txs, rxs) = channels(num_output_partitions); + // Clone sender for each input partitions + let txs = txs + .into_iter() + .map(|item| vec![item; num_input_partitions]) + .collect::>(); + let rxs = rxs.into_iter().map(|item| vec![item]).collect::>(); + (txs, rxs) + } +} + +/// Create a hashmap of channels for sending batches from input partitions to output partitions. +fn create_partition_channels_hashmap( + txs: Vec, + rxs: Vec, + name: String, + context: Arc, +) -> HashMap< + usize, + ( + InputPartitionsToCurrentPartitionSender, + InputPartitionsToCurrentPartitionReceiver, + SharedMemoryReservation, + ), +> { + let mut channels = HashMap::with_capacity(txs.len()); + + for (partition, (tx, rx)) in txs.into_iter().zip(rxs).enumerate() { + let reservation = Arc::new(Mutex::new( + MemoryConsumer::new(format!("{}[{partition}]", name)) + .register(context.memory_pool()), + )); + channels.insert(partition, (tx, rx, reservation)); + } + + channels +} + /// Inner state of [`RepartitionExec`]. #[derive(Debug)] struct RepartitionExecState { @@ -93,35 +152,14 @@ impl RepartitionExecState { let num_input_partitions = input.output_partitioning().partition_count(); let num_output_partitions = partitioning.partition_count(); - let (txs, rxs) = if preserve_order { - let (txs, rxs) = - partition_aware_channels(num_input_partitions, num_output_partitions); - // Take transpose of senders and receivers. `state.channels` keeps track of entries per output partition - let txs = transpose(txs); - let rxs = transpose(rxs); - (txs, rxs) - } else { - // create one channel per *output* partition - // note we use a custom channel that ensures there is always data for each receiver - // but limits the amount of buffering if required. - let (txs, rxs) = channels(num_output_partitions); - // Clone sender for each input partitions - let txs = txs - .into_iter() - .map(|item| vec![item; num_input_partitions]) - .collect::>(); - let rxs = rxs.into_iter().map(|item| vec![item]).collect::>(); - (txs, rxs) - }; + let (txs, rxs) = create_repartition_channels( + preserve_order, + num_input_partitions, + num_output_partitions, + ); - let mut channels = HashMap::with_capacity(txs.len()); - for (partition, (tx, rx)) in txs.into_iter().zip(rxs).enumerate() { - let reservation = Arc::new(Mutex::new( - MemoryConsumer::new(format!("{}[{partition}]", name)) - .register(context.memory_pool()), - )); - channels.insert(partition, (tx, rx, reservation)); - } + let channels = + create_partition_channels_hashmap(txs, rxs, name, Arc::clone(&context)); // launch one async task per *input* partition let mut spawned_tasks = Vec::with_capacity(num_input_partitions); @@ -146,15 +184,16 @@ impl RepartitionExecState { // In a separate task, wait for each input to be done // (and pass along any errors, including panic!s) + let wait_for_task = SpawnedTask::spawn(RepartitionExec::wait_for_task( input_task, txs.into_iter() .map(|(partition, (tx, _reservation))| (partition, tx)) .collect(), )); + spawned_tasks.push(wait_for_task); } - Self { channels, abort_helper: Arc::new(spawned_tasks), @@ -193,6 +232,9 @@ enum BatchPartitionerState { num_partitions: usize, next_idx: usize, }, + OnDemand { + num_partitions: usize, + }, } impl BatchPartitioner { @@ -214,6 +256,9 @@ impl BatchPartitioner { random_state: ahash::RandomState::with_seeds(0, 0, 0, 0), hash_buffer: vec![], }, + Partitioning::OnDemand(num_partitions) => { + BatchPartitionerState::OnDemand { num_partitions } + } other => return not_impl_err!("Unsupported repartitioning scheme {other:?}"), }; @@ -258,6 +303,9 @@ impl BatchPartitioner { *next_idx = (*next_idx + 1) % *num_partitions; Box::new(std::iter::once(Ok((idx, batch)))) } + BatchPartitionerState::OnDemand { .. } => { + Box::new(std::iter::once(Ok((0, batch)))) + } BatchPartitionerState::Hash { random_state, exprs, @@ -328,6 +376,90 @@ impl BatchPartitioner { match self.state { BatchPartitionerState::RoundRobin { num_partitions, .. } => num_partitions, BatchPartitionerState::Hash { num_partitions, .. } => num_partitions, + BatchPartitionerState::OnDemand { num_partitions } => num_partitions, + } + } +} + +#[derive(Debug, Clone)] +pub struct RepartitionExecBase { + /// Input execution plan + input: Arc, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, + /// Boolean flag to decide whether to preserve ordering. If true means + /// `SortPreservingRepartitionExec`, false means `RepartitionExec`. + preserve_order: bool, + /// Cache holding plan properties like equivalences, output partitioning etc. + cache: PlanProperties, +} + +impl RepartitionExecBase { + fn maintains_input_order_helper( + input: &Arc, + preserve_order: bool, + ) -> Vec { + // We preserve ordering when repartition is order preserving variant or input partitioning is 1 + vec![preserve_order || input.output_partitioning().partition_count() <= 1] + } + + fn eq_properties_helper( + input: &Arc, + preserve_order: bool, + ) -> EquivalenceProperties { + // Equivalence Properties + let mut eq_properties = input.equivalence_properties().clone(); + // If the ordering is lost, reset the ordering equivalence class: + if !Self::maintains_input_order_helper(input, preserve_order)[0] { + eq_properties.clear_orderings(); + } + // When there are more than one input partitions, they will be fused at the output. + // Therefore, remove per partition constants. + if input.output_partitioning().partition_count() > 1 { + eq_properties.clear_per_partition_constants(); + } + eq_properties + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties( + input: &Arc, + partitioning: Partitioning, + preserve_order: bool, + ) -> PlanProperties { + PlanProperties::new( + Self::eq_properties_helper(input, preserve_order), + partitioning, + input.pipeline_behavior(), + input.boundedness(), + ) + } + + /// Specify if this repartitioning operation should preserve the order of + /// rows from its input when producing output. Preserving order is more + /// expensive at runtime, so should only be set if the output of this + /// operator can take advantage of it. + /// + /// If the input is not ordered, or has only one partition, this is a no op, + /// and the node remains a `RepartitionExec`. + pub fn with_preserve_order(mut self) -> Self { + self.preserve_order = + // If the input isn't ordered, there is no ordering to preserve + self.input.output_ordering().is_some() && + // if there is only one input partition, merging is not required + // to maintain order + self.input.output_partitioning().partition_count() > 1; + let eq_properties = Self::eq_properties_helper(&self.input, self.preserve_order); + self.cache = self.cache.with_eq_properties(eq_properties); + self + } + + /// Return the sort expressions that are used to merge + fn sort_exprs(&self) -> Option<&LexOrdering> { + if self.preserve_order { + self.input.output_ordering() + } else { + None } } } @@ -400,17 +532,10 @@ impl BatchPartitioner { /// data across threads. #[derive(Debug, Clone)] pub struct RepartitionExec { - /// Input execution plan - input: Arc, + /// Common fields for all repartitioning executors + base: RepartitionExecBase, /// Inner state that is initialized when the first output stream is created. state: LazyState, - /// Execution metrics - metrics: ExecutionPlanMetricsSet, - /// Boolean flag to decide whether to preserve ordering. If true means - /// `SortPreservingRepartitionExec`, false means `RepartitionExec`. - preserve_order: bool, - /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, } #[derive(Debug, Clone)] @@ -461,18 +586,30 @@ impl RepartitionMetrics { impl RepartitionExec { /// Input execution plan pub fn input(&self) -> &Arc { - &self.input + &self.base.input } /// Partitioning scheme to use pub fn partitioning(&self) -> &Partitioning { - &self.cache.partitioning + &self.base.cache.partitioning } /// Get preserve_order flag of the RepartitionExecutor /// `true` means `SortPreservingRepartitionExec`, `false` means `RepartitionExec` pub fn preserve_order(&self) -> bool { - self.preserve_order + self.base.preserve_order + } + + /// Specify if this reparititoning operation should preserve the order of + /// rows from its input when producing output. Preserving order is more + /// expensive at runtime, so should only be set if the output of this + /// operator can take advantage of it. + /// + /// If the input is not ordered, or has only one partition, this is a no op, + /// and the node remains a `RepartitionExec`. + pub fn with_preserve_order(mut self) -> Self { + self.base = self.base.with_preserve_order(); + self } /// Get name used to display this Exec @@ -494,14 +631,14 @@ impl DisplayAs for RepartitionExec { "{}: partitioning={}, input_partitions={}", self.name(), self.partitioning(), - self.input.output_partitioning().partition_count() + self.base.input.output_partitioning().partition_count() )?; - if self.preserve_order { + if self.base.preserve_order { write!(f, ", preserve_order=true")?; } - if let Some(sort_exprs) = self.sort_exprs() { + if let Some(sort_exprs) = self.base.sort_exprs() { write!(f, ", sort_exprs={}", sort_exprs.clone())?; } Ok(()) @@ -521,11 +658,11 @@ impl ExecutionPlan for RepartitionExec { } fn properties(&self) -> &PlanProperties { - &self.cache + &self.base.cache } fn children(&self) -> Vec<&Arc> { - vec![&self.input] + vec![&self.base.input] } fn with_new_children( @@ -536,7 +673,7 @@ impl ExecutionPlan for RepartitionExec { children.swap_remove(0), self.partitioning().clone(), )?; - if self.preserve_order { + if self.base.preserve_order { repartition = repartition.with_preserve_order(); } Ok(Arc::new(repartition)) @@ -547,7 +684,10 @@ impl ExecutionPlan for RepartitionExec { } fn maintains_input_order(&self) -> Vec { - Self::maintains_input_order_helper(self.input(), self.preserve_order) + RepartitionExecBase::maintains_input_order_helper( + self.input(), + self.base.preserve_order, + ) } fn execute( @@ -562,16 +702,16 @@ impl ExecutionPlan for RepartitionExec { ); let lazy_state = Arc::clone(&self.state); - let input = Arc::clone(&self.input); + let input = Arc::clone(&self.base.input); let partitioning = self.partitioning().clone(); - let metrics = self.metrics.clone(); - let preserve_order = self.preserve_order; + let metrics = self.base.metrics.clone(); + let preserve_order = self.base.preserve_order; let name = self.name().to_owned(); let schema = self.schema(); let schema_captured = Arc::clone(&schema); // Get existing ordering to use for merging - let sort_exprs = self.sort_exprs().cloned().unwrap_or_default(); + let sort_exprs = self.base.sort_exprs().cloned().unwrap_or_default(); let stream = futures::stream::once(async move { let num_input_partitions = input.output_partitioning().partition_count(); @@ -584,7 +724,7 @@ impl ExecutionPlan for RepartitionExec { .get_or_init(|| async move { Mutex::new(RepartitionExecState::new( input_captured, - partitioning, + partitioning.clone(), metrics_captured, preserve_order, name_captured, @@ -661,11 +801,11 @@ impl ExecutionPlan for RepartitionExec { } fn metrics(&self) -> Option { - Some(self.metrics.clone_inner()) + Some(self.base.metrics.clone_inner()) } fn statistics(&self) -> Result { - self.input.statistics() + self.base.input.statistics() } fn cardinality_effect(&self) -> CardinalityEffect { @@ -722,85 +862,22 @@ impl RepartitionExec { partitioning: Partitioning, ) -> Result { let preserve_order = false; - let cache = - Self::compute_properties(&input, partitioning.clone(), preserve_order); + let cache = RepartitionExecBase::compute_properties( + &input, + partitioning.clone(), + preserve_order, + ); Ok(RepartitionExec { - input, + base: RepartitionExecBase { + input, + metrics: ExecutionPlanMetricsSet::new(), + preserve_order, + cache, + }, state: Default::default(), - metrics: ExecutionPlanMetricsSet::new(), - preserve_order, - cache, }) } - fn maintains_input_order_helper( - input: &Arc, - preserve_order: bool, - ) -> Vec { - // We preserve ordering when repartition is order preserving variant or input partitioning is 1 - vec![preserve_order || input.output_partitioning().partition_count() <= 1] - } - - fn eq_properties_helper( - input: &Arc, - preserve_order: bool, - ) -> EquivalenceProperties { - // Equivalence Properties - let mut eq_properties = input.equivalence_properties().clone(); - // If the ordering is lost, reset the ordering equivalence class: - if !Self::maintains_input_order_helper(input, preserve_order)[0] { - eq_properties.clear_orderings(); - } - // When there are more than one input partitions, they will be fused at the output. - // Therefore, remove per partition constants. - if input.output_partitioning().partition_count() > 1 { - eq_properties.clear_per_partition_constants(); - } - eq_properties - } - - /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. - fn compute_properties( - input: &Arc, - partitioning: Partitioning, - preserve_order: bool, - ) -> PlanProperties { - PlanProperties::new( - Self::eq_properties_helper(input, preserve_order), - partitioning, - input.pipeline_behavior(), - input.boundedness(), - ) - } - - /// Specify if this repartitioning operation should preserve the order of - /// rows from its input when producing output. Preserving order is more - /// expensive at runtime, so should only be set if the output of this - /// operator can take advantage of it. - /// - /// If the input is not ordered, or has only one partition, this is a no op, - /// and the node remains a `RepartitionExec`. - pub fn with_preserve_order(mut self) -> Self { - self.preserve_order = - // If the input isn't ordered, there is no ordering to preserve - self.input.output_ordering().is_some() && - // if there is only one input partition, merging is not required - // to maintain order - self.input.output_partitioning().partition_count() > 1; - let eq_properties = Self::eq_properties_helper(&self.input, self.preserve_order); - self.cache = self.cache.with_eq_properties(eq_properties); - self - } - - /// Return the sort expressions that are used to merge - fn sort_exprs(&self) -> Option<&LexOrdering> { - if self.preserve_order { - self.input.output_ordering() - } else { - None - } - } - /// Pulls data from the specified input plan, feeding it to the /// output partitions based on the desired partitioning /// @@ -958,8 +1035,8 @@ impl Stream for RepartitionStream { cx: &mut Context<'_>, ) -> Poll> { loop { - match self.input.recv().poll_unpin(cx) { - Poll::Ready(Some(Some(v))) => { + match ready!(self.input.recv().poll_unpin(cx)) { + Some(Some(v)) => { if let Ok(batch) = &v { self.reservation .lock() @@ -968,7 +1045,7 @@ impl Stream for RepartitionStream { return Poll::Ready(Some(v)); } - Poll::Ready(Some(None)) => { + Some(None) => { self.num_input_partitions_processed += 1; if self.num_input_partitions == self.num_input_partitions_processed { @@ -979,12 +1056,9 @@ impl Stream for RepartitionStream { continue; } } - Poll::Ready(None) => { + None => { return Poll::Ready(None); } - Poll::Pending => { - return Poll::Pending; - } } } } @@ -1020,21 +1094,21 @@ impl Stream for PerPartitionStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - match self.receiver.recv().poll_unpin(cx) { - Poll::Ready(Some(Some(v))) => { + match ready!(self.receiver.recv().poll_unpin(cx)) { + Some(Some(v)) => { if let Ok(batch) = &v { self.reservation .lock() .shrink(batch.get_array_memory_size()); } + Poll::Ready(Some(v)) } - Poll::Ready(Some(None)) => { + Some(None) => { // Input partition has finished sending batches Poll::Ready(None) } - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, + None => Poll::Ready(None), } } } @@ -1052,6 +1126,9 @@ mod tests { use super::*; use crate::{ + collect, + expressions::col, + memory::MemorySourceConfig, test::{ assert_is_pending, exec::{ @@ -1059,7 +1136,6 @@ mod tests { ErrorExec, MockExec, }, }, - {collect, expressions::col, memory::MemorySourceConfig}, }; use arrow::array::{ArrayRef, StringArray, UInt32Array}; @@ -1596,19 +1672,11 @@ mod tests { ) .unwrap() } -} - -#[cfg(test)] -mod test { use arrow::compute::SortOptions; - use arrow::datatypes::{DataType, Field, Schema}; - use super::*; - use crate::memory::MemorySourceConfig; use crate::source::DataSourceExec; use crate::union::UnionExec; - use datafusion_physical_expr::expressions::col; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; /// Asserts that the plan is as expected @@ -1698,10 +1766,6 @@ mod test { Ok(()) } - fn test_schema() -> Arc { - Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])) - } - fn sort_exprs(schema: &Schema) -> LexOrdering { let options = SortOptions::default(); LexOrdering::new(vec![PhysicalSortExpr { diff --git a/datafusion/physical-plan/src/repartition/on_demand_repartition.rs b/datafusion/physical-plan/src/repartition/on_demand_repartition.rs new file mode 100644 index 000000000000..f5e757ef13ef --- /dev/null +++ b/datafusion/physical-plan/src/repartition/on_demand_repartition.rs @@ -0,0 +1,1576 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This file implements the [`OnDemandRepartitionExec`] operator, which maps N input +//! partitions to M output partitions based on a partitioning scheme, optionally +//! maintaining the order of the input rows in the output. The operator is similar to the [`RepartitionExec`] +//! operator, but it doesn't distribute the data to the output streams until the downstreams request the data. +//! +//! [`RepartitionExec`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/repartition/struct.RepartitionExec.html + +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::{any::Any, vec}; + +use super::distributor_channels::{on_demand_partition_aware_channels, tokio_channels}; +use super::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use super::{ + DisplayAs, ExecutionPlanProperties, MaybeBatch, RecordBatchStream, + RepartitionExecBase, SendableRecordBatchStream, +}; +use crate::common::SharedMemoryReservation; +use crate::execution_plan::CardinalityEffect; +use crate::metrics::{self, BaselineMetrics, MetricBuilder}; +use crate::projection::{all_columns, make_with_child, ProjectionExec}; +use crate::sorts::streaming_merge::StreamingMergeBuilder; +use crate::stream::RecordBatchStreamAdapter; +use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use async_channel::{Receiver, Sender, TrySendError}; + +use datafusion_common::utils::transpose; +use datafusion_common::DataFusionError; +use datafusion_common::{internal_datafusion_err, internal_err, Result}; +use datafusion_common_runtime::SpawnedTask; +use datafusion_execution::memory_pool::MemoryConsumer; +use datafusion_execution::TaskContext; + +use datafusion_common::HashMap; +use futures::stream::Stream; +use futures::{ready, StreamExt, TryStreamExt}; +use log::trace; +use parking_lot::Mutex; + +type PartitionChannels = (Vec>, Vec>); + +/// The OnDemandRepartitionExec operator repartitions the input data based on a push-based model. +/// It is similar to the RepartitionExec operator, but it doesn't distribute the data to the output +/// partitions until the output partitions request the data. +/// +/// When polling, the operator sends the output partition number to the one partition channel, then the prefetch buffer will distribute the data based on the order of the partition number. +/// Each input steams has a prefetch buffer(channel) to distribute the data to the output partitions. +/// +/// The following diagram illustrates the data flow of the OnDemandRepartitionExec operator with 3 output partitions for the input stream 1: +/// ```text +/// /\ /\ /\ +/// ││ ││ ││ +/// ││ ││ ││ +/// ││ ││ ││ +/// ┌───────┴┴────────┐ ┌───────┴┴────────┐ ┌───────┴┴────────┐ +/// │ Stream │ │ Stream │ │ Stream │ +/// │ (1) │ │ (2) │ │ (3) │ +/// └────────┬────────┘ └───────┬─────────┘ └────────┬────────┘ +/// │ │ │ / \ +/// │ │ │ | | +/// │ │ │ | | +/// └────────────────┐ │ ┌──────────────────┘ | | +/// │ │ │ | | +/// ▼ ▼ ▼ | | +/// ┌─────────────────┐ | | +/// Send the partition │ partion channel │ | | +/// number when polling │ │ | | +/// └────────┬────────┘ | | +/// │ | | +/// │ | | +/// │ Get the partition number | | +/// ▼ then send data | | +/// ┌─────────────────┐ | | +/// │ Prefetch Buffer │───────────────────┘ | +/// │ (1) │─────────────────────┘ +/// └─────────────────┘ Distribute data to the output partitions +/// +/// ``` +type OnDemandDistributionSender = tokio::sync::mpsc::UnboundedSender; +type OnDemandDistributionReceiver = tokio::sync::mpsc::UnboundedReceiver; + +type OnDemandInputPartitionsToCurrentPartitionSender = Vec; +type OnDemandInputPartitionsToCurrentPartitionReceiver = + Vec; +/// Inner state of [`OnDemandRepartitionExec`]. +#[derive(Debug)] +struct OnDemandRepartitionExecState { + /// Channels for sending batches from input partitions to output partitions. + /// Key is the partition number. + channels: HashMap< + usize, + ( + OnDemandInputPartitionsToCurrentPartitionSender, + OnDemandInputPartitionsToCurrentPartitionReceiver, + SharedMemoryReservation, + ), + >, + + /// Helper that ensures that that background job is killed once it is no longer needed. + abort_helper: Arc>>, +} + +/// create channels for sending batches from input partitions to output partitions for on-demand repartitioning. +fn create_on_demand_repartition_channels( + preserve_order: bool, + num_input_partitions: usize, + num_output_partitions: usize, +) -> ( + Vec, + Vec, +) { + if preserve_order { + let (txs, rxs) = on_demand_partition_aware_channels( + num_input_partitions, + num_output_partitions, + ); + // Take transpose of senders and receivers. `state.channels` keeps track of entries per output partition + let txs = transpose(txs); + let rxs = transpose(rxs); + (txs, rxs) + } else { + // create one channel per *output* partition + // note we use a custom channel that ensures there is always data for each receiver + // but limits the amount of buffering if required. + let (txs, rxs) = tokio_channels(num_output_partitions); + // Clone sender for each input partitions + let txs = txs + .into_iter() + .map(|item| vec![item; num_input_partitions]) + .collect::>(); + let rxs = rxs.into_iter().map(|item| vec![item]).collect::>(); + (txs, rxs) + } +} + +/// Create a hashmap of channels for sending batches from input partitions to output partitions. +fn create_on_demand_partition_channels_hashmap( + txs: Vec, + rxs: Vec, + name: String, + context: Arc, +) -> HashMap< + usize, + ( + OnDemandInputPartitionsToCurrentPartitionSender, + OnDemandInputPartitionsToCurrentPartitionReceiver, + SharedMemoryReservation, + ), +> { + let mut channels = HashMap::with_capacity(txs.len()); + + for (partition, (tx, rx)) in txs.into_iter().zip(rxs).enumerate() { + let reservation = Arc::new(Mutex::new( + MemoryConsumer::new(format!("{}[{partition}]", name)) + .register(context.memory_pool()), + )); + channels.insert(partition, (tx, rx, reservation)); + } + + channels +} + +impl OnDemandRepartitionExecState { + #[allow(clippy::too_many_arguments)] + fn new( + input: Arc, + partitioning: Partitioning, + metrics: ExecutionPlanMetricsSet, + preserve_order: bool, + name: String, + context: Arc, + partition_receivers: Vec>, + ) -> Self { + let num_input_partitions = input.output_partitioning().partition_count(); + let num_output_partitions = partitioning.partition_count(); + + let (txs, rxs) = create_on_demand_repartition_channels( + preserve_order, + num_input_partitions, + num_output_partitions, + ); + + let channels = create_on_demand_partition_channels_hashmap( + txs, + rxs, + name, + Arc::clone(&context), + ); + + // launch one async task per *input* partition + let mut spawned_tasks = Vec::with_capacity(num_input_partitions); + for i in 0..num_input_partitions { + let txs: HashMap<_, _> = channels + .iter() + .map(|(partition, (tx, _rx, reservation))| { + (*partition, (tx[i].clone(), Arc::clone(reservation))) + }) + .collect(); + + let input_task = { + let partition_rx = if preserve_order { + partition_receivers.clone()[i].clone() + } else { + partition_receivers.clone()[0].clone() + }; + let r_metrics = + OnDemandRepartitionMetrics::new(i, num_output_partitions, &metrics); + + SpawnedTask::spawn(OnDemandRepartitionExec::pull_from_input( + Arc::clone(&input), + i, + txs.clone(), + partitioning.clone(), + partition_rx, + r_metrics, + Arc::clone(&context), + )) + }; + + // In a separate task, wait for each input to be done + // (and pass along any errors, including panic!s) + + let wait_for_task = + SpawnedTask::spawn(OnDemandRepartitionExec::wait_for_task( + input_task, + txs.into_iter() + .map(|(partition, (tx, _reservation))| (partition, tx)) + .collect(), + )); + + spawned_tasks.push(wait_for_task); + } + Self { + channels, + abort_helper: Arc::new(spawned_tasks), + } + } +} + +#[derive(Debug, Clone)] +pub struct OnDemandRepartitionExec { + base: RepartitionExecBase, + /// Channel to send partition number to the downstream task + partition_channels: Arc>>, + state: Arc>>, +} + +impl OnDemandRepartitionExec { + /// Input execution plan + pub fn input(&self) -> &Arc { + &self.base.input + } + + /// Partitioning scheme to use + pub fn partitioning(&self) -> &Partitioning { + &self.base.cache.partitioning + } + + /// Get preserve_order flag of the RepartitionExecutor + /// `true` means `SortPreservingRepartitionExec`, `false` means `OnDemandRepartitionExec` + pub fn preserve_order(&self) -> bool { + self.base.preserve_order + } + + /// Specify if this reparititoning operation should preserve the order of + /// rows from its input when producing output. Preserving order is more + /// expensive at runtime, so should only be set if the output of this + /// operator can take advantage of it. + /// + /// If the input is not ordered, or has only one partition, this is a no op, + /// and the node remains a `OnDemandRepartitionExec`. + pub fn with_preserve_order(mut self) -> Self { + self.base = self.base.with_preserve_order(); + self + } + + /// Get name used to display this Exec + pub fn name(&self) -> &str { + "OnDemandRepartitionExec" + } +} + +impl DisplayAs for OnDemandRepartitionExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "{}: partitioning={}, input_partitions={}", + self.name(), + self.partitioning(), + self.base.input.output_partitioning().partition_count() + )?; + + if self.base.preserve_order { + write!(f, ", preserve_order=true")?; + } + + if let Some(sort_exprs) = self.base.sort_exprs() { + write!(f, ", sort_exprs={}", sort_exprs.clone())?; + } + Ok(()) + } + } + } +} + +impl ExecutionPlan for OnDemandRepartitionExec { + fn name(&self) -> &'static str { + "OnDemandRepartitionExec" + } + + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.base.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.base.input] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + let mut repartition = OnDemandRepartitionExec::try_new( + children.swap_remove(0), + self.partitioning().clone(), + )?; + if self.base.preserve_order { + repartition = repartition.with_preserve_order(); + } + Ok(Arc::new(repartition)) + } + + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false] + } + + fn maintains_input_order(&self) -> Vec { + RepartitionExecBase::maintains_input_order_helper( + self.input(), + self.base.preserve_order, + ) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + trace!( + "Start {}::execute for partition: {}", + self.name(), + partition + ); + + let lazy_state = Arc::clone(&self.state); + let partition_channels = Arc::clone(&self.partition_channels); + let input = Arc::clone(&self.base.input); + let partitioning = self.partitioning().clone(); + let metrics = self.base.metrics.clone(); + let preserve_order = self.base.preserve_order; + let name = self.name().to_owned(); + let schema = self.schema(); + let schema_captured = Arc::clone(&schema); + + // Get existing ordering to use for merging + let sort_exprs = self.base.sort_exprs().cloned().unwrap_or_default(); + + let stream = futures::stream::once(async move { + let num_input_partitions = input.output_partitioning().partition_count(); + let input_captured = Arc::clone(&input); + let metrics_captured = metrics.clone(); + let name_captured = name.clone(); + let context_captured = Arc::clone(&context); + let partition_channels = partition_channels + .get_or_init(|| async move { + let (txs, rxs) = if preserve_order { + (0..num_input_partitions) + .map(|_| async_channel::unbounded()) + .unzip::<_, _, Vec<_>, Vec<_>>() + } else { + let (tx, rx) = async_channel::unbounded(); + (vec![tx], vec![rx]) + }; + Mutex::new((txs, rxs)) + }) + .await; + let (partition_txs, partition_rxs) = { + let channel = partition_channels.lock(); + (channel.0.clone(), channel.1.clone()) + }; + + let state = lazy_state + .get_or_init(|| async move { + Mutex::new(OnDemandRepartitionExecState::new( + input_captured, + partitioning.clone(), + metrics_captured, + preserve_order, + name_captured, + context_captured, + partition_rxs.clone(), + )) + }) + .await; + + // lock scope + let (mut rx, reservation, abort_helper) = { + // lock mutexes + let mut state = state.lock(); + + // now return stream for the specified *output* partition which will + // read from the channel + let (_tx, rx, reservation) = state + .channels + .remove(&partition) + .expect("partition not used yet"); + + (rx, reservation, Arc::clone(&state.abort_helper)) + }; + + trace!( + "Before returning stream in {}::execute for partition: {}", + name, + partition + ); + + if preserve_order { + // Store streams from all the input partitions: + let input_streams = rx + .into_iter() + .enumerate() + .map(|(i, receiver)| { + // sender should be partition-wise + Box::pin(OnDemandPerPartitionStream { + schema: Arc::clone(&schema_captured), + receiver, + _drop_helper: Arc::clone(&abort_helper), + reservation: Arc::clone(&reservation), + sender: partition_txs[i].clone(), + partition, + is_requested: false, + }) as SendableRecordBatchStream + }) + .collect::>(); + // Note that receiver size (`rx.len()`) and `num_input_partitions` are same. + + // Merge streams (while preserving ordering) coming from + // input partitions to this partition: + let fetch = None; + let merge_reservation = + MemoryConsumer::new(format!("{}[Merge {partition}]", name)) + .register(context.memory_pool()); + StreamingMergeBuilder::new() + .with_streams(input_streams) + .with_schema(schema_captured) + .with_expressions(&sort_exprs) + .with_metrics(BaselineMetrics::new(&metrics, partition)) + .with_batch_size(context.session_config().batch_size()) + .with_fetch(fetch) + .with_reservation(merge_reservation) + .build() + } else { + Ok(Box::pin(OnDemandRepartitionStream { + num_input_partitions, + num_input_partitions_processed: 0, + schema: input.schema(), + input: rx.swap_remove(0), + _drop_helper: abort_helper, + reservation, + sender: partition_txs[0].clone(), + partition, + is_requested: false, + }) as SendableRecordBatchStream) + } + }) + .try_flatten(); + let stream = RecordBatchStreamAdapter::new(schema, stream); + Ok(Box::pin(stream)) + } + + fn metrics(&self) -> Option { + Some(self.base.metrics.clone_inner()) + } + + fn statistics(&self) -> Result { + self.base.input.statistics() + } + + fn cardinality_effect(&self) -> CardinalityEffect { + CardinalityEffect::Equal + } + + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result>> { + // If the projection does not narrow the schema, we should not try to push it down. + if projection.expr().len() >= projection.input().schema().fields().len() { + return Ok(None); + } + + // If pushdown is not beneficial or applicable, break it. + if projection.benefits_from_input_partitioning()[0] + || !all_columns(projection.expr()) + { + return Ok(None); + } + + let new_projection = make_with_child(projection, self.input())?; + + Ok(Some(Arc::new(OnDemandRepartitionExec::try_new( + new_projection, + self.partitioning().clone(), + )?))) + } +} + +impl OnDemandRepartitionExec { + /// Create a new RepartitionExec, that produces output `partitioning`, and + /// does not preserve the order of the input (see [`Self::with_preserve_order`] + /// for more details) + pub fn try_new( + input: Arc, + partitioning: Partitioning, + ) -> Result { + let preserve_order = false; + let cache = RepartitionExecBase::compute_properties( + &input, + partitioning.clone(), + preserve_order, + ); + Ok(OnDemandRepartitionExec { + base: RepartitionExecBase { + input, + metrics: ExecutionPlanMetricsSet::new(), + preserve_order, + cache, + }, + partition_channels: Default::default(), + state: Default::default(), + }) + } + // Executes the input plan and poll stream into the buffer, records fetch_time and buffer_time metrics + async fn process_input( + input: Arc, + partition: usize, + buffer_tx: tokio::sync::mpsc::Sender, + context: Arc, + fetch_time: metrics::Time, + send_buffer_time: metrics::Time, + ) -> Result<()> { + let timer = fetch_time.timer(); + let mut stream = input.execute(partition, context).map_err(|e| { + internal_datafusion_err!( + "Error executing input partition {} for on demand repartitioning: {}", + partition, + e + ) + })?; + timer.done(); + + loop { + let timer = fetch_time.timer(); + let batch = stream.next().await; + timer.done(); + + let Some(batch) = batch else { + break; + }; + let timer = send_buffer_time.timer(); + // Feed the buffer with batch, since the buffer channel has limited capacity + // The process waits here until one is consumed + buffer_tx.send(batch?).await.map_err(|e| { + internal_datafusion_err!( + "Error sending batch to buffer channel for partition {}: {}", + partition, + e + ) + })?; + timer.done(); + } + + Ok(()) + } + + /// Pulls data from the specified input plan, feeding it to the + /// output partitions based on the desired partitioning + /// + /// txs hold the output sending channels for each output partition + pub(crate) async fn pull_from_input( + input: Arc, + input_partition: usize, + mut output_channels: HashMap< + usize, + (OnDemandDistributionSender, SharedMemoryReservation), + >, + partitioning: Partitioning, + output_partition_rx: Receiver, + metrics: OnDemandRepartitionMetrics, + context: Arc, + ) -> Result<()> { + // initialize buffer channel so that we can pre-fetch from input + let (buffer_tx, mut buffer_rx) = tokio::sync::mpsc::channel(2); + // execute the child operator in a separate task + // that pushes batches into buffer channel with limited capacity + let processing_task = SpawnedTask::spawn(Self::process_input( + Arc::clone(&input), + input_partition, + buffer_tx, + Arc::clone(&context), + metrics.fetch_time.clone(), + metrics.send_buffer_time.clone(), + )); + + let mut batches_until_yield = partitioning.partition_count(); + // When the input is done, break the loop + while !output_channels.is_empty() { + // Wait until a partition is requested, then get the output partition information + let partition = output_partition_rx.recv().await.map_err(|e| { + internal_datafusion_err!( + "Error receiving partition number from output partition: {}", + e + ) + })?; + + // Fetch the batch from the buffer, ideally this should reduce the time gap between the requester and the input stream + let batch = match buffer_rx.recv().await { + Some(batch) => batch, + None => break, + }; + + let size = batch.get_array_memory_size(); + + let timer = metrics.send_time[partition].timer(); + // if there is still a receiver, send to it + if let Some((tx, reservation)) = output_channels.get_mut(&partition) { + reservation.lock().try_grow(size)?; + + if tx.send(Some(Ok(batch))).is_err() { + // If the other end has hung up, it was an early shutdown (e.g. LIMIT) + reservation.lock().shrink(size); + output_channels.remove(&partition); + } + } + timer.done(); + + // If the input stream is endless, we may spin forever and + // never yield back to tokio. See + // https://github.com/apache/datafusion/issues/5278. + // + // However, yielding on every batch causes a bottleneck + // when running with multiple cores. See + // https://github.com/apache/datafusion/issues/6290 + // + // Thus, heuristically yield after producing num_partition + // batches + if batches_until_yield == 0 { + tokio::task::yield_now().await; + batches_until_yield = partitioning.partition_count(); + } else { + batches_until_yield -= 1; + } + } + + processing_task.join().await.map_err(|e| { + internal_datafusion_err!("Error waiting for processing task to finish: {}", e) + })??; + Ok(()) + } + + /// Waits for `input_task` which is consuming one of the inputs to + /// complete. Upon each successful completion, sends a `None` to + /// each of the output tx channels to signal one of the inputs is + /// complete. Upon error, propagates the errors to all output tx + /// channels. + async fn wait_for_task( + input_task: SpawnedTask>, + txs: HashMap, + ) { + // wait for completion, and propagate error + // note we ignore errors on send (.ok) as that means the receiver has already shutdown. + + match input_task.join().await { + // Error in joining task + Err(e) => { + let e = Arc::new(e); + + for (_, tx) in txs { + let err = Err(DataFusionError::Context( + "Join Error".to_string(), + Box::new(DataFusionError::External(Box::new(Arc::clone(&e)))), + )); + tx.send(Some(err)).ok(); + } + } + // Error from running input task + Ok(Err(e)) => { + // send the same Arc'd error to all output partitions + let e = Arc::new(e); + + for (_, tx) in txs { + // wrap it because need to send error to all output partitions + let err = Err(DataFusionError::from(&e)); + tx.send(Some(err)).ok(); + } + } + // Input task completed successfully + Ok(Ok(())) => { + // notify each output partition that this input partition has no more data + for (_, tx) in txs { + tx.send(None).ok(); + } + } + } + } +} + +#[derive(Debug, Clone)] +pub(crate) struct OnDemandRepartitionMetrics { + /// Time in nanos to execute child operator and fetch batches + fetch_time: metrics::Time, + /// Time in nanos for sending resulting batches to buffer channels. + send_buffer_time: metrics::Time, + /// Time in nanos for sending resulting batches to channels. + /// + /// One metric per output partition. + send_time: Vec, +} + +impl OnDemandRepartitionMetrics { + pub fn new( + input_partition: usize, + num_output_partitions: usize, + metrics: &ExecutionPlanMetricsSet, + ) -> Self { + // Time in nanos to execute child operator and fetch batches + let fetch_time = + MetricBuilder::new(metrics).subset_time("fetch_time", input_partition); + + // Time in nanos for sending resulting batches to channels + let send_time = (0..num_output_partitions) + .map(|output_partition| { + let label = + metrics::Label::new("outputPartition", output_partition.to_string()); + MetricBuilder::new(metrics) + .with_label(label) + .subset_time("send_time", input_partition) + }) + .collect(); + + // Time in nanos for sending resulting batches to buffer channels + let send_buffer_time = + MetricBuilder::new(metrics).subset_time("send_buffer_time", input_partition); + Self { + fetch_time, + send_time, + send_buffer_time, + } + } +} + +/// This struct converts a receiver to a stream. +/// Receiver receives data on an SPSC channel. +struct OnDemandPerPartitionStream { + /// Schema wrapped by Arc + schema: SchemaRef, + + /// channel containing the repartitioned batches + receiver: OnDemandDistributionReceiver, + + /// Handle to ensure background tasks are killed when no longer needed. + _drop_helper: Arc>>, + + /// Memory reservation. + reservation: SharedMemoryReservation, + + /// Sender to send partititon number to the receiver + sender: Sender, + + /// Partition number + partition: usize, + + /// Avoid sending partition number multiple times, set to true after sending partition number + is_requested: bool, +} + +impl Stream for OnDemandPerPartitionStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + if !self.is_requested { + match self.sender.try_send(self.partition) { + Ok(_) => self.is_requested = true, + // unobunded channel, should not be full + Err(TrySendError::Full(_)) => { + return internal_err!("Partition sender {} is full", self.partition)?; + } + Err(TrySendError::Closed(_)) => { + return Poll::Ready(None); + } + } + } + + let result = ready!(self.receiver.poll_recv(cx)); + // set is_requested to false, when receiving a batch + self.is_requested = false; + + match result { + Some(Some(batch_result)) => { + if let Ok(batch) = &batch_result { + self.reservation + .lock() + .shrink(batch.get_array_memory_size()); + } + Poll::Ready(Some(batch_result)) + } + _ => Poll::Ready(None), + } + } +} + +impl RecordBatchStream for OnDemandPerPartitionStream { + /// Get the schema + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +struct OnDemandRepartitionStream { + /// Number of input partitions that will be sending batches to this output channel + num_input_partitions: usize, + + /// Number of input partitions that have finished sending batches to this output channel + num_input_partitions_processed: usize, + + /// Schema wrapped by Arc + schema: SchemaRef, + + /// channel containing the repartitioned batches + input: OnDemandDistributionReceiver, + + /// Handle to ensure background tasks are killed when no longer needed. + _drop_helper: Arc>>, + + /// Memory reservation. + reservation: SharedMemoryReservation, + + /// Sender for the output partition + sender: Sender, + + /// Partition number + partition: usize, + + /// Avoid sending partition number multiple times, set to true after sending partition number + is_requested: bool, +} + +impl Stream for OnDemandRepartitionStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + // Send partition number to input partitions + if !self.is_requested { + match self.sender.try_send(self.partition) { + Ok(_) => self.is_requested = true, + // unobunded channel, should not be full + Err(TrySendError::Full(_)) => { + return internal_err!( + "Partition sender {} is full", + self.partition + )?; + } + Err(TrySendError::Closed(_)) => { + return Poll::Ready(None); + } + } + } + + let result = ready!(self.input.poll_recv(cx)); + // set is_requested to false, when receiving a batch + self.is_requested = false; + + match result { + Some(Some(v)) => { + if let Ok(batch) = &v { + self.reservation + .lock() + .shrink(batch.get_array_memory_size()); + } + return Poll::Ready(Some(v)); + } + Some(None) => { + self.num_input_partitions_processed += 1; + + if self.num_input_partitions == self.num_input_partitions_processed { + // all input partitions have finished sending batches + return Poll::Ready(None); + } else { + // other partitions still have data to send + continue; + } + } + None => { + return Poll::Ready(None); + } + } + } + } +} + +impl RecordBatchStream for OnDemandRepartitionStream { + /// Get the schema + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use super::*; + use crate::{ + collect, + memory::MemorySourceConfig, + source::DataSourceExec, + test::{ + assert_is_pending, + exec::{ + assert_strong_count_converges_to_zero, BarrierExec, BlockingExec, + ErrorExec, MockExec, + }, + }, + }; + + use arrow::array::{ArrayRef, StringArray, UInt32Array}; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::cast::as_string_array; + use datafusion_common::{assert_batches_sorted_eq, exec_err}; + use futures::FutureExt; + use tokio::task::JoinSet; + + use arrow_schema::SortOptions; + + use crate::coalesce_partitions::CoalescePartitionsExec; + use crate::union::UnionExec; + use datafusion_physical_expr::expressions::col; + use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; + + /// Asserts that the plan is as expected + /// + /// `$EXPECTED_PLAN_LINES`: input plan + /// `$PLAN`: the plan to optimized + /// + macro_rules! assert_plan { + ($EXPECTED_PLAN_LINES: expr, $PLAN: expr) => { + let physical_plan = $PLAN; + let formatted = crate::displayable(&physical_plan).indent(true).to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + + let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES + .iter().map(|s| *s).collect(); + + assert_eq!( + expected_plan_lines, actual, + "\n**Original Plan Mismatch\n\nexpected:\n\n{expected_plan_lines:#?}\nactual:\n\n{actual:#?}\n\n" + ); + }; + } + + fn test_schema() -> Arc { + Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)])) + } + + async fn repartition( + schema: &SchemaRef, + input_partitions: Vec>, + partitioning: Partitioning, + ) -> Result>> { + let task_ctx = Arc::new(TaskContext::default()); + // create physical plan + let exec = MemorySourceConfig::try_new_exec( + &input_partitions, + Arc::clone(schema), + None, + )?; + let exec = OnDemandRepartitionExec::try_new(exec, partitioning)?; + + // execute and collect results + let mut output_partitions = vec![]; + for i in 0..exec.partitioning().partition_count() { + // execute this *output* partition and collect all batches + let mut stream = exec.execute(i, Arc::clone(&task_ctx))?; + let mut batches = vec![]; + while let Some(result) = stream.next().await { + batches.push(result?); + } + output_partitions.push(batches); + } + Ok(output_partitions) + } + + #[tokio::test] + async fn many_to_one_on_demand() -> Result<()> { + // define input partitions + let schema = test_schema(); + let partition = create_vec_batches(50); + let partitions = vec![partition.clone(), partition.clone(), partition.clone()]; + + // repartition from 3 input to 1 output + let output_partitions = + repartition(&schema, partitions, Partitioning::OnDemand(1)).await?; + + assert_eq!(1, output_partitions.len()); + assert_eq!(150, output_partitions[0].len()); + + Ok(()) + } + + #[tokio::test] + async fn many_to_many_on_demand() -> Result<()> { + // define input partitions + let schema = test_schema(); + let partition = create_vec_batches(50); + let partitions = vec![partition.clone(), partition.clone(), partition.clone()]; + + let output_partitions = + repartition(&schema, partitions, Partitioning::OnDemand(8)).await?; + + let total_rows: usize = output_partitions + .iter() + .map(|x| x.iter().map(|x| x.num_rows()).sum::()) + .sum(); + + assert_eq!(8, output_partitions.len()); + assert_eq!(total_rows, 8 * 50 * 3); + + Ok(()) + } + + #[tokio::test] + async fn many_to_many_on_demand_with_coalesce() -> Result<()> { + let schema = test_schema(); + let partition: Vec = create_vec_batches(2); + let partitions = vec![partition.clone(), partition.clone()]; + let input = + MemorySourceConfig::try_new_exec(&partitions, Arc::clone(&schema), None)?; + let exec = + OnDemandRepartitionExec::try_new(input, Partitioning::OnDemand(3)).unwrap(); + + let coalesce_exec = + CoalescePartitionsExec::new(Arc::new(exec) as Arc); + + let expected_plan = [ + "CoalescePartitionsExec", + " OnDemandRepartitionExec: partitioning=OnDemand(3), input_partitions=2", + " DataSourceExec: partitions=2, partition_sizes=[2, 2]", + ]; + assert_plan!(expected_plan, coalesce_exec.clone()); + + // execute the plan + let task_ctx = Arc::new(TaskContext::default()); + let stream = coalesce_exec.execute(0, task_ctx)?; + let batches = crate::common::collect(stream).await?; + + #[rustfmt::skip] + let expected = vec![ + "+----+", + "| c0 |", + "+----+", + "| 1 |", + "| 1 |", + "| 1 |", + "| 1 |", + "| 2 |", + "| 2 |", + "| 2 |", + "| 2 |", + "| 3 |", + "| 3 |", + "| 3 |", + "| 3 |", + "| 4 |", + "| 4 |", + "| 4 |", + "| 4 |", + "| 5 |", + "| 5 |", + "| 5 |", + "| 5 |", + "| 6 |", + "| 6 |", + "| 6 |", + "| 6 |", + "| 7 |", + "| 7 |", + "| 7 |", + "| 7 |", + "| 8 |", + "| 8 |", + "| 8 |", + "| 8 |", + "+----+", + ]; + + assert_batches_sorted_eq!(&expected, &batches); + + Ok(()) + } + + #[tokio::test] + async fn error_for_input_exec() { + // This generates an error on a call to execute. The error + // should be returned and no results produced. + + let task_ctx = Arc::new(TaskContext::default()); + let input = ErrorExec::new(); + let partitioning = Partitioning::OnDemand(1); + let exec = + OnDemandRepartitionExec::try_new(Arc::new(input), partitioning).unwrap(); + + // Note: this should pass (the stream can be created) but the + // error when the input is executed should get passed back + let output_stream = exec.execute(0, task_ctx).unwrap(); + + // Expect that an error is returned + let result_string = crate::common::collect(output_stream) + .await + .unwrap_err() + .to_string(); + assert!( + result_string.contains("ErrorExec, unsurprisingly, errored in partition 0"), + "actual: {result_string}" + ); + } + + #[tokio::test] + async fn repartition_with_error_in_stream() { + let task_ctx = Arc::new(TaskContext::default()); + let batch = RecordBatch::try_from_iter(vec![( + "my_awesome_field", + Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef, + )]) + .unwrap(); + + // input stream returns one good batch and then one error. The + // error should be returned. + let err = exec_err!("bad data error"); + + let schema = batch.schema(); + let input = MockExec::new(vec![Ok(batch), err], schema); + let partitioning = Partitioning::OnDemand(1); + let exec = + OnDemandRepartitionExec::try_new(Arc::new(input), partitioning).unwrap(); + + // Note: this should pass (the stream can be created) but the + // error when the input is executed should get passed back + let output_stream = exec.execute(0, task_ctx).unwrap(); + + // Expect that an error is returned + let result_string = crate::common::collect(output_stream) + .await + .unwrap_err() + .to_string(); + assert!( + result_string.contains("bad data error"), + "actual: {result_string}" + ); + } + + #[tokio::test] + async fn repartition_with_delayed_stream() { + let task_ctx = Arc::new(TaskContext::default()); + let batch1 = RecordBatch::try_from_iter(vec![( + "my_awesome_field", + Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef, + )]) + .unwrap(); + + let batch2 = RecordBatch::try_from_iter(vec![( + "my_awesome_field", + Arc::new(StringArray::from(vec!["frob", "baz"])) as ArrayRef, + )]) + .unwrap(); + + // The mock exec doesn't return immediately (instead it + // requires the input to wait at least once) + let schema = batch1.schema(); + let expected_batches = vec![batch1.clone(), batch2.clone()]; + let input = MockExec::new(vec![Ok(batch1), Ok(batch2)], schema); + let partitioning = Partitioning::OnDemand(1); + + let exec = + OnDemandRepartitionExec::try_new(Arc::new(input), partitioning).unwrap(); + + let expected = vec![ + "+------------------+", + "| my_awesome_field |", + "+------------------+", + "| foo |", + "| bar |", + "| frob |", + "| baz |", + "+------------------+", + ]; + + assert_batches_sorted_eq!(&expected, &expected_batches); + + let output_stream = exec.execute(0, task_ctx).unwrap(); + let batches = crate::common::collect(output_stream).await.unwrap(); + + assert_batches_sorted_eq!(&expected, &batches); + } + + #[tokio::test] + async fn hash_repartition_avoid_empty_batch() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let batch = RecordBatch::try_from_iter(vec![( + "a", + Arc::new(StringArray::from(vec!["foo"])) as ArrayRef, + )]) + .unwrap(); + let partitioning = Partitioning::OnDemand(2); + let schema = batch.schema(); + let input = MockExec::new(vec![Ok(batch)], schema); + let exec = + OnDemandRepartitionExec::try_new(Arc::new(input), partitioning).unwrap(); + let output_stream0 = exec.execute(0, Arc::clone(&task_ctx)).unwrap(); + let batch0 = crate::common::collect(output_stream0).await.unwrap(); + let output_stream1 = exec.execute(1, Arc::clone(&task_ctx)).unwrap(); + let batch1 = crate::common::collect(output_stream1).await.unwrap(); + assert!(batch0.is_empty() || batch1.is_empty()); + Ok(()) + } + + #[tokio::test] + async fn on_demand_repartition_with_dropping_output_stream() { + let task_ctx = Arc::new(TaskContext::default()); + let partitioning = Partitioning::OnDemand(2); + + // We first collect the results without dropping the output stream. + let input = Arc::new(make_barrier_exec()); + let exec = OnDemandRepartitionExec::try_new( + Arc::clone(&input) as Arc, + partitioning.clone(), + ) + .unwrap(); + let output_stream1 = exec.execute(1, Arc::clone(&task_ctx)).unwrap(); + let mut background_task = JoinSet::new(); + background_task.spawn(async move { + input.wait().await; + }); + let batches_without_drop = crate::common::collect(output_stream1).await.unwrap(); + + // run some checks on the result + let items_vec = str_batches_to_vec(&batches_without_drop); + let items_set: HashSet<&str> = items_vec.iter().copied().collect(); + assert_eq!(items_vec.len(), items_set.len()); + let source_str_set: HashSet<&str> = + ["foo", "bar", "frob", "baz", "goo", "gar", "grob", "gaz"] + .iter() + .copied() + .collect(); + assert_eq!(items_set.difference(&source_str_set).count(), 0); + + // Now do the same but dropping the stream before waiting for the barrier + let input = Arc::new(make_barrier_exec()); + let exec = OnDemandRepartitionExec::try_new( + Arc::clone(&input) as Arc, + partitioning, + ) + .unwrap(); + let output_stream0 = exec.execute(0, Arc::clone(&task_ctx)).unwrap(); + let output_stream1 = exec.execute(1, Arc::clone(&task_ctx)).unwrap(); + // now, purposely drop output stream 0 + // *before* any outputs are produced + drop(output_stream0); + let mut background_task = JoinSet::new(); + background_task.spawn(async move { + input.wait().await; + }); + let batches_with_drop = crate::common::collect(output_stream1).await.unwrap(); + + assert_eq!(batches_without_drop, batches_with_drop); + } + + fn str_batches_to_vec(batches: &[RecordBatch]) -> Vec<&str> { + batches + .iter() + .flat_map(|batch| { + assert_eq!(batch.columns().len(), 1); + let string_array = as_string_array(batch.column(0)) + .expect("Unexpected type for repartitoned batch"); + + string_array + .iter() + .map(|v| v.expect("Unexpected null")) + .collect::>() + }) + .collect::>() + } + + /// Create a BarrierExec that returns two partitions of two batches each + fn make_barrier_exec() -> BarrierExec { + let batch1 = RecordBatch::try_from_iter(vec![( + "my_awesome_field", + Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef, + )]) + .unwrap(); + + let batch2 = RecordBatch::try_from_iter(vec![( + "my_awesome_field", + Arc::new(StringArray::from(vec!["frob", "baz"])) as ArrayRef, + )]) + .unwrap(); + + let batch3 = RecordBatch::try_from_iter(vec![( + "my_awesome_field", + Arc::new(StringArray::from(vec!["goo", "gar"])) as ArrayRef, + )]) + .unwrap(); + + let batch4 = RecordBatch::try_from_iter(vec![( + "my_awesome_field", + Arc::new(StringArray::from(vec!["grob", "gaz"])) as ArrayRef, + )]) + .unwrap(); + + // The barrier exec waits to be pinged + // requires the input to wait at least once) + let schema = batch1.schema(); + BarrierExec::new(vec![vec![batch1, batch2], vec![batch3, batch4]], schema) + } + + #[tokio::test] + async fn test_drop_cancel() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)])); + + let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 2)); + let refs = blocking_exec.refs(); + let repartition_exec = Arc::new(OnDemandRepartitionExec::try_new( + blocking_exec, + Partitioning::UnknownPartitioning(1), + )?); + + let fut = collect(repartition_exec, task_ctx); + let mut fut = fut.boxed(); + + assert_is_pending(&mut fut); + drop(fut); + assert_strong_count_converges_to_zero(refs).await; + + Ok(()) + } + + /// Create vector batches + fn create_vec_batches(n: usize) -> Vec { + let batch = create_batch(); + (0..n).map(|_| batch.clone()).collect() + } + + /// Create batch + fn create_batch() -> RecordBatch { + let schema = test_schema(); + RecordBatch::try_new( + schema, + vec![Arc::new(UInt32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8]))], + ) + .unwrap() + } + + #[tokio::test] + async fn test_preserve_order() -> Result<()> { + let schema = test_schema(); + let sort_exprs = sort_exprs(&schema); + let source1 = sorted_memory_exec(&schema, sort_exprs.clone()); + let source2 = sorted_memory_exec(&schema, sort_exprs); + // output has multiple partitions, and is sorted + let union = UnionExec::new(vec![source1, source2]); + let exec = + OnDemandRepartitionExec::try_new(Arc::new(union), Partitioning::OnDemand(10)) + .unwrap() + .with_preserve_order(); + + // Repartition should preserve order + let expected_plan = [ + "OnDemandRepartitionExec: partitioning=OnDemand(10), input_partitions=2, preserve_order=true, sort_exprs=c0@0 ASC", + " UnionExec", + " DataSourceExec: partitions=1, partition_sizes=[0], output_ordering=c0@0 ASC", + " DataSourceExec: partitions=1, partition_sizes=[0], output_ordering=c0@0 ASC", + ]; + assert_plan!(expected_plan, exec); + Ok(()) + } + + #[tokio::test] + async fn test_preserve_order_with_coalesce() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "my_awesome_field", + DataType::UInt32, + false, + )])); + let options = SortOptions::default(); + let sort_exprs = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("my_awesome_field", &schema).unwrap(), + options, + }]); + + let batch = RecordBatch::try_from_iter(vec![( + "my_awesome_field", + Arc::new(UInt32Array::from(vec![1, 2, 3, 4])) as ArrayRef, + )])?; + + let source = Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new( + &[vec![batch.clone()]], + Arc::clone(&schema), + None, + ) + .unwrap() + .try_with_sort_information(vec![sort_exprs]) + .unwrap(), + ))); + + // output has multiple partitions, and is sorted + let union = UnionExec::new(vec![Arc::::clone(&source), source]); + let repartition_exec = + OnDemandRepartitionExec::try_new(Arc::new(union), Partitioning::OnDemand(5)) + .unwrap() + .with_preserve_order(); + + let coalesce_exec = CoalescePartitionsExec::new( + Arc::new(repartition_exec) as Arc + ); + + // Repartition should preserve order + let expected_plan = [ + "CoalescePartitionsExec", + " OnDemandRepartitionExec: partitioning=OnDemand(5), input_partitions=2, preserve_order=true, sort_exprs=my_awesome_field@0 ASC", + " UnionExec", + " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=my_awesome_field@0 ASC", + " DataSourceExec: partitions=1, partition_sizes=[1], output_ordering=my_awesome_field@0 ASC", + + ]; + assert_plan!(expected_plan, coalesce_exec.clone()); + + let task_ctx = Arc::new(TaskContext::default()); + let stream = coalesce_exec.execute(0, task_ctx)?; + let expected_batches = crate::common::collect(stream).await?; + + let expected = vec![ + "+------------------+", + "| my_awesome_field |", + "+------------------+", + "| 1 |", + "| 1 |", + "| 2 |", + "| 2 |", + "| 3 |", + "| 3 |", + "| 4 |", + "| 4 |", + "+------------------+", + ]; + + assert_batches_sorted_eq!(&expected, &expected_batches); + Ok(()) + } + + #[tokio::test] + async fn test_preserve_order_one_partition() -> Result<()> { + let schema = test_schema(); + let sort_exprs = sort_exprs(&schema); + let source = sorted_memory_exec(&schema, sort_exprs); + // output is sorted, but has only a single partition, so no need to sort + let exec = OnDemandRepartitionExec::try_new(source, Partitioning::OnDemand(10)) + .unwrap() + .with_preserve_order(); + + // Repartition should not preserve order + let expected_plan = [ + "OnDemandRepartitionExec: partitioning=OnDemand(10), input_partitions=1", + " DataSourceExec: partitions=1, partition_sizes=[0], output_ordering=c0@0 ASC", + ]; + assert_plan!(expected_plan, exec); + Ok(()) + } + + #[tokio::test] + async fn test_preserve_order_input_not_sorted() -> Result<()> { + let schema = test_schema(); + let source1 = memory_exec(&schema); + let source2 = memory_exec(&schema); + // output has multiple partitions, but is not sorted + let union = UnionExec::new(vec![source1, source2]); + let exec = + OnDemandRepartitionExec::try_new(Arc::new(union), Partitioning::OnDemand(10)) + .unwrap() + .with_preserve_order(); + + // Repartition should not preserve order, as there is no order to preserve + let expected_plan = [ + "OnDemandRepartitionExec: partitioning=OnDemand(10), input_partitions=2", + " UnionExec", + " DataSourceExec: partitions=1, partition_sizes=[0]", + " DataSourceExec: partitions=1, partition_sizes=[0]", + ]; + assert_plan!(expected_plan, exec); + Ok(()) + } + + fn sort_exprs(schema: &Schema) -> LexOrdering { + let options = SortOptions::default(); + LexOrdering::new(vec![PhysicalSortExpr { + expr: col("c0", schema).unwrap(), + options, + }]) + } + + fn memory_exec(schema: &SchemaRef) -> Arc { + MemorySourceConfig::try_new_exec(&[vec![]], Arc::clone(schema), None).unwrap() + } + + fn sorted_memory_exec( + schema: &SchemaRef, + sort_exprs: LexOrdering, + ) -> Arc { + Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[vec![]], Arc::clone(schema), None) + .unwrap() + .try_with_sort_information(vec![sort_exprs]) + .unwrap(), + ))) + } +} diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 1cdfe6d216e3..ca32e48b555e 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1233,6 +1233,7 @@ message Partitioning { uint64 round_robin = 1; PhysicalHashRepartition hash = 2; uint64 unknown = 3; + uint64 on_demand = 4; } } diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 6e09e9a797ea..c164d2aa9b97 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -13251,6 +13251,11 @@ impl serde::Serialize for Partitioning { #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("unknown", ToString::to_string(&v).as_str())?; } + partitioning::PartitionMethod::OnDemand(v) => { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("onDemand", ToString::to_string(&v).as_str())?; + } } } struct_ser.end() @@ -13267,6 +13272,8 @@ impl<'de> serde::Deserialize<'de> for Partitioning { "roundRobin", "hash", "unknown", + "on_demand", + "onDemand", ]; #[allow(clippy::enum_variant_names)] @@ -13274,6 +13281,7 @@ impl<'de> serde::Deserialize<'de> for Partitioning { RoundRobin, Hash, Unknown, + OnDemand, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -13298,6 +13306,7 @@ impl<'de> serde::Deserialize<'de> for Partitioning { "roundRobin" | "round_robin" => Ok(GeneratedField::RoundRobin), "hash" => Ok(GeneratedField::Hash), "unknown" => Ok(GeneratedField::Unknown), + "onDemand" | "on_demand" => Ok(GeneratedField::OnDemand), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -13339,6 +13348,12 @@ impl<'de> serde::Deserialize<'de> for Partitioning { } partition_method__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| partitioning::PartitionMethod::Unknown(x.0)); } + GeneratedField::OnDemand => { + if partition_method__.is_some() { + return Err(serde::de::Error::duplicate_field("onDemand")); + } + partition_method__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| partitioning::PartitionMethod::OnDemand(x.0)); + } } } Ok(Partitioning { diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index f5ec45da48f2..00ff83ce3caa 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1832,7 +1832,7 @@ pub struct RepartitionExecNode { } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Partitioning { - #[prost(oneof = "partitioning::PartitionMethod", tags = "1, 2, 3")] + #[prost(oneof = "partitioning::PartitionMethod", tags = "1, 2, 3, 4")] pub partition_method: ::core::option::Option, } /// Nested message and enum types in `Partitioning`. @@ -1845,6 +1845,8 @@ pub mod partitioning { Hash(super::PhysicalHashRepartition), #[prost(uint64, tag = "3")] Unknown(u64), + #[prost(uint64, tag = "4")] + OnDemand(u64), } } #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 641dfe7b5fb8..a0217cc95612 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1417,6 +1417,7 @@ impl AsLogicalPlan for LogicalPlanNode { Partitioning::RoundRobinBatch(partition_count) => { PartitionMethod::RoundRobin(*partition_count as u64) } + Partitioning::DistributeBy(_) => { return not_impl_err!("DistributeBy") } diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 34fb5bb6ddc1..f1f3caed8092 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -451,6 +451,9 @@ pub fn parse_protobuf_partitioning( )) => Ok(Some(Partitioning::RoundRobinBatch( *partition_count as usize, ))), + Some(protobuf::partitioning::PartitionMethod::OnDemand(partition_count)) => { + Ok(Some(Partitioning::OnDemand(*partition_count as usize))) + } Some(protobuf::partitioning::PartitionMethod::Hash(hash_repartition)) => { parse_protobuf_hash_partitioning( Some(hash_repartition), diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 3f67842fe625..8d7a8e92d72b 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -398,6 +398,11 @@ pub fn serialize_partitioning( *partition_count as u64, )), }, + Partitioning::OnDemand(partition_count) => protobuf::Partitioning { + partition_method: Some(protobuf::partitioning::PartitionMethod::OnDemand( + *partition_count as u64, + )), + }, Partitioning::Hash(exprs, partition_count) => { let serialized_exprs = serialize_physical_exprs(exprs, codec)?; protobuf::Partitioning { diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 5a1caad46732..5c8613791516 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -249,6 +249,7 @@ datafusion.optimizer.max_passes 3 datafusion.optimizer.prefer_existing_sort false datafusion.optimizer.prefer_existing_union false datafusion.optimizer.prefer_hash_join true +datafusion.optimizer.prefer_round_robin_repartition true datafusion.optimizer.repartition_aggregations true datafusion.optimizer.repartition_file_min_size 10485760 datafusion.optimizer.repartition_file_scans true @@ -344,6 +345,7 @@ datafusion.optimizer.max_passes 3 Number of times that the optimizer will attemp datafusion.optimizer.prefer_existing_sort false When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. datafusion.optimizer.prefer_existing_union false When set to true, the optimizer will not attempt to convert Union to Interleave datafusion.optimizer.prefer_hash_join true When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory +datafusion.optimizer.prefer_round_robin_repartition true When set to false, the physical plan optimizer will replace the round robin repartitioning with on demand repartitioning datafusion.optimizer.repartition_aggregations true Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level datafusion.optimizer.repartition_file_min_size 10485760 Minimum total files size in bytes to perform file scan repartitioning. datafusion.optimizer.repartition_file_scans true When set to `true`, file groups will be repartitioned to achieve maximum parallelism. Currently Parquet and CSV formats are supported. If set to `true`, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false`, different files will be read in parallel, but repartitioning won't happen within a single file. diff --git a/datafusion/sqllogictest/test_files/on_demand_repartition.slt b/datafusion/sqllogictest/test_files/on_demand_repartition.slt new file mode 100644 index 000000000000..afc92eac83d1 --- /dev/null +++ b/datafusion/sqllogictest/test_files/on_demand_repartition.slt @@ -0,0 +1,445 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +########## +# Tests for On-Demand Repartitioning +########## + +# Set 4 partitions for deterministic output plans +statement ok +set datafusion.execution.target_partitions = 4; + +# enable round robin repartitioning +statement ok +set datafusion.optimizer.enable_round_robin_repartition = true; + +########## +# Read from parquet source with on-demand repartitioning +########## + +statement ok +COPY (VALUES (1, 2), (2, 5), (3, 2), (4, 5), (5, 0)) TO 'test_files/scratch/repartition/parquet_table/2.parquet' +STORED AS PARQUET; + +statement ok +CREATE EXTERNAL TABLE parquet_table(column1 int, column2 int) +STORED AS PARQUET +LOCATION 'test_files/scratch/repartition/parquet_table/'; + +# Enable on-demand repartitioning +statement ok +set datafusion.optimizer.prefer_round_robin_repartition = false; + +query TT +EXPLAIN SELECT column1, SUM(column2) FROM parquet_table GROUP BY column1; +---- +logical_plan +01)Aggregate: groupBy=[[parquet_table.column1]], aggr=[[sum(CAST(parquet_table.column2 AS Int64))]] +02)--TableScan: parquet_table projection=[column1, column2] +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[column1@0 as column1], aggr=[sum(parquet_table.column2)] +02)--CoalesceBatchesExec: target_batch_size=8192 +03)----RepartitionExec: partitioning=Hash([column1@0], 4), input_partitions=4 +04)------AggregateExec: mode=Partial, gby=[column1@0 as column1], aggr=[sum(parquet_table.column2)] +05)--------OnDemandRepartitionExec: partitioning=OnDemand(4), input_partitions=1 +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition/parquet_table/2.parquet]]}, projection=[column1, column2], file_type=parquet + +# Disable on-demand repartitioning +statement ok +set datafusion.optimizer.prefer_round_robin_repartition = true; + +query TT +EXPLAIN SELECT column1, SUM(column2) FROM parquet_table GROUP BY column1; +---- +logical_plan +01)Aggregate: groupBy=[[parquet_table.column1]], aggr=[[sum(CAST(parquet_table.column2 AS Int64))]] +02)--TableScan: parquet_table projection=[column1, column2] +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[column1@0 as column1], aggr=[sum(parquet_table.column2)] +02)--CoalesceBatchesExec: target_batch_size=8192 +03)----RepartitionExec: partitioning=Hash([column1@0], 4), input_partitions=4 +04)------AggregateExec: mode=Partial, gby=[column1@0 as column1], aggr=[sum(parquet_table.column2)] +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition/parquet_table/2.parquet]]}, projection=[column1, column2], file_type=parquet + +# Cleanup +statement ok +DROP TABLE parquet_table; + +########## +# Read from CSV source with on-demand repartitioning +########## + +# Test CSV source with on-demand repartitioning +statement ok +set datafusion.optimizer.enable_round_robin_repartition = true; + +statement ok +set datafusion.optimizer.prefer_round_robin_repartition = false; + + +# create_external_table_with_quote_escape +statement ok +CREATE EXTERNAL TABLE csv_with_quote ( +column1 VARCHAR, +column2 VARCHAR +) STORED AS CSV +LOCATION '../core/tests/data/quote.csv' +OPTIONS ('format.quote' '~', + 'format.delimiter' ',', + 'format.has_header' 'true'); + +statement ok +CREATE EXTERNAL TABLE csv_with_escape ( +column1 VARCHAR, +column2 VARCHAR +) STORED AS CSV +OPTIONS ('format.escape' '\', + 'format.delimiter' ',', + 'format.has_header' 'true') +LOCATION '../core/tests/data/escape.csv'; + +query TT +EXPLAIN SELECT column1 FROM csv_with_quote GROUP BY column1; +---- +logical_plan +01)Aggregate: groupBy=[[csv_with_quote.column1]], aggr=[[]] +02)--TableScan: csv_with_quote projection=[column1] +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[column1@0 as column1], aggr=[] +02)--CoalesceBatchesExec: target_batch_size=8192 +03)----RepartitionExec: partitioning=Hash([column1@0], 4), input_partitions=4 +04)------AggregateExec: mode=Partial, gby=[column1@0 as column1], aggr=[] +05)--------OnDemandRepartitionExec: partitioning=OnDemand(4), input_partitions=1 +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/quote.csv]]}, projection=[column1], file_type=csv, has_header=true + +query TT +EXPLAIN SELECT column1 FROM csv_with_escape GROUP BY column1; +---- +logical_plan +01)Aggregate: groupBy=[[csv_with_escape.column1]], aggr=[[]] +02)--TableScan: csv_with_escape projection=[column1] +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[column1@0 as column1], aggr=[] +02)--CoalesceBatchesExec: target_batch_size=8192 +03)----RepartitionExec: partitioning=Hash([column1@0], 4), input_partitions=4 +04)------AggregateExec: mode=Partial, gby=[column1@0 as column1], aggr=[] +05)--------OnDemandRepartitionExec: partitioning=OnDemand(4), input_partitions=1 +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/escape.csv]]}, projection=[column1], file_type=csv, has_header=true + +statement ok +DROP TABLE csv_with_quote; + +statement ok +DROP TABLE csv_with_escape; + +########## +# Read from arrow source with on-demand repartitioning +########## + +statement ok +CREATE EXTERNAL TABLE arrow_simple( +column1 INT, +column2 VARCHAR, +column3 BOOLEAN +) STORED AS ARROW +LOCATION '../core/tests/data/example.arrow'; + +query TT +EXPLAIN SELECT column1 FROM arrow_simple GROUP BY column1; +---- +logical_plan +01)Aggregate: groupBy=[[arrow_simple.column1]], aggr=[[]] +02)--TableScan: arrow_simple projection=[column1] +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[column1@0 as column1], aggr=[] +02)--CoalesceBatchesExec: target_batch_size=8192 +03)----RepartitionExec: partitioning=Hash([column1@0], 4), input_partitions=4 +04)------AggregateExec: mode=Partial, gby=[column1@0 as column1], aggr=[] +05)--------OnDemandRepartitionExec: partitioning=OnDemand(4), input_partitions=1 +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.arrow]]}, projection=[column1], file_type=arrow + +statement ok +DROP TABLE arrow_simple; + +########## +# Read from steaming table source with on-demand repartitioning +########## + +# Unbounded repartition +# See https://github.com/apache/datafusion/issues/5278 +# Set up unbounded table and run a query - the query plan should display a `RepartitionExec` +# and a `CoalescePartitionsExec` +statement ok +CREATE UNBOUNDED EXTERNAL TABLE sink_table ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT NOT NULL, + c5 INTEGER NOT NULL, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 INT UNSIGNED NOT NULL, + c10 BIGINT UNSIGNED NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL + ) +STORED AS CSV +LOCATION '../../testing/data/csv/aggregate_test_100.csv' +OPTIONS ('format.has_header' 'true'); + +query TII +SELECT c1, c2, c3 FROM sink_table WHERE c3 > 0 LIMIT 5; +---- +c 2 1 +b 1 29 +e 3 104 +a 3 13 +d 1 38 + +statement ok +set datafusion.execution.target_partitions = 3; + +statement ok +set datafusion.optimizer.enable_round_robin_repartition = true; + +statement ok +set datafusion.optimizer.prefer_round_robin_repartition = false; + +query TT +EXPLAIN SELECT c1, c2, c3 FROM sink_table WHERE c3 > 0 LIMIT 5; +---- +logical_plan +01)Limit: skip=0, fetch=5 +02)--Filter: sink_table.c3 > Int16(0) +03)----TableScan: sink_table projection=[c1, c2, c3] +physical_plan +01)CoalescePartitionsExec: fetch=5 +02)--CoalesceBatchesExec: target_batch_size=8192, fetch=5 +03)----FilterExec: c3@2 > 0 +04)------OnDemandRepartitionExec: partitioning=OnDemand(3), input_partitions=1 +05)--------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true + +# Start repratition on empty column test. +# See https://github.com/apache/datafusion/issues/12057 + +statement ok +CREATE TABLE t1(v1 int); + +statement ok +INSERT INTO t1 values(42); + +query I +SELECT sum(1) OVER (PARTITION BY false=false) +FROM t1 WHERE ((false > (v1 = v1)) IS DISTINCT FROM true); +---- +1 + +statement ok +DROP TABLE t1; + +# End repartition on empty columns test + +########## +# Read from memory table source with on-demand repartitioning +########## + +statement ok +CREATE TABLE memory_table AS SELECT * FROM (VALUES (1, 2), (2, 5), (3, 2), (4, 5), (5, 0)) AS t(column1, column2); + +query TT +EXPLAIN SELECT column1 FROM memory_table GROUP BY column1; +---- +logical_plan +01)Aggregate: groupBy=[[memory_table.column1]], aggr=[[]] +02)--TableScan: memory_table projection=[column1] +physical_plan +01)AggregateExec: mode=FinalPartitioned, gby=[column1@0 as column1], aggr=[] +02)--CoalesceBatchesExec: target_batch_size=8192 +03)----RepartitionExec: partitioning=Hash([column1@0], 3), input_partitions=3 +04)------OnDemandRepartitionExec: partitioning=OnDemand(3), input_partitions=1 +05)--------AggregateExec: mode=Partial, gby=[column1@0 as column1], aggr=[] +06)----------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +DROP TABLE memory_table; + + +########## +# Tests for Join with On-Demand Repartitioning +########## + +statement ok +set datafusion.execution.target_partitions = 1; + +statement ok +set datafusion.execution.batch_size = 2; + +statement ok +CREATE TABLE join_t1(t1_id INT UNSIGNED, t1_name VARCHAR, t1_int INT UNSIGNED) +AS VALUES +(11, 'a', 1), +(22, 'b', 2), +(33, 'c', 3), +(44, 'd', 4); + +statement ok +CREATE TABLE join_t2(t2_id INT UNSIGNED, t2_name VARCHAR, t2_int INT UNSIGNED) +AS VALUES +(11, 'z', 3), +(22, 'y', 1), +(44, 'x', 3), +(55, 'w', 3); + +statement ok +set datafusion.optimizer.repartition_joins = true; + +statement ok +set datafusion.execution.target_partitions = 2; + +# left join +query TT +EXPLAIN +select join_t1.t1_id, join_t2.t2_id +from join_t1 +left join join_t2 on join_t1.t1_id = join_t2.t2_id; +---- +logical_plan +01)Left Join: join_t1.t1_id = join_t2.t2_id +02)--TableScan: join_t1 projection=[t1_id] +03)--TableScan: join_t2 projection=[t2_id] +physical_plan +01)CoalesceBatchesExec: target_batch_size=2 +02)--HashJoinExec: mode=Partitioned, join_type=Left, on=[(t1_id@0, t2_id@0)] +03)----CoalesceBatchesExec: target_batch_size=2 +04)------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 +05)--------OnDemandRepartitionExec: partitioning=OnDemand(2), input_partitions=1 +06)----------DataSourceExec: partitions=1, partition_sizes=[1] +07)----CoalesceBatchesExec: target_batch_size=2 +08)------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 +09)--------OnDemandRepartitionExec: partitioning=OnDemand(2), input_partitions=1 +10)----------DataSourceExec: partitions=1, partition_sizes=[1] + +# right join +query TT +EXPLAIN +select join_t1.t1_id, join_t2.t2_id +from join_t1 +right join join_t2 on join_t1.t1_id = join_t2.t2_id; +---- +logical_plan +01)Right Join: join_t1.t1_id = join_t2.t2_id +02)--TableScan: join_t1 projection=[t1_id] +03)--TableScan: join_t2 projection=[t2_id] +physical_plan +01)CoalesceBatchesExec: target_batch_size=2 +02)--HashJoinExec: mode=Partitioned, join_type=Right, on=[(t1_id@0, t2_id@0)] +03)----CoalesceBatchesExec: target_batch_size=2 +04)------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 +05)--------OnDemandRepartitionExec: partitioning=OnDemand(2), input_partitions=1 +06)----------DataSourceExec: partitions=1, partition_sizes=[1] +07)----CoalesceBatchesExec: target_batch_size=2 +08)------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 +09)--------OnDemandRepartitionExec: partitioning=OnDemand(2), input_partitions=1 +10)----------DataSourceExec: partitions=1, partition_sizes=[1] + +# inner join +query TT +EXPLAIN +select join_t1.t1_id, join_t2.t2_id +from join_t1 +inner join join_t2 on join_t1.t1_id = join_t2.t2_id; +---- +logical_plan +01)Inner Join: join_t1.t1_id = join_t2.t2_id +02)--TableScan: join_t1 projection=[t1_id] +03)--TableScan: join_t2 projection=[t2_id] +physical_plan +01)CoalesceBatchesExec: target_batch_size=2 +02)--HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, t2_id@0)] +03)----CoalesceBatchesExec: target_batch_size=2 +04)------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 +05)--------OnDemandRepartitionExec: partitioning=OnDemand(2), input_partitions=1 +06)----------DataSourceExec: partitions=1, partition_sizes=[1] +07)----CoalesceBatchesExec: target_batch_size=2 +08)------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 +09)--------OnDemandRepartitionExec: partitioning=OnDemand(2), input_partitions=1 +10)----------DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +DROP TABLE join_t1; + +statement ok +DROP TABLE join_t2; + + +########## +# Tests for Join with On-Demand Repartitioning +########## +statement ok +CREATE EXTERNAL TABLE aggregate_test_100 ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT, + c5 INT, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 BIGINT UNSIGNED NOT NULL, + c10 VARCHAR NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL +) +STORED AS CSV +LOCATION '../../testing/data/csv/aggregate_test_100.csv' +OPTIONS ('format.has_header' 'true'); + +statement ok +set datafusion.execution.batch_size = 4096; + +statement ok +set datafusion.optimizer.repartition_windows = true; + +query TT +EXPLAIN SELECT + SUM(c4) OVER(PARTITION BY c1, c2 ORDER BY c2 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), + COUNT(*) OVER(PARTITION BY c1 ORDER BY c2 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) + FROM aggregate_test_100 +---- +logical_plan +01)Projection: sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING +02)--WindowAggr: windowExpr=[[count(Int64(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] +03)----Projection: aggregate_test_100.c1, aggregate_test_100.c2, sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING +04)------WindowAggr: windowExpr=[[sum(CAST(aggregate_test_100.c4 AS Int64)) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] +05)--------TableScan: aggregate_test_100 projection=[c1, c2, c4] +physical_plan +01)ProjectionExec: expr=[sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@2 as sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING] +02)--BoundedWindowAggExec: wdw=[count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "count(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)), is_causal: false }], mode=[Sorted] +03)----SortExec: expr=[c1@0 ASC NULLS LAST, c2@1 ASC NULLS LAST], preserve_partitioning=[true] +04)------CoalesceBatchesExec: target_batch_size=4096 +05)--------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=2 +06)----------ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING] +07)------------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)), is_causal: false }], mode=[Sorted] +08)--------------SortExec: expr=[c1@0 ASC NULLS LAST, c2@1 ASC NULLS LAST], preserve_partitioning=[true] +09)----------------CoalesceBatchesExec: target_batch_size=4096 +10)------------------RepartitionExec: partitioning=Hash([c1@0, c2@1], 2), input_partitions=2 +11)--------------------OnDemandRepartitionExec: partitioning=OnDemand(2), input_partitions=1 +12)----------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c4], file_type=csv, has_header=true diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 999735f4c059..9ab42ea80c44 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -97,6 +97,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.enforce_batch_size_in_joins | false | Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | +| datafusion.optimizer.prefer_round_robin_repartition | true | When set to false, the physical plan optimizer will replace the round robin repartitioning with on demand repartitioning | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | | datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level |