Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Fix the PPL Lookup command behavior when inputField is missing and REPLACE with existing fields #1035

Merged
merged 4 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/ppl-lang/ppl-lookup-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ LOOKUP <lookupIndex> (<lookupMappingField> [AS <sourceMappingField>])...
**inputField**
- Optional
- Default: All fields of \<lookupIndex\> where matched values are applied to result output if no field is specified.
- Description: A field in \<lookupIndex\> where matched values are applied to result output. You can specify multiple \<inputField\> with comma-delimited. If you don't specify any \<inputField\>, all fields of \<lookupIndex\> where matched values are applied to result output.
- Description: A field in \<lookupIndex\> where matched values are applied to result output. You can specify multiple \<inputField\> with comma-delimited. If you don't specify any \<inputField\>, all fields expect \<lookupMappingField\> from \<lookupIndex\> where matched values are applied to result output.

**outputField**
- Optional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,24 @@ class FlintSparkPPLLookupITSuite

test("test LOOKUP lookupTable uid AS id REPLACE department") {
val frame = sql(s"source = $sourceTable| LOOKUP $lookupTable uid AS id REPLACE department")
// frame.show()
// frame.explain(true)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row(1000, "Jake", "Engineer", "England", 100000, "IT"),
Row(1001, "Hello", "Artist", "USA", 70000, null),
Row(1002, "John", "Doctor", "Canada", 120000, "DATA"),
Row(1003, "David", "Doctor", null, 120000, "HR"),
Row(1004, "David", null, "Canada", 0, null),
Row(1005, "Jane", "Scientist", "Canada", 90000, "DATA"))
assertSameRows(expectedResults, frame)

implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Integer](_.getAs[Integer](0))
assert(results.sorted.sameElements(expectedResults.sorted))
val lookupProject =
Project(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("uid")), lookupAlias)
val joinCondition = EqualTo(UnresolvedAttribute("uid"), UnresolvedAttribute("id"))
val joinPlan = Join(sourceAlias, lookupProject, LeftOuter, Some(joinCondition), JoinHint.NONE)
val coalesceForSafeExpr =
Coalesce(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("department")))
Coalesce(
Seq(
UnresolvedAttribute("__auto_generated_subquery_name_l.department"),
UnresolvedAttribute("department")))
val projectAfterJoin = Project(
Seq(
UnresolvedStar(Some(Seq("__auto_generated_subquery_name_s"))),
Expand All @@ -88,26 +87,24 @@ class FlintSparkPPLLookupITSuite

test("test LOOKUP lookupTable uid AS id APPEND department") {
val frame = sql(s"source = $sourceTable| LOOKUP $lookupTable uid AS id APPEND department")
// frame.show()
// frame.explain(true)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row(1000, "Jake", "Engineer", "England", 100000, "IT"),
Row(1001, "Hello", "Artist", "USA", 70000, null),
Row(1002, "John", "Doctor", "Canada", 120000, "DATA"),
Row(1003, "David", "Doctor", null, 120000, "HR"),
Row(1004, "David", null, "Canada", 0, null),
Row(1005, "Jane", "Scientist", "Canada", 90000, "DATA"))

implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Integer](_.getAs[Integer](0))
assert(results.sorted.sameElements(expectedResults.sorted))
assertSameRows(expectedResults, frame)

val lookupProject =
Project(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("uid")), lookupAlias)
val joinCondition = EqualTo(UnresolvedAttribute("uid"), UnresolvedAttribute("id"))
val joinPlan = Join(sourceAlias, lookupProject, LeftOuter, Some(joinCondition), JoinHint.NONE)
val coalesceExpr =
Coalesce(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("department")))
Coalesce(
Seq(
UnresolvedAttribute("department"),
UnresolvedAttribute("__auto_generated_subquery_name_l.department")))
val coalesceForSafeExpr = Coalesce(Seq(coalesceExpr, UnresolvedAttribute("department")))
val projectAfterJoin = Project(
Seq(
Expand All @@ -128,26 +125,24 @@ class FlintSparkPPLLookupITSuite
test("test LOOKUP lookupTable uid AS id REPLACE department AS country") {
val frame =
sql(s"source = $sourceTable| LOOKUP $lookupTable uid AS id REPLACE department AS country")
// frame.show()
// frame.explain(true)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row(1000, "Jake", "Engineer", 100000, "IT"),
Row(1001, "Hello", "Artist", 70000, "USA"),
Row(1002, "John", "Doctor", 120000, "DATA"),
Row(1003, "David", "Doctor", 120000, "HR"),
Row(1004, "David", null, 0, "Canada"),
Row(1005, "Jane", "Scientist", 90000, "DATA"))

implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Integer](_.getAs[Integer](0))
assert(results.sorted.sameElements(expectedResults.sorted))
assertSameRows(expectedResults, frame)

val lookupProject =
Project(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("uid")), lookupAlias)
val joinCondition = EqualTo(UnresolvedAttribute("uid"), UnresolvedAttribute("id"))
val joinPlan = Join(sourceAlias, lookupProject, LeftOuter, Some(joinCondition), JoinHint.NONE)
val coalesceForSafeExpr =
Coalesce(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("country")))
Coalesce(
Seq(
UnresolvedAttribute("__auto_generated_subquery_name_l.department"),
UnresolvedAttribute("__auto_generated_subquery_name_s.country")))
val projectAfterJoin = Project(
Seq(
UnresolvedStar(Some(Seq("__auto_generated_subquery_name_s"))),
Expand All @@ -167,24 +162,26 @@ class FlintSparkPPLLookupITSuite
test("test LOOKUP lookupTable uid AS id APPEND department AS country") {
val frame =
sql(s"source = $sourceTable| LOOKUP $lookupTable uid AS id APPEND department AS country")
// frame.show()
// frame.explain(true)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row(1000, "Jake", "Engineer", 100000, "England"),
Row(1001, "Hello", "Artist", 70000, "USA"),
Row(1002, "John", "Doctor", 120000, "Canada"),
Row(1003, "David", "Doctor", 120000, "HR"),
Row(1004, "David", null, 0, "Canada"),
Row(1005, "Jane", "Scientist", 90000, "Canada"))
assertSameRows(expectedResults, frame)

val lookupProject =
Project(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("uid")), lookupAlias)
val joinCondition = EqualTo(UnresolvedAttribute("uid"), UnresolvedAttribute("id"))
val joinPlan = Join(sourceAlias, lookupProject, LeftOuter, Some(joinCondition), JoinHint.NONE)
val coalesceExpr =
Coalesce(Seq(UnresolvedAttribute("country"), UnresolvedAttribute("department")))
val coalesceForSafeExpr = Coalesce(Seq(coalesceExpr, UnresolvedAttribute("country")))
Coalesce(
Seq(
UnresolvedAttribute("__auto_generated_subquery_name_s.country"),
UnresolvedAttribute("__auto_generated_subquery_name_l.department")))
val coalesceForSafeExpr =
Coalesce(Seq(coalesceExpr, UnresolvedAttribute("__auto_generated_subquery_name_s.country")))
val projectAfterJoin = Project(
Seq(
UnresolvedStar(Some(Seq("__auto_generated_subquery_name_s"))),
Expand All @@ -204,19 +201,15 @@ class FlintSparkPPLLookupITSuite
test("test LOOKUP lookupTable uid AS id, name REPLACE department") {
val frame =
sql(s"source = $sourceTable| LOOKUP $lookupTable uID AS id, name REPLACE department")
// frame.show()
// frame.explain(true)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row(1000, "Jake", "Engineer", "England", 100000, "IT"),
Row(1001, "Hello", "Artist", "USA", 70000, null),
Row(1002, "John", "Doctor", "Canada", 120000, "DATA"),
Row(1003, "David", "Doctor", null, 120000, "HR"),
Row(1004, "David", null, "Canada", 0, null),
Row(1005, "Jane", "Scientist", "Canada", 90000, "DATA"))
assertSameRows(expectedResults, frame)

implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Integer](_.getAs[Integer](0))
assert(results.sorted.sameElements(expectedResults.sorted))
val lookupProject =
Project(
Seq(
Expand All @@ -232,7 +225,10 @@ class FlintSparkPPLLookupITSuite
UnresolvedAttribute("__auto_generated_subquery_name_s.name")))
val joinPlan = Join(sourceAlias, lookupProject, LeftOuter, Some(joinCondition), JoinHint.NONE)
val coalesceForSafeExpr =
Coalesce(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("department")))
Coalesce(
Seq(
UnresolvedAttribute("__auto_generated_subquery_name_l.department"),
UnresolvedAttribute("department")))
val projectAfterJoin = Project(
Seq(
UnresolvedStar(Some(Seq("__auto_generated_subquery_name_s"))),
Expand All @@ -253,19 +249,14 @@ class FlintSparkPPLLookupITSuite
test("test LOOKUP lookupTable uid AS id, name APPEND department") {
val frame =
sql(s"source = $sourceTable| LOOKUP $lookupTable uid AS ID, name APPEND department")
// frame.show()
// frame.explain(true)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row(1000, "Jake", "Engineer", "England", 100000, "IT"),
Row(1001, "Hello", "Artist", "USA", 70000, null),
Row(1002, "John", "Doctor", "Canada", 120000, "DATA"),
Row(1003, "David", "Doctor", null, 120000, "HR"),
Row(1004, "David", null, "Canada", 0, null),
Row(1005, "Jane", "Scientist", "Canada", 90000, "DATA"))

implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Integer](_.getAs[Integer](0))
assert(results.sorted.sameElements(expectedResults.sorted))
assertSameRows(expectedResults, frame)

val lookupProject =
Project(
Expand All @@ -282,7 +273,10 @@ class FlintSparkPPLLookupITSuite
UnresolvedAttribute("__auto_generated_subquery_name_s.name")))
val joinPlan = Join(sourceAlias, lookupProject, LeftOuter, Some(joinCondition), JoinHint.NONE)
val coalesceExpr =
Coalesce(Seq(UnresolvedAttribute("department"), UnresolvedAttribute("department")))
Coalesce(
Seq(
UnresolvedAttribute("department"),
UnresolvedAttribute("__auto_generated_subquery_name_l.department")))
val coalesceForSafeExpr = Coalesce(Seq(coalesceExpr, UnresolvedAttribute("department")))
val projectAfterJoin = Project(
Seq(
Expand All @@ -303,38 +297,29 @@ class FlintSparkPPLLookupITSuite

test("test LOOKUP lookupTable uid AS id, name") {
val frame = sql(s"source = $sourceTable| LOOKUP $lookupTable uID AS id, name")
// frame.show()
// frame.explain(true)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row(1000, "Jake", "Engineer", "England", 100000, 1000, "Jake", "IT", "Engineer"),
Row(1001, "Hello", "Artist", "USA", 70000, null, null, null, null),
Row(1002, "John", "Doctor", "Canada", 120000, 1002, "John", "DATA", "Scientist"),
Row(1003, "David", "Doctor", null, 120000, 1003, "David", "HR", "Doctor"),
Row(1004, "David", null, "Canada", 0, null, null, null, null),
Row(1005, "Jane", "Scientist", "Canada", 90000, 1005, "Jane", "DATA", "Engineer"))

implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Integer](_.getAs[Integer](0))
assert(results.sorted.sameElements(expectedResults.sorted))
Row(1000, "Jake", "England", 100000, "IT", "Engineer"),
Row(1001, "Hello", "USA", 70000, null, null),
Row(1002, "John", "Canada", 120000, "DATA", "Scientist"),
Row(1003, "David", null, 120000, "HR", "Doctor"),
Row(1004, "David", "Canada", 0, null, null),
Row(1005, "Jane", "Canada", 90000, "DATA", "Engineer"))

assertSameRows(expectedResults, frame)
}

