Skip to content

Commit

Permalink
Support multi-fields for text fields and improve push down logic
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Feb 7, 2025
1 parent eff717a commit f23e174
Show file tree
Hide file tree
Showing 10 changed files with 385 additions and 22 deletions.
25 changes: 13 additions & 12 deletions docs/opensearch-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ Using a wildcard index name:
val df = spark.sql("SELECT * FROM dev.default.`my_index*`")
df.show()
```
Join two indices
```scala
val df = spark.sql("SELECT * FROM dev.default.my_index as t1 JOIN dev.default.my_index as t2 ON t1.id == t2.id")
df.show()
```

## Data Types
The following table defines the data type mapping between OpenSearch index field type and Spark data type.

Expand All @@ -70,20 +76,15 @@ The following table defines the data type mapping between OpenSearch index field
* Map to DateType if format = strict_date, (we also support format = date, may change in future)
* Map to TimestampType if format = strict_date_optional_time_nanos, (we also support format =
strict_date_optional_time | epoch_millis, may change in future)
* Spark data types VarcharType(length) and CharType(length) are both currently mapped to Flint data
type *keyword*, dropping their length property. On the other hand, Flint data type *keyword* only
maps to StringType.
* Spark data types VarcharType(length) and CharType(length) are both currently mapped to OpenSearch
data type *keyword*, dropping their length property. On the other hand, OpenSearch data type
*keyword* only maps to StringType.
* Spark data type MapType is mapped to an empty OpenSearch object. The inner fields then rely on
dynamic mapping. On the other hand, Flint data type *object* only maps to StructType.
* Spark data type DecimalType is mapped to an OpenSearch double. On the other hand, Flint data type
*double* only maps to DoubleType.
dynamic mapping. On the other hand, OpenSearch data type *object* only maps to StructType.
* Spark data type DecimalType is mapped to an OpenSearch double. On the other hand, OpenSearch data
type *double* only maps to DoubleType.
* OpenSearch alias fields allow alternative names for existing fields in the schema without duplicating data. They inherit the data type and nullability of the referenced field and resolve dynamically to the primary field in queries.

Join two indices
```scala
val df = spark.sql("SELECT * FROM dev.default.my_index as t1 JOIN dev.default.my_index as t2 ON t1.id == t2.id")
df.show()
```
* OpenSearch multi-fields on text field is supported. These multi-fields are stored as part of the field's metadata and cannot be directly selected. Instead, they are automatically utilized during the DSL query translation process.

## Limitation
### catalog operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,19 @@ object FlintDataType {
(fieldProperties \ "format")
.extractOrElse(DEFAULT_DATE_FORMAT))

// Text
// Text with possible multi-fields
case JString("text") =>
metadataBuilder.putString("osType", "text")
StringType
FlintMetadataHelper.addTextFieldMetadata(metadataBuilder)
(fieldProperties \ "fields") match {
case fields: JObject =>
FlintMetadataHelper.addMultiFieldMetadata(
metadataBuilder,
fields.obj.map { case (name, props) =>
(s"$fieldName.$name", (props \ "type").extract[String])
}.toMap)
StringType
case _ => StringType
}

// object types
case JString("object") | JNothing => deserializeJValue(fieldProperties)
Expand Down Expand Up @@ -155,7 +164,7 @@ object FlintDataType {

// string
case StringType | _: VarcharType | _: CharType =>
if (metadata.contains("osType") && metadata.getString("osType") == "text") {
if (FlintMetadataHelper.isTextField(metadata)) {
JObject("type" -> JString("text"))
} else {
JObject("type" -> JString("keyword"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.apache.spark.sql.flint.datatype

import org.apache.spark.sql.types.{MetadataBuilder, _}

/**
* Helper class for handling Flint metadata operations
*/
object FlintMetadataHelper {
// OpenSearch Mappings. https://opensearch.org/docs/latest/field-types/
val OS_TYPE_KEY = "osType"
val FIELDS_NAMES_KEY = "fields"

// OpenSearch field types. https://opensearch.org/docs/latest/field-types/supported-field-types/index/
val TEXT_TYPE = "text"
val KEYWORD_TYPE = "keyword"

/**
* Check if the metadata indicates a text field
*/
def isTextField(metadata: Metadata): Boolean = {
metadata.contains(OS_TYPE_KEY) && metadata.getString(OS_TYPE_KEY) == TEXT_TYPE
}

/**
* Add text field metadata to builder
*/
def addTextFieldMetadata(builder: MetadataBuilder): MetadataBuilder = {
builder.putString(OS_TYPE_KEY, TEXT_TYPE)
}

/**
* Add multi-field metadata to the provided MetadataBuilder.
*
* This method groups the provided fields by their field type. For each field type, the
* associated field names are collected into an array. These arrays are then stored in a nested
* metadata object, with each field type as the key. The nested metadata is added to the main
* metadata builder under the key FIELDS_NAMES_KEY.
*
* @param builder
* the MetadataBuilder to update with multi-field metadata.
* @param fields
* a map where each key is a field name and the corresponding value is its field type.
* @return
* the updated MetadataBuilder containing the multi-field metadata.
*/
def addMultiFieldMetadata(
builder: MetadataBuilder,
fields: Map[String, String]): MetadataBuilder = {
val mb = new MetadataBuilder()
fields
.groupBy { case (_, fieldType) => fieldType }
.foreach { case (fieldType, entries) =>
val fieldNames = entries.map { case (fieldName, _) => fieldName }
mb.putStringArray(fieldType, fieldNames.toArray)
}
builder.putMetadata(FIELDS_NAMES_KEY, mb.build())
}

/**
* Retrieve the first subfield name of type KEYWORD_TYPE if available.
*
* This method checks whether the provided metadata contains multi-field metadata under the key
* FIELDS_NAMES_KEY. It then looks for a group of subfields with the key equal to KEYWORD_TYPE.
* If such a group exists, the first field name in the array is returned.
*
* @param metadata
* the metadata from which to retrieve the keyword subfield.
* @return
* an Option containing the first keyword subfield name, if found; otherwise, None.
*/
def getKeywordSubfield(metadata: Metadata): Option[String] = {
if (metadata.contains(FIELDS_NAMES_KEY)) {
val multiFieldMetadata = metadata.getMetadata(FIELDS_NAMES_KEY)
if (multiFieldMetadata.contains(KEYWORD_TYPE)) {
multiFieldMetadata.getStringArray(KEYWORD_TYPE).headOption
} else {
None
}
} else {
None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, LiteralValue}
import org.apache.spark.sql.connector.expressions.filter.{And, Predicate}
import org.apache.spark.sql.flint.datatype.FlintDataType.STRICT_DATE_OPTIONAL_TIME_FORMATTER_WITH_NANOS
import org.apache.spark.sql.flint.datatype.FlintMetadataHelper
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -89,7 +90,16 @@ case class FlintQueryCompiler(schema: StructType) {
case "NOT" =>
s"""{"bool":{"must_not":${compile(p.children()(0))}}}"""
case "=" =>
s"""{"term":{"${compile(p.children()(0))}":{"value":${compile(p.children()(1))}}}}"""
val fieldName = compile(p.children()(0))
if (isTextField(fieldName)) {
getKeywordSubfield(fieldName) match {
case Some(keywordField) =>
s"""{"term":{"$keywordField":{"value":${compile(p.children()(1))}}}}"""
case None => ""
}
} else {
s"""{"term":{"$fieldName":{"value":${compile(p.children()(1))}}}}"""
}
case ">" =>
s"""{"range":{"${compile(p.children()(0))}":{"gt":${compile(p.children()(1))}}}}"""
case ">=" =>
Expand Down Expand Up @@ -144,8 +154,19 @@ case class FlintQueryCompiler(schema: StructType) {
protected def isTextField(attribute: String): Boolean = {
schema.apply(attribute) match {
case StructField(_, StringType, _, metadata) =>
metadata.contains("osType") && metadata.getString("osType") == "text"
FlintMetadataHelper.isTextField(metadata)
case _ => false
}
}

/**
* Get keyword subfield name if available for text fields
*/
protected def getKeywordSubfield(attribute: String): Option[String] = {
schema.apply(attribute) match {
case StructField(_, StringType, _, metadata) =>
FlintMetadataHelper.getKeywordSubfield(metadata)
case _ => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,51 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
Nil)
}

test("text field with multi-fields deserialize") {
val flintDataType = """{
| "properties": {
| "city": {
| "type": "text",
| "fields": {
| "raw": {
| "type": "keyword"
| },
| "keyword": {
| "type": "keyword"
| }
| }
| }
| }
|}""".stripMargin
val mb = new MetadataBuilder()
FlintMetadataHelper.addTextFieldMetadata(mb)
FlintMetadataHelper.addMultiFieldMetadata(
mb,
Map("city.raw" -> "keyword", "city.keyword" -> "keyword"))
val metadata = mb.build()
val expectedStructType = StructType(StructField("city", StringType, true, metadata) :: Nil)

FlintDataType.deserialize(flintDataType) should contain theSameElementsAs expectedStructType
}

test("text field without multi-fields deserialize") {
val flintDataType = """{
| "properties": {
| "description": {
| "type": "text"
| }
| }
|}""".stripMargin

val mb = new MetadataBuilder()
FlintMetadataHelper.addTextFieldMetadata(mb)
val metadata = mb.build()

val expectedStructType =
StructType(StructField("description", StringType, true, metadata) :: Nil)
FlintDataType.deserialize(flintDataType) should contain theSameElementsAs expectedStructType
}

test("flint date type deserialize and serialize") {
val flintDataType = """{
| "properties": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.apache.spark.sql.flint.datatype

import org.scalatest.matchers.should.Matchers

import org.apache.spark.FlintSuite
import org.apache.spark.sql.types._

class FlintMetadataHelperSuite extends FlintSuite with Matchers {

test("isTextField returns true when osType is text") {
val builder = new MetadataBuilder()
.putString(FlintMetadataHelper.OS_TYPE_KEY, FlintMetadataHelper.TEXT_TYPE)
val metadata: Metadata = builder.build()
assert(FlintMetadataHelper.isTextField(metadata))
}

test("isTextField returns false when osType is not text") {
val builder = new MetadataBuilder().putString(FlintMetadataHelper.OS_TYPE_KEY, "non-text")
val metadata: Metadata = builder.build()
assert(!FlintMetadataHelper.isTextField(metadata))
}

test("addTextFieldMetadata sets osType to text") {
val builder = new MetadataBuilder()
val updatedBuilder = FlintMetadataHelper.addTextFieldMetadata(builder)
val metadata: Metadata = updatedBuilder.build()
assert(metadata.getString(FlintMetadataHelper.OS_TYPE_KEY) == FlintMetadataHelper.TEXT_TYPE)
}

test("addMultiFieldMetadata groups fields by field type") {
val builder = new MetadataBuilder()
val fields = Map(
"field1" -> FlintMetadataHelper.TEXT_TYPE,
"field2" -> FlintMetadataHelper.KEYWORD_TYPE,
"field3" -> FlintMetadataHelper.KEYWORD_TYPE)
val updatedBuilder = FlintMetadataHelper.addMultiFieldMetadata(builder, fields)
val metadata: Metadata = updatedBuilder.build()

// Verify that multi-field metadata is added under FIELDS_NAMES_KEY.
assert(metadata.contains(FlintMetadataHelper.FIELDS_NAMES_KEY))
val multiFieldMetadata: Metadata = metadata.getMetadata(FlintMetadataHelper.FIELDS_NAMES_KEY)

// Verify text type field grouping.
assert(multiFieldMetadata.contains(FlintMetadataHelper.TEXT_TYPE))
val textFields = multiFieldMetadata.getStringArray(FlintMetadataHelper.TEXT_TYPE)
assert(textFields.sameElements(Array("field1")))

// Verify keyword type field grouping.
assert(multiFieldMetadata.contains(FlintMetadataHelper.KEYWORD_TYPE))
val keywordFields = multiFieldMetadata.getStringArray(FlintMetadataHelper.KEYWORD_TYPE)
// Since the order of grouping may vary, compare sorted arrays.
assert(keywordFields.sorted.sameElements(Array("field2", "field3")))
}

test("getKeywordSubfield returns the first keyword field if available") {
val builder = new MetadataBuilder()
val fields = Map(
"field1" -> FlintMetadataHelper.TEXT_TYPE,
"field2" -> FlintMetadataHelper.KEYWORD_TYPE,
"field3" -> FlintMetadataHelper.KEYWORD_TYPE)
val updatedBuilder = FlintMetadataHelper.addMultiFieldMetadata(builder, fields)
val metadata: Metadata = updatedBuilder.build()

// Retrieve keyword fields from the nested metadata.
val multiFieldMetadata = metadata.getMetadata(FlintMetadataHelper.FIELDS_NAMES_KEY)
val keywordFields = multiFieldMetadata.getStringArray(FlintMetadataHelper.KEYWORD_TYPE)

// Expect the first keyword field.
assert(FlintMetadataHelper.getKeywordSubfield(metadata) == keywordFields.headOption)
}

test("getKeywordSubfield returns None if no keyword field exists") {
val builder = new MetadataBuilder()
val fields = Map("field1" -> FlintMetadataHelper.TEXT_TYPE)
val updatedBuilder = FlintMetadataHelper.addMultiFieldMetadata(builder, fields)
val metadata: Metadata = updatedBuilder.build()

// Since there is no keyword type, getKeywordSubfield should return None.
assert(FlintMetadataHelper.getKeywordSubfield(metadata).isEmpty)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import scala.io.Source
import org.apache.spark.FlintSuite
import org.apache.spark.sql.connector.expressions.{FieldReference, GeneralScalarExpression, LiteralValue}
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.flint.datatype.FlintMetadataHelper
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -167,6 +168,37 @@ class FlintQueryCompilerSuite extends FlintSuite {
|""".stripMargin)(query)
}

test("compile text field with keyword subfield should use term query on subfield") {
val schema = StructType(
Seq(StructField(
"city",
StringType,
nullable = true, {
val mb = new MetadataBuilder()
FlintMetadataHelper.addTextFieldMetadata(mb)
FlintMetadataHelper.addMultiFieldMetadata(mb, Map("city.raw" -> "keyword"))
mb.build()
})))
val query = FlintQueryCompiler(schema).compile(EqualTo("city", "Seattle").toV2)
assertResult("""{"term":{"city.raw":{"value":"Seattle"}}}""")(query)
}

test("compile text field without keyword subfield should return empty") {
val schema = StructType(
Seq(
StructField(
"aText",
StringType,
nullable = true, {
val mb = new MetadataBuilder()
FlintMetadataHelper.addTextFieldMetadata(mb)
mb.build()
})))

val query = FlintQueryCompiler(schema).compile(EqualTo("aText", "text").toV2)
assertResult("")(query)
}

protected def schema(): StructType = {
StructType(
Seq(
Expand Down
Loading

0 comments on commit f23e174

Please sign in to comment.