diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java index e7d8e36acb302..976edc39a3430 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java @@ -45,7 +45,7 @@ /** * Integration test for Kafka ingestion */ -@ThreadLeakFilters(filters = TestContainerWatchdogThreadLeakFilter.class) +@ThreadLeakFilters(filters = TestContainerThreadLeakFilter.class) public class IngestFromKafkaIT extends OpenSearchIntegTestCase { static final String topicName = "test"; @@ -82,20 +82,27 @@ public void testKafkaIngestion() { "test", Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) .put("ingestion_source.type", "kafka") .put("ingestion_source.pointer.init.reset", "earliest") .put("ingestion_source.param.topic", "test") .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .put("index.replication.type", "SEGMENT") .build(), "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" ); + ensureGreen("test"); + flushAndRefresh("test"); RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21); + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { refresh("test"); - SearchResponse response = client().prepareSearch("test").setQuery(query).get(); - assertThat(response.getHits().getTotalHits().value(), is(1L)); + SearchResponse primaryResponse = client().prepareSearch("test").setQuery(query).setPreference("_primary").get(); + assertThat(primaryResponse.getHits().getTotalHits().value(), is(1L)); + + SearchResponse replicaResponse = client().prepareSearch("test").setQuery(query).setPreference("_replica").get(); + assertThat(replicaResponse.getHits().getTotalHits().value(), is(1L)); }); } finally { stopKafka(); @@ -132,4 +139,22 @@ private void prepareKafkaData() { ); producer.close(); } + + // wait for documents to be visible in the primary and replica shards + private void waitForSearchableDocs(String indexName, long docCount) throws Exception { + // wait until the replica has the latest segment generation. + assertBusy(() -> { + final SearchResponse response = client().prepareSearch(indexName).setSize(0).setPreference("_primary").get(); + final long hits = response.getHits().getTotalHits().value(); + if (hits < docCount) { + fail("Primary shard did not return expected hit count"); + } + + final SearchResponse response2 = client().prepareSearch(indexName).setSize(0).setPreference("_replica").get(); + final long hits2 = response2.getHits().getTotalHits().value(); + if (hits2 < docCount) { + fail("Replica shard did not return expected hit count"); + } + }, 1, TimeUnit.MINUTES); + } } diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerWatchdogThreadLeakFilter.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerThreadLeakFilter.java similarity index 76% rename from plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerWatchdogThreadLeakFilter.java rename to plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerThreadLeakFilter.java index 50b88c6233a46..91e2c83ebfa48 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerWatchdogThreadLeakFilter.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerThreadLeakFilter.java @@ -13,11 +13,12 @@ /** * The {@link org.testcontainers.images.TimeLimitedLoggedPullImageResultCallback} instance used by test containers, * for example {@link org.testcontainers.containers.KafkaContainer} creates a watcher daemon thread which is never - * stopped. This filter excludes that thread from the thread leak detection logic. + * stopped. This filter excludes that thread from the thread leak detection logic. It also excludes ryuk resource reaper + * thread which is not closed on time. */ -public final class TestContainerWatchdogThreadLeakFilter implements ThreadFilter { +public final class TestContainerThreadLeakFilter implements ThreadFilter { @Override public boolean reject(Thread t) { - return t.getName().startsWith("testcontainers-pull-watchdog-"); + return t.getName().startsWith("testcontainers-pull-watchdog-") || t.getName().startsWith("testcontainers-ryuk"); } } diff --git a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java index 58c6371d51c0a..b23cc8fbdf4e0 100644 --- a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java @@ -9,13 +9,7 @@ package org.opensearch.index.engine; import org.apache.lucene.analysis.standard.StandardAnalyzer; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.MergePolicy; -import org.apache.lucene.index.SegmentCommitInfo; -import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.*; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.store.AlreadyClosedException; @@ -48,6 +42,7 @@ import org.opensearch.index.merge.MergeStats; import org.opensearch.index.merge.OnGoingMerge; import org.opensearch.index.seqno.SeqNoStats; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.OpenSearchMergePolicy; import org.opensearch.index.translog.NoOpTranslogManager; import org.opensearch.index.translog.Translog; @@ -129,6 +124,15 @@ public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory inges documentMapperForType = engineConfig.getDocumentMapperForTypeSupplier().get(); this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory); + // Register internal and external refresh listeners. These include replication checkpoint listener among + // others which are required to initiate segment replication after a refresh + for (ReferenceManager.RefreshListener listener : engineConfig.getExternalRefreshListener()) { + this.externalReaderManager.addListener(listener); + } + for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) { + this.internalReaderManager.addListener(listener); + } + success = true; } catch (IOException | TranslogCorruptedException e) { throw new EngineCreationFailureException(shardId, "failed to create engine", e); @@ -192,7 +196,11 @@ public void start() { } streamPoller = new DefaultStreamPoller(startPointer, persistedPointers, ingestionShardConsumer, this, resetState); - streamPoller.start(); + + // Poller is only started on the primary shard. Replica shards will rely on segment replication. + if (!engineConfig.isReadOnlyReplica()) { + streamPoller.start(); + } } private IndexWriter createWriter() throws IOException { @@ -391,7 +399,28 @@ protected SegmentInfos getLastCommittedSegmentInfos() { @Override protected SegmentInfos getLatestSegmentInfos() { - throw new UnsupportedOperationException(); + try (final GatedCloseable snapshot = getSegmentInfosSnapshot()) { + return snapshot.get(); + } catch (IOException e) { + throw new EngineException(shardId, e.getMessage(), e); + } + } + + @Override + public GatedCloseable getSegmentInfosSnapshot() { + final OpenSearchDirectoryReader reader; + try { + reader = internalReaderManager.acquire(); + return new GatedCloseable<>(((StandardDirectoryReader) reader.getDelegate()).getSegmentInfos(), () -> { + try { + internalReaderManager.release(reader); + } catch (AlreadyClosedException e) { + logger.warn("Engine is already closed.", e); + } + }); + } catch (IOException e) { + throw new EngineException(shardId, e.getMessage(), e); + } } @Override @@ -461,12 +490,20 @@ public GetResult get(Get get, BiFunction search @Override protected ReferenceManager getReferenceManager(SearcherScope scope) { - return externalReaderManager; + switch (scope) { + case INTERNAL: + return internalReaderManager; + case EXTERNAL: + return externalReaderManager; + default: + throw new IllegalStateException("unknown scope: " + scope); + } } @Override public Closeable acquireHistoryRetentionLock() { - throw new UnsupportedOperationException("Not implemented"); + // do not need to retain operations as they can be replayed from ingestion source + return () -> {}; } @Override @@ -477,7 +514,7 @@ public Translog.Snapshot newChangesSnapshot( boolean requiredFullRange, boolean accurateCount ) throws IOException { - throw new UnsupportedOperationException("Not implemented"); + return EMPTY_TRANSLOG_SNAPSHOT; } @Override @@ -507,7 +544,8 @@ public long getProcessedLocalCheckpoint() { @Override public SeqNoStats getSeqNoStats(long globalCheckpoint) { - return null; + // Sequence numbers and checkpoints are not maintained since ingestion only supports segment replication at the moment + return new SeqNoStats(0, 0, 0); } @Override @@ -600,6 +638,11 @@ public void writeIndexingBuffer() throws EngineException { @Override public boolean shouldPeriodicallyFlush() { + ensureOpen(); + if (shouldPeriodicallyFlushAfterBigMerge.get()) { + return true; + } + return false; } @@ -632,7 +675,8 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { // (3) the newly created commit points to a different translog generation (can free translog), // or (4) the local checkpoint information in the last commit is stale, which slows down future recoveries. boolean hasUncommittedChanges = indexWriter.hasUncommittedChanges(); - if (hasUncommittedChanges || force) { + boolean shouldPeriodicallyFlush = shouldPeriodicallyFlush(); + if (hasUncommittedChanges || force || shouldPeriodicallyFlush) { logger.trace("starting commit for flush;"); // TODO: do we need to close the latest commit as done in InternalEngine? @@ -646,6 +690,8 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { // we need to refresh in order to clear older version values refresh("version_table_flush", SearcherScope.INTERNAL, true); } + + refreshLastCommittedSegmentInfos(); } catch (FlushFailedEngineException ex) { maybeFailEngine("flush", ex); throw ex; @@ -657,6 +703,33 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { } } + private void refreshLastCommittedSegmentInfos() { + /* + * we have to inc-ref the store here since if the engine is closed by a tragic event + * we don't acquire the write lock and wait until we have exclusive access. This might also + * dec the store reference which can essentially close the store and unless we can inc the reference + * we can't use it. + */ + store.incRef(); + try { + // reread the last committed segment infos + lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); + } catch (Exception e) { + if (isClosed.get() == false) { + try { + logger.warn("failed to read latest segment infos on flush", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + if (Lucene.isCorruptionException(e)) { + throw new FlushFailedEngineException(shardId, e); + } + } + } finally { + store.decRef(); + } + } + /** * Commits the specified index writer. * @@ -668,9 +741,12 @@ protected void commitIndexWriter(final IndexWriter writer) throws IOException { /* * The user data captured the min and max range of the stream poller */ - final Map commitData = new HashMap<>(2); + final Map commitData = new HashMap<>(3); + // ingestion engine only tracks batch start pointer and does not explicitly track checkpoints commitData.put(StreamPoller.BATCH_START, streamPoller.getBatchStartPointer().asString()); + commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, "0"); + commitData.put(Translog.TRANSLOG_UUID_KEY, ""); final String currentForceMergeUUID = forceMergeUUID; if (currentForceMergeUUID != null) { commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID); @@ -678,6 +754,8 @@ protected void commitIndexWriter(final IndexWriter writer) throws IOException { logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); }); + + shouldPeriodicallyFlushAfterBigMerge.set(false); writer.commit(); } catch (final Exception ex) { try { diff --git a/server/src/main/java/org/opensearch/index/engine/IngestionNRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/IngestionNRTReplicationEngine.java new file mode 100644 index 0000000000000..e9cf8a1cfb491 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/IngestionNRTReplicationEngine.java @@ -0,0 +1,124 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.apache.lucene.index.SegmentInfos; +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.ReleasableLock; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.index.seqno.SeqNoStats; +import org.opensearch.index.translog.*; + +import java.io.IOException; +import java.util.Collection; + +import static org.opensearch.index.translog.Translog.EMPTY_TRANSLOG_LOCATION; +import static org.opensearch.index.translog.Translog.EMPTY_TRANSLOG_SNAPSHOT; + +/** + * This is a {@link NRTReplicationEngine} variant to be used with pull-based ingestion on replica shards when segment + * replication is enabled. This engine uses a no-op translog. + */ +public class IngestionNRTReplicationEngine extends NRTReplicationEngine { + protected final TranslogManager translogManager; + + + public IngestionNRTReplicationEngine(EngineConfig engineConfig) { + super(engineConfig); + + try { + this.translogManager = new NoOpTranslogManager( + shardId, + readLock, + this::ensureOpen, + new TranslogStats(0, 0, 0, 0, 0), + EMPTY_TRANSLOG_SNAPSHOT + ); + } catch (IOException | TranslogCorruptedException e) { + throw new EngineCreationFailureException(shardId, "failed to create engine", e); + } + } + + @Override + public TranslogManager translogManager() { + return translogManager; + } + + @Override + protected WriteOnlyTranslogManager createWriteOnlyTranslogManager(String translogUUID) { + // This engine does not use a translog. A separate NoOpTranslogManager will be used. + return null; + } + + @Override + public synchronized void updateSegments(final SegmentInfos infos) throws IOException { + try (ReleasableLock lock = writeLock.acquire()) { + // Update the current infos reference on the Engine's reader. + ensureOpen(); + final long incomingGeneration = infos.getGeneration(); + readerManager.updateSegments(infos); + // Ensure that we commit and clear the local translog if a new commit has been made on the primary. + // We do not compare against the last local commit gen here because it is possible to receive + // a lower gen from a newly elected primary shard that is behind this shard's last commit gen. + // In that case we still commit into the next local generation. + if (incomingGeneration != this.lastReceivedPrimaryGen) { + flush(false, true); + } + this.lastReceivedPrimaryGen = incomingGeneration; + } + } + + /** + * Persist the latest live SegmentInfos. This method creates a commit point from the latest SegmentInfos. + * + * @throws IOException - When there is an IO error committing the SegmentInfos. + */ + @Override + protected void commitSegmentInfos(SegmentInfos infos) throws IOException { + // get a reference to the previous commit files so they can be decref'd once a new commit is made. + final Collection previousCommitFiles = getLastCommittedSegmentInfos().files(true); + store.commitSegmentInfos(infos, 0, 0); + this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); + // incref the latest on-disk commit. + replicaFileTracker.incRef(this.lastCommittedSegmentInfos.files(true)); + // decref the prev commit. + replicaFileTracker.decRef(previousCommitFiles); + } + + @Override + protected Translog.Location getTranslogIndexLocation(Index index, IndexResult indexResult) throws IOException { + return EMPTY_TRANSLOG_LOCATION; + } + + @Override + protected Translog.Location getTranslogDeleteLocation(Delete delete, DeleteResult deleteResult) throws IOException { + return EMPTY_TRANSLOG_LOCATION; + } + + @Override + protected Translog.Location getTranslogNoOpLocation(NoOp noOp) throws IOException { + return EMPTY_TRANSLOG_LOCATION; + } + + @Override + public SeqNoStats getSeqNoStats(long globalCheckpoint) { + // sequence numbers are not used as ingestion only supports segment replication + return new SeqNoStats(0, 0, 0); + } + + @Override + public long getLastSyncedGlobalCheckpoint() { + return 0; + } + + @Override + public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {} + +} diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index d759423ce5a55..b6909f12e8900 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -57,17 +57,17 @@ @PublicApi(since = "1.0.0") public class NRTReplicationEngine extends Engine { - private volatile SegmentInfos lastCommittedSegmentInfos; - private final NRTReplicationReaderManager readerManager; - private final CompletionStatsCache completionStatsCache; - private final LocalCheckpointTracker localCheckpointTracker; - private final WriteOnlyTranslogManager translogManager; - private final Lock flushLock = new ReentrantLock(); + protected volatile SegmentInfos lastCommittedSegmentInfos; + protected final NRTReplicationReaderManager readerManager; + protected final CompletionStatsCache completionStatsCache; + protected final LocalCheckpointTracker localCheckpointTracker; + protected final WriteOnlyTranslogManager translogManager; + protected final Lock flushLock = new ReentrantLock(); protected final ReplicaFileTracker replicaFileTracker; - private volatile long lastReceivedPrimaryGen = SequenceNumbers.NO_OPS_PERFORMED; + protected volatile long lastReceivedPrimaryGen = SequenceNumbers.NO_OPS_PERFORMED; - private static final int SI_COUNTER_INCREMENT = 10; + protected static final int SI_COUNTER_INCREMENT = 10; public NRTReplicationEngine(EngineConfig engineConfig) { super(engineConfig); @@ -99,34 +99,7 @@ public NRTReplicationEngine(EngineConfig engineConfig) { } final Map userData = this.lastCommittedSegmentInfos.getUserData(); final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY)); - translogManagerRef = new WriteOnlyTranslogManager( - engineConfig.getTranslogConfig(), - engineConfig.getPrimaryTermSupplier(), - engineConfig.getGlobalCheckpointSupplier(), - getTranslogDeletionPolicy(engineConfig), - shardId, - readLock, - this::getLocalCheckpointTracker, - translogUUID, - new TranslogEventListener() { - @Override - public void onFailure(String reason, Exception ex) { - failEngine(reason, ex); - } - - @Override - public void onAfterTranslogSync() { - try { - translogManager.trimUnreferencedReaders(); - } catch (IOException ex) { - throw new TranslogException(shardId, "failed to trim unreferenced translog readers", ex); - } - } - }, - this, - engineConfig.getTranslogFactory(), - engineConfig.getStartedPrimarySupplier() - ); + translogManagerRef = createWriteOnlyTranslogManager(translogUUID); this.translogManager = translogManagerRef; success = true; } catch (IOException | TranslogCorruptedException e) { @@ -142,11 +115,42 @@ public void onAfterTranslogSync() { } } + protected WriteOnlyTranslogManager createWriteOnlyTranslogManager(String translogUUID) throws IOException { + return new WriteOnlyTranslogManager( + engineConfig.getTranslogConfig(), + engineConfig.getPrimaryTermSupplier(), + engineConfig.getGlobalCheckpointSupplier(), + getTranslogDeletionPolicy(engineConfig), + shardId, + readLock, + this::getLocalCheckpointTracker, + translogUUID, + new TranslogEventListener() { + @Override + public void onFailure(String reason, Exception ex) { + failEngine(reason, ex); + } + + @Override + public void onAfterTranslogSync() { + try { + translogManager.trimUnreferencedReaders(); + } catch (IOException ex) { + throw new TranslogException(shardId, "failed to trim unreferenced translog readers", ex); + } + } + }, + this, + engineConfig.getTranslogFactory(), + engineConfig.getStartedPrimarySupplier() + ); + } + public void cleanUnreferencedFiles() throws IOException { replicaFileTracker.deleteUnreferencedFiles(store.directory().listAll()); } - private NRTReplicationReaderManager buildReaderManager() throws IOException { + protected NRTReplicationReaderManager buildReaderManager() throws IOException { return new NRTReplicationReaderManager( OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId), replicaFileTracker::incRef, @@ -187,7 +191,7 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep * * @throws IOException - When there is an IO error committing the SegmentInfos. */ - private void commitSegmentInfos(SegmentInfos infos) throws IOException { + protected void commitSegmentInfos(SegmentInfos infos) throws IOException { // get a reference to the previous commit files so they can be decref'd once a new commit is made. final Collection previousCommitFiles = getLastCommittedSegmentInfos().files(true); store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint()); @@ -199,7 +203,7 @@ private void commitSegmentInfos(SegmentInfos infos) throws IOException { translogManager.syncTranslog(); } - private void commitSegmentInfos() throws IOException { + protected void commitSegmentInfos() throws IOException { commitSegmentInfos(getLatestSegmentInfos()); } @@ -232,7 +236,7 @@ public boolean isThrottled() { public IndexResult index(Index index) throws IOException { ensureOpen(); IndexResult indexResult = new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), false); - final Translog.Location location = translogManager.add(new Translog.Index(index, indexResult)); + final Translog.Location location = getTranslogIndexLocation(index, indexResult); indexResult.setTranslogLocation(location); indexResult.setTook(System.nanoTime() - index.startTime()); indexResult.freeze(); @@ -244,7 +248,7 @@ public IndexResult index(Index index) throws IOException { public DeleteResult delete(Delete delete) throws IOException { ensureOpen(); DeleteResult deleteResult = new DeleteResult(delete.version(), delete.primaryTerm(), delete.seqNo(), true); - final Translog.Location location = translogManager.add(new Translog.Delete(delete, deleteResult)); + final Translog.Location location = getTranslogDeleteLocation(delete, deleteResult); deleteResult.setTranslogLocation(location); deleteResult.setTook(System.nanoTime() - delete.startTime()); deleteResult.freeze(); @@ -256,7 +260,7 @@ public DeleteResult delete(Delete delete) throws IOException { public NoOpResult noOp(NoOp noOp) throws IOException { ensureOpen(); NoOpResult noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo()); - final Translog.Location location = translogManager.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); + final Translog.Location location = getTranslogNoOpLocation(noOp); noOpResult.setTranslogLocation(location); noOpResult.setTook(System.nanoTime() - noOp.startTime()); noOpResult.freeze(); @@ -264,6 +268,18 @@ public NoOpResult noOp(NoOp noOp) throws IOException { return noOpResult; } + protected Translog.Location getTranslogIndexLocation(Index index, IndexResult indexResult) throws IOException { + return translogManager.add(new Translog.Index(index, indexResult)); + } + + protected Translog.Location getTranslogDeleteLocation(Delete delete, DeleteResult deleteResult) throws IOException { + return translogManager.add(new Translog.Delete(delete, deleteResult)); + } + + protected Translog.Location getTranslogNoOpLocation(NoOp noOp) throws IOException { + return translogManager.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); + } + @Override public GetResult get(Get get, BiFunction searcherFactory) throws EngineException { return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL); @@ -528,7 +544,7 @@ protected LocalCheckpointTracker getLocalCheckpointTracker() { return localCheckpointTracker; } - private DirectoryReader getDirectoryReader() throws IOException { + protected DirectoryReader getDirectoryReader() throws IOException { // for segment replication: replicas should create the reader from store, we don't want an open IW on replicas. return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f26e53967b873..f0405faafc5b7 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2483,6 +2483,11 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operat } private void loadGlobalCheckpointToReplicationTracker() throws IOException { + if(isIngestionSource()) { + // global checkpoints are not used for ingestion source + return; + } + // we have to set it before we open an engine and recover from the translog because // acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in, // and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in. @@ -5420,4 +5425,9 @@ static ShardMigrationState getShardMigrationState(IndexSettings indexSettings, b } return ShardMigrationState.DOCREP_NON_MIGRATING; } + + private boolean isIngestionSource() { + return indexSettings().getIndexMetadata().useIngestionSource(); + } + } diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index ffda06d8d8292..b1e88624c9906 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -899,6 +899,8 @@ public TranslogDeletionPolicy getDeletionPolicy() { return deletionPolicy; } + public static final Translog.Location EMPTY_TRANSLOG_LOCATION = new Translog.Location(0, 0, 0); + /** * Location in the translot * diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java b/server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java index e124adb90365b..c6921990a88e6 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java @@ -9,10 +9,7 @@ package org.opensearch.indices.pollingingest; import org.opensearch.index.IngestionConsumerFactory; -import org.opensearch.index.engine.Engine; -import org.opensearch.index.engine.EngineConfig; -import org.opensearch.index.engine.EngineFactory; -import org.opensearch.index.engine.IngestionEngine; +import org.opensearch.index.engine.*; import java.util.Objects; @@ -29,6 +26,10 @@ public IngestionEngineFactory(IngestionConsumerFactory ingestionConsumerFactory) @Override public Engine newReadWriteEngine(EngineConfig config) { + if (config.isReadOnlyReplica()) { + return new IngestionNRTReplicationEngine(config); + } + IngestionEngine ingestionEngine = new IngestionEngine(config, ingestionConsumerFactory); ingestionEngine.start(); return ingestionEngine;