Skip to content

Commit

Permalink
Support OpenSearch alias field type (#1032)
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo authored Feb 6, 2025
1 parent 785d02b commit eff717a
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 8 deletions.
31 changes: 31 additions & 0 deletions docs/opensearch-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,37 @@ Using a wildcard index name:
val df = spark.sql("SELECT * FROM dev.default.`my_index*`")
df.show()
```
## Data Types
The following table defines the data type mapping between OpenSearch index field type and Spark data type.

| **OpenSearch FieldType** | **SparkDataType** |
|--------------------------|-----------------------------------|
| boolean | BooleanType |
| long | LongType |
| integer | IntegerType |
| short | ShortType |
| byte | ByteType |
| double | DoubleType |
| float | FloatType |
| date(Timestamp) | TimestampType |
| date(Date) | DateType |
| keyword | StringType, VarcharType, CharType |
| text | StringType(meta(osType)=text) |
| object | StructType |
| alias | Inherits referenced field type |

* OpenSearch data type date is mapped to Spark data type based on the format:
* Map to DateType if format = strict_date, (we also support format = date, may change in future)
* Map to TimestampType if format = strict_date_optional_time_nanos, (we also support format =
strict_date_optional_time | epoch_millis, may change in future)
* Spark data types VarcharType(length) and CharType(length) are both currently mapped to Flint data
type *keyword*, dropping their length property. On the other hand, Flint data type *keyword* only
maps to StringType.
* Spark data type MapType is mapped to an empty OpenSearch object. The inner fields then rely on
dynamic mapping. On the other hand, Flint data type *object* only maps to StructType.
* Spark data type DecimalType is mapped to an OpenSearch double. On the other hand, Flint data type
*double* only maps to DoubleType.
* OpenSearch alias fields allow alternative names for existing fields in the schema without duplicating data. They inherit the data type and nullability of the referenced field and resolve dynamically to the primary field in queries.

Join two indices
```scala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ object FlintDataType {
"dateFormat" -> DateFormatter.defaultPattern,
"timestampFormat" -> STRICT_DATE_OPTIONAL_TIME_FORMATTER_WITH_NANOS)

val METADATA_ALIAS_PATH_NAME = "aliasPath"

/**
* parse Flint metadata and extract properties to StructType.
*/
Expand All @@ -39,14 +41,38 @@ object FlintDataType {

def deserializeJValue(json: JValue): StructType = {
val properties = (json \ "properties").extract[Map[String, JValue]]
val fields = properties.map { case (fieldName, fieldProperties) =>
deserializeFiled(fieldName, fieldProperties)
val (aliasProps, normalProps) = properties.partition { case (_, fieldProperties) =>
(fieldProperties \ "type") match {
case JString("alias") => true
case _ => false
}
}

StructType(fields.toSeq)
val fields: Seq[StructField] = normalProps.map { case (fieldName, fieldProperties) =>
deserializeField(fieldName, fieldProperties)
}.toSeq

val normalFieldMap: Map[String, StructField] = fields.map(f => f.name -> f).toMap

val aliasFields: Seq[StructField] = aliasProps.map { case (fieldName, fieldProperties) =>
val aliasPath = (fieldProperties \ "path").extract[String]
if (!normalFieldMap.contains(aliasPath)) {
throw new IllegalStateException(
s"Alias field [$fieldName] references undefined field [$aliasPath]")
}
val metadataBuilder = new MetadataBuilder()
metadataBuilder.putString(METADATA_ALIAS_PATH_NAME, aliasPath)
DataTypes.createStructField(
fieldName,
normalFieldMap(aliasPath).dataType,
true,
metadataBuilder.build())
}.toSeq

StructType(fields ++ aliasFields)
}

def deserializeFiled(fieldName: String, fieldProperties: JValue): StructField = {
def deserializeField(fieldName: String, fieldProperties: JValue): StructField = {
val metadataBuilder = new MetadataBuilder()
val dataType = fieldProperties \ "type" match {
// boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.json.{JacksonUtils, JsonFilters, JSONOption
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, DateFormatter, DateTimeUtils, GenericArrayData, IntervalUtils, MapData, PartialResultException, RebaseDateTime, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.flint.datatype.FlintDataType
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -448,13 +449,33 @@ class FlintJacksonParser(
var badRecordException: Option[Throwable] = None
var skipRow = false

// Build mapping from JSON key to sequence of schema field indices.
val fieldMapping: Map[String, Seq[Int]] = {
schema.fields.zipWithIndex.foldLeft(Map.empty[String, Seq[Int]]) {
case (acc, (field, idx)) =>
val jsonKey = if (field.metadata.contains(FlintDataType.METADATA_ALIAS_PATH_NAME)) {
field.metadata.getString(FlintDataType.METADATA_ALIAS_PATH_NAME)
} else {
field.name
}
acc.updated(jsonKey, acc.getOrElse(jsonKey, Seq.empty[Int]) :+ idx)
}
}

structFilters.reset()
while (!skipRow && nextUntil(parser, JsonToken.END_OBJECT)) {
schema.getFieldIndex(parser.getCurrentName) match {
case Some(index) =>
fieldMapping.get(parser.getCurrentName) match {
case Some(indices) =>
try {
row.update(index, fieldConverters(index).apply(parser))
skipRow = structFilters.skipRow(row, index)
// All fields in indices are same type.
val fieldValue = fieldConverters(indices.head).apply(parser)
// Assign the parsed value to all schema fields mapped to this JSON key.
indices.foreach { idx =>
row.update(idx, fieldValue)
if (structFilters.skipRow(row, idx)) {
skipRow = true
}
}
} catch {
case e: SparkUpgradeException => throw e
case NonFatal(e) if isRoot =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,45 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
val data: JValue = JsonMethods.parse(json)
JsonMethods.compact(JsonMethods.render(data))
}

test("alias field deserialize") {
val flintDataType =
"""{
| "properties": {
| "distance": {
| "type": "long"
| },
| "route_length_miles": {
| "type": "alias",
| "path": "distance"
| },
| "transit_mode": {
| "type": "keyword"
| }
| }
|}""".stripMargin

val expectedStructType = StructType(
Seq(
StructField("distance", LongType, true),
StructField("transit_mode", StringType, true),
StructField(
"route_length_miles",
LongType,
true,
new MetadataBuilder().putString("aliasPath", "distance").build())))

val deserialized = FlintDataType.deserialize(flintDataType)

deserialized.fields should have length (3)
deserialized.fields(0) shouldEqual expectedStructType.fields(0)
deserialized.fields(1) shouldEqual expectedStructType.fields(1)

val aliasField = deserialized.fields(2)
aliasField.name shouldEqual "route_length_miles"
aliasField.dataType shouldEqual LongType
aliasField.metadata.contains("aliasPath") shouldBe true
aliasField.metadata.getString("aliasPath") shouldEqual "distance"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,22 @@ class OpenSearchTableQueryITSuite extends OpenSearchCatalogSuite with FlintPPLSu
}
}
}

test("Query index with alias data type") {
val index1 = "t0001"
withIndexName(index1) {
indexWithAlias(index1)
// select original field and alias field
var df = spark.sql(s"""SELECT id, alias FROM ${catalogName}.default.$index1""")
checkAnswer(df, Seq(Row(1, 1), Row(2, 2)))

// filter on alias field
df = spark.sql(s"""SELECT id, alias FROM ${catalogName}.default.$index1 WHERE alias=1""")
checkAnswer(df, Row(1, 1))

// filter on original field
df = spark.sql(s"""SELECT id, alias FROM ${catalogName}.default.$index1 WHERE id=1""")
checkAnswer(df, Row(1, 1))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,22 @@ trait OpenSearchSuite extends BeforeAndAfterAll {
index(indexName, oneNodeSetting, mappings, docs)
}

def indexWithAlias(indexName: String): Unit = {
val mappings = """{
| "properties": {
| "id": {
| "type": "integer"
| },
| "alias": {
| "type": "alias",
| "path": "id"
| }
| }
|}""".stripMargin
val docs = Seq("""{"id": 1}""", """{"id": 2}""")
index(indexName, oneNodeSetting, mappings, docs)
}

def index(index: String, settings: String, mappings: String, docs: Seq[String]): Unit = {
openSearchClient.indices.create(
new CreateIndexRequest(index)
Expand Down

0 comments on commit eff717a

Please sign in to comment.