Skip to content

Commit

Permalink
Early exit on column normalisation to improve DataFrame performance (#…
Browse files Browse the repository at this point in the history
…14636)

* Exit early if the column is normalized

* `Alias` original column

* Add references test
  • Loading branch information
blaginin authored Feb 17, 2025
1 parent 42eabb9 commit 580e622
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 7 deletions.
3 changes: 2 additions & 1 deletion datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1811,7 +1811,8 @@ impl DataFrame {
.iter()
.map(|(qualifier, field)| {
if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
col(Column::from((qualifier, field))).alias(new_name)
col(Column::from((qualifier, field)))
.alias_qualified(qualifier.cloned(), new_name)
} else {
col(Column::from((qualifier, field)))
}
Expand Down
26 changes: 21 additions & 5 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
use datafusion_catalog::TableProvider;
use datafusion_common::{
assert_contains, Constraint, Constraints, DataFusionError, ParamValues, ScalarValue,
UnnestOptions,
TableReference, UnnestOptions,
};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::config::SessionConfig;
Expand Down Expand Up @@ -1617,9 +1617,25 @@ async fn with_column_renamed() -> Result<()> {
// accepts table qualifier
.with_column_renamed("aggregate_test_100.c2", "two")?
// no-op for missing column
.with_column_renamed("c4", "boom")?
.collect()
.await?;
.with_column_renamed("c4", "boom")?;

let references: Vec<_> = df_sum_renamed
.schema()
.iter()
.map(|(a, _)| a.cloned())
.collect();

assert_eq!(
references,
vec![
Some(TableReference::bare("aggregate_test_100")), // table name is preserved
Some(TableReference::bare("aggregate_test_100")),
Some(TableReference::bare("aggregate_test_100")),
None // total column
]
);

let batches = &df_sum_renamed.collect().await?;

assert_batches_sorted_eq!(
[
Expand All @@ -1629,7 +1645,7 @@ async fn with_column_renamed() -> Result<()> {
"| a | 3 | -72 | -69 |",
"+-----+-----+-----+-------+",
],
&df_sum_renamed
batches
);

Ok(())
Expand Down
8 changes: 7 additions & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,10 +847,16 @@ impl LogicalPlanBuilder {
plan: &LogicalPlan,
column: impl Into<Column>,
) -> Result<Column> {
let column = column.into();
if column.relation.is_some() {
// column is already normalized
return Ok(column);
}

let schema = plan.schema();
let fallback_schemas = plan.fallback_normalize_schemas();
let using_columns = plan.using_columns()?;
column.into().normalize_with_schemas_and_ambiguity_check(
column.normalize_with_schemas_and_ambiguity_check(
&[&[schema], &fallback_schemas],
&using_columns,
)
Expand Down

0 comments on commit 580e622

Please sign in to comment.