From 05dbf33e79f75b26a3a4d9be6ccdea309a052a0a Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Fri, 14 Feb 2025 08:13:08 -0800 Subject: [PATCH] Add OpenSearch Dashboards IT (#1046) Signed-off-by: Peng Huo --- .../sql/flint/datatype/FlintDataType.scala | 36 +++++--- .../dashboards_sample_data_flights.json | 5 ++ ...shboards_sample_data_flights_mappings.json | 85 ++++++++++++++++++ .../dashboards_sample_data_flights_tests.json | 80 +++++++++++++++++ .../table/OpenSearchCatalogSuite.scala | 1 + .../table/OpenSearchDashboardITSuite.scala | 89 +++++++++++++++++++ .../opensearch/flint/OpenSearchSuite.scala | 12 +++ 7 files changed, 294 insertions(+), 14 deletions(-) create mode 100644 integ-test/src/integration/resources/opensearch/dashboards_sample_data_flights.json create mode 100644 integ-test/src/integration/resources/opensearch/dashboards_sample_data_flights_mappings.json create mode 100644 integ-test/src/integration/resources/opensearch/dashboards_sample_data_flights_tests.json create mode 100644 integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchDashboardITSuite.scala diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala index 9c2aecdf6..d1dab9cc5 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala @@ -33,6 +33,8 @@ object FlintDataType { val METADATA_ALIAS_PATH_NAME = "aliasPath" + val UNSUPPORTED_OPENSEARCH_FIELD_TYPE = Set("geo_point") + /** * parse Flint metadata and extract properties to StructType. */ @@ -49,25 +51,24 @@ object FlintDataType { } } - val fields: Seq[StructField] = normalProps.map { case (fieldName, fieldProperties) => - deserializeField(fieldName, fieldProperties) - }.toSeq + val fields: Seq[StructField] = normalProps + .filter { case (_, fp) => isSupported(fp) } + .map { case (fieldName, fieldProperties) => + deserializeField(fieldName, fieldProperties) + } + .toSeq val normalFieldMap: Map[String, StructField] = fields.map(f => f.name -> f).toMap - val aliasFields: Seq[StructField] = aliasProps.map { case (fieldName, fieldProperties) => + // Process alias fields: only include alias fields if the referenced field exists. + val aliasFields: Seq[StructField] = aliasProps.flatMap { case (fieldName, fieldProperties) => val aliasPath = (fieldProperties \ "path").extract[String] - if (!normalFieldMap.contains(aliasPath)) { - throw new IllegalStateException( - s"Alias field [$fieldName] references undefined field [$aliasPath]") + normalFieldMap.get(aliasPath).map { referencedField => + val metadataBuilder = new MetadataBuilder() + metadataBuilder.putString(METADATA_ALIAS_PATH_NAME, aliasPath) + DataTypes + .createStructField(fieldName, referencedField.dataType, true, metadataBuilder.build()) } - val metadataBuilder = new MetadataBuilder() - metadataBuilder.putString(METADATA_ALIAS_PATH_NAME, aliasPath) - DataTypes.createStructField( - fieldName, - normalFieldMap(aliasPath).dataType, - true, - metadataBuilder.build()) }.toSeq StructType(fields ++ aliasFields) @@ -120,6 +121,13 @@ object FlintDataType { DataTypes.createStructField(fieldName, dataType, true, metadataBuilder.build()) } + def isSupported(fieldProperties: JValue): Boolean = { + (fieldProperties \ "type") match { + case JString(fieldType) => !UNSUPPORTED_OPENSEARCH_FIELD_TYPE.contains(fieldType) + case _ => true + } + } + /** * parse format in flint metadata * @return diff --git a/integ-test/src/integration/resources/opensearch/dashboards_sample_data_flights.json b/integ-test/src/integration/resources/opensearch/dashboards_sample_data_flights.json new file mode 100644 index 000000000..8e681d0b3 --- /dev/null +++ b/integ-test/src/integration/resources/opensearch/dashboards_sample_data_flights.json @@ -0,0 +1,5 @@ +{"FlightNum":"9HY9SWR","DestCountry":"AU","OriginWeather":"Sunny","OriginCityName":"Frankfurt am Main","AvgTicketPrice":841.2656419677076,"DistanceMiles":10247.856675613455,"FlightDelay":false,"DestWeather":"Rain","Dest":"Sydney Kingsford Smith International Airport","FlightDelayType":"No Delay","OriginCountry":"DE","dayOfWeek":0,"DistanceKilometers":16492.32665375846,"timestamp":"2025-01-27T00:00:00","DestLocation":{"lat":"-33.94609833","lon":"151.177002"},"DestAirportID":"SYD","Carrier":"OpenSearch Dashboards Airlines","Cancelled":false,"FlightTimeMin":1030.7704158599038,"Origin":"Frankfurt am Main Airport","OriginLocation":{"lat":"50.033333","lon":"8.570556"},"DestRegion":"SE-BD","OriginAirportID":"FRA","OriginRegion":"DE-HE","DestCityName":"Sydney","FlightTimeHour":17.179506930998397,"FlightDelayMin":0} +{"FlightNum":"X98CCZO","DestCountry":"IT","OriginWeather":"Clear","OriginCityName":"Cape Town","AvgTicketPrice":882.9826615595518,"DistanceMiles":5482.606664853586,"FlightDelay":false,"DestWeather":"Sunny","Dest":"Venice Marco Polo Airport","FlightDelayType":"No Delay","OriginCountry":"ZA","dayOfWeek":0,"DistanceKilometers":8823.40014044213,"timestamp":"2025-01-27T18:27:00","DestLocation":{"lat":"45.505299","lon":"12.3519"},"DestAirportID":"VE05","Carrier":"Logstash Airways","Cancelled":false,"FlightTimeMin":464.3894810759016,"Origin":"Cape Town International Airport","OriginLocation":{"lat":"-33.96480179","lon":"18.60169983"},"DestRegion":"IT-34","OriginAirportID":"CPT","OriginRegion":"SE-BD","DestCityName":"Venice","FlightTimeHour":7.73982468459836,"FlightDelayMin":0} +{"FlightNum":"UFK2WIZ","DestCountry":"IT","OriginWeather":"Rain","OriginCityName":"Venice","AvgTicketPrice":190.6369038508356,"DistanceMiles":0,"FlightDelay":false,"DestWeather":"Cloudy","Dest":"Venice Marco Polo Airport","FlightDelayType":"No Delay","OriginCountry":"IT","dayOfWeek":0,"DistanceKilometers":0,"timestamp":"2025-01-27T17:11:14","DestLocation":{"lat":"45.505299","lon":"12.3519"},"DestAirportID":"VE05","Carrier":"Logstash Airways","Cancelled":false,"FlightTimeMin":0,"Origin":"Venice Marco Polo Airport","OriginLocation":{"lat":"45.505299","lon":"12.3519"},"DestRegion":"IT-34","OriginAirportID":"VE05","OriginRegion":"IT-34","DestCityName":"Venice","FlightTimeHour":0,"FlightDelayMin":0} +{"FlightNum":"EAYQW69","DestCountry":"IT","OriginWeather":"Thunder & Lightning","OriginCityName":"Naples","AvgTicketPrice":181.69421554118,"DistanceMiles":345.31943877289535,"FlightDelay":true,"DestWeather":"Clear","Dest":"Treviso-Sant'Angelo Airport","FlightDelayType":"Weather Delay","OriginCountry":"IT","dayOfWeek":0,"DistanceKilometers":555.7377668725265,"timestamp":"2025-01-27T10:33:28","DestLocation":{"lat":"45.648399","lon":"12.1944"},"DestAirportID":"TV01","Carrier":"OpenSearch Dashboards Airlines","Cancelled":true,"FlightTimeMin":222.74905899019436,"Origin":"Naples International Airport","OriginLocation":{"lat":"40.886002","lon":"14.2908"},"DestRegion":"IT-34","OriginAirportID":"NA01","OriginRegion":"IT-72","DestCityName":"Treviso","FlightTimeHour":3.712484316503239,"FlightDelayMin":180} +{"FlightNum":"58U013N","DestCountry":"CN","OriginWeather":"Damaging Wind","OriginCityName":"Mexico City","AvgTicketPrice":730.041778346198,"DistanceMiles":8300.428124665925,"FlightDelay":false,"DestWeather":"Clear","Dest":"Xi'an Xianyang International Airport","FlightDelayType":"No Delay","OriginCountry":"MX","dayOfWeek":0,"DistanceKilometers":13358.24419986236,"timestamp":"2025-01-27T05:13:00","DestLocation":{"lat":"34.447102","lon":"108.751999"},"DestAirportID":"XIY","Carrier":"OpenSearch Dashboards Airlines","Cancelled":false,"FlightTimeMin":785.7790705801389,"Origin":"Licenciado Benito Juarez International Airport","OriginLocation":{"lat":"19.4363","lon":"-99.072098"},"DestRegion":"SE-BD","OriginAirportID":"AICM","OriginRegion":"MX-DIF","DestCityName":"Xi'an","FlightTimeHour":13.096317843002314,"FlightDelayMin":0} diff --git a/integ-test/src/integration/resources/opensearch/dashboards_sample_data_flights_mappings.json b/integ-test/src/integration/resources/opensearch/dashboards_sample_data_flights_mappings.json new file mode 100644 index 000000000..727e0ca80 --- /dev/null +++ b/integ-test/src/integration/resources/opensearch/dashboards_sample_data_flights_mappings.json @@ -0,0 +1,85 @@ +{ + "properties": { + "AvgTicketPrice": { + "type": "float" + }, + "Cancelled": { + "type": "boolean" + }, + "Carrier": { + "type": "keyword" + }, + "Dest": { + "type": "keyword" + }, + "DestAirportID": { + "type": "keyword" + }, + "DestCityName": { + "type": "keyword" + }, + "DestCountry": { + "type": "keyword" + }, + "DestLocation": { + "type": "geo_point" + }, + "DestRegion": { + "type": "keyword" + }, + "DestWeather": { + "type": "keyword" + }, + "DistanceKilometers": { + "type": "float" + }, + "DistanceMiles": { + "type": "float" + }, + "FlightDelay": { + "type": "boolean" + }, + "FlightDelayMin": { + "type": "integer" + }, + "FlightDelayType": { + "type": "keyword" + }, + "FlightNum": { + "type": "keyword" + }, + "FlightTimeHour": { + "type": "keyword" + }, + "FlightTimeMin": { + "type": "float" + }, + "Origin": { + "type": "keyword" + }, + "OriginAirportID": { + "type": "keyword" + }, + "OriginCityName": { + "type": "keyword" + }, + "OriginCountry": { + "type": "keyword" + }, + "OriginLocation": { + "type": "geo_point" + }, + "OriginRegion": { + "type": "keyword" + }, + "OriginWeather": { + "type": "keyword" + }, + "dayOfWeek": { + "type": "integer" + }, + "timestamp": { + "type": "date" + } + } +} diff --git a/integ-test/src/integration/resources/opensearch/dashboards_sample_data_flights_tests.json b/integ-test/src/integration/resources/opensearch/dashboards_sample_data_flights_tests.json new file mode 100644 index 000000000..5330a4599 --- /dev/null +++ b/integ-test/src/integration/resources/opensearch/dashboards_sample_data_flights_tests.json @@ -0,0 +1,80 @@ +{ + "tests": [ + { "index": "flights" }, + { + "do": { + "sql": { + "query": "SELECT COUNT(*) as count FROM dev.default.flights WHERE FlightDelay = true" + }, + "match": [ "[1]" ] + } + }, + { + "do": { + "sql": { + "query": "SELECT FlightDelay, COUNT(*) as count FROM dev.default.flights GROUP BY FlightDelay" + }, + "match": [ "[false,4]", "[true,1]" ] + } + }, + { + "do": { + "sql": { + "query": "SELECT FLOOR(FlightDelayMin / 30) * 30 AS bucket, COUNT(*) AS doc_count FROM dev.default.flights GROUP BY bucket ORDER BY bucket" + }, + "match": [ "[0,4]", "[180,1]" ] + } + }, + { + "do": { + "sql": { + "query": "SELECT carrier, COUNT(*) as count FROM dev.default.flights GROUP BY Carrier ORDER BY count DESC" + }, + "match": [ "[OpenSearch Dashboards Airlines,3]", "[Logstash Airways,2]" ] + } + }, + { + "do": { + "sql": { + "query": "SELECT date_format(window.start, 'yyyy-MM-dd HH:mm:ss'),MAX(AvgTicketPrice) AS avg_ticket_price FROM dev.default.flights GROUP BY window(to_utc_timestamp(timestamp, 'UTC'),'1 day') ORDER BY window.start" + }, + "match": [ "[2025-01-27 00:00:00,882.98267]" ] + } + }, + { + "do": { + "sql": { + "query": "SELECT OriginCountry, DestCountry, cnt + FROM ( + SELECT + OriginCountry, + DestCountry, + cnt, + ROW_NUMBER() OVER (PARTITION BY OriginCountry ORDER BY cnt DESC) AS dest_rn + FROM ( + SELECT + OriginCountry, + DestCountry, + COUNT(*) AS cnt + FROM your_table + WHERE OriginCountry IN ( + SELECT OriginCountry + FROM ( + SELECT OriginCountry, COUNT(*) AS cnt + FROM your_table + GROUP BY OriginCountry + ORDER BY cnt DESC + LIMIT 5 + ) AS top_origins + ) + GROUP BY OriginCountry, DestCountry + ) AS grouped + ) AS ranked + WHERE dest_rn <= 5 + ORDER BY cnt DESC;" + }, + "match": [ "[2025-01-27 00:00:00,No Delay,2]", "[2025-01-27 08:00:00,Weather Delay,1]", "[2025-01-27 16:00:00,No Delay,2]" ] + } + } + ] +} diff --git a/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchCatalogSuite.scala b/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchCatalogSuite.scala index 832642088..042350638 100644 --- a/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchCatalogSuite.scala +++ b/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchCatalogSuite.scala @@ -21,5 +21,6 @@ trait OpenSearchCatalogSuite extends FlintSparkSuite { spark.conf.set( s"spark.sql.catalog.${catalogName}.opensearch.write.refresh_policy", "wait_for") + spark.conf.set("spark.sql.session.timeZone", "UTC") } } diff --git a/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchDashboardITSuite.scala b/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchDashboardITSuite.scala new file mode 100644 index 000000000..b5cfec9b4 --- /dev/null +++ b/integ-test/src/integration/scala/org/apache/spark/opensearch/table/OpenSearchDashboardITSuite.scala @@ -0,0 +1,89 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.opensearch.table + +import org.opensearch.flint.spark.ppl.FlintPPLSuite + +import org.apache.spark.sql.Row + +class OpenSearchDashboardITSuite extends OpenSearchCatalogSuite with FlintPPLSuite { + test("test dashboards queries") { + Seq(dashboards_sample_data_flights()).foreach { config => + withIndexName(config.index) { + openSearchDashboardsIndex(config.useCaseName, config.index) + config.tests.foreach { sqlTest => + sqlTest.queries.foreach { query => + val df = spark.sql(query) + withClue(s"Failed query: ${query}\n") { + checkAnswer(df, sqlTest.expected) + } + } + } + } + } + } + + case class QueryTest(queries: Seq[String], expected: Seq[Row]) + case class TestConfig(useCaseName: String, index: String, tests: Seq[QueryTest]) + + def dashboards_sample_data_flights(): TestConfig = { + val tbl = "flights" + TestConfig( + "dashboards_sample_data_flights", + tbl, + Seq( + // Airline Carrier + QueryTest( + Seq( + s"""SELECT carrier, COUNT(*) as count + FROM dev.default.$tbl + GROUP BY Carrier ORDER BY count DESC""", + s"""source=dev.default.$tbl + || stats count() as count by Carrier + || sort - count + || fields Carrier, count""".stripMargin), + Seq(Row("OpenSearch Dashboards Airlines", 3), Row("Logstash Airways", 2))), + // Average ticket price + QueryTest( + Seq( + s"""SELECT date_format(window.start, 'yyyy-MM-dd HH:mm:ss'), MAX(AvgTicketPrice) AS + |avg_ticket_price + | FROM dev.default.$tbl + | GROUP BY window(timestamp,'1 day') ORDER BY window.start + |""".stripMargin, + s"""source=dev.default.$tbl + || stats max(AvgTicketPrice) as avg by span(timestamp, 1d) as window + || sort window.start + || eval start = date_format(window.start, 'yyyy-MM-dd HH:mm:ss') + || fields start, avg""".stripMargin), + Seq(Row("2025-01-27 00:00:00", 882.98267f))), + // Total Flight Delays + QueryTest( + Seq( + s"SELECT COUNT(*) as count FROM dev.default.$tbl WHERE FlightDelay = true", + s"""source=dev.default.$tbl | where FlightDelay=True | stats count()"""), + Seq(Row(1))), + // Flight Delays + QueryTest( + Seq( + s"""SELECT FlightDelay, COUNT(*) as count FROM dev.default.$tbl GROUP BY FlightDelay""", + s"""source=dev.default.$tbl + || stats count() as cnt by FlightDelay + || fields FlightDelay, cnt""".stripMargin), + Seq(Row(false, 4), Row(true, 1))), + // Delay Buckets + QueryTest( + Seq( + s"SELECT FLOOR(FlightDelayMin / 30) * 30 AS bucket, COUNT(*) AS doc_count FROM dev" + + s".default.$tbl GROUP BY bucket ORDER BY bucket", + s"""source=dev.default.$tbl + || eval bucket=FLOOR(FlightDelayMin / 30) * 30 + || stats count() as cnt by bucket + || sort bucket + || fields bucket, cnt""".stripMargin), + Seq(Row(0, 4), Row(180, 1))))) + } +} diff --git a/integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala index 8899c629a..c803d9bc8 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala @@ -5,6 +5,8 @@ package org.opensearch.flint +import scala.io.Source + import org.apache.http.HttpHost import org.opensearch.action.admin.indices.delete.DeleteIndexRequest import org.opensearch.action.bulk.BulkRequest @@ -123,6 +125,16 @@ trait OpenSearchSuite extends BeforeAndAfterAll { index(indexName, oneNodeSetting, mappings, docs) } + def openSearchDashboardsIndex(useCaseName: String, indexName: String): Unit = { + val mappings = + Source + .fromResource(s"opensearch/${useCaseName}_mappings.json") + .mkString + val docs: Seq[String] = + Source.fromResource(s"opensearch/${useCaseName}.json").getLines().toSeq + index(indexName, oneNodeSetting, mappings, docs) + } + def indexWithAlias(indexName: String): Unit = { val mappings = """{ | "properties": {