Skip to content

Commit

Permalink
fix: Support NOT <field> IN (<subquery>) via anti join (#10936)
Browse files Browse the repository at this point in the history
* fix: rewriting NOT IN (<subquery>) to anti join

* add wrapped_not_not_in_subquery test
  • Loading branch information
akoshchiy authored Jun 17, 2024
1 parent f373a86 commit a8847e1
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 5 deletions.
86 changes: 81 additions & 5 deletions datafusion/optimizer/src/decorrelate_predicate_subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
use datafusion_expr::logical_plan::{JoinType, Subquery};
use datafusion_expr::utils::{conjunction, split_conjunction, split_conjunction_owned};
use datafusion_expr::{
exists, in_subquery, not_exists, not_in_subquery, BinaryExpr, Expr, Filter,
exists, in_subquery, not, not_exists, not_in_subquery, BinaryExpr, Expr, Filter,
LogicalPlan, LogicalPlanBuilder, Operator,
};

Expand Down Expand Up @@ -79,6 +79,25 @@ impl DecorrelatePredicateSubquery {
let mut others = vec![];
for it in filters.into_iter() {
match it {
Expr::Not(not_expr) => match *not_expr {
Expr::InSubquery(InSubquery {
expr,
subquery,
negated,
}) => {
let new_subquery = self.rewrite_subquery(subquery, config)?;
subqueries.push(SubqueryInfo::new_with_in_expr(
new_subquery,
*expr,
!negated,
));
}
Expr::Exists(Exists { subquery, negated }) => {
let new_subquery = self.rewrite_subquery(subquery, config)?;
subqueries.push(SubqueryInfo::new(new_subquery, !negated));
}
expr => others.push(not(expr)),
},
Expr::InSubquery(InSubquery {
expr,
subquery,
Expand Down Expand Up @@ -126,9 +145,17 @@ impl OptimizerRule for DecorrelatePredicateSubquery {
};

// if there are no subqueries in the predicate, return the original plan
let has_subqueries = split_conjunction(&filter.predicate)
.iter()
.any(|expr| matches!(expr, Expr::InSubquery(_) | Expr::Exists(_)));
let has_subqueries =
split_conjunction(&filter.predicate)
.iter()
.any(|expr| match expr {
Expr::Not(not_expr) => {
matches!(not_expr.as_ref(), Expr::InSubquery(_) | Expr::Exists(_))
}
Expr::InSubquery(_) | Expr::Exists(_) => true,
_ => false,
});

if !has_subqueries {
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
}
Expand Down Expand Up @@ -351,7 +378,7 @@ mod tests {
use crate::test::*;

use arrow::datatypes::DataType;
use datafusion_expr::{and, binary_expr, col, lit, or, out_ref_col};
use datafusion_expr::{and, binary_expr, col, lit, not, or, out_ref_col};

fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) -> Result<()> {
assert_optimized_plan_eq_display_indent(
Expand Down Expand Up @@ -1099,6 +1126,55 @@ mod tests {
Ok(())
}

#[test]
fn wrapped_not_in_subquery() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.filter(not(in_subquery(col("c"), test_subquery_with_name("sq")?)))?
.project(vec![col("test.b")])?
.build()?;

let expected = "Projection: test.b [b:UInt32]\
\n LeftAnti Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
\n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
\n Projection: sq.c [c:UInt32]\
\n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";

assert_optimized_plan_eq_display_indent(
Arc::new(DecorrelatePredicateSubquery::new()),
plan,
expected,
);
Ok(())
}

#[test]
fn wrapped_not_not_in_subquery() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.filter(not(not_in_subquery(
col("c"),
test_subquery_with_name("sq")?,
)))?
.project(vec![col("test.b")])?
.build()?;

let expected = "Projection: test.b [b:UInt32]\
\n LeftSemi Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
\n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
\n Projection: sq.c [c:UInt32]\
\n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";

assert_optimized_plan_eq_display_indent(
Arc::new(DecorrelatePredicateSubquery::new()),
plan,
expected,
);
Ok(())
}

#[test]
fn in_subquery_both_side_expr() -> Result<()> {
let table_scan = test_table_scan()?;
Expand Down
13 changes: 13 additions & 0 deletions datafusion/sqllogictest/test_files/subquery.slt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,19 @@ where t1.t1_id + 12 not in (
----
22 b 2

# wrapped_not_in_subquery_to_join_with_correlated_outer_filter
query ITI rowsort
select t1.t1_id,
t1.t1_name,
t1.t1_int
from t1
where not t1.t1_id + 12 in (
select t2.t2_id + 1 from t2 where t1.t1_int > 0
)
----
22 b 2


# in subquery with two parentheses, see #5529
query ITI rowsort
select t1.t1_id,
Expand Down

0 comments on commit a8847e1

Please sign in to comment.