test("test LOOKUP lookupTable name REPLACE occupation") {
val frame =
sql(
s"source = $sourceTable | eval major = occupation | fields id, name, major, country, salary | LOOKUP $lookupTable name REPLACE occupation AS major")
// frame.show()
// frame.explain(true)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row(1000, "Jake", "England", 100000, "Engineer"),
Row(1001, "Hello", "USA", 70000, "Artist"),
Row(1002, "John", "Canada", 120000, "Scientist"),
Row(1003, "David", null, 120000, "Doctor"),
Row(1004, "David", "Canada", 0, "Doctor"),
Row(1005, "Jane", "Canada", 90000, "Engineer"))

implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Integer](_.getAs[Integer](0))
assert(results.sorted.sameElements(expectedResults.sorted))
assertSameRows(expectedResults, frame)

val sourceTbl = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val eval = Project(
Expand All @@ -356,7 +341,10 @@ class FlintSparkPPLLookupITSuite
UnresolvedAttribute("__auto_generated_subquery_name_l.name"))
val joinPlan = Join(sourceAlias, lookupProject, LeftOuter, Some(joinCondition), JoinHint.NONE)
val coalesceForSafeExpr =
Coalesce(Seq(UnresolvedAttribute("occupation"), UnresolvedAttribute("major")))
Coalesce(
Seq(
UnresolvedAttribute("__auto_generated_subquery_name_l.occupation"),
UnresolvedAttribute("major")))
val projectAfterJoin = Project(
Seq(
UnresolvedStar(Some(Seq("__auto_generated_subquery_name_s"))),
Expand All @@ -377,19 +365,14 @@ class FlintSparkPPLLookupITSuite
val frame =
sql(
s"source = $sourceTable | eval major = occupation | fields id, name, major, country, salary | LOOKUP $lookupTable name APPEND occupation AS major")
// frame.show()
// frame.explain(true)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row(1000, "Jake", "England", 100000, "Engineer"),
Row(1001, "Hello", "USA", 70000, "Artist"),
Row(1002, "John", "Canada", 120000, "Doctor"),
Row(1003, "David", null, 120000, "Doctor"),
Row(1004, "David", "Canada", 0, "Doctor"),
Row(1005, "Jane", "Canada", 90000, "Scientist"))

implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Integer](_.getAs[Integer](0))
assert(results.sorted.sameElements(expectedResults.sorted))
assertSameRows(expectedResults, frame)

