Skip to content

Commit

Permalink
feat: Add support for --mem-pool-type and --memory-limit options to m…
Browse files Browse the repository at this point in the history
…ultiple benchmarks (#14642)

* Add support --mem-pool-type and --memory-limit options for all benchmarks

* Add --sort-spill-reservation-bytes option
  • Loading branch information
Kontinuation authored Feb 14, 2025
1 parent 469f18b commit 0e52274
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 64 deletions.
72 changes: 17 additions & 55 deletions benchmarks/src/bin/external_aggr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

//! external_aggr binary entrypoint
use datafusion::execution::memory_pool::GreedyMemoryPool;
use datafusion::execution::memory_pool::MemoryPool;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
Expand All @@ -41,7 +43,7 @@ use datafusion::prelude::*;
use datafusion_benchmarks::util::{BenchmarkRun, CommonOpt};
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::{exec_datafusion_err, exec_err, DEFAULT_PARQUET_EXTENSION};
use datafusion_common::{exec_err, DEFAULT_PARQUET_EXTENSION};

#[derive(Debug, StructOpt)]
#[structopt(
Expand All @@ -58,10 +60,6 @@ struct ExternalAggrConfig {
#[structopt(short, long)]
query: Option<usize>,

/// Memory limit (e.g. '100M', '1.5G'). If not specified, run all pre-defined memory limits for given query.
#[structopt(long)]
memory_limit: Option<String>,

/// Common options
#[structopt(flatten)]
common: CommonOpt,
Expand Down Expand Up @@ -129,10 +127,8 @@ impl ExternalAggrConfig {
pub async fn run(&self) -> Result<()> {
let mut benchmark_run = BenchmarkRun::new();

let memory_limit = match &self.memory_limit {
Some(limit) => Some(Self::parse_memory_limit(limit)?),
None => None,
};
let memory_limit = self.common.memory_limit.map(|limit| limit as u64);
let mem_pool_type = self.common.mem_pool_type.as_str();

let query_range = match self.query {
Some(query_id) => query_id..=query_id,
Expand Down Expand Up @@ -171,7 +167,9 @@ impl ExternalAggrConfig {
human_readable_size(mem_limit as usize)
));

let query_results = self.benchmark_query(query_id, mem_limit).await?;
let query_results = self
.benchmark_query(query_id, mem_limit, mem_pool_type)
.await?;
for iter in query_results {
benchmark_run.write_iter(iter.elapsed, iter.row_count);
}
Expand All @@ -187,12 +185,20 @@ impl ExternalAggrConfig {
&self,
query_id: usize,
mem_limit: u64,
mem_pool_type: &str,
) -> Result<Vec<QueryResult>> {
let query_name =
format!("Q{query_id}({})", human_readable_size(mem_limit as usize));
let config = self.common.config();
let memory_pool: Arc<dyn MemoryPool> = match mem_pool_type {
"fair" => Arc::new(FairSpillPool::new(mem_limit as usize)),
"greedy" => Arc::new(GreedyMemoryPool::new(mem_limit as usize)),
_ => {
return exec_err!("Invalid memory pool type: {}", mem_pool_type);
}
};
let runtime_env = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(mem_limit as usize)))
.with_memory_pool(memory_pool)
.build_arc()?;
let state = SessionStateBuilder::new()
.with_config(config)
Expand Down Expand Up @@ -331,22 +337,6 @@ impl ExternalAggrConfig {
.partitions
.unwrap_or(get_available_parallelism())
}

/// Parse memory limit from string to number of bytes
/// e.g. '1.5G', '100M' -> 1572864
fn parse_memory_limit(limit: &str) -> Result<u64> {
let (number, unit) = limit.split_at(limit.len() - 1);
let number: f64 = number.parse().map_err(|_| {
exec_datafusion_err!("Failed to parse number from memory limit '{}'", limit)
})?;

match unit {
"K" => Ok((number * 1024.0) as u64),
"M" => Ok((number * 1024.0 * 1024.0) as u64),
"G" => Ok((number * 1024.0 * 1024.0 * 1024.0) as u64),
_ => exec_err!("Unsupported unit '{}' in memory limit '{}'", unit, limit),
}
}
}

#[tokio::main]
Expand All @@ -359,31 +349,3 @@ pub async fn main() -> Result<()> {

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_parse_memory_limit_all() {
// Test valid inputs
assert_eq!(
ExternalAggrConfig::parse_memory_limit("100K").unwrap(),
102400
);
assert_eq!(
ExternalAggrConfig::parse_memory_limit("1.5M").unwrap(),
1572864
);
assert_eq!(
ExternalAggrConfig::parse_memory_limit("2G").unwrap(),
2147483648
);

// Test invalid unit
assert!(ExternalAggrConfig::parse_memory_limit("500X").is_err());

// Test invalid number
assert!(ExternalAggrConfig::parse_memory_limit("abcM").is_err());
}
}
3 changes: 2 additions & 1 deletion benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ impl RunOpt {
parquet_options.binary_as_string = true;
}

let ctx = SessionContext::new_with_config(config);
let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
self.register_hits(&ctx).await?;

let iterations = self.common.iterations;
Expand Down
3 changes: 2 additions & 1 deletion benchmarks/src/h2o.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ impl RunOpt {
};

let config = self.common.config();
let ctx = SessionContext::new_with_config(config);
let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);

// Register data
self.register_data(&ctx).await?;
Expand Down
10 changes: 8 additions & 2 deletions benchmarks/src/imdb/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ impl RunOpt {
.config()
.with_collect_statistics(!self.disable_statistics);
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;

let ctx = SessionContext::new_with_config(config);
let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);

// register tables
self.register_tables(&ctx).await?;
Expand Down Expand Up @@ -515,6 +515,9 @@ mod tests {
iterations: 1,
partitions: Some(2),
batch_size: 8192,
mem_pool_type: "fair".to_string(),
memory_limit: None,
sort_spill_reservation_bytes: None,
debug: false,
};
let opt = RunOpt {
Expand Down Expand Up @@ -548,6 +551,9 @@ mod tests {
iterations: 1,
partitions: Some(2),
batch_size: 8192,
mem_pool_type: "fair".to_string(),
memory_limit: None,
sort_spill_reservation_bytes: None,
debug: false,
};
let opt = RunOpt {
Expand Down
2 changes: 2 additions & 0 deletions benchmarks/src/sort_tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,10 @@ impl RunOpt {
/// Benchmark query `query_id` in `SORT_QUERIES`
async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
let config = self.common.config();
let rt_builder = self.common.runtime_env_builder()?;
let state = SessionStateBuilder::new()
.with_config(config)
.with_runtime_env(rt_builder.build_arc()?)
.with_default_features()
.build();
let ctx = SessionContext::from(state);
Expand Down
9 changes: 8 additions & 1 deletion benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ impl RunOpt {
.config()
.with_collect_statistics(!self.disable_statistics);
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
let ctx = SessionContext::new_with_config(config);
let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);

// register tables
self.register_tables(&ctx).await?;
Expand Down Expand Up @@ -342,6 +343,9 @@ mod tests {
iterations: 1,
partitions: Some(2),
batch_size: 8192,
mem_pool_type: "fair".to_string(),
memory_limit: None,
sort_spill_reservation_bytes: None,
debug: false,
};
let opt = RunOpt {
Expand Down Expand Up @@ -375,6 +379,9 @@ mod tests {
iterations: 1,
partitions: Some(2),
batch_size: 8192,
mem_pool_type: "fair".to_string(),
memory_limit: None,
sort_spill_reservation_bytes: None,
debug: false,
};
let opt = RunOpt {
Expand Down
102 changes: 98 additions & 4 deletions benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,17 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::prelude::SessionConfig;
use datafusion_common::utils::get_available_parallelism;
use std::{num::NonZeroUsize, sync::Arc};

use datafusion::{
execution::{
disk_manager::DiskManagerConfig,
memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool},
runtime_env::RuntimeEnvBuilder,
},
prelude::SessionConfig,
};
use datafusion_common::{utils::get_available_parallelism, DataFusionError, Result};
use structopt::StructOpt;

// Common benchmark options (don't use doc comments otherwise this doc
Expand All @@ -35,6 +44,20 @@ pub struct CommonOpt {
#[structopt(short = "s", long = "batch-size", default_value = "8192")]
pub batch_size: usize,

/// The memory pool type to use, should be one of "fair" or "greedy"
#[structopt(long = "mem-pool-type", default_value = "fair")]
pub mem_pool_type: String,

/// Memory limit (e.g. '100M', '1.5G'). If not specified, run all pre-defined memory limits for given query
/// if there's any, otherwise run with no memory limit.
#[structopt(long = "memory-limit", parse(try_from_str = parse_memory_limit))]
pub memory_limit: Option<usize>,

/// The amount of memory to reserve for sort spill operations. DataFusion's default value will be used
/// if not specified.
#[structopt(long = "sort-spill-reservation-bytes", parse(try_from_str = parse_memory_limit))]
pub sort_spill_reservation_bytes: Option<usize>,

/// Activate debug mode to see more details
#[structopt(short, long)]
pub debug: bool,
Expand All @@ -48,10 +71,81 @@ impl CommonOpt {

/// Modify the existing config appropriately
pub fn update_config(&self, config: SessionConfig) -> SessionConfig {
config
let mut config = config
.with_target_partitions(
self.partitions.unwrap_or(get_available_parallelism()),
)
.with_batch_size(self.batch_size)
.with_batch_size(self.batch_size);
if let Some(sort_spill_reservation_bytes) = self.sort_spill_reservation_bytes {
config =
config.with_sort_spill_reservation_bytes(sort_spill_reservation_bytes);
}
config
}

/// Return an appropriately configured `RuntimeEnvBuilder`
pub fn runtime_env_builder(&self) -> Result<RuntimeEnvBuilder> {
let mut rt_builder = RuntimeEnvBuilder::new();
const NUM_TRACKED_CONSUMERS: usize = 5;
if let Some(memory_limit) = self.memory_limit {
let pool: Arc<dyn MemoryPool> = match self.mem_pool_type.as_str() {
"fair" => Arc::new(TrackConsumersPool::new(
FairSpillPool::new(memory_limit),
NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),
)),
"greedy" => Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(memory_limit),
NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),
)),
_ => {
return Err(DataFusionError::Configuration(format!(
"Invalid memory pool type: {}",
self.mem_pool_type
)))
}
};
rt_builder = rt_builder
.with_memory_pool(pool)
.with_disk_manager(DiskManagerConfig::NewOs);
}
Ok(rt_builder)
}
}

/// Parse memory limit from string to number of bytes
/// e.g. '1.5G', '100M' -> 1572864
fn parse_memory_limit(limit: &str) -> Result<usize, String> {
let (number, unit) = limit.split_at(limit.len() - 1);
let number: f64 = number
.parse()
.map_err(|_| format!("Failed to parse number from memory limit '{}'", limit))?;

match unit {
"K" => Ok((number * 1024.0) as usize),
"M" => Ok((number * 1024.0 * 1024.0) as usize),
"G" => Ok((number * 1024.0 * 1024.0 * 1024.0) as usize),
_ => Err(format!(
"Unsupported unit '{}' in memory limit '{}'",
unit, limit
)),
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_parse_memory_limit_all() {
// Test valid inputs
assert_eq!(parse_memory_limit("100K").unwrap(), 102400);
assert_eq!(parse_memory_limit("1.5M").unwrap(), 1572864);
assert_eq!(parse_memory_limit("2G").unwrap(), 2147483648);

// Test invalid unit
assert!(parse_memory_limit("500X").is_err());

// Test invalid number
assert!(parse_memory_limit("abcM").is_err());
}
}

0 comments on commit 0e52274

Please sign in to comment.