From 2369e4f4a22c2c688c74f97080ac7b714bf49655 Mon Sep 17 00:00:00 2001 From: xuxiong1 Date: Thu, 13 Feb 2025 15:50:17 +0800 Subject: [PATCH] initial commit Signed-off-by: xuxiong1 --- .../plugin/kafka/KafkaPartitionConsumer.java | 16 ++++++ .../cluster/metadata/IndexMetadata.java | 51 +++++++++++++++++-- .../cluster/metadata/IngestionSource.java | 43 ++++++++++++++-- .../common/settings/IndexScopedSettings.java | 1 + .../index/IngestionShardConsumer.java | 16 ++++++ .../index/engine/IngestionEngine.java | 5 +- .../pollingingest/DefaultStreamPoller.java | 23 +++++++-- .../indices/pollingingest/StreamPoller.java | 2 + .../metadata/IngestionSourceTests.java | 18 ++++--- .../index/engine/FakeIngestionSource.java | 10 ++++ .../DefaultStreamPollerTests.java | 12 +++-- 11 files changed, 173 insertions(+), 24 deletions(-) diff --git a/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java b/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java index a20e52a06eecd..462b120e88704 100644 --- a/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java +++ b/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java @@ -12,6 +12,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; @@ -140,6 +141,21 @@ public IngestionShardPointer latestPointer() { return new KafkaOffset(endOffset); } + @Override + public IngestionShardPointer pointerFromTimestampMillis(long timestampMillis) { + long offset = AccessController.doPrivileged( + (PrivilegedAction) () -> consumer.offsetsForTimes(Collections.singletonMap(topicPartition, timestampMillis)) + .getOrDefault(topicPartition, new OffsetAndTimestamp(0L, timestampMillis)) + .offset() + ); + return new KafkaOffset(offset); + } + + @Override + public IngestionShardPointer pointerFromOffset(long offset) { + return new KafkaOffset(offset); + } + private synchronized List> fetch(long startOffset, long maxMessages, int timeoutMillis) { if (lastFetchedOffset < 0 || lastFetchedOffset != startOffset - 1) { logger.info("Seeking to offset {}", startOffset); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index d50192f106cfe..66552c903892f 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -716,8 +716,7 @@ public void validate(final String value, final Map, Object> settings) @Override public void validate(final String value) { - if (!(value.equalsIgnoreCase(StreamPoller.ResetState.LATEST.name()) - || value.equalsIgnoreCase(StreamPoller.ResetState.EARLIEST.name()))) { + if (!isValidResetState(value)) { throw new IllegalArgumentException( "Invalid value for " + SETTING_INGESTION_SOURCE_POINTER_INIT_RESET + " [" + value + "]" ); @@ -725,12 +724,51 @@ public void validate(final String value) { } @Override - public void validate(final String value, final Map, Object> settings) {} + public void validate(final String value, final Map, Object> settings) { + if (isRewindState(value)) { + // Ensure the reset value setting is provided when rewinding. + final long resetValue = (long) settings.get(INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING); + if (resetValue <= 0) { + throw new IllegalArgumentException( + "Setting " + INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING.getKey() + " should be set and greater than 0" + ); + } + } + } + + private boolean isValidResetState(String value) { + return StreamPoller.ResetState.LATEST.name().equalsIgnoreCase(value) + || StreamPoller.ResetState.EARLIEST.name().equalsIgnoreCase(value) + || isRewindState(value); + } + + private boolean isRewindState(String value) { + return StreamPoller.ResetState.REWIND_BY_OFFSET.name().equalsIgnoreCase(value) + || StreamPoller.ResetState.REWIND_BY_TIMESTAMP.name().equalsIgnoreCase(value); + } + + @Override + public Iterator> settings() { + final List> settings = Collections.singletonList(INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING); + return settings.iterator(); + } }, Property.IndexScope, Property.Dynamic ); + /** + * Defines the setting for the value to be used when resetting by offset or timestamp. + */ + public static final String SETTING_INGESTION_SOURCE_POINTER_INIT_RESET_VALUE = "index.ingestion_source.pointer.init.reset.value"; + public static final Setting INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING = Setting.longSetting( + SETTING_INGESTION_SOURCE_POINTER_INIT_RESET_VALUE, + 0, + 0, + Property.IndexScope, + Property.Dynamic + ); + public static final Setting.AffixSetting INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting( "index.ingestion_source.param.", key -> new Setting<>(key, "", (value) -> { @@ -954,7 +992,12 @@ public Version getCreationVersion() { public IngestionSource getIngestionSource() { final String ingestionSourceType = INGESTION_SOURCE_TYPE_SETTING.get(settings); if (ingestionSourceType != null && !(NONE_INGESTION_SOURCE_TYPE.equals(ingestionSourceType))) { - final String pointerInitReset = INGESTION_SOURCE_POINTER_INIT_RESET_SETTING.get(settings); + final String pointerInitResetType = INGESTION_SOURCE_POINTER_INIT_RESET_SETTING.get(settings); + final long pointerInitResetValue = INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING.get(settings); + IngestionSource.PointerInitReset pointerInitReset = new IngestionSource.PointerInitReset( + pointerInitResetType, + pointerInitResetValue + ); final Map ingestionSourceParams = INGESTION_SOURCE_PARAMS_SETTING.getAsMap(settings); return new IngestionSource(ingestionSourceType, pointerInitReset, ingestionSourceParams); } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java b/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java index 583114d9ecbd2..30270a33f0e80 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java @@ -19,10 +19,10 @@ @ExperimentalApi public class IngestionSource { private String type; - private String pointerInitReset; + private PointerInitReset pointerInitReset; private Map params; - public IngestionSource(String type, String pointerInitReset, Map params) { + public IngestionSource(String type, PointerInitReset pointerInitReset, Map params) { this.type = type; this.pointerInitReset = pointerInitReset; this.params = params; @@ -32,7 +32,7 @@ public String getType() { return type; } - public String getPointerInitReset() { + public PointerInitReset getPointerInitReset() { return pointerInitReset; } @@ -59,4 +59,41 @@ public int hashCode() { public String toString() { return "IngestionSource{" + "type='" + type + '\'' + ",pointer_init_reset='" + pointerInitReset + '\'' + ", params=" + params + '}'; } + + @ExperimentalApi + public static class PointerInitReset { + private final String type; + private final long value; + + public PointerInitReset(String policy, long value) { + this.type = policy; + this.value = value; + } + + public String getType() { + return type; + } + + public long getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PointerInitReset pointerInitReset = (PointerInitReset) o; + return Objects.equals(type, pointerInitReset.type) && Objects.equals(value, pointerInitReset.value); + } + + @Override + public int hashCode() { + return Objects.hash(type, value); + } + + @Override + public String toString() { + return "PointerInitReset{" + "type='" + type + '\'' + ", value=" + value + '}'; + } + } } diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 0e21104fb6426..946d7fe734deb 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -263,6 +263,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { // Settings for ingestion source IndexMetadata.INGESTION_SOURCE_TYPE_SETTING, IndexMetadata.INGESTION_SOURCE_POINTER_INIT_RESET_SETTING, + IndexMetadata.INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING, IndexMetadata.INGESTION_SOURCE_PARAMS_SETTING, // validate that built-in similarities don't get redefined diff --git a/server/src/main/java/org/opensearch/index/IngestionShardConsumer.java b/server/src/main/java/org/opensearch/index/IngestionShardConsumer.java index 02a9f5a18ebb1..230a8c42540eb 100644 --- a/server/src/main/java/org/opensearch/index/IngestionShardConsumer.java +++ b/server/src/main/java/org/opensearch/index/IngestionShardConsumer.java @@ -82,6 +82,22 @@ public M getMessage() { */ IngestionShardPointer latestPointer(); + /** + * Returns an ingestion shard pointer based on the provided timestamp in milliseconds. + * + * @param timestampMillis the timestamp in milliseconds + * @return the ingestion shard pointer corresponding to the given timestamp + */ + IngestionShardPointer pointerFromTimestampMillis(long timestampMillis); + + /** + * Returns an ingestion shard pointer based on the provided offset. + * + * @param offset the offset value + * @return the ingestion shard pointer corresponding to the given offset + */ + IngestionShardPointer pointerFromOffset(long offset); + /** * @return the shard id */ diff --git a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java index 58c6371d51c0a..3ea72a6bbad94 100644 --- a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java @@ -173,7 +173,7 @@ public void start() { Map commitData = commitDataAsMap(); StreamPoller.ResetState resetState = StreamPoller.ResetState.valueOf( - ingestionSource.getPointerInitReset().toUpperCase(Locale.ROOT) + ingestionSource.getPointerInitReset().getType().toUpperCase(Locale.ROOT) ); IngestionShardPointer startPointer = null; Set persistedPointers = new HashSet<>(); @@ -191,7 +191,8 @@ public void start() { resetState = StreamPoller.ResetState.NONE; } - streamPoller = new DefaultStreamPoller(startPointer, persistedPointers, ingestionShardConsumer, this, resetState); + long resetValue = ingestionSource.getPointerInitReset().getValue(); + streamPoller = new DefaultStreamPoller(startPointer, persistedPointers, ingestionShardConsumer, this, resetState, resetValue); streamPoller.start(); } diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java b/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java index b5c1db999544a..2312658d4bb1e 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java @@ -52,6 +52,7 @@ public class DefaultStreamPoller implements StreamPoller { private IngestionShardPointer batchStartPointer; private ResetState resetState; + private final long resetValue; private Set persistedPointers; @@ -68,14 +69,16 @@ public DefaultStreamPoller( Set persistedPointers, IngestionShardConsumer consumer, IngestionEngine ingestionEngine, - ResetState resetState + ResetState resetState, + long resetValue ) { this( startPointer, persistedPointers, consumer, new MessageProcessorRunnable(new ArrayBlockingQueue<>(100), ingestionEngine), - resetState + resetState, + resetValue ); } @@ -84,10 +87,12 @@ public DefaultStreamPoller( Set persistedPointers, IngestionShardConsumer consumer, MessageProcessorRunnable processorRunnable, - ResetState resetState + ResetState resetState, + long resetValue ) { this.consumer = Objects.requireNonNull(consumer); this.resetState = resetState; + this.resetValue = resetValue; batchStartPointer = startPointer; this.persistedPointers = persistedPointers; if (!this.persistedPointers.isEmpty()) { @@ -151,6 +156,18 @@ protected void startPoll() { batchStartPointer = consumer.latestPointer(); logger.info("Resetting offset by seeking to latest offset {}", batchStartPointer.asString()); break; + case REWIND_BY_OFFSET: + batchStartPointer = consumer.pointerFromOffset(resetValue); + logger.info("Resetting offset by seeking to offset {}", batchStartPointer.asString()); + break; + case REWIND_BY_TIMESTAMP: + batchStartPointer = consumer.pointerFromTimestampMillis(resetValue); + logger.info( + "Resetting offset by seeking to timestamp {}, corresponding offset {}", + resetValue, + batchStartPointer.asString() + ); + break; } resetState = ResetState.NONE; } diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java b/server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java index f674f6dc55c85..4ca6658b39d17 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java @@ -66,6 +66,8 @@ enum State { enum ResetState { EARLIEST, LATEST, + REWIND_BY_OFFSET, + REWIND_BY_TIMESTAMP, NONE, } } diff --git a/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java index f67d13e54e608..4d0e05e9f63a1 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java @@ -15,10 +15,12 @@ public class IngestionSourceTests extends OpenSearchTestCase { + private final IngestionSource.PointerInitReset pointerInitReset = new IngestionSource.PointerInitReset("pointerInitReset", 0); + public void testConstructorAndGetters() { Map params = new HashMap<>(); params.put("key", "value"); - IngestionSource source = new IngestionSource("type", "pointerInitReset", params); + IngestionSource source = new IngestionSource("type", pointerInitReset, params); assertEquals("type", source.getType()); assertEquals("pointerInitReset", source.getPointerInitReset()); @@ -28,38 +30,38 @@ public void testConstructorAndGetters() { public void testEquals() { Map params1 = new HashMap<>(); params1.put("key", "value"); - IngestionSource source1 = new IngestionSource("type", "pointerInitReset", params1); + IngestionSource source1 = new IngestionSource("type", pointerInitReset, params1); Map params2 = new HashMap<>(); params2.put("key", "value"); - IngestionSource source2 = new IngestionSource("type", "pointerInitReset", params2); + IngestionSource source2 = new IngestionSource("type", pointerInitReset, params2); assertTrue(source1.equals(source2)); assertTrue(source2.equals(source1)); - IngestionSource source3 = new IngestionSource("differentType", "pointerInitReset", params1); + IngestionSource source3 = new IngestionSource("differentType", pointerInitReset, params1); assertFalse(source1.equals(source3)); } public void testHashCode() { Map params1 = new HashMap<>(); params1.put("key", "value"); - IngestionSource source1 = new IngestionSource("type", "pointerInitReset", params1); + IngestionSource source1 = new IngestionSource("type", pointerInitReset, params1); Map params2 = new HashMap<>(); params2.put("key", "value"); - IngestionSource source2 = new IngestionSource("type", "pointerInitReset", params2); + IngestionSource source2 = new IngestionSource("type", pointerInitReset, params2); assertEquals(source1.hashCode(), source2.hashCode()); - IngestionSource source3 = new IngestionSource("differentType", "pointerInitReset", params1); + IngestionSource source3 = new IngestionSource("differentType", pointerInitReset, params1); assertNotEquals(source1.hashCode(), source3.hashCode()); } public void testToString() { Map params = new HashMap<>(); params.put("key", "value"); - IngestionSource source = new IngestionSource("type", "pointerInitReset", params); + IngestionSource source = new IngestionSource("type", pointerInitReset, params); String expected = "IngestionSource{type='type',pointer_init_reset='pointerInitReset', params={key=value}}"; assertEquals(expected, source.toString()); diff --git a/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java b/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java index de03dcd313c29..5372824d0e3c6 100644 --- a/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java +++ b/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java @@ -93,6 +93,16 @@ public FakeIngestionShardPointer latestPointer() { return new FakeIngestionShardPointer(messages.size()); } + @Override + public IngestionShardPointer pointerFromTimestampMillis(long timestampMillis) { + return new FakeIngestionShardPointer(0); // Placeholder implementation + } + + @Override + public IngestionShardPointer pointerFromOffset(long offset) { + return new FakeIngestionShardPointer(offset); + } + @Override public int getShardId() { return shardId; diff --git a/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java b/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java index 1a98f65d04f7c..bb85841772d52 100644 --- a/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java +++ b/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java @@ -52,7 +52,8 @@ public void setUp() throws Exception { persistedPointers, fakeConsumer, processorRunnable, - StreamPoller.ResetState.NONE + StreamPoller.ResetState.NONE, + 0L ); } @@ -90,7 +91,8 @@ public void testSkipProcessed() throws InterruptedException { persistedPointers, fakeConsumer, processorRunnable, - StreamPoller.ResetState.NONE + StreamPoller.ResetState.NONE, + 0L ); poller.start(); Thread.sleep(sleepTime); // Allow some time for the poller to run @@ -118,7 +120,8 @@ public void testResetStateEarliest() throws InterruptedException { persistedPointers, fakeConsumer, processorRunnable, - StreamPoller.ResetState.EARLIEST + StreamPoller.ResetState.EARLIEST, + 0L ); poller.start(); @@ -134,7 +137,8 @@ public void testResetStateLatest() throws InterruptedException { persistedPointers, fakeConsumer, processorRunnable, - StreamPoller.ResetState.LATEST + StreamPoller.ResetState.LATEST, + 0L ); poller.start();