val sourceTbl = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val eval = Project(
Expand All @@ -411,7 +394,10 @@ class FlintSparkPPLLookupITSuite
UnresolvedAttribute("__auto_generated_subquery_name_l.name"))
val joinPlan = Join(sourceAlias, lookupProject, LeftOuter, Some(joinCondition), JoinHint.NONE)
val coalesceExpr =
Coalesce(Seq(UnresolvedAttribute("major"), UnresolvedAttribute("occupation")))
Coalesce(
Seq(
UnresolvedAttribute("major"),
UnresolvedAttribute("__auto_generated_subquery_name_l.occupation")))
val coalesceForSafeExpr =
Coalesce(Seq(coalesceExpr, UnresolvedAttribute("major")))
val projectAfterJoin = Project(
Expand All @@ -429,4 +415,32 @@ class FlintSparkPPLLookupITSuite

comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("test LOOKUP lookupTable name") {
Copy link
Collaborator

@penghuo penghuo Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expectedResults should only have one occupation columns, but it has 2 occupation columns now.

IMO, The lookup command enriches your fact table by pulling in columns from a lookup (dimension) table. If the same column name appears in both tables, you can choose one of two conflict-resolution strategies:

  • REPLACE overwrites values in the source column with values from the lookup table.
  • APPEND only writes values to the source column if the column is currently missing or null.

Copy link
Member Author

@LantaoJin LantaoJin Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me double-check the behaviour of SPL. My understand is keeping 2 occupation columns because we don't know what strategies for command | LOOKUP lookupTable name.

When using the lookup command, if an OUTPUT or OUTPUTNEW clause is not specified, all of the fields in the lookup table that are not the match fields are used as output fields. If the OUTPUT clause is specified, the output lookup fields overwrite existing fields. If the OUTPUTNEW clause is specified, the lookup is not performed for events in which the output fields already exist.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OUTPUT is default option?
| LOOKUP lookupTable name equivalent to | LOOKUP lookupTable name REPLACE dimensionTable.*. columns conflict-resolution should be applied.

btw, if the output include 2 occupation columns, are these naming conflict?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, the occupation field from Source should be replaced with occupation field from Lookup if there are duplicated columns. Double-confirmed via SPL.

Copy link
Member Author

@LantaoJin LantaoJin Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But achieving this goal may bring more complexity than expected, because what is being duplicated is not the field names of the lookup table and the source table, but the field names of the lookup table and the output of the source plan. But our current implementation is to pass an unresolved logical plan to Spark, and at this time we have not obtained the output of the source plan.

Copy link
Member Author

@LantaoJin LantaoJin Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@penghuo , I have fixed to address your comment in 2e954b2. Here is new behaivour:

source

id col1 col2
1 a b
2 aa bb

lookup

id col1 col3
1 x y
3 xx yy
index=source | lookup id

will output

id col1 col2 col3
1 x b y
2 null bb null

Notice the value of col1 for row 2 is null instead of aa, this behavior produces the same result as SPL's by my testing (Although it looks a bit strange).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, the behavior is strange. but let's align with it.

Update my undestanding with lookup with SPL test.

The lookup command enriches your fact table by performing a left join with a lookup (dimension) table. Specifically, it:

  1. Joins the fact table with the lookup table based on a join key (lookupMappingField). By default, the command appends all columns from the lookup table (except the join key) to the fact table. Alternatively, you can explicitly specify the desired output fields.
  2. Resolves column name conflicts between the two tables using one of two strategies:
    • REPLACE: The lookup table’s value replaces the fact table’s value.
    • APPEND: The lookup table’s value is used only if the corresponding fact table column is missing or null.

One highligh is,

  • fact table
id col1 col2
1 a b
2 aa bb
3 null ccc
  • dimension table
id col1 col3
1 x y
3 xx yy

if source="fact.csv" | lookup "dim.csv" id OUTPUT id, col1, col3, fact table id column is replaced with dimension id column, the result is,

id col1 col2 col3
1 x b y
null null bb null
3 xx ccc yy

Copy link
Member Author

@LantaoJin LantaoJin Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output of value null for field id and col1 of row 2 really surprised me. In our implementation, only the matched rows (explicitly joined row) will be applied the output strategies (whatever replace or append). So the output of source="fact.csv" | lookup "dim.csv" id REPLACE id, col1, col3 is,

id col1 col2 col3
1 x b y
2 aa bb null
3 xx ccc yy

But seems in Splunk, whatever a row is matched or not, the conflicted column from source will be replaced to new value from lookup. @penghuo will we refactor to align with Splunk's output?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aligned all behaviours in 42eb9e0

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it is not intuitive, but let's align with SPL.

val frame =
sql(s"source = $sourceTable | LOOKUP $lookupTable name")
val expectedResults: Array[Row] = Array(
Row(1000, "Jake", "England", 100000, 1000, "IT", "Engineer"),
Row(1001, "Hello", "USA", 70000, null, null, null),
Row(1002, "John", "Canada", 120000, 1002, "DATA", "Scientist"),
Row(1003, "David", null, 120000, 1003, "HR", "Doctor"),
Row(1004, "David", "Canada", 0, 1003, "HR", "Doctor"),
Row(1005, "Jane", "Canada", 90000, 1005, "DATA", "Engineer"))
assertSameRows(expectedResults, frame)
}

test("test LOOKUP lookupTable name REPLACE occupation - 2") {
val frame =
sql(s"source = $sourceTable | LOOKUP $lookupTable name REPLACE occupation")
frame.show()
frame.explain(true)
val expectedResults: Array[Row] = Array(
Row(1000, "Jake", "England", 100000, "Engineer"),
Row(1001, "Hello", "USA", 70000, "Artist"),
Row(1002, "John", "Canada", 120000, "Scientist"),
Row(1003, "David", null, 120000, "Doctor"),
Row(1004, "David", "Canada", 0, "Doctor"),
Row(1005, "Jane", "Canada", 90000, "Engineer"))
assertSameRows(expectedResults, frame)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ public Expression visitWindowFunction(WindowFunction node, CatalystPlanContext c
@Override
public Expression visitInSubquery(InSubquery node, CatalystPlanContext outerContext) {
CatalystPlanContext innerContext = new CatalystPlanContext();
innerContext.withSparkSession(outerContext.getSparkSession());
visitExpressionList(node.getChild(), innerContext);
Seq<Expression> values = innerContext.retainAllNamedParseExpressions(p -> p);
UnresolvedPlan outerPlan = node.getQuery();
Expand All @@ -387,6 +388,7 @@ public Expression visitInSubquery(InSubquery node, CatalystPlanContext outerCont
@Override
public Expression visitScalarSubquery(ScalarSubquery node, CatalystPlanContext context) {
CatalystPlanContext innerContext = new CatalystPlanContext();
innerContext.withSparkSession(context.getSparkSession());
UnresolvedPlan outerPlan = node.getQuery();
LogicalPlan subSearch = outerPlan.accept(planVisitor, innerContext);
Expression scalarSubQuery = ScalarSubquery$.MODULE$.apply(
Expand All @@ -402,6 +404,7 @@ public Expression visitScalarSubquery(ScalarSubquery node, CatalystPlanContext c
@Override
public Expression visitExistsSubquery(ExistsSubquery node, CatalystPlanContext context) {
CatalystPlanContext innerContext = new CatalystPlanContext();
innerContext.withSparkSession(context.getSparkSession());
UnresolvedPlan outerPlan = node.getQuery();
LogicalPlan subSearch = outerPlan.accept(planVisitor, innerContext);
Expression existsSubQuery = Exists$.MODULE$.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.ppl;

import lombok.Getter;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
import org.apache.spark.sql.catalyst.expressions.Expression;
Expand Down Expand Up @@ -38,6 +39,8 @@
* The context used for Catalyst logical plan.
*/
public class CatalystPlanContext {

@Getter private SparkSession sparkSession;
/**
* Catalyst relations list
**/
Expand Down Expand Up @@ -283,4 +286,8 @@ public Expression resolveJoinCondition(
isResolvingJoinCondition = false;
return result;
}

public void withSparkSession(SparkSession sparkSession) {
this.sparkSession = sparkSession;
}
}
Loading
Loading