Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support OpenSearch alias field type #1032

Merged
merged 5 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions docs/opensearch-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,37 @@ Using a wildcard index name:
val df = spark.sql("SELECT * FROM dev.default.`my_index*`")
df.show()
```
## Data Types
The following table defines the data type mapping between OpenSearch index field type and Spark data type.

| **OpenSearch FieldType** | **SparkDataType** |
|--------------------------|-----------------------------------|
| boolean | BooleanType |
| long | LongType |
| integer | IntegerType |
| short | ShortType |
| byte | ByteType |
| double | DoubleType |
| float | FloatType |
| date(Timestamp) | DateType |
| date(Date) | TimestampType |
penghuo marked this conversation as resolved.
Show resolved Hide resolved
| keyword | StringType, VarcharType, CharType |
| text | StringType(meta(osType)=text) |
| object | StructType |
| alias | Inherits referenced field type |

* OpenSearch data type date is mapped to Spark data type based on the format:
* Map to DateType if format = strict_date, (we also support format = date, may change in future)
* Map to TimestampType if format = strict_date_optional_time_nanos, (we also support format =
strict_date_optional_time | epoch_millis, may change in future)
* Spark data types VarcharType(length) and CharType(length) are both currently mapped to Flint data
type *keyword*, dropping their length property. On the other hand, Flint data type *keyword* only
maps to StringType.
* Spark data type MapType is mapped to an empty OpenSearch object. The inner fields then rely on
dynamic mapping. On the other hand, Flint data type *object* only maps to StructType.
* Spark data type DecimalType is mapped to an OpenSearch double. On the other hand, Flint data type
*double* only maps to DoubleType.
* OpenSearch alias fields allow alternative names for existing fields in the schema without duplicating data. They inherit the data type and nullability of the referenced field and resolve dynamically to the primary field in queries.

## Limitation
### catalog operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ object FlintDataType {
"dateFormat" -> DateFormatter.defaultPattern,
"timestampFormat" -> STRICT_DATE_OPTIONAL_TIME_FORMATTER_WITH_NANOS)

val METADATA_ALIAS_PATH_NAME = "aliasPath"

/**
* parse Flint metadata and extract properties to StructType.
*/
Expand All @@ -39,14 +41,38 @@ object FlintDataType {

def deserializeJValue(json: JValue): StructType = {
val properties = (json \ "properties").extract[Map[String, JValue]]
val fields = properties.map { case (fieldName, fieldProperties) =>
deserializeFiled(fieldName, fieldProperties)
val (aliasProps, normalProps) = properties.partition { case (_, fieldProperties) =>
(fieldProperties \ "type") match {
case JString("alias") => true
case _ => false
}
}

StructType(fields.toSeq)
val fields: Seq[StructField] = normalProps.map { case (fieldName, fieldProperties) =>
deserializeField(fieldName, fieldProperties)
}.toSeq

val normalFieldMap: Map[String, StructField] = fields.map(f => f.name -> f).toMap

val aliasFields: Seq[StructField] = aliasProps.map { case (fieldName, fieldProperties) =>
val aliasPath = (fieldProperties \ "path").extract[String]
if (!normalFieldMap.contains(aliasPath)) {
throw new IllegalStateException(
s"Alias field [$fieldName] references undefined field [$aliasPath]")
}
val metadataBuilder = new MetadataBuilder()
metadataBuilder.putString(METADATA_ALIAS_PATH_NAME, aliasPath)
DataTypes.createStructField(
fieldName,
normalFieldMap(aliasPath).dataType,
true,
metadataBuilder.build())
}.toSeq

StructType(fields ++ aliasFields)
}

def deserializeFiled(fieldName: String, fieldProperties: JValue): StructField = {
def deserializeField(fieldName: String, fieldProperties: JValue): StructField = {
val metadataBuilder = new MetadataBuilder()
val dataType = fieldProperties \ "type" match {
// boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.json.{JacksonUtils, JsonFilters, JSONOption
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, DateFormatter, DateTimeUtils, GenericArrayData, IntervalUtils, MapData, PartialResultException, RebaseDateTime, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.flint.datatype.FlintDataType
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -448,13 +449,33 @@ class FlintJacksonParser(
var badRecordException: Option[Throwable] = None
var skipRow = false

// Build mapping from JSON key to sequence of schema field indices.
penghuo marked this conversation as resolved.
Show resolved Hide resolved
val fieldMapping: Map[String, Seq[Int]] = {
schema.fields.zipWithIndex.foldLeft(Map.empty[String, Seq[Int]]) {
case (acc, (field, idx)) =>
val jsonKey = if (field.metadata.contains(FlintDataType.METADATA_ALIAS_PATH_NAME)) {
field.metadata.getString(FlintDataType.METADATA_ALIAS_PATH_NAME)
} else {
field.name
}
acc.updated(jsonKey, acc.getOrElse(jsonKey, Seq.empty[Int]) :+ idx)
}
}

structFilters.reset()
while (!skipRow && nextUntil(parser, JsonToken.END_OBJECT)) {
schema.getFieldIndex(parser.getCurrentName) match {
case Some(index) =>
fieldMapping.get(parser.getCurrentName) match {
case Some(indices) =>
try {
row.update(index, fieldConverters(index).apply(parser))
skipRow = structFilters.skipRow(row, index)
// All fields in indices are same type.
val fieldValue = fieldConverters(indices.head).apply(parser)
// Assign the parsed value to all schema fields mapped to this JSON key.
indices.foreach { idx =>
row.update(idx, fieldValue)
if (structFilters.skipRow(row, idx)) {
skipRow = true
}
}
} catch {
case e: SparkUpgradeException => throw e
case NonFatal(e) if isRoot =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,45 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
val data: JValue = JsonMethods.parse(json)
JsonMethods.compact(JsonMethods.render(data))
}

test("alias field deserialize") {
val flintDataType =
"""{
| "properties": {
| "distance": {
| "type": "long"
| },
| "route_length_miles": {
| "type": "alias",
| "path": "distance"
| },
| "transit_mode": {
| "type": "keyword"
| }
| }
|}""".stripMargin

val expectedStructType = StructType(
Seq(
StructField("distance", LongType, true),
StructField("transit_mode", StringType, true),
StructField(
"route_length_miles",
LongType,
true,
new MetadataBuilder().putString("aliasPath", "distance").build())))

val deserialized = FlintDataType.deserialize(flintDataType)

deserialized.fields should have length (3)
deserialized.fields(0) shouldEqual expectedStructType.fields(0)
deserialized.fields(1) shouldEqual expectedStructType.fields(1)

val aliasField = deserialized.fields(2)
aliasField.name shouldEqual "route_length_miles"
aliasField.dataType shouldEqual LongType
aliasField.metadata.contains("aliasPath") shouldBe true
aliasField.metadata.getString("aliasPath") shouldEqual "distance"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.apache.spark.opensearch.table

import org.apache.spark.sql.Row

class OpenSearchTableITSuite extends OpenSearchCatalogSuite {

def multipleShardsIndex(indexName: String): Unit = {
Expand Down Expand Up @@ -61,4 +63,22 @@ class OpenSearchTableITSuite extends OpenSearchCatalogSuite {
}
}
}

test("Query index with alias data type") {
val index1 = "t0001"
withIndexName(index1) {
indexWithAlias(index1)
// select original field and alias field
var df = spark.sql(s"""SELECT id, alias FROM ${catalogName}.default.$index1""")
checkAnswer(df, Seq(Row(1, 1), Row(2, 2)))

// filter on alias field
df = spark.sql(s"""SELECT id, alias FROM ${catalogName}.default.$index1 WHERE alias=1""")
checkAnswer(df, Row(1, 1))

// filter on original field
df = spark.sql(s"""SELECT id, alias FROM ${catalogName}.default.$index1 WHERE id=1""")
checkAnswer(df, Row(1, 1))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,22 @@ trait OpenSearchSuite extends BeforeAndAfterAll {
index(indexName, oneNodeSetting, mappings, docs)
}

def indexWithAlias(indexName: String): Unit = {
val mappings = """{
| "properties": {
| "id": {
| "type": "integer"
| },
| "alias": {
| "type": "alias",
| "path": "id"
| }
| }
|}""".stripMargin
val docs = Seq("""{"id": 1}""", """{"id": 2}""")
index(indexName, oneNodeSetting, mappings, docs)
}

def index(index: String, settings: String, mappings: String, docs: Seq[String]): Unit = {
openSearchClient.indices.create(
new CreateIndexRequest(index)
Expand Down
Loading