Skip to content

Commit

Permalink
Change delete index API to logical delete (#191)
Browse files Browse the repository at this point in the history
* Change deleteIndex to logical delete

Signed-off-by: Chen Dai <[email protected]>

* Fix SQL IT failures

Signed-off-by: Chen Dai <[email protected]>

* Update user manual

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Jan 19, 2024
1 parent dae36ec commit fa0becd
Show file tree
Hide file tree
Showing 15 changed files with 74 additions and 89 deletions.
19 changes: 19 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ High level API is dependent on query engine implementation. Please see Query Eng

### SQL

- **CREATE:** Create a Flint index with the specified indexing logic. If the auto-refresh option is enabled, a background job will continually update the index with the latest data from the source.
- **REFRESH:** Manually refresh a Flint index. This command is applicable only to indexes with the auto-refresh option disabled.
- **SHOW:** Display all Flint indexes in the specified catalog or source table.
- **DESCRIBE:** Retrieve detailed information about a Flint index.
- **DROP:** Delete a Flint index logically. This action stops the refreshing process and rules it out in query rewrite.
- **VACUUM:** Physically remove all data associated with a Flint index, including index metadata and data. This operation effectively cleans up and frees resources.

#### Skipping Index

The default maximum size for the value set is 100. In cases where a file contains columns with high cardinality values, the value set will become null. This is the trade-off that prevents excessive memory consumption at the cost of not skipping the file.
Expand All @@ -139,6 +146,8 @@ REFRESH SKIPPING INDEX ON <object>

DROP SKIPPING INDEX ON <object>

VACUUM SKIPPING INDEX ON <object>

<object> ::= [db_name].[schema_name].table_name
```

Expand Down Expand Up @@ -167,6 +176,8 @@ REFRESH SKIPPING INDEX ON alb_logs
DESCRIBE SKIPPING INDEX ON alb_logs

DROP SKIPPING INDEX ON alb_logs

VACUUM SKIPPING INDEX ON alb_logs
```

#### Covering Index
Expand All @@ -184,6 +195,8 @@ SHOW [INDEX|INDEXES] ON <object>
[DESC|DESCRIBE] INDEX name ON <object>

DROP INDEX name ON <object>

VACUUM INDEX name ON <object>
```

Example:
Expand All @@ -199,6 +212,8 @@ SHOW INDEX ON alb_logs
DESCRIBE INDEX elb_and_requestUri ON alb_logs

DROP INDEX elb_and_requestUri ON alb_logs

VACUUM INDEX elb_and_requestUri ON alb_logs
```

#### Materialized View
Expand All @@ -215,6 +230,8 @@ SHOW MATERIALIZED [VIEW|VIEWS] IN catalog[.database]
[DESC|DESCRIBE] MATERIALIZED VIEW name

DROP MATERIALIZED VIEW name

VACUUM MATERIALIZED VIEW name
```

Example:
Expand All @@ -235,6 +252,8 @@ SHOW MATERIALIZED VIEWS IN spark_catalog.default
DESC MATERIALIZED VIEW alb_logs_metrics

DROP MATERIALIZED VIEW alb_logs_metrics

VACUUM MATERIALIZED VIEW alb_logs_metrics
```

#### Create Index Options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ materializedViewStatement
| showMaterializedViewStatement
| describeMaterializedViewStatement
| dropMaterializedViewStatement
| vacuumMaterializedViewStatement
;

createMaterializedViewStatement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ class FlintSpark(val spark: SparkSession) extends Logging {
// TODO: share same transaction for now
flintIndexMonitor.stopMonitor(indexName)
stopRefreshingJob(indexName)
flintClient.deleteIndex(indexName)
true
})
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
super.afterEach()

// Delete all test indices
flint.deleteIndex(testFlintIndex)
deleteTestIndex(testFlintIndex)
}

test("create covering index with metadata successfully") {
Expand Down Expand Up @@ -126,6 +126,6 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
.onTable(testTable)
.addIndexColumns("address")
.create()
flint.deleteIndex(getFlintIndexName(newIndex, testTable))
deleteTestIndex(getFlintIndexName(newIndex, testTable))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.must.Matchers.{defined, have}
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}

import org.apache.spark.sql.Row
Expand All @@ -38,7 +38,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
super.afterEach()

// Delete all test indices
flint.deleteIndex(testFlintIndex)
deleteTestIndex(testFlintIndex)
}

test("create covering index with auto refresh") {
Expand Down Expand Up @@ -252,8 +252,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
val result = sql(s"SHOW INDEX ON $testTable")
checkAnswer(result, Seq(Row(testIndex), Row("idx_address")))

flint.deleteIndex(getFlintIndexName("idx_address", testTable))
flint.deleteIndex(getSkippingIndexName(testTable))
deleteTestIndex(getFlintIndexName("idx_address", testTable), getSkippingIndexName(testTable))
}

test("describe covering index") {
Expand All @@ -268,7 +267,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
checkAnswer(result, Seq(Row("name", "string", "indexed"), Row("age", "int", "indexed")))
}

