Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ppl spark join command #69

Merged
merged 60 commits into from
Dec 14, 2023

Conversation

YANG-DB
Copy link
Member

@YANG-DB YANG-DB commented Oct 11, 2023

Description

New Correlation Query Command

Here is the new command that would allow this type of investigation :

source alb_logs, traces | where alb_logs.ip="10.0.0.1" AND alb_logs.cloud.provider="aws"|
correlate exact fields(traceId, ip) scope(@timestamp, 1D) mapping(alb_logs.ip = traces.attributes.http.server.address, alb_logs.traceId = traces.traceId )

Lets break this down a bit:

1. source alb_logs, traces allows to select all the data-sources that will be correlated to one another

2. where ip="10.0.0.1" AND cloud.provider="aws" predicate clause constraints the scope of the search corpus

3. correlate exact fields(traceId, ip) express the correlation operation on the following list of field :

- ip has an explicit filter condition so this will be propagated into the correlation condition for all the data-sources
- traceId has no explicit filter so the correlation will only match same traceId’s from all the data-sources

The fields names indicate the logical meaning the function within the correlation command, the actual join condition will take the mapping statement described bellow.

The term exact means that the correlation statements will require all the fields to match in order to fulfill the query statement.

Other alternative for this can be approximate that will attempt to match on a best case scenario and will not reject rows with partially match.

Addressing different field mapping

In cases where the same logical field (such as ip ) may have different mapping within several data-sources, the explicit mapping field path is expected.

The next syntax will extend the correlation conditions to allow matching different field names with similar logical meaning
alb_logs.ip = traces.attributes.http.server.address, alb_logs.traceId = traces.traceId

It is expected that for each field that participates in the correlation join, there should be a relevant mapping statement that includes all the tables that should be joined by this correlation command.

Example**:**
In our case there are 2 sources : alb_logs, traces
There are 2 fields: traceId, ip
These are 2 mapping statements : alb_logs.ip = traces.attributes.http.server.address, alb_logs.traceId = traces.traceId

Scoping the correlation timeframes

In order to simplify the work that has to be done by the execution engine (driver) the scope statement was added to explicitly direct the join query on the time it should scope for this search.

scope(@timestamp, 1D) in this example, the scope of the search should be focused on a daily basis so that correlations appearing in the same day should be grouped together. This scoping mechanism simplifies and allows better control over results and allows incremental search resolution base on the user’s needs.

Issues Resolved

#68

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

YANG-DB added 30 commits August 23, 2023 14:06
add ppl statement logical plan elements
add ppl parser components
add ppl expressions components

Signed-off-by: YANGDB <[email protected]>
 -  source = $testTable
 -  source = $testTable | fields name, age
 -  source = $testTable age=25 | fields name, age

Signed-off-by: YANGDB <[email protected]>
add AggregateFunction translation & tests
remove unused DSL builder

Signed-off-by: YANGDB <[email protected]>
# Conflicts:
#	build.sbt
#	flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala
add actual ppl based table content fetch and verification

Signed-off-by: YANGDB <[email protected]>
Signed-off-by: YANGDB <[email protected]>
add README.md details for supported commands and planned future support

Signed-off-by: YANGDB <[email protected]>
add missing license header
update supported command in readme

Signed-off-by: YANGDB <[email protected]>
add join ast builder

Signed-off-by: YANGDB <[email protected]>
# Conflicts:
#	build.sbt
#	integ-test/src/test/scala/org/opensearch/flint/spark/LogicalPlanTestUtils.scala
#	ppl-spark-integration/README.md
#	ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4
#	ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
#	ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java
#	ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Project.java
#	ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystPlanContext.java
#	ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java
#	ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java
#	ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java
#	ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/AggregatorTranslator.java
#	ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/ComparatorTransformer.java
#	ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/DataTypeTransformer.java
#	ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/SortUtils.java
#	ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLParser.scala
#	ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/LogicalPlanTestUtils.scala
#	ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalAdvancedTranslatorTestSuite.scala
#	ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanAggregationQueriesTranslatorTestSuite.scala
#	ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala
#	ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFiltersTranslatorTestSuite.scala
add test parts

Signed-off-by: YANGDB <[email protected]>
 - add plan branches context traversal
 - add resolving of un-resolved attributes (columns)
 - add join spec transformer util API
 - add documentation about the correlation design considerations

Signed-off-by: YANGDB <[email protected]>
@YANG-DB YANG-DB marked this pull request as ready for review October 14, 2023 07:12
@YANG-DB YANG-DB merged commit a0ac1fb into opensearch-project:main Dec 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant