Skip to content

Commit

Permalink
Support segment replication for pull-based ingestion
Browse files Browse the repository at this point in the history
Signed-off-by: Varun Bharadwaj <[email protected]>
  • Loading branch information
varunbharadwaj committed Feb 14, 2025
1 parent 38e4b33 commit 7a682ef
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
/**
* Integration test for Kafka ingestion
*/
@ThreadLeakFilters(filters = TestContainerWatchdogThreadLeakFilter.class)
@ThreadLeakFilters(filters = TestContainerThreadLeakFilter.class)
public class IngestFromKafkaIT extends OpenSearchIntegTestCase {
static final String topicName = "test";

Expand Down Expand Up @@ -82,20 +82,27 @@ public void testKafkaIngestion() {
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.param.topic", "test")
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("index.replication.type", "SEGMENT")
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

ensureGreen("test");
flushAndRefresh("test");
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21);

await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
refresh("test");
SearchResponse response = client().prepareSearch("test").setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
SearchResponse primaryResponse = client().prepareSearch("test").setQuery(query).setPreference("_primary").get();
assertThat(primaryResponse.getHits().getTotalHits().value(), is(1L));

SearchResponse replicaResponse = client().prepareSearch("test").setQuery(query).setPreference("_replica").get();
assertThat(replicaResponse.getHits().getTotalHits().value(), is(1L));
});
} finally {
stopKafka();
Expand Down Expand Up @@ -132,4 +139,22 @@ private void prepareKafkaData() {
);
producer.close();
}

// wait for documents to be visible in the primary and replica shards
private void waitForSearchableDocs(String indexName, long docCount) throws Exception {
// wait until the replica has the latest segment generation.
assertBusy(() -> {
final SearchResponse response = client().prepareSearch(indexName).setSize(0).setPreference("_primary").get();
final long hits = response.getHits().getTotalHits().value();
if (hits < docCount) {
fail("Primary shard did not return expected hit count");
}

final SearchResponse response2 = client().prepareSearch(indexName).setSize(0).setPreference("_replica").get();
final long hits2 = response2.getHits().getTotalHits().value();
if (hits2 < docCount) {
fail("Replica shard did not return expected hit count");
}
}, 1, TimeUnit.MINUTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
/**
* The {@link org.testcontainers.images.TimeLimitedLoggedPullImageResultCallback} instance used by test containers,
* for example {@link org.testcontainers.containers.KafkaContainer} creates a watcher daemon thread which is never
* stopped. This filter excludes that thread from the thread leak detection logic.
* stopped. This filter excludes that thread from the thread leak detection logic. It also excludes ryuk resource reaper
* thread which is not closed on time.
*/
public final class TestContainerWatchdogThreadLeakFilter implements ThreadFilter {
public final class TestContainerThreadLeakFilter implements ThreadFilter {
@Override
public boolean reject(Thread t) {
return t.getName().startsWith("testcontainers-pull-watchdog-");
return t.getName().startsWith("testcontainers-pull-watchdog-") || t.getName().startsWith("testcontainers-ryuk");
}
}
108 changes: 93 additions & 15 deletions server/src/main/java/org/opensearch/index/engine/IngestionEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,7 @@
package org.opensearch.index.engine;

import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.*;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.AlreadyClosedException;
Expand Down Expand Up @@ -48,6 +42,7 @@
import org.opensearch.index.merge.MergeStats;
import org.opensearch.index.merge.OnGoingMerge;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.OpenSearchMergePolicy;
import org.opensearch.index.translog.NoOpTranslogManager;
import org.opensearch.index.translog.Translog;
Expand Down Expand Up @@ -129,6 +124,15 @@ public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory inges
documentMapperForType = engineConfig.getDocumentMapperForTypeSupplier().get();
this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory);

// Register internal and external refresh listeners. These include replication checkpoint listener among
// others which are required to initiate segment replication after a refresh
for (ReferenceManager.RefreshListener listener : engineConfig.getExternalRefreshListener()) {
this.externalReaderManager.addListener(listener);
}
for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) {
this.internalReaderManager.addListener(listener);
}

success = true;
} catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
Expand Down Expand Up @@ -192,7 +196,11 @@ public void start() {
}

streamPoller = new DefaultStreamPoller(startPointer, persistedPointers, ingestionShardConsumer, this, resetState);
streamPoller.start();

// Poller is only started on the primary shard. Replica shards will rely on segment replication.
if (!engineConfig.isReadOnlyReplica()) {
streamPoller.start();
}
}

private IndexWriter createWriter() throws IOException {
Expand Down Expand Up @@ -391,7 +399,28 @@ protected SegmentInfos getLastCommittedSegmentInfos() {

@Override
protected SegmentInfos getLatestSegmentInfos() {
throw new UnsupportedOperationException();
try (final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot()) {
return snapshot.get();
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
}
}

@Override
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
final OpenSearchDirectoryReader reader;
try {
reader = internalReaderManager.acquire();
return new GatedCloseable<>(((StandardDirectoryReader) reader.getDelegate()).getSegmentInfos(), () -> {
try {
internalReaderManager.release(reader);
} catch (AlreadyClosedException e) {
logger.warn("Engine is already closed.", e);
}
});
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
}
}

@Override
Expand Down Expand Up @@ -461,12 +490,20 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> search

@Override
protected ReferenceManager<OpenSearchDirectoryReader> getReferenceManager(SearcherScope scope) {
return externalReaderManager;
switch (scope) {
case INTERNAL:
return internalReaderManager;
case EXTERNAL:
return externalReaderManager;
default:
throw new IllegalStateException("unknown scope: " + scope);
}
}

@Override
public Closeable acquireHistoryRetentionLock() {
throw new UnsupportedOperationException("Not implemented");
// do not need to retain operations as they can be replayed from ingestion source
return () -> {};
}

@Override
Expand All @@ -477,7 +514,7 @@ public Translog.Snapshot newChangesSnapshot(
boolean requiredFullRange,
boolean accurateCount
) throws IOException {
throw new UnsupportedOperationException("Not implemented");
return EMPTY_TRANSLOG_SNAPSHOT;
}

@Override
Expand Down Expand Up @@ -507,7 +544,8 @@ public long getProcessedLocalCheckpoint() {

@Override
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
return null;
// Sequence numbers and checkpoints are not maintained since ingestion only supports segment replication at the moment
return new SeqNoStats(0, 0, 0);
}

@Override
Expand Down Expand Up @@ -600,6 +638,11 @@ public void writeIndexingBuffer() throws EngineException {

@Override
public boolean shouldPeriodicallyFlush() {
ensureOpen();
if (shouldPeriodicallyFlushAfterBigMerge.get()) {
return true;
}

return false;
}

Expand Down Expand Up @@ -632,7 +675,8 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
// (3) the newly created commit points to a different translog generation (can free translog),
// or (4) the local checkpoint information in the last commit is stale, which slows down future recoveries.
boolean hasUncommittedChanges = indexWriter.hasUncommittedChanges();
if (hasUncommittedChanges || force) {
boolean shouldPeriodicallyFlush = shouldPeriodicallyFlush();
if (hasUncommittedChanges || force || shouldPeriodicallyFlush) {
logger.trace("starting commit for flush;");

// TODO: do we need to close the latest commit as done in InternalEngine?
Expand All @@ -646,6 +690,8 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
// we need to refresh in order to clear older version values
refresh("version_table_flush", SearcherScope.INTERNAL, true);
}

refreshLastCommittedSegmentInfos();
} catch (FlushFailedEngineException ex) {
maybeFailEngine("flush", ex);
throw ex;
Expand All @@ -657,6 +703,33 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
}
}

private void refreshLastCommittedSegmentInfos() {
/*
* we have to inc-ref the store here since if the engine is closed by a tragic event
* we don't acquire the write lock and wait until we have exclusive access. This might also
* dec the store reference which can essentially close the store and unless we can inc the reference
* we can't use it.
*/
store.incRef();
try {
// reread the last committed segment infos
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
} catch (Exception e) {
if (isClosed.get() == false) {
try {
logger.warn("failed to read latest segment infos on flush", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
if (Lucene.isCorruptionException(e)) {
throw new FlushFailedEngineException(shardId, e);
}
}
} finally {
store.decRef();
}
}

/**
* Commits the specified index writer.
*
Expand All @@ -668,16 +741,21 @@ protected void commitIndexWriter(final IndexWriter writer) throws IOException {
/*
* The user data captured the min and max range of the stream poller
*/
final Map<String, String> commitData = new HashMap<>(2);
final Map<String, String> commitData = new HashMap<>(3);

// ingestion engine only tracks batch start pointer and does not explicitly track checkpoints
commitData.put(StreamPoller.BATCH_START, streamPoller.getBatchStartPointer().asString());
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, "0");
commitData.put(Translog.TRANSLOG_UUID_KEY, "");
final String currentForceMergeUUID = forceMergeUUID;
if (currentForceMergeUUID != null) {
commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID);
}
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});

shouldPeriodicallyFlushAfterBigMerge.set(false);
writer.commit();
} catch (final Exception ex) {
try {
Expand Down
Loading

0 comments on commit 7a682ef

Please sign in to comment.