test("drop covering index") {
test("drop and vacuum covering index") {
flint
.coveringIndex()
.name(testIndex)
Expand All @@ -277,7 +276,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
.create()

sql(s"DROP INDEX $testIndex ON $testTable")

sql(s"VACUUM INDEX $testIndex ON $testTable")
flint.describeIndex(testFlintIndex) shouldBe empty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,7 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers
}

override def afterEach(): Unit = {

/**
* Todo, if state is not valid, will throw IllegalStateException. Should check flint
* .isRefresh before cleanup resource. Current solution, (1) try to delete flint index, (2) if
* failed, delete index itself.
*/
try {
flint.deleteIndex(testIndex)
} catch {
case _: IllegalStateException => deleteIndex(testIndex)
}
deleteTestIndex(testIndex)
super.afterEach()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite with Matchers {
try {
test(new AssertionHelper(flintIndexName, checkpointDir))
} finally {
flint.deleteIndex(flintIndexName)
deleteTestIndex(flintIndexName)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
FlintSparkIndexMonitor.indexMonitorTracker.clear()

try {
flint.deleteIndex(testFlintIndex)
} catch {
// Index maybe end up with failed state in some test
case _: IllegalStateException =>
openSearchClient
.indices()
.delete(new DeleteIndexRequest(testFlintIndex), RequestOptions.DEFAULT)
deleteTestIndex(testFlintIndex)
} finally {
super.afterEach()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class FlintSparkIndexNameITSuite extends FlintSparkSuite {
indexData should have size 1

sql(s"DROP SKIPPING INDEX ON $testTable")
sql(s"VACUUM SKIPPING INDEX ON $testTable")
flint.describeIndex(flintIndexName) shouldBe empty
}

Expand All @@ -76,6 +77,7 @@ class FlintSparkIndexNameITSuite extends FlintSparkSuite {
indexData should have size 1

sql(s"DROP INDEX $testIndex ON $testTable")
sql(s"VACUUM INDEX $testIndex ON $testTable")
flint.describeIndex(flintIndexName) shouldBe empty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {

override def afterEach(): Unit = {
super.afterEach()
flint.deleteIndex(testFlintIndex)
deleteTestIndex(testFlintIndex)
}

test("create materialized view with metadata successfully") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {

override def afterEach(): Unit = {
super.afterEach()
flint.deleteIndex(testFlintIndex)
deleteTestIndex(testFlintIndex)
}

test("create materialized view with auto refresh") {
Expand Down Expand Up @@ -255,15 +255,15 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
checkAnswer(sql("DESC MATERIALIZED VIEW nonexistent_mv"), Seq())
}

test("drop materialized view") {
test("drop and vacuum materialized view") {
flint
.materializedView()
.name(testMvName)
.query(testQuery)
.create()

sql(s"DROP MATERIALIZED VIEW $testMvName")

sql(s"VACUUM MATERIALIZED VIEW $testMvName")
flint.describeIndex(testFlintIndex) shouldBe empty
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
super.afterEach()

// Delete all test indices
flint.deleteIndex(testIndex)
deleteTestIndex(testIndex)
}

test("create skipping index with metadata successfully") {
Expand Down Expand Up @@ -615,7 +615,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
| }
|""".stripMargin)

flint.deleteIndex(testIndex)
deleteTestIndex(testIndex)
}

test("can build skipping index for varchar and char and rewrite applicable query") {
Expand Down Expand Up @@ -659,7 +659,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
hasIndexFilter((isnull(col("varchar_col")) || col("varchar_col") === "sample varchar") &&
(isnull(col("char_col")) || col("char_col") === paddedChar)))

flint.deleteIndex(testIndex)
deleteTestIndex(testIndex)
}

// Custom matcher to check if a SparkPlan uses FlintSparkSkippingFileIndex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
protected override def afterEach(): Unit = {
super.afterEach()

flint.deleteIndex(testIndex)
deleteTestIndex(testIndex)
}

test("create skipping index with auto refresh") {
Expand Down Expand Up @@ -266,15 +266,15 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
checkAnswer(result, Seq.empty)
}

test("drop skipping index") {
test("drop and vacuum skipping index") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year")
.create()

sql(s"DROP SKIPPING INDEX ON $testTable")

sql(s"VACUUM SKIPPING INDEX ON $testTable")
flint.describeIndex(testIndex) shouldBe empty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import scala.concurrent.duration.TimeUnit
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.when
import org.mockito.invocation.InvocationOnMock
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.GetIndexRequest
import org.opensearch.flint.OpenSearchSuite
import org.scalatestplus.mockito.MockitoSugar.mock

Expand Down Expand Up @@ -46,6 +49,29 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
FlintSparkIndexMonitor.executor = mockExecutor
}

protected def deleteTestIndex(testIndexNames: String*): Unit = {
testIndexNames.foreach(testIndex => {
/**
* Todo, if state is not valid, will throw IllegalStateException. Should check flint
* .isRefresh before cleanup resource. Current solution, (1) try to delete flint index, (2)
* if failed, delete index itself.
*/
try {
flint.deleteIndex(testIndex)
flint.vacuumIndex(testIndex)
} catch {
case _: IllegalStateException =>
if (openSearchClient
.indices()
.exists(new GetIndexRequest(testIndex), RequestOptions.DEFAULT)) {
openSearchClient
.indices()
.delete(new DeleteIndexRequest(testIndex), RequestOptions.DEFAULT)
}
}
})
}

protected def awaitStreamingComplete(jobId: String): Unit = {
val job = spark.streams.get(jobId)
failAfter(streamingTimeout) {
Expand Down
Loading

0 comments on commit fa0becd

Please sign in to comment.