Skip to content

Commit

Permalink
[BUG] Fix the PPL Lookup command behavior when inputField is missing …
Browse files Browse the repository at this point in the history
…and REPLACE with existing fields (#1035)

Signed-off-by: Lantao Jin <[email protected]>
  • Loading branch information
LantaoJin authored Feb 10, 2025
1 parent 1a6b9f3 commit 9925289
Show file tree
Hide file tree
Showing 10 changed files with 306 additions and 122 deletions.
6 changes: 3 additions & 3 deletions docs/ppl-lang/ppl-lookup-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ LOOKUP <lookupIndex> (<lookupMappingField> [AS <sourceMappingField>])...
**inputField**
- Optional
- Default: All fields of \<lookupIndex\> where matched values are applied to result output if no field is specified.
- Description: A field in \<lookupIndex\> where matched values are applied to result output. You can specify multiple \<inputField\> with comma-delimited. If you don't specify any \<inputField\>, all fields of \<lookupIndex\> where matched values are applied to result output.
- Description: A field in \<lookupIndex\> where matched values are applied to result output. You can specify multiple \<inputField\> with comma-delimited. If you don't specify any \<inputField\>, all fields expect \<lookupMappingField\> from \<lookupIndex\> where matched values are applied to result output.

**outputField**
- Optional
- Default: \<inputField\>
- Description: A field of output. You can specify multiple \<outputField\>. If you specify \<outputField\> with an existing field name in source query, its values will be replaced or appended by matched values from \<inputField\>. If the field specified in \<outputField\> is a new field, an extended new field will be applied to the results.
- Description: A field of output. You can specify zero or multiple \<outputField\>. If you specify \<outputField\> with an existing field name in source query, its values will be replaced or appended by matched values from \<inputField\>. If the field specified in \<outputField\> is a new field, in REPLACE strategy, an extended new field will be applied to the results, but fail in APPEND strategy.

**REPLACE | APPEND**
- Optional
- Default: REPLACE
- Description: If you specify REPLACE, matched values in \<lookupIndex\> field overwrite the values in result. If you specify APPEND, matched values in \<lookupIndex\> field only append to the missing values in result.
- Description: The output strategies. If you specify REPLACE, matched values in \<lookupIndex\> field overwrite the values in result. If you specify APPEND, matched values in \<lookupIndex\> field only append to the missing values in result.

### Usage
- `LOOKUP <lookupIndex> id AS cid REPLACE mail AS email`
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ public Expression visitWindowFunction(WindowFunction node, CatalystPlanContext c
@Override
public Expression visitInSubquery(InSubquery node, CatalystPlanContext outerContext) {
CatalystPlanContext innerContext = new CatalystPlanContext();
innerContext.withSparkSession(outerContext.getSparkSession());
visitExpressionList(node.getChild(), innerContext);
Seq<Expression> values = innerContext.retainAllNamedParseExpressions(p -> p);
UnresolvedPlan outerPlan = node.getQuery();
Expand All @@ -387,6 +388,7 @@ public Expression visitInSubquery(InSubquery node, CatalystPlanContext outerCont
@Override
public Expression visitScalarSubquery(ScalarSubquery node, CatalystPlanContext context) {
CatalystPlanContext innerContext = new CatalystPlanContext();
innerContext.withSparkSession(context.getSparkSession());
UnresolvedPlan outerPlan = node.getQuery();
LogicalPlan subSearch = outerPlan.accept(planVisitor, innerContext);
Expression scalarSubQuery = ScalarSubquery$.MODULE$.apply(
Expand All @@ -402,6 +404,7 @@ public Expression visitScalarSubquery(ScalarSubquery node, CatalystPlanContext c
@Override
public Expression visitExistsSubquery(ExistsSubquery node, CatalystPlanContext context) {
CatalystPlanContext innerContext = new CatalystPlanContext();
innerContext.withSparkSession(context.getSparkSession());
UnresolvedPlan outerPlan = node.getQuery();
LogicalPlan subSearch = outerPlan.accept(planVisitor, innerContext);
Expression existsSubQuery = Exists$.MODULE$.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.ppl;

import lombok.Getter;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
import org.apache.spark.sql.catalyst.expressions.Expression;
Expand Down Expand Up @@ -38,6 +39,8 @@
* The context used for Catalyst logical plan.
*/
public class CatalystPlanContext {

@Getter private SparkSession sparkSession;
/**
* Catalyst relations list
**/
Expand Down Expand Up @@ -283,4 +286,8 @@ public Expression resolveJoinCondition(
isResolvingJoinCondition = false;
return result;
}

public void withSparkSession(SparkSession sparkSession) {
this.sparkSession = sparkSession;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.opensearch.sql.ppl.utils.FieldSummaryTransformer;
import org.opensearch.sql.ppl.utils.GeoIpCatalystLogicalPlanTranslator;
import org.opensearch.sql.ppl.utils.ParseTransformer;
import org.opensearch.sql.ppl.utils.RelationUtils;
import org.opensearch.sql.ppl.utils.SortUtils;
import org.opensearch.sql.ppl.utils.TrendlineCatalystUtils;
import org.opensearch.sql.ppl.utils.WindowSpecTransformer;
Expand All @@ -88,9 +89,11 @@
import scala.collection.Seq;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -193,41 +196,56 @@ public LogicalPlan visitFilter(Filter node, CatalystPlanContext context) {
public LogicalPlan visitLookup(Lookup node, CatalystPlanContext context) {
visitFirstChild(node, context);
return context.apply( searchSide -> {
context.retainAllNamedParseExpressions(p -> p);
context.retainAllPlans(p -> p);
LogicalPlan target;
LogicalPlan lookupTable = node.getLookupRelation().accept(this, context);
Expression lookupCondition = buildLookupMappingCondition(node, expressionAnalyzer, context);
// If no output field is specified, all fields from lookup table are applied to the output.
if (node.allFieldsShouldAppliedToOutputList()) {
context.retainAllNamedParseExpressions(p -> p);
context.retainAllPlans(p -> p);
return join(searchSide, lookupTable, Join.JoinType.LEFT, Optional.of(lookupCondition), new Join.JoinHint());
}

// If the output fields are specified, build a project list for lookup table.
// The mapping fields of lookup table should be added in this project list, otherwise join will fail.
// So the mapping fields of lookup table should be dropped after join.
List<NamedExpression> lookupTableProjectList = buildLookupRelationProjectList(node, expressionAnalyzer, context);
LogicalPlan lookupTableWithProject = Project$.MODULE$.apply(seq(lookupTableProjectList), lookupTable);

LogicalPlan join = join(searchSide, lookupTableWithProject, Join.JoinType.LEFT, Optional.of(lookupCondition), new Join.JoinHint());
// When no output field is specified, all fields except mapping fields from lookup table are applied to the output.
// If some output fields from source side duplicate to fields of lookup table, these fields will
// be replaced by fields from lookup table in output.
// For example, the lookup table contains fields [id, col1, col3] and source side fields are [id, col1, col2].
// For query "index = sourceTable | fields id, col1, col2 | LOOKUP lookupTable id",
// the col1 is duplicated field and id is mapping key (and duplicated).
// The query outputs 4 fields: [id, col1, col2, col3]. Among them, `col2` is the original field from source,
// the matched values of `col1` from lookup table will replace to the values of `col1` from source.
Set<Field> duplicatedFieldsMaybeDrop =
new HashSet<>(RelationUtils.getFieldsFromCatalogTable(context.getSparkSession(), lookupTable));
Set<Field> mappingFieldsOfLookup = node.getLookupMappingMap().keySet();
// lookup mapping keys are not concerned to drop here, it will be checked later.
duplicatedFieldsMaybeDrop.removeAll(mappingFieldsOfLookup);
List<Expression> duplicated =
buildProjectListFromFields(new ArrayList<>(duplicatedFieldsMaybeDrop), expressionAnalyzer, context)
.stream().map(e -> (Expression) e).collect(Collectors.toList());
LogicalPlan searchSideWithDropped = DataFrameDropColumns$.MODULE$.apply(seq(duplicated), searchSide);
target = join(searchSideWithDropped, lookupTable, Join.JoinType.LEFT, Optional.of(lookupCondition), new Join.JoinHint());
} else {
// When output fields are specified, build a project list for lookup table.
// The mapping fields of lookup table should be added in this project list, otherwise join will fail.
// So the mapping fields of lookup table should be dropped after join.
List<NamedExpression> lookupTableProjectList = buildLookupRelationProjectList(node, expressionAnalyzer, context);
LogicalPlan lookupTableWithProject = Project$.MODULE$.apply(seq(lookupTableProjectList), lookupTable);

// Add all outputFields by __auto_generated_subquery_name_s.*
List<NamedExpression> outputFieldsWithNewAdded = new ArrayList<>();
outputFieldsWithNewAdded.add(UnresolvedStar$.MODULE$.apply(Option.apply(seq(node.getSourceSubqueryAliasName()))));
LogicalPlan join = join(searchSide, lookupTableWithProject, Join.JoinType.LEFT, Optional.of(lookupCondition), new Join.JoinHint());

// Add new columns based on different strategies:
// Append: coalesce($outputField, $"inputField").as(outputFieldName)
// Replace: $outputField.as(outputFieldName)
outputFieldsWithNewAdded.addAll(buildOutputProjectList(node, node.getOutputStrategy(), expressionAnalyzer, context));
// Add all outputFields by __auto_generated_subquery_name_s.*
List<NamedExpression> outputFieldsWithNewAdded = new ArrayList<>();
outputFieldsWithNewAdded.add(UnresolvedStar$.MODULE$.apply(Option.apply(seq(node.getSourceSubqueryAliasName()))));

org.apache.spark.sql.catalyst.plans.logical.Project outputWithNewAdded = Project$.MODULE$.apply(seq(outputFieldsWithNewAdded), join);
// Add new columns based on different strategies:
// Append: coalesce($outputField, $"inputField").as(outputFieldName)
// Replace: $outputField.as(outputFieldName)
outputFieldsWithNewAdded.addAll(buildOutputProjectList(node, node.getOutputStrategy(), expressionAnalyzer, context, searchSide));

target = Project$.MODULE$.apply(seq(outputFieldsWithNewAdded), join);
}
// Drop the mapping fields of lookup table in result:
// For example, in command "LOOKUP lookTbl Field1 AS Field2, Field3",
// the Field1 and Field3 are projection fields and join keys which will be dropped in result.
List<Field> mappingFieldsOfLookup = node.getLookupMappingMap().entrySet().stream()
.map(kv -> kv.getKey().getField() == kv.getValue().getField() ? buildFieldWithLookupSubqueryAlias(node, kv.getKey()) : kv.getKey())
.collect(Collectors.toList());
// List<Field> mappingFieldsOfLookup = new ArrayList<>(node.getLookupMappingMap().keySet());
List<Expression> dropListOfLookupMappingFields =
buildProjectListFromFields(mappingFieldsOfLookup, expressionAnalyzer, context).stream()
.map(Expression.class::cast).collect(Collectors.toList());
Expand All @@ -237,7 +255,7 @@ public LogicalPlan visitLookup(Lookup node, CatalystPlanContext context) {
List<Expression> toDrop = new ArrayList<>(dropListOfLookupMappingFields);
toDrop.addAll(dropListOfSourceFields);

LogicalPlan outputWithDropped = DataFrameDropColumns$.MODULE$.apply(seq(toDrop), outputWithNewAdded);
LogicalPlan outputWithDropped = DataFrameDropColumns$.MODULE$.apply(seq(toDrop), target);

context.retainAllNamedParseExpressions(p -> p);
context.retainAllPlans(p -> p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
import org.apache.spark.sql.catalyst.expressions.EqualTo$;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.opensearch.sql.ast.expression.Alias;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ppl.CatalystExpressionVisitor;
import org.opensearch.sql.ppl.CatalystPlanContext;
import org.opensearch.sql.ppl.CatalystQueryPlanVisitor;
import scala.Option;

import java.util.ArrayList;
Expand Down Expand Up @@ -83,31 +83,36 @@ static List<NamedExpression> buildOutputProjectList(
Lookup node,
Lookup.OutputStrategy strategy,
CatalystExpressionVisitor expressionAnalyzer,
CatalystPlanContext context) {
CatalystPlanContext context,
LogicalPlan searchSide) {
List<NamedExpression> outputProjectList = new ArrayList<>();
for (Map.Entry<Alias, Field> entry : node.getOutputCandidateMap().entrySet()) {
Alias inputFieldWithAlias = entry.getKey();
Field inputField = (Field) inputFieldWithAlias.getDelegated();
Field outputField = entry.getValue();
Expression inputCol = expressionAnalyzer.visitField(inputField, context);
Expression outputCol = expressionAnalyzer.visitField(outputField, context);

// Always resolve the inputCol expression with alias: __auto_generated_subquery_name_l.<fieldName>
// If the outputField existed in source table, resolve the outputCol expression with alias: __auto_generated_subquery_name_s.<fieldName>
// If not, resolve the outputCol expression without alias: <fieldName> to avoid failure of unable to resolved attribute.
Expression inputCol = expressionAnalyzer.visitField(buildFieldWithLookupSubqueryAlias(node, inputField), context);
Expression outputCol;
if (RelationUtils.columnExistsInCatalogTable(context.getSparkSession(), searchSide, outputField)) {
outputCol = expressionAnalyzer.visitField(buildFieldWithSourceSubqueryAlias(node, outputField), context);
} else {
outputCol = expressionAnalyzer.visitField(outputField, context);
}
Expression child;
if (strategy == Lookup.OutputStrategy.APPEND) {
child = Coalesce$.MODULE$.apply(seq(outputCol, inputCol));
} else {
child = inputCol;
}
// The result output project list we build here is used to replace the source output,
// for the unmatched rows of left outer join, the outputs are null, so fall back to source output.
Expression nullSafeOutput = Coalesce$.MODULE$.apply(seq(child, outputCol));
NamedExpression nullSafeOutputCol = Alias$.MODULE$.apply(nullSafeOutput,
NamedExpression output = Alias$.MODULE$.apply(child,
inputFieldWithAlias.getName(),
NamedExpression.newExprId(),
seq(new java.util.ArrayList<String>()),
Option.empty(),
seq(new java.util.ArrayList<String>()));
outputProjectList.add(nullSafeOutputCol);
outputProjectList.add(output);
}
context.retainAllNamedParseExpressions(p -> p);
return outputProjectList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,29 @@

package org.opensearch.sql.ppl.utils;

import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.opensearch.flint.spark.ppl.PPLSparkUtils;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.QualifiedName;
import scala.Option$;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public interface RelationUtils {
Logger LOG = LogManager.getLogger(RelationUtils.class);

/**
* attempt resolving if the field is relating to the given relation
* if name doesnt contain table prefix - add the current relation prefix to the fields name - returns true
Expand Down Expand Up @@ -65,4 +78,27 @@ static TableIdentifier getTableIdentifier(QualifiedName qualifiedName) {
}
return identifier;
}

static boolean columnExistsInCatalogTable(SparkSession spark, LogicalPlan plan, Field field) {
return getFieldsFromCatalogTable(spark, plan).stream().anyMatch(f -> f.getField().equals(field.getField()));
}

static List<Field> getFieldsFromCatalogTable(SparkSession spark, LogicalPlan plan) {
UnresolvedRelation relation = PPLSparkUtils.findLogicalRelations(plan).head();
QualifiedName tableQualifiedName = QualifiedName.of(Arrays.asList(relation.tableName().split("\\.")));
TableIdentifier tableIdentifier = getTableIdentifier(tableQualifiedName);
boolean tableExists = spark.sessionState().catalog().tableExists(tableIdentifier);
if (tableExists) {
try {
CatalogTable table = spark.sessionState().catalog().getTableMetadata(getTableIdentifier(tableQualifiedName));
return Arrays.stream(table.dataSchema().fields()).map(f -> new Field(QualifiedName.of(f.name()))).collect(Collectors.toList());
} catch (NoSuchDatabaseException | NoSuchTableException e) {
LOG.info("Table or database {} not found", tableIdentifier);
return ImmutableList.of();
}
} else {
LOG.info("Table {} not found", tableIdentifier);
return ImmutableList.of();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class FlintPPLSparkExtensions extends (SparkSessionExtensions => Unit) {

override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectParser { (spark, parser) =>
new FlintSparkPPLParser(parser)
new FlintSparkPPLParser(parser, spark)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.opensearch.flint.spark.ppl.PlaneUtils.plan
import org.opensearch.sql.common.antlr.SyntaxCheckException
import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser._
Expand All @@ -44,7 +45,8 @@ import org.apache.spark.sql.types.{DataType, StructType}
* @param sparkParser
* Spark SQL parser
*/
class FlintSparkPPLParser(sparkParser: ParserInterface) extends ParserInterface {
class FlintSparkPPLParser(sparkParser: ParserInterface, val spark: SparkSession)
extends ParserInterface {

/** OpenSearch (PPL) AST builder. */
private val planTransformer = new CatalystQueryPlanVisitor()
Expand All @@ -55,6 +57,7 @@ class FlintSparkPPLParser(sparkParser: ParserInterface) extends ParserInterface
try {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
context.withSparkSession(spark)
planTransformer.visit(plan(pplParser, sqlText), context)
context.getPlan
} catch {
Expand Down
Loading

0 comments on commit 9925289

Please sign in to comment.