From 21eefddc55a3b0b882ee3113f3108b663578d95d Mon Sep 17 00:00:00 2001 From: xuxiong1 Date: Fri, 14 Feb 2025 15:31:20 +0800 Subject: [PATCH] resolve comments Signed-off-by: xuxiong1 --- .../plugin/kafka/IngestFromKafkaIT.java | 4 ++-- .../plugin/kafka/KafkaPartitionConsumer.java | 10 ++++---- .../kafka/KafkaPartitionConsumerTests.java | 2 +- .../cluster/metadata/IndexMetadata.java | 23 +++++++++++-------- .../cluster/metadata/IngestionSource.java | 13 ++++++----- .../index/IngestionShardConsumer.java | 4 ++-- .../index/engine/IngestionEngine.java | 7 ++---- .../pollingingest/DefaultStreamPoller.java | 6 ++--- .../indices/pollingingest/StreamPoller.java | 2 ++ .../metadata/IngestionSourceTests.java | 13 +++++++---- .../index/engine/FakeIngestionSource.java | 6 ++--- .../DefaultStreamPollerTests.java | 10 ++++---- 12 files changed, 55 insertions(+), 45 deletions(-) diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java index e668f97ce70d6..36619a10d0d59 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java @@ -116,7 +116,7 @@ public void testKafkaIngestion_RewindByTimeStamp() { // 1739459500000 is the timestamp of the first message // 1739459800000 is the timestamp of the second message // by resetting to 1739459600000, only the second message will be ingested - .put("ingestion_source.pointer.init.reset.value", 1739459600000L) + .put("ingestion_source.pointer.init.reset.value", "1739459600000") .put("ingestion_source.param.topic", "test") .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) .build(), @@ -145,7 +145,7 @@ public void testKafkaIngestion_RewindByOffset() { .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put("ingestion_source.type", "kafka") .put("ingestion_source.pointer.init.reset", "rewind_by_offset") - .put("ingestion_source.pointer.init.reset.value", 1L) + .put("ingestion_source.pointer.init.reset.value", "1") .put("ingestion_source.param.topic", "test") .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) .build(), diff --git a/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java b/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java index 836824c193e03..87dc2460c8a53 100644 --- a/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java +++ b/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java @@ -143,8 +143,9 @@ public IngestionShardPointer latestPointer() { } @Override - public IngestionShardPointer pointerFromTimestampMillis(long timestampMillis) { + public IngestionShardPointer pointerFromTimestampMillis(String timestampMillisStr) { long offset = AccessController.doPrivileged((PrivilegedAction) () -> { + long timestampMillis = Long.parseLong(timestampMillisStr); Map position = consumer.offsetsForTimes( Collections.singletonMap(topicPartition, timestampMillis) ); @@ -159,15 +160,16 @@ public IngestionShardPointer pointerFromTimestampMillis(long timestampMillis) { }); if (offset < 0) { // no message found for the timestamp, return the latest offset - logger.warn("No message found for timestamp {}, seeking to the latest", timestampMillis); + logger.warn("No message found for timestamp {}, seeking to the latest", timestampMillisStr); return latestPointer(); } return new KafkaOffset(offset); } @Override - public IngestionShardPointer pointerFromOffset(long offset) { - return new KafkaOffset(offset); + public IngestionShardPointer pointerFromOffset(String offset) { + long offsetValue = Long.parseLong(offset); + return new KafkaOffset(offsetValue); } private synchronized List> fetch(long startOffset, long maxMessages, int timeoutMillis) { diff --git a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java index 8883086f48889..1f35d31e76861 100644 --- a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java +++ b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java @@ -96,7 +96,7 @@ public void testPointerFromTimestampMillis() { Collections.singletonMap(topicPartition, new org.apache.kafka.clients.consumer.OffsetAndTimestamp(5L, 1000L)) ); - KafkaOffset offset = (KafkaOffset) consumer.pointerFromTimestampMillis(1000L); + KafkaOffset offset = (KafkaOffset) consumer.pointerFromTimestampMillis("1000"); assertEquals(5L, offset.getOffset()); } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index 66552c903892f..d4fcadc4ac56d 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -727,10 +727,12 @@ public void validate(final String value) { public void validate(final String value, final Map, Object> settings) { if (isRewindState(value)) { // Ensure the reset value setting is provided when rewinding. - final long resetValue = (long) settings.get(INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING); - if (resetValue <= 0) { + final String resetValue = (String) settings.get(INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING); + if (resetValue == null || resetValue.isEmpty()) { throw new IllegalArgumentException( - "Setting " + INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING.getKey() + " should be set and greater than 0" + "Setting " + + INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING.getKey() + + " should be set when REWIND_BY_OFFSET or REWIND_BY_TIMESTAMP" ); } } @@ -754,19 +756,18 @@ public Iterator> settings() { } }, Property.IndexScope, - Property.Dynamic + Property.Final ); /** * Defines the setting for the value to be used when resetting by offset or timestamp. */ public static final String SETTING_INGESTION_SOURCE_POINTER_INIT_RESET_VALUE = "index.ingestion_source.pointer.init.reset.value"; - public static final Setting INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING = Setting.longSetting( + public static final Setting INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING = Setting.simpleString( SETTING_INGESTION_SOURCE_POINTER_INIT_RESET_VALUE, - 0, - 0, + "", Property.IndexScope, - Property.Dynamic + Property.Final ); public static final Setting.AffixSetting INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting( @@ -992,8 +993,10 @@ public Version getCreationVersion() { public IngestionSource getIngestionSource() { final String ingestionSourceType = INGESTION_SOURCE_TYPE_SETTING.get(settings); if (ingestionSourceType != null && !(NONE_INGESTION_SOURCE_TYPE.equals(ingestionSourceType))) { - final String pointerInitResetType = INGESTION_SOURCE_POINTER_INIT_RESET_SETTING.get(settings); - final long pointerInitResetValue = INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING.get(settings); + final StreamPoller.ResetState pointerInitResetType = StreamPoller.ResetState.valueOf( + INGESTION_SOURCE_POINTER_INIT_RESET_SETTING.get(settings).toUpperCase(Locale.ROOT) + ); + final String pointerInitResetValue = INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING.get(settings); IngestionSource.PointerInitReset pointerInitReset = new IngestionSource.PointerInitReset( pointerInitResetType, pointerInitResetValue diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java b/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java index 30270a33f0e80..9bb06b5460eb3 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java @@ -9,6 +9,7 @@ package org.opensearch.cluster.metadata; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.indices.pollingingest.StreamPoller; import java.util.Map; import java.util.Objects; @@ -62,19 +63,19 @@ public String toString() { @ExperimentalApi public static class PointerInitReset { - private final String type; - private final long value; + private final StreamPoller.ResetState type; + private final String value; - public PointerInitReset(String policy, long value) { - this.type = policy; + public PointerInitReset(StreamPoller.ResetState type, String value) { + this.type = type; this.value = value; } - public String getType() { + public StreamPoller.ResetState getType() { return type; } - public long getValue() { + public String getValue() { return value; } diff --git a/server/src/main/java/org/opensearch/index/IngestionShardConsumer.java b/server/src/main/java/org/opensearch/index/IngestionShardConsumer.java index 230a8c42540eb..455833e936daf 100644 --- a/server/src/main/java/org/opensearch/index/IngestionShardConsumer.java +++ b/server/src/main/java/org/opensearch/index/IngestionShardConsumer.java @@ -88,7 +88,7 @@ public M getMessage() { * @param timestampMillis the timestamp in milliseconds * @return the ingestion shard pointer corresponding to the given timestamp */ - IngestionShardPointer pointerFromTimestampMillis(long timestampMillis); + IngestionShardPointer pointerFromTimestampMillis(String timestampMillis); /** * Returns an ingestion shard pointer based on the provided offset. @@ -96,7 +96,7 @@ public M getMessage() { * @param offset the offset value * @return the ingestion shard pointer corresponding to the given offset */ - IngestionShardPointer pointerFromOffset(long offset); + IngestionShardPointer pointerFromOffset(String offset); /** * @return the shard id diff --git a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java index 3ea72a6bbad94..b37281b9d1582 100644 --- a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java @@ -65,7 +65,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -172,9 +171,7 @@ public void start() { logger.info("created ingestion consumer for shard [{}]", engineConfig.getShardId()); Map commitData = commitDataAsMap(); - StreamPoller.ResetState resetState = StreamPoller.ResetState.valueOf( - ingestionSource.getPointerInitReset().getType().toUpperCase(Locale.ROOT) - ); + StreamPoller.ResetState resetState = ingestionSource.getPointerInitReset().getType(); IngestionShardPointer startPointer = null; Set persistedPointers = new HashSet<>(); if (commitData.containsKey(StreamPoller.BATCH_START)) { @@ -191,7 +188,7 @@ public void start() { resetState = StreamPoller.ResetState.NONE; } - long resetValue = ingestionSource.getPointerInitReset().getValue(); + String resetValue = ingestionSource.getPointerInitReset().getValue(); streamPoller = new DefaultStreamPoller(startPointer, persistedPointers, ingestionShardConsumer, this, resetState, resetValue); streamPoller.start(); } diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java b/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java index 2312658d4bb1e..4b87587196e4b 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java @@ -52,7 +52,7 @@ public class DefaultStreamPoller implements StreamPoller { private IngestionShardPointer batchStartPointer; private ResetState resetState; - private final long resetValue; + private final String resetValue; private Set persistedPointers; @@ -70,7 +70,7 @@ public DefaultStreamPoller( IngestionShardConsumer consumer, IngestionEngine ingestionEngine, ResetState resetState, - long resetValue + String resetValue ) { this( startPointer, @@ -88,7 +88,7 @@ public DefaultStreamPoller( IngestionShardConsumer consumer, MessageProcessorRunnable processorRunnable, ResetState resetState, - long resetValue + String resetValue ) { this.consumer = Objects.requireNonNull(consumer); this.resetState = resetState; diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java b/server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java index 4ca6658b39d17..5010982991ecc 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java @@ -8,6 +8,7 @@ package org.opensearch.indices.pollingingest; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.IngestionShardPointer; import java.io.Closeable; @@ -63,6 +64,7 @@ enum State { /** * a reset state to indicate how to reset the pointer */ + @ExperimentalApi enum ResetState { EARLIEST, LATEST, diff --git a/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java index 96c49cf92e883..0afe67002517b 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java @@ -8,6 +8,7 @@ package org.opensearch.cluster.metadata; +import org.opensearch.indices.pollingingest.StreamPoller; import org.opensearch.test.OpenSearchTestCase; import java.util.HashMap; @@ -15,7 +16,10 @@ public class IngestionSourceTests extends OpenSearchTestCase { - private final IngestionSource.PointerInitReset pointerInitReset = new IngestionSource.PointerInitReset("pointerInitReset", 1000L); + private final IngestionSource.PointerInitReset pointerInitReset = new IngestionSource.PointerInitReset( + StreamPoller.ResetState.REWIND_BY_OFFSET, + "1000" + ); public void testConstructorAndGetters() { Map params = new HashMap<>(); @@ -23,8 +27,8 @@ public void testConstructorAndGetters() { IngestionSource source = new IngestionSource("type", pointerInitReset, params); assertEquals("type", source.getType()); - assertEquals("pointerInitReset", source.getPointerInitReset().getType()); - assertEquals(1000L, source.getPointerInitReset().getValue()); + assertEquals(StreamPoller.ResetState.REWIND_BY_OFFSET, source.getPointerInitReset().getType()); + assertEquals("1000", source.getPointerInitReset().getValue()); assertEquals(params, source.params()); } @@ -64,7 +68,8 @@ public void testToString() { params.put("key", "value"); IngestionSource source = new IngestionSource("type", pointerInitReset, params); - String expected = "IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='pointerInitReset', value=1000}', params={key=value}}"; + String expected = + "IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='REWIND_BY_OFFSET', value=1000}', params={key=value}}"; assertEquals(expected, source.toString()); } } diff --git a/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java b/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java index fc74c5feb7d51..dfed0240e7547 100644 --- a/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java +++ b/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java @@ -94,13 +94,13 @@ public FakeIngestionShardPointer latestPointer() { } @Override - public IngestionShardPointer pointerFromTimestampMillis(long timestampMillis) { + public IngestionShardPointer pointerFromTimestampMillis(String timestampMillis) { throw new UnsupportedOperationException("Not implemented yet."); } @Override - public IngestionShardPointer pointerFromOffset(long offset) { - return new FakeIngestionShardPointer(offset); + public IngestionShardPointer pointerFromOffset(String offset) { + return new FakeIngestionShardPointer(Long.parseLong(offset)); } @Override diff --git a/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java b/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java index d3896b6d6390f..3b7c799a98b89 100644 --- a/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java +++ b/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java @@ -53,7 +53,7 @@ public void setUp() throws Exception { fakeConsumer, processorRunnable, StreamPoller.ResetState.NONE, - 0L + "" ); } @@ -92,7 +92,7 @@ public void testSkipProcessed() throws InterruptedException { fakeConsumer, processorRunnable, StreamPoller.ResetState.NONE, - 0L + "" ); poller.start(); Thread.sleep(sleepTime); // Allow some time for the poller to run @@ -121,7 +121,7 @@ public void testResetStateEarliest() throws InterruptedException { fakeConsumer, processorRunnable, StreamPoller.ResetState.EARLIEST, - 0L + "" ); poller.start(); @@ -138,7 +138,7 @@ public void testResetStateLatest() throws InterruptedException { fakeConsumer, processorRunnable, StreamPoller.ResetState.LATEST, - 0L + "" ); poller.start(); @@ -156,7 +156,7 @@ public void testResetStateRewindByOffset() throws InterruptedException { fakeConsumer, processorRunnable, StreamPoller.ResetState.REWIND_BY_OFFSET, - 1L + "1" ); poller.start();