Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OpenSearch Dashboards IT #1046

Merged
merged 2 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ object FlintDataType {

val METADATA_ALIAS_PATH_NAME = "aliasPath"

val UNSUPPORTED_OPENSEARCH_FIELD_TYPE = Set("geo_point")

/**
* parse Flint metadata and extract properties to StructType.
*/
Expand All @@ -49,25 +51,24 @@ object FlintDataType {
}
}

val fields: Seq[StructField] = normalProps.map { case (fieldName, fieldProperties) =>
deserializeField(fieldName, fieldProperties)
}.toSeq
val fields: Seq[StructField] = normalProps
.filter { case (_, fp) => isSupported(fp) }
.map { case (fieldName, fieldProperties) =>
deserializeField(fieldName, fieldProperties)
}
.toSeq

val normalFieldMap: Map[String, StructField] = fields.map(f => f.name -> f).toMap

val aliasFields: Seq[StructField] = aliasProps.map { case (fieldName, fieldProperties) =>
// Process alias fields: only include alias fields if the referenced field exists.
val aliasFields: Seq[StructField] = aliasProps.flatMap { case (fieldName, fieldProperties) =>
val aliasPath = (fieldProperties \ "path").extract[String]
if (!normalFieldMap.contains(aliasPath)) {
throw new IllegalStateException(
s"Alias field [$fieldName] references undefined field [$aliasPath]")
normalFieldMap.get(aliasPath).map { referencedField =>
val metadataBuilder = new MetadataBuilder()
metadataBuilder.putString(METADATA_ALIAS_PATH_NAME, aliasPath)
DataTypes
.createStructField(fieldName, referencedField.dataType, true, metadataBuilder.build())
}
val metadataBuilder = new MetadataBuilder()
metadataBuilder.putString(METADATA_ALIAS_PATH_NAME, aliasPath)
DataTypes.createStructField(
fieldName,
normalFieldMap(aliasPath).dataType,
true,
metadataBuilder.build())
}.toSeq

StructType(fields ++ aliasFields)
Expand Down Expand Up @@ -120,6 +121,13 @@ object FlintDataType {
DataTypes.createStructField(fieldName, dataType, true, metadataBuilder.build())
}

def isSupported(fieldProperties: JValue): Boolean = {
(fieldProperties \ "type") match {
case JString(fieldType) => !UNSUPPORTED_OPENSEARCH_FIELD_TYPE.contains(fieldType)
case _ => true
}
}

/**
* parse format in flint metadata
* @return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"FlightNum":"9HY9SWR","DestCountry":"AU","OriginWeather":"Sunny","OriginCityName":"Frankfurt am Main","AvgTicketPrice":841.2656419677076,"DistanceMiles":10247.856675613455,"FlightDelay":false,"DestWeather":"Rain","Dest":"Sydney Kingsford Smith International Airport","FlightDelayType":"No Delay","OriginCountry":"DE","dayOfWeek":0,"DistanceKilometers":16492.32665375846,"timestamp":"2025-01-27T00:00:00","DestLocation":{"lat":"-33.94609833","lon":"151.177002"},"DestAirportID":"SYD","Carrier":"OpenSearch Dashboards Airlines","Cancelled":false,"FlightTimeMin":1030.7704158599038,"Origin":"Frankfurt am Main Airport","OriginLocation":{"lat":"50.033333","lon":"8.570556"},"DestRegion":"SE-BD","OriginAirportID":"FRA","OriginRegion":"DE-HE","DestCityName":"Sydney","FlightTimeHour":17.179506930998397,"FlightDelayMin":0}
{"FlightNum":"X98CCZO","DestCountry":"IT","OriginWeather":"Clear","OriginCityName":"Cape Town","AvgTicketPrice":882.9826615595518,"DistanceMiles":5482.606664853586,"FlightDelay":false,"DestWeather":"Sunny","Dest":"Venice Marco Polo Airport","FlightDelayType":"No Delay","OriginCountry":"ZA","dayOfWeek":0,"DistanceKilometers":8823.40014044213,"timestamp":"2025-01-27T18:27:00","DestLocation":{"lat":"45.505299","lon":"12.3519"},"DestAirportID":"VE05","Carrier":"Logstash Airways","Cancelled":false,"FlightTimeMin":464.3894810759016,"Origin":"Cape Town International Airport","OriginLocation":{"lat":"-33.96480179","lon":"18.60169983"},"DestRegion":"IT-34","OriginAirportID":"CPT","OriginRegion":"SE-BD","DestCityName":"Venice","FlightTimeHour":7.73982468459836,"FlightDelayMin":0}
{"FlightNum":"UFK2WIZ","DestCountry":"IT","OriginWeather":"Rain","OriginCityName":"Venice","AvgTicketPrice":190.6369038508356,"DistanceMiles":0,"FlightDelay":false,"DestWeather":"Cloudy","Dest":"Venice Marco Polo Airport","FlightDelayType":"No Delay","OriginCountry":"IT","dayOfWeek":0,"DistanceKilometers":0,"timestamp":"2025-01-27T17:11:14","DestLocation":{"lat":"45.505299","lon":"12.3519"},"DestAirportID":"VE05","Carrier":"Logstash Airways","Cancelled":false,"FlightTimeMin":0,"Origin":"Venice Marco Polo Airport","OriginLocation":{"lat":"45.505299","lon":"12.3519"},"DestRegion":"IT-34","OriginAirportID":"VE05","OriginRegion":"IT-34","DestCityName":"Venice","FlightTimeHour":0,"FlightDelayMin":0}
{"FlightNum":"EAYQW69","DestCountry":"IT","OriginWeather":"Thunder & Lightning","OriginCityName":"Naples","AvgTicketPrice":181.69421554118,"DistanceMiles":345.31943877289535,"FlightDelay":true,"DestWeather":"Clear","Dest":"Treviso-Sant'Angelo Airport","FlightDelayType":"Weather Delay","OriginCountry":"IT","dayOfWeek":0,"DistanceKilometers":555.7377668725265,"timestamp":"2025-01-27T10:33:28","DestLocation":{"lat":"45.648399","lon":"12.1944"},"DestAirportID":"TV01","Carrier":"OpenSearch Dashboards Airlines","Cancelled":true,"FlightTimeMin":222.74905899019436,"Origin":"Naples International Airport","OriginLocation":{"lat":"40.886002","lon":"14.2908"},"DestRegion":"IT-34","OriginAirportID":"NA01","OriginRegion":"IT-72","DestCityName":"Treviso","FlightTimeHour":3.712484316503239,"FlightDelayMin":180}
{"FlightNum":"58U013N","DestCountry":"CN","OriginWeather":"Damaging Wind","OriginCityName":"Mexico City","AvgTicketPrice":730.041778346198,"DistanceMiles":8300.428124665925,"FlightDelay":false,"DestWeather":"Clear","Dest":"Xi'an Xianyang International Airport","FlightDelayType":"No Delay","OriginCountry":"MX","dayOfWeek":0,"DistanceKilometers":13358.24419986236,"timestamp":"2025-01-27T05:13:00","DestLocation":{"lat":"34.447102","lon":"108.751999"},"DestAirportID":"XIY","Carrier":"OpenSearch Dashboards Airlines","Cancelled":false,"FlightTimeMin":785.7790705801389,"Origin":"Licenciado Benito Juarez International Airport","OriginLocation":{"lat":"19.4363","lon":"-99.072098"},"DestRegion":"SE-BD","OriginAirportID":"AICM","OriginRegion":"MX-DIF","DestCityName":"Xi'an","FlightTimeHour":13.096317843002314,"FlightDelayMin":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
{
"properties": {
"AvgTicketPrice": {
"type": "float"
},
"Cancelled": {
"type": "boolean"
},
"Carrier": {
"type": "keyword"
},
"Dest": {
"type": "keyword"
},
"DestAirportID": {
"type": "keyword"
},
"DestCityName": {
"type": "keyword"
},
"DestCountry": {
"type": "keyword"
},
"DestLocation": {
"type": "geo_point"
},
"DestRegion": {
"type": "keyword"
},
"DestWeather": {
"type": "keyword"
},
"DistanceKilometers": {
"type": "float"
},
"DistanceMiles": {
"type": "float"
},
"FlightDelay": {
"type": "boolean"
},
"FlightDelayMin": {
"type": "integer"
},
"FlightDelayType": {
"type": "keyword"
},
"FlightNum": {
"type": "keyword"
},
"FlightTimeHour": {
"type": "keyword"
},
"FlightTimeMin": {
"type": "float"
},
"Origin": {
"type": "keyword"
},
"OriginAirportID": {
"type": "keyword"
},
"OriginCityName": {
"type": "keyword"
},
"OriginCountry": {
"type": "keyword"
},
"OriginLocation": {
"type": "geo_point"
},
"OriginRegion": {
"type": "keyword"
},
"OriginWeather": {
"type": "keyword"
},
"dayOfWeek": {
"type": "integer"
},
"timestamp": {
"type": "date"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
{
"tests": [
{ "index": "flights" },
{
"do": {
"sql": {
"query": "SELECT COUNT(*) as count FROM dev.default.flights WHERE FlightDelay = true"
},
"match": [ "[1]" ]
}
},
{
"do": {
"sql": {
"query": "SELECT FlightDelay, COUNT(*) as count FROM dev.default.flights GROUP BY FlightDelay"
},
"match": [ "[false,4]", "[true,1]" ]
}
},
{
"do": {
"sql": {
"query": "SELECT FLOOR(FlightDelayMin / 30) * 30 AS bucket, COUNT(*) AS doc_count FROM dev.default.flights GROUP BY bucket ORDER BY bucket"
},
"match": [ "[0,4]", "[180,1]" ]
}
},
{
"do": {
"sql": {
"query": "SELECT carrier, COUNT(*) as count FROM dev.default.flights GROUP BY Carrier ORDER BY count DESC"
},
"match": [ "[OpenSearch Dashboards Airlines,3]", "[Logstash Airways,2]" ]
}
},
{
"do": {
"sql": {
"query": "SELECT date_format(window.start, 'yyyy-MM-dd HH:mm:ss'),MAX(AvgTicketPrice) AS avg_ticket_price FROM dev.default.flights GROUP BY window(to_utc_timestamp(timestamp, 'UTC'),'1 day') ORDER BY window.start"
},
"match": [ "[2025-01-27 00:00:00,882.98267]" ]
}
},
{
"do": {
"sql": {
"query": "SELECT OriginCountry, DestCountry, cnt
FROM (
SELECT
OriginCountry,
DestCountry,
cnt,
ROW_NUMBER() OVER (PARTITION BY OriginCountry ORDER BY cnt DESC) AS dest_rn
FROM (
SELECT
OriginCountry,
DestCountry,
COUNT(*) AS cnt
FROM your_table
WHERE OriginCountry IN (
SELECT OriginCountry
FROM (
SELECT OriginCountry, COUNT(*) AS cnt
FROM your_table
GROUP BY OriginCountry
ORDER BY cnt DESC
LIMIT 5
) AS top_origins
)
GROUP BY OriginCountry, DestCountry
) AS grouped
) AS ranked
WHERE dest_rn <= 5
ORDER BY cnt DESC;"
},
"match": [ "[2025-01-27 00:00:00,No Delay,2]", "[2025-01-27 08:00:00,Weather Delay,1]", "[2025-01-27 16:00:00,No Delay,2]" ]
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ trait OpenSearchCatalogSuite extends FlintSparkSuite {
spark.conf.set(
s"spark.sql.catalog.${catalogName}.opensearch.write.refresh_policy",
"wait_for")
spark.conf.set("spark.sql.session.timeZone", "UTC")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.apache.spark.opensearch.table

import org.opensearch.flint.spark.ppl.FlintPPLSuite

import org.apache.spark.sql.Row

class OpenSearchDashboardITSuite extends OpenSearchCatalogSuite with FlintPPLSuite {
test("test dashboards queries") {
Seq(dashboards_sample_data_flights()).foreach { config =>
withIndexName(config.index) {
openSearchDashboardsIndex(config.useCaseName, config.index)
config.tests.foreach { sqlTest =>
sqlTest.queries.foreach { query =>
val df = spark.sql(query)
withClue(s"Failed query: ${query}\n") {
checkAnswer(df, sqlTest.expected)
}
}
}
}
}
}

case class QueryTest(queries: Seq[String], expected: Seq[Row])
case class TestConfig(useCaseName: String, index: String, tests: Seq[QueryTest])

def dashboards_sample_data_flights(): TestConfig = {
val tbl = "flights"
TestConfig(
"dashboards_sample_data_flights",
tbl,
Seq(
// Airline Carrier
QueryTest(
Seq(
s"""SELECT carrier, COUNT(*) as count
FROM dev.default.$tbl
GROUP BY Carrier ORDER BY count DESC""",
s"""source=dev.default.$tbl
|| stats count() as count by Carrier
|| sort - count
|| fields Carrier, count""".stripMargin),
Seq(Row("OpenSearch Dashboards Airlines", 3), Row("Logstash Airways", 2))),
// Average ticket price
QueryTest(
Seq(
s"""SELECT date_format(window.start, 'yyyy-MM-dd HH:mm:ss'), MAX(AvgTicketPrice) AS
|avg_ticket_price
| FROM dev.default.$tbl
| GROUP BY window(timestamp,'1 day') ORDER BY window.start
|""".stripMargin,
s"""source=dev.default.$tbl
|| stats max(AvgTicketPrice) as avg by span(timestamp, 1d) as window
|| sort window.start
|| eval start = date_format(window.start, 'yyyy-MM-dd HH:mm:ss')
|| fields start, avg""".stripMargin),
Seq(Row("2025-01-27 00:00:00", 882.98267f))),
// Total Flight Delays
QueryTest(
Seq(
s"SELECT COUNT(*) as count FROM dev.default.$tbl WHERE FlightDelay = true",
s"""source=dev.default.$tbl | where FlightDelay=True | stats count()"""),
Seq(Row(1))),
// Flight Delays
QueryTest(
Seq(
s"""SELECT FlightDelay, COUNT(*) as count FROM dev.default.$tbl GROUP BY FlightDelay""",
s"""source=dev.default.$tbl
|| stats count() as cnt by FlightDelay
|| fields FlightDelay, cnt""".stripMargin),
Seq(Row(false, 4), Row(true, 1))),
// Delay Buckets
QueryTest(
Seq(
s"SELECT FLOOR(FlightDelayMin / 30) * 30 AS bucket, COUNT(*) AS doc_count FROM dev" +
s".default.$tbl GROUP BY bucket ORDER BY bucket",
s"""source=dev.default.$tbl
|| eval bucket=FLOOR(FlightDelayMin / 30) * 30
|| stats count() as cnt by bucket
|| sort bucket
|| fields bucket, cnt""".stripMargin),
Seq(Row(0, 4), Row(180, 1)))))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint

import scala.io.Source

import org.apache.http.HttpHost
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.bulk.BulkRequest
Expand Down Expand Up @@ -123,6 +125,16 @@ trait OpenSearchSuite extends BeforeAndAfterAll {
index(indexName, oneNodeSetting, mappings, docs)
}

def openSearchDashboardsIndex(useCaseName: String, indexName: String): Unit = {
val mappings =
Source
.fromResource(s"opensearch/${useCaseName}_mappings.json")
.mkString
val docs: Seq[String] =
Source.fromResource(s"opensearch/${useCaseName}.json").getLines().toSeq
index(indexName, oneNodeSetting, mappings, docs)
}

def indexWithAlias(indexName: String): Unit = {
val mappings = """{
| "properties": {
Expand Down
Loading