Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Fix the PPL Lookup command behavior when inputField is missing and REPLACE with existing fields #1035

Merged
merged 4 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/ppl-lang/ppl-lookup-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ LOOKUP <lookupIndex> (<lookupMappingField> [AS <sourceMappingField>])...
**inputField**
- Optional
- Default: All fields of \<lookupIndex\> where matched values are applied to result output if no field is specified.
- Description: A field in \<lookupIndex\> where matched values are applied to result output. You can specify multiple \<inputField\> with comma-delimited. If you don't specify any \<inputField\>, all fields of \<lookupIndex\> where matched values are applied to result output.
- Description: A field in \<lookupIndex\> where matched values are applied to result output. You can specify multiple \<inputField\> with comma-delimited. If you don't specify any \<inputField\>, all fields expect \<lookupMappingField\> from \<lookupIndex\> where matched values are applied to result output.

**outputField**
- Optional
- Default: \<inputField\>
- Description: A field of output. You can specify multiple \<outputField\>. If you specify \<outputField\> with an existing field name in source query, its values will be replaced or appended by matched values from \<inputField\>. If the field specified in \<outputField\> is a new field, an extended new field will be applied to the results.
- Description: A field of output. You can specify zero or multiple \<outputField\>. If you specify \<outputField\> with an existing field name in source query, its values will be replaced or appended by matched values from \<inputField\>. If the field specified in \<outputField\> is a new field, in REPLACE strategy, an extended new field will be applied to the results, but fail in APPEND strategy.

**REPLACE | APPEND**
- Optional
- Default: REPLACE
- Description: If you specify REPLACE, matched values in \<lookupIndex\> field overwrite the values in result. If you specify APPEND, matched values in \<lookupIndex\> field only append to the missing values in result.
- Description: The output strategies. If you specify REPLACE, matched values in \<lookupIndex\> field overwrite the values in result. If you specify APPEND, matched values in \<lookupIndex\> field only append to the missing values in result.

### Usage
- `LOOKUP <lookupIndex> id AS cid REPLACE mail AS email`
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ public Expression visitWindowFunction(WindowFunction node, CatalystPlanContext c
@Override
public Expression visitInSubquery(InSubquery node, CatalystPlanContext outerContext) {
CatalystPlanContext innerContext = new CatalystPlanContext();
innerContext.withSparkSession(outerContext.getSparkSession());
visitExpressionList(node.getChild(), innerContext);
Seq<Expression> values = innerContext.retainAllNamedParseExpressions(p -> p);
UnresolvedPlan outerPlan = node.getQuery();
Expand All @@ -387,6 +388,7 @@ public Expression visitInSubquery(InSubquery node, CatalystPlanContext outerCont
@Override
public Expression visitScalarSubquery(ScalarSubquery node, CatalystPlanContext context) {
CatalystPlanContext innerContext = new CatalystPlanContext();
innerContext.withSparkSession(context.getSparkSession());
UnresolvedPlan outerPlan = node.getQuery();
LogicalPlan subSearch = outerPlan.accept(planVisitor, innerContext);
Expression scalarSubQuery = ScalarSubquery$.MODULE$.apply(
Expand All @@ -402,6 +404,7 @@ public Expression visitScalarSubquery(ScalarSubquery node, CatalystPlanContext c
@Override
public Expression visitExistsSubquery(ExistsSubquery node, CatalystPlanContext context) {
CatalystPlanContext innerContext = new CatalystPlanContext();
innerContext.withSparkSession(context.getSparkSession());
UnresolvedPlan outerPlan = node.getQuery();
LogicalPlan subSearch = outerPlan.accept(planVisitor, innerContext);
Expression existsSubQuery = Exists$.MODULE$.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.ppl;

import lombok.Getter;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
import org.apache.spark.sql.catalyst.expressions.Expression;
Expand Down Expand Up @@ -38,6 +39,8 @@
* The context used for Catalyst logical plan.
*/
public class CatalystPlanContext {

@Getter private SparkSession sparkSession;
/**
* Catalyst relations list
**/
Expand Down Expand Up @@ -283,4 +286,8 @@ public Expression resolveJoinCondition(
isResolvingJoinCondition = false;
return result;
}

public void withSparkSession(SparkSession sparkSession) {
this.sparkSession = sparkSession;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.opensearch.sql.ppl.utils.FieldSummaryTransformer;
import org.opensearch.sql.ppl.utils.GeoIpCatalystLogicalPlanTranslator;
import org.opensearch.sql.ppl.utils.ParseTransformer;
import org.opensearch.sql.ppl.utils.RelationUtils;
import org.opensearch.sql.ppl.utils.SortUtils;
import org.opensearch.sql.ppl.utils.TrendlineCatalystUtils;
import org.opensearch.sql.ppl.utils.WindowSpecTransformer;
Expand All @@ -88,9 +89,11 @@
import scala.collection.Seq;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -193,41 +196,56 @@ public LogicalPlan visitFilter(Filter node, CatalystPlanContext context) {
public LogicalPlan visitLookup(Lookup node, CatalystPlanContext context) {
visitFirstChild(node, context);
return context.apply( searchSide -> {
context.retainAllNamedParseExpressions(p -> p);
context.retainAllPlans(p -> p);
LogicalPlan target;
LogicalPlan lookupTable = node.getLookupRelation().accept(this, context);
Expression lookupCondition = buildLookupMappingCondition(node, expressionAnalyzer, context);
// If no output field is specified, all fields from lookup table are applied to the output.
if (node.allFieldsShouldAppliedToOutputList()) {
context.retainAllNamedParseExpressions(p -> p);
context.retainAllPlans(p -> p);
return join(searchSide, lookupTable, Join.JoinType.LEFT, Optional.of(lookupCondition), new Join.JoinHint());
}

// If the output fields are specified, build a project list for lookup table.
// The mapping fields of lookup table should be added in this project list, otherwise join will fail.
// So the mapping fields of lookup table should be dropped after join.
List<NamedExpression> lookupTableProjectList = buildLookupRelationProjectList(node, expressionAnalyzer, context);
LogicalPlan lookupTableWithProject = Project$.MODULE$.apply(seq(lookupTableProjectList), lookupTable);

LogicalPlan join = join(searchSide, lookupTableWithProject, Join.JoinType.LEFT, Optional.of(lookupCondition), new Join.JoinHint());
// When no output field is specified, all fields except mapping fields from lookup table are applied to the output.
// If some output fields from source side duplicate to fields of lookup table, these fields will
// be replaced by fields from lookup table in output.
// For example, the lookup table contains fields [id, col1, col3] and source side fields are [id, col1, col2].
// For query "index = sourceTable | fields id, col1, col2 | LOOKUP lookupTable id",
// the col1 is duplicated field and id is mapping key (and duplicated).
// The query outputs 4 fields: [id, col1, col2, col3]. Among them, `col2` is the original field from source,
// the matched values of `col1` from lookup table will replace to the values of `col1` from source.
Set<Field> duplicatedFieldsMaybeDrop =
new HashSet<>(RelationUtils.getFieldsFromCatalogTable(context.getSparkSession(), lookupTable));
Set<Field> mappingFieldsOfLookup = node.getLookupMappingMap().keySet();
// lookup mapping keys are not concerned to drop here, it will be checked later.
duplicatedFieldsMaybeDrop.removeAll(mappingFieldsOfLookup);
List<Expression> duplicated =
buildProjectListFromFields(new ArrayList<>(duplicatedFieldsMaybeDrop), expressionAnalyzer, context)
.stream().map(e -> (Expression) e).collect(Collectors.toList());
LogicalPlan searchSideWithDropped = DataFrameDropColumns$.MODULE$.apply(seq(duplicated), searchSide);
target = join(searchSideWithDropped, lookupTable, Join.JoinType.LEFT, Optional.of(lookupCondition), new Join.JoinHint());
} else {
// When output fields are specified, build a project list for lookup table.
// The mapping fields of lookup table should be added in this project list, otherwise join will fail.
// So the mapping fields of lookup table should be dropped after join.
List<NamedExpression> lookupTableProjectList = buildLookupRelationProjectList(node, expressionAnalyzer, context);
LogicalPlan lookupTableWithProject = Project$.MODULE$.apply(seq(lookupTableProjectList), lookupTable);

// Add all outputFields by __auto_generated_subquery_name_s.*
List<NamedExpression> outputFieldsWithNewAdded = new ArrayList<>();
outputFieldsWithNewAdded.add(UnresolvedStar$.MODULE$.apply(Option.apply(seq(node.getSourceSubqueryAliasName()))));
LogicalPlan join = join(searchSide, lookupTableWithProject, Join.JoinType.LEFT, Optional.of(lookupCondition), new Join.JoinHint());

// Add new columns based on different strategies:
// Append: coalesce($outputField, $"inputField").as(outputFieldName)
// Replace: $outputField.as(outputFieldName)
outputFieldsWithNewAdded.addAll(buildOutputProjectList(node, node.getOutputStrategy(), expressionAnalyzer, context));
// Add all outputFields by __auto_generated_subquery_name_s.*
List<NamedExpression> outputFieldsWithNewAdded = new ArrayList<>();
outputFieldsWithNewAdded.add(UnresolvedStar$.MODULE$.apply(Option.apply(seq(node.getSourceSubqueryAliasName()))));

org.apache.spark.sql.catalyst.plans.logical.Project outputWithNewAdded = Project$.MODULE$.apply(seq(outputFieldsWithNewAdded), join);
// Add new columns based on different strategies:
// Append: coalesce($outputField, $"inputField").as(outputFieldName)
// Replace: $outputField.as(outputFieldName)
outputFieldsWithNewAdded.addAll(buildOutputProjectList(node, node.getOutputStrategy(), expressionAnalyzer, context, searchSide));

target = Project$.MODULE$.apply(seq(outputFieldsWithNewAdded), join);
}
// Drop the mapping fields of lookup table in result:
// For example, in command "LOOKUP lookTbl Field1 AS Field2, Field3",
// the Field1 and Field3 are projection fields and join keys which will be dropped in result.
List<Field> mappingFieldsOfLookup = node.getLookupMappingMap().entrySet().stream()
.map(kv -> kv.getKey().getField() == kv.getValue().getField() ? buildFieldWithLookupSubqueryAlias(node, kv.getKey()) : kv.getKey())
.collect(Collectors.toList());
// List<Field> mappingFieldsOfLookup = new ArrayList<>(node.getLookupMappingMap().keySet());
List<Expression> dropListOfLookupMappingFields =
buildProjectListFromFields(mappingFieldsOfLookup, expressionAnalyzer, context).stream()
.map(Expression.class::cast).collect(Collectors.toList());
Expand All @@ -237,7 +255,7 @@ public LogicalPlan visitLookup(Lookup node, CatalystPlanContext context) {
List<Expression> toDrop = new ArrayList<>(dropListOfLookupMappingFields);
toDrop.addAll(dropListOfSourceFields);

LogicalPlan outputWithDropped = DataFrameDropColumns$.MODULE$.apply(seq(toDrop), outputWithNewAdded);
LogicalPlan outputWithDropped = DataFrameDropColumns$.MODULE$.apply(seq(toDrop), target);

context.retainAllNamedParseExpressions(p -> p);
context.retainAllPlans(p -> p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
import org.apache.spark.sql.catalyst.expressions.EqualTo$;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.opensearch.sql.ast.expression.Alias;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ppl.CatalystExpressionVisitor;
import org.opensearch.sql.ppl.CatalystPlanContext;
import org.opensearch.sql.ppl.CatalystQueryPlanVisitor;
import scala.Option;

import java.util.ArrayList;
Expand Down Expand Up @@ -83,31 +83,36 @@ static List<NamedExpression> buildOutputProjectList(
Lookup node,
Lookup.OutputStrategy strategy,
CatalystExpressionVisitor expressionAnalyzer,
CatalystPlanContext context) {
CatalystPlanContext context,
LogicalPlan searchSide) {
List<NamedExpression> outputProjectList = new ArrayList<>();
for (Map.Entry<Alias, Field> entry : node.getOutputCandidateMap().entrySet()) {
Alias inputFieldWithAlias = entry.getKey();
Field inputField = (Field) inputFieldWithAlias.getDelegated();
Field outputField = entry.getValue();
Expression inputCol = expressionAnalyzer.visitField(inputField, context);
Expression outputCol = expressionAnalyzer.visitField(outputField, context);

// Always resolve the inputCol expression with alias: __auto_generated_subquery_name_l.<fieldName>
// If the outputField existed in source table, resolve the outputCol expression with alias: __auto_generated_subquery_name_s.<fieldName>
// If not, resolve the outputCol expression without alias: <fieldName> to avoid failure of unable to resolved attribute.
Expression inputCol = expressionAnalyzer.visitField(buildFieldWithLookupSubqueryAlias(node, inputField), context);
Expression outputCol;
if (RelationUtils.columnExistsInCatalogTable(context.getSparkSession(), searchSide, outputField)) {
outputCol = expressionAnalyzer.visitField(buildFieldWithSourceSubqueryAlias(node, outputField), context);
} else {
outputCol = expressionAnalyzer.visitField(outputField, context);
}
Expression child;
if (strategy == Lookup.OutputStrategy.APPEND) {
child = Coalesce$.MODULE$.apply(seq(outputCol, inputCol));
} else {
child = inputCol;
}
// The result output project list we build here is used to replace the source output,
// for the unmatched rows of left outer join, the outputs are null, so fall back to source output.
Expression nullSafeOutput = Coalesce$.MODULE$.apply(seq(child, outputCol));
NamedExpression nullSafeOutputCol = Alias$.MODULE$.apply(nullSafeOutput,
NamedExpression output = Alias$.MODULE$.apply(child,
inputFieldWithAlias.getName(),
NamedExpression.newExprId(),
seq(new java.util.ArrayList<String>()),
Option.empty(),
seq(new java.util.ArrayList<String>()));
outputProjectList.add(nullSafeOutputCol);
outputProjectList.add(output);
}
context.retainAllNamedParseExpressions(p -> p);
return outputProjectList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,29 @@

package org.opensearch.sql.ppl.utils;

import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.opensearch.flint.spark.ppl.PPLSparkUtils;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.QualifiedName;
import scala.Option$;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public interface RelationUtils {
Logger LOG = LogManager.getLogger(RelationUtils.class);

/**
* attempt resolving if the field is relating to the given relation
* if name doesnt contain table prefix - add the current relation prefix to the fields name - returns true
Expand Down Expand Up @@ -65,4 +78,27 @@ static TableIdentifier getTableIdentifier(QualifiedName qualifiedName) {
}
return identifier;
}

static boolean columnExistsInCatalogTable(SparkSession spark, LogicalPlan plan, Field field) {
return getFieldsFromCatalogTable(spark, plan).stream().anyMatch(f -> f.getField().equals(field.getField()));
}

static List<Field> getFieldsFromCatalogTable(SparkSession spark, LogicalPlan plan) {
UnresolvedRelation relation = PPLSparkUtils.findLogicalRelations(plan).head();
QualifiedName tableQualifiedName = QualifiedName.of(Arrays.asList(relation.tableName().split("\\.")));
TableIdentifier tableIdentifier = getTableIdentifier(tableQualifiedName);
boolean tableExists = spark.sessionState().catalog().tableExists(tableIdentifier);
if (tableExists) {
try {
CatalogTable table = spark.sessionState().catalog().getTableMetadata(getTableIdentifier(tableQualifiedName));
Copy link
Member Author

@LantaoJin LantaoJin Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a just a metadata call via getting external catalog from the SparkSession passed in. No additional spark job will be submitted.

Copy link
Member

@YANG-DB YANG-DB Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense doing this metadata discovery more generic so that all relations have this info ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes if we see more requirements later. Now I am still willing to leave the metadata binding to Spark internal if not necessary.

return Arrays.stream(table.dataSchema().fields()).map(f -> new Field(QualifiedName.of(f.name()))).collect(Collectors.toList());
} catch (NoSuchDatabaseException | NoSuchTableException e) {
LOG.info("Table or database {} not found", tableIdentifier);
return ImmutableList.of();
}
} else {
LOG.info("Table {} not found", tableIdentifier);
return ImmutableList.of();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class FlintPPLSparkExtensions extends (SparkSessionExtensions => Unit) {

override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectParser { (spark, parser) =>
new FlintSparkPPLParser(parser)
new FlintSparkPPLParser(parser, spark)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SparkSession in using is passed in from here.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.opensearch.flint.spark.ppl.PlaneUtils.plan
import org.opensearch.sql.common.antlr.SyntaxCheckException
import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser._
Expand All @@ -44,7 +45,8 @@ import org.apache.spark.sql.types.{DataType, StructType}
* @param sparkParser
* Spark SQL parser
*/
class FlintSparkPPLParser(sparkParser: ParserInterface) extends ParserInterface {
class FlintSparkPPLParser(sparkParser: ParserInterface, val spark: SparkSession)
extends ParserInterface {

/** OpenSearch (PPL) AST builder. */
private val planTransformer = new CatalystQueryPlanVisitor()
Expand All @@ -55,6 +57,7 @@ class FlintSparkPPLParser(sparkParser: ParserInterface) extends ParserInterface
try {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
context.withSparkSession(spark)
planTransformer.visit(plan(pplParser, sqlText), context)
context.getPlan
} catch {
Expand Down
Loading
Loading