Skip to content

Commit

Permalink
Add OpenSearch Dashboards IT (#1046)
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo authored Feb 14, 2025
1 parent c6d8793 commit 05dbf33
Show file tree
Hide file tree
Showing 7 changed files with 294 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ object FlintDataType {

val METADATA_ALIAS_PATH_NAME = "aliasPath"

val UNSUPPORTED_OPENSEARCH_FIELD_TYPE = Set("geo_point")

/**
* parse Flint metadata and extract properties to StructType.
*/
Expand All @@ -49,25 +51,24 @@ object FlintDataType {
}
}

val fields: Seq[StructField] = normalProps.map { case (fieldName, fieldProperties) =>
deserializeField(fieldName, fieldProperties)
}.toSeq
val fields: Seq[StructField] = normalProps
.filter { case (_, fp) => isSupported(fp) }
.map { case (fieldName, fieldProperties) =>
deserializeField(fieldName, fieldProperties)
}
.toSeq

val normalFieldMap: Map[String, StructField] = fields.map(f => f.name -> f).toMap

val aliasFields: Seq[StructField] = aliasProps.map { case (fieldName, fieldProperties) =>
// Process alias fields: only include alias fields if the referenced field exists.
val aliasFields: Seq[StructField] = aliasProps.flatMap { case (fieldName, fieldProperties) =>
val aliasPath = (fieldProperties \ "path").extract[String]
if (!normalFieldMap.contains(aliasPath)) {
throw new IllegalStateException(
s"Alias field [$fieldName] references undefined field [$aliasPath]")
normalFieldMap.get(aliasPath).map { referencedField =>
val metadataBuilder = new MetadataBuilder()
metadataBuilder.putString(METADATA_ALIAS_PATH_NAME, aliasPath)
DataTypes
.createStructField(fieldName, referencedField.dataType, true, metadataBuilder.build())
}
val metadataBuilder = new MetadataBuilder()
metadataBuilder.putString(METADATA_ALIAS_PATH_NAME, aliasPath)
DataTypes.createStructField(
fieldName,
normalFieldMap(aliasPath).dataType,
true,
metadataBuilder.build())
}.toSeq

StructType(fields ++ aliasFields)
Expand Down Expand Up @@ -120,6 +121,13 @@ object FlintDataType {
DataTypes.createStructField(fieldName, dataType, true, metadataBuilder.build())
}

def isSupported(fieldProperties: JValue): Boolean = {
(fieldProperties \ "type") match {
case JString(fieldType) => !UNSUPPORTED_OPENSEARCH_FIELD_TYPE.contains(fieldType)
case _ => true
}
}

/**
* parse format in flint metadata
* @return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"FlightNum":"9HY9SWR","DestCountry":"AU","OriginWeather":"Sunny","OriginCityName":"Frankfurt am Main","AvgTicketPrice":841.2656419677076,"DistanceMiles":10247.856675613455,"FlightDelay":false,"DestWeather":"Rain","Dest":"Sydney Kingsford Smith International Airport","FlightDelayType":"No Delay","OriginCountry":"DE","dayOfWeek":0,"DistanceKilometers":16492.32665375846,"timestamp":"2025-01-27T00:00:00","DestLocation":{"lat":"-33.94609833","lon":"151.177002"},"DestAirportID":"SYD","Carrier":"OpenSearch Dashboards Airlines","Cancelled":false,"FlightTimeMin":1030.7704158599038,"Origin":"Frankfurt am Main Airport","OriginLocation":{"lat":"50.033333","lon":"8.570556"},"DestRegion":"SE-BD","OriginAirportID":"FRA","OriginRegion":"DE-HE","DestCityName":"Sydney","FlightTimeHour":17.179506930998397,"FlightDelayMin":0}
{"FlightNum":"X98CCZO","DestCountry":"IT","OriginWeather":"Clear","OriginCityName":"Cape Town","AvgTicketPrice":882.9826615595518,"DistanceMiles":5482.606664853586,"FlightDelay":false,"DestWeather":"Sunny","Dest":"Venice Marco Polo Airport","FlightDelayType":"No Delay","OriginCountry":"ZA","dayOfWeek":0,"DistanceKilometers":8823.40014044213,"timestamp":"2025-01-27T18:27:00","DestLocation":{"lat":"45.505299","lon":"12.3519"},"DestAirportID":"VE05","Carrier":"Logstash Airways","Cancelled":false,"FlightTimeMin":464.3894810759016,"Origin":"Cape Town International Airport","OriginLocation":{"lat":"-33.96480179","lon":"18.60169983"},"DestRegion":"IT-34","OriginAirportID":"CPT","OriginRegion":"SE-BD","DestCityName":"Venice","FlightTimeHour":7.73982468459836,"FlightDelayMin":0}
{"FlightNum":"UFK2WIZ","DestCountry":"IT","OriginWeather":"Rain","OriginCityName":"Venice","AvgTicketPrice":190.6369038508356,"DistanceMiles":0,"FlightDelay":false,"DestWeather":"Cloudy","Dest":"Venice Marco Polo Airport","FlightDelayType":"No Delay","OriginCountry":"IT","dayOfWeek":0,"DistanceKilometers":0,"timestamp":"2025-01-27T17:11:14","DestLocation":{"lat":"45.505299","lon":"12.3519"},"DestAirportID":"VE05","Carrier":"Logstash Airways","Cancelled":false,"FlightTimeMin":0,"Origin":"Venice Marco Polo Airport","OriginLocation":{"lat":"45.505299","lon":"12.3519"},"DestRegion":"IT-34","OriginAirportID":"VE05","OriginRegion":"IT-34","DestCityName":"Venice","FlightTimeHour":0,"FlightDelayMin":0}
{"FlightNum":"EAYQW69","DestCountry":"IT","OriginWeather":"Thunder & Lightning","OriginCityName":"Naples","AvgTicketPrice":181.69421554118,"DistanceMiles":345.31943877289535,"FlightDelay":true,"DestWeather":"Clear","Dest":"Treviso-Sant'Angelo Airport","FlightDelayType":"Weather Delay","OriginCountry":"IT","dayOfWeek":0,"DistanceKilometers":555.7377668725265,"timestamp":"2025-01-27T10:33:28","DestLocation":{"lat":"45.648399","lon":"12.1944"},"DestAirportID":"TV01","Carrier":"OpenSearch Dashboards Airlines","Cancelled":true,"FlightTimeMin":222.74905899019436,"Origin":"Naples International Airport","OriginLocation":{"lat":"40.886002","lon":"14.2908"},"DestRegion":"IT-34","OriginAirportID":"NA01","OriginRegion":"IT-72","DestCityName":"Treviso","FlightTimeHour":3.712484316503239,"FlightDelayMin":180}
{"FlightNum":"58U013N","DestCountry":"CN","OriginWeather":"Damaging Wind","OriginCityName":"Mexico City","AvgTicketPrice":730.041778346198,"DistanceMiles":8300.428124665925,"FlightDelay":false,"DestWeather":"Clear","Dest":"Xi'an Xianyang International Airport","FlightDelayType":"No Delay","OriginCountry":"MX","dayOfWeek":0,"DistanceKilometers":13358.24419986236,"timestamp":"2025-01-27T05:13:00","DestLocation":{"lat":"34.447102","lon":"108.751999"},"DestAirportID":"XIY","Carrier":"OpenSearch Dashboards Airlines","Cancelled":false,"FlightTimeMin":785.7790705801389,"Origin":"Licenciado Benito Juarez International Airport","OriginLocation":{"lat":"19.4363","lon":"-99.072098"},"DestRegion":"SE-BD","OriginAirportID":"AICM","OriginRegion":"MX-DIF","DestCityName":"Xi'an","FlightTimeHour":13.096317843002314,"FlightDelayMin":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
{
"properties": {
"AvgTicketPrice": {
"type": "float"
},
"Cancelled": {
"type": "boolean"
},
"Carrier": {
"type": "keyword"
},
"Dest": {
"type": "keyword"
},
"DestAirportID": {
"type": "keyword"
},
"DestCityName": {
"type": "keyword"
},
"DestCountry": {
"type": "keyword"
},
"DestLocation": {
"type": "geo_point"
},
"DestRegion": {
"type": "keyword"
},
"DestWeather": {
"type": "keyword"
},
"DistanceKilometers": {
"type": "float"
},
"DistanceMiles": {
"type": "float"
},
"FlightDelay": {
"type": "boolean"
},
"FlightDelayMin": {
"type": "integer"
},
"FlightDelayType": {
"type": "keyword"
},
"FlightNum": {
"type": "keyword"
},
"FlightTimeHour": {
"type": "keyword"
},
"FlightTimeMin": {
"type": "float"
},
"Origin": {
"type": "keyword"
},
"OriginAirportID": {
"type": "keyword"
},
"OriginCityName": {
"type": "keyword"
},
"OriginCountry": {
"type": "keyword"
},
"OriginLocation": {
"type": "geo_point"
},
"OriginRegion": {
"type": "keyword"
},
"OriginWeather": {
"type": "keyword"
},
"dayOfWeek": {
"type": "integer"
},
"timestamp": {
"type": "date"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
{
"tests": [
{ "index": "flights" },
{
"do": {
"sql": {
"query": "SELECT COUNT(*) as count FROM dev.default.flights WHERE FlightDelay = true"
},
"match": [ "[1]" ]
}
},
{
"do": {
"sql": {
"query": "SELECT FlightDelay, COUNT(*) as count FROM dev.default.flights GROUP BY FlightDelay"
},
"match": [ "[false,4]", "[true,1]" ]
}
},
{
"do": {
"sql": {
"query": "SELECT FLOOR(FlightDelayMin / 30) * 30 AS bucket, COUNT(*) AS doc_count FROM dev.default.flights GROUP BY bucket ORDER BY bucket"
},
"match": [ "[0,4]", "[180,1]" ]
}
},
{
"do": {
"sql": {
"query": "SELECT carrier, COUNT(*) as count FROM dev.default.flights GROUP BY Carrier ORDER BY count DESC"
},
"match": [ "[OpenSearch Dashboards Airlines,3]", "[Logstash Airways,2]" ]
}
},
{
"do": {
"sql": {
"query": "SELECT date_format(window.start, 'yyyy-MM-dd HH:mm:ss'),MAX(AvgTicketPrice) AS avg_ticket_price FROM dev.default.flights GROUP BY window(to_utc_timestamp(timestamp, 'UTC'),'1 day') ORDER BY window.start"
},
"match": [ "[2025-01-27 00:00:00,882.98267]" ]
}
},
{
"do": {
"sql": {
"query": "SELECT OriginCountry, DestCountry, cnt
FROM (
SELECT
OriginCountry,
DestCountry,
cnt,
ROW_NUMBER() OVER (PARTITION BY OriginCountry ORDER BY cnt DESC) AS dest_rn
FROM (
SELECT
OriginCountry,
DestCountry,
COUNT(*) AS cnt
FROM your_table
WHERE OriginCountry IN (
SELECT OriginCountry
FROM (
SELECT OriginCountry, COUNT(*) AS cnt
FROM your_table
GROUP BY OriginCountry
ORDER BY cnt DESC
LIMIT 5
) AS top_origins
)
GROUP BY OriginCountry, DestCountry
) AS grouped
) AS ranked
WHERE dest_rn <= 5
ORDER BY cnt DESC;"
},
"match": [ "[2025-01-27 00:00:00,No Delay,2]", "[2025-01-27 08:00:00,Weather Delay,1]", "[2025-01-27 16:00:00,No Delay,2]" ]
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ trait OpenSearchCatalogSuite extends FlintSparkSuite {
spark.conf.set(
s"spark.sql.catalog.${catalogName}.opensearch.write.refresh_policy",
"wait_for")
spark.conf.set("spark.sql.session.timeZone", "UTC")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.apache.spark.opensearch.table

import org.opensearch.flint.spark.ppl.FlintPPLSuite

import org.apache.spark.sql.Row

class OpenSearchDashboardITSuite extends OpenSearchCatalogSuite with FlintPPLSuite {
test("test dashboards queries") {
Seq(dashboards_sample_data_flights()).foreach { config =>
withIndexName(config.index) {
openSearchDashboardsIndex(config.useCaseName, config.index)
config.tests.foreach { sqlTest =>
sqlTest.queries.foreach { query =>
val df = spark.sql(query)
withClue(s"Failed query: ${query}\n") {
checkAnswer(df, sqlTest.expected)
}
}
}
}
}
}

case class QueryTest(queries: Seq[String], expected: Seq[Row])
case class TestConfig(useCaseName: String, index: String, tests: Seq[QueryTest])

def dashboards_sample_data_flights(): TestConfig = {
val tbl = "flights"
TestConfig(
"dashboards_sample_data_flights",
tbl,
Seq(
// Airline Carrier
QueryTest(
Seq(
s"""SELECT carrier, COUNT(*) as count
FROM dev.default.$tbl
GROUP BY Carrier ORDER BY count DESC""",
s"""source=dev.default.$tbl
|| stats count() as count by Carrier
|| sort - count
|| fields Carrier, count""".stripMargin),
Seq(Row("OpenSearch Dashboards Airlines", 3), Row("Logstash Airways", 2))),
// Average ticket price
QueryTest(
Seq(
s"""SELECT date_format(window.start, 'yyyy-MM-dd HH:mm:ss'), MAX(AvgTicketPrice) AS
|avg_ticket_price
| FROM dev.default.$tbl
| GROUP BY window(timestamp,'1 day') ORDER BY window.start
|""".stripMargin,
s"""source=dev.default.$tbl
|| stats max(AvgTicketPrice) as avg by span(timestamp, 1d) as window
|| sort window.start
|| eval start = date_format(window.start, 'yyyy-MM-dd HH:mm:ss')
|| fields start, avg""".stripMargin),
Seq(Row("2025-01-27 00:00:00", 882.98267f))),
// Total Flight Delays
QueryTest(
Seq(
s"SELECT COUNT(*) as count FROM dev.default.$tbl WHERE FlightDelay = true",
s"""source=dev.default.$tbl | where FlightDelay=True | stats count()"""),
Seq(Row(1))),
// Flight Delays
QueryTest(
Seq(
s"""SELECT FlightDelay, COUNT(*) as count FROM dev.default.$tbl GROUP BY FlightDelay""",
s"""source=dev.default.$tbl
|| stats count() as cnt by FlightDelay
|| fields FlightDelay, cnt""".stripMargin),
Seq(Row(false, 4), Row(true, 1))),
// Delay Buckets
QueryTest(
Seq(
s"SELECT FLOOR(FlightDelayMin / 30) * 30 AS bucket, COUNT(*) AS doc_count FROM dev" +
s".default.$tbl GROUP BY bucket ORDER BY bucket",
s"""source=dev.default.$tbl
|| eval bucket=FLOOR(FlightDelayMin / 30) * 30
|| stats count() as cnt by bucket
|| sort bucket
|| fields bucket, cnt""".stripMargin),
Seq(Row(0, 4), Row(180, 1)))))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint

import scala.io.Source

import org.apache.http.HttpHost
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.bulk.BulkRequest
Expand Down Expand Up @@ -123,6 +125,16 @@ trait OpenSearchSuite extends BeforeAndAfterAll {
index(indexName, oneNodeSetting, mappings, docs)
}

def openSearchDashboardsIndex(useCaseName: String, indexName: String): Unit = {
val mappings =
Source
.fromResource(s"opensearch/${useCaseName}_mappings.json")
.mkString
val docs: Seq[String] =
Source.fromResource(s"opensearch/${useCaseName}.json").getLines().toSeq
index(indexName, oneNodeSetting, mappings, docs)
}

def indexWithAlias(indexName: String): Unit = {
val mappings = """{
| "properties": {
Expand Down

0 comments on commit 05dbf33

Please sign in to comment.