diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java index e7d8e36acb302..d6b099c6b24d8 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java @@ -102,6 +102,69 @@ public void testKafkaIngestion() { } } + public void testKafkaIngestion_RewindByTimeStamp() { + try { + setupKafka(); + // create an index with ingestion source from kafka + createIndex( + "test_rewind_by_timestamp", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.pointer.init.reset", "rewind_by_timestamp") + // 1739459500000 is the timestamp of the first message + // 1739459800000 is the timestamp of the second message + // by resetting to 1739459600000, only the second message will be ingested + .put("ingestion_source.pointer.init.reset.value", "1739459600000") + .put("ingestion_source.param.topic", "test") + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .put("ingestion_source.param.auto.offset.reset", "latest") + .build(), + "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" + ); + + RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0); + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + refresh("test_rewind_by_timestamp"); + SearchResponse response = client().prepareSearch("test_rewind_by_timestamp").setQuery(query).get(); + assertThat(response.getHits().getTotalHits().value(), is(1L)); + }); + } finally { + stopKafka(); + } + } + + public void testKafkaIngestion_RewindByOffset() { + try { + setupKafka(); + // create an index with ingestion source from kafka + createIndex( + "test_rewind_by_offset", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.pointer.init.reset", "rewind_by_offset") + .put("ingestion_source.pointer.init.reset.value", "1") + .put("ingestion_source.param.topic", "test") + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .put("ingestion_source.param.auto.offset.reset", "latest") + .build(), + "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" + ); + + RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0); + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + refresh("test_rewind_by_offset"); + SearchResponse response = client().prepareSearch("test_rewind_by_offset").setQuery(query).get(); + assertThat(response.getHits().getTotalHits().value(), is(1L)); + }); + } finally { + stopKafka(); + } + } + private void setupKafka() { kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) // disable topic auto creation @@ -122,10 +185,14 @@ private void prepareKafkaData() { Properties props = new Properties(); props.put("bootstrap.servers", kafka.getBootstrapServers()); Producer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); - producer.send(new ProducerRecord<>(topicName, "null", "{\"_id\":\"1\",\"_source\":{\"name\":\"bob\", \"age\": 24}}")); + producer.send( + new ProducerRecord<>(topicName, null, 1739459500000L, "null", "{\"_id\":\"1\",\"_source\":{\"name\":\"bob\", \"age\": 24}}") + ); producer.send( new ProducerRecord<>( topicName, + null, + 1739459800000L, "null", "{\"_id\":\"2\", \"_op_type:\":\"index\",\"_source\":{\"name\":\"alice\", \"age\": 20}}" ) diff --git a/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java b/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java index a20e52a06eecd..d8ad7a2bd1749 100644 --- a/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java +++ b/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java @@ -9,9 +9,12 @@ package org.opensearch.plugin.kafka; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; @@ -27,6 +30,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeoutException; @@ -47,6 +51,7 @@ public class KafkaPartitionConsumer implements IngestionShardConsumer consumer) { this.clientId = clientId; this.consumer = consumer; + this.config = config; String topic = config.getTopic(); List partitionInfos = AccessController.doPrivileged( (PrivilegedAction>) () -> consumer.partitionsFor(topic, Duration.ofMillis(timeoutMillis)) @@ -93,6 +99,9 @@ protected static Consumer createConsumer(String clientId, KafkaS Properties consumerProp = new Properties(); consumerProp.put("bootstrap.servers", config.getBootstrapServers()); consumerProp.put("client.id", clientId); + if (config.getAutoOffsetResetConfig() != null && !config.getAutoOffsetResetConfig().isEmpty()) { + consumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, config.getAutoOffsetResetConfig()); + } // TODO: why Class org.apache.kafka.common.serialization.StringDeserializer could not be found if set the deserializer as prop? // consumerProp.put("key.deserializer", // "org.apache.kafka.common.serialization.StringDeserializer"); @@ -140,6 +149,44 @@ public IngestionShardPointer latestPointer() { return new KafkaOffset(endOffset); } + @Override + public IngestionShardPointer pointerFromTimestampMillis(String timestampMillisStr) { + long offset = AccessController.doPrivileged((PrivilegedAction) () -> { + long timestampMillis = Long.parseLong(timestampMillisStr); + Map position = consumer.offsetsForTimes( + Collections.singletonMap(topicPartition, timestampMillis) + ); + if (position == null || position.isEmpty()) { + return -1L; + } + OffsetAndTimestamp offsetAndTimestamp = position.values().iterator().next(); + if (offsetAndTimestamp == null) { + return -1L; + } + return offsetAndTimestamp.offset(); + }); + if (offset < 0) { + logger.warn("No message found for timestamp {}, fall back to auto.offset.reset policy", timestampMillisStr); + String autoOffsetResetConfig = config.getAutoOffsetResetConfig(); + if (OffsetResetStrategy.EARLIEST.toString().equals(autoOffsetResetConfig)) { + logger.warn("The auto.offset.reset is set to earliest, seek to earliest pointer"); + return earliestPointer(); + } else if (OffsetResetStrategy.LATEST.toString().equals(autoOffsetResetConfig)) { + logger.warn("The auto.offset.reset is set to latest, seek to latest pointer"); + return latestPointer(); + } else { + throw new IllegalArgumentException("No message found for timestamp " + timestampMillisStr); + } + } + return new KafkaOffset(offset); + } + + @Override + public IngestionShardPointer pointerFromOffset(String offset) { + long offsetValue = Long.parseLong(offset); + return new KafkaOffset(offsetValue); + } + private synchronized List> fetch(long startOffset, long maxMessages, int timeoutMillis) { if (lastFetchedOffset < 0 || lastFetchedOffset != startOffset - 1) { logger.info("Seeking to offset {}", startOffset); diff --git a/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java b/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java index 722883d353ebf..48963572f6461 100644 --- a/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java +++ b/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java @@ -18,9 +18,11 @@ public class KafkaSourceConfig { private final String PROP_TOPIC = "topic"; private final String PROP_BOOTSTRAP_SERVERS = "bootstrap_servers"; + private final String PROP_AUTO_OFFSET_RESET = "auto.offset.reset"; private final String topic; private final String bootstrapServers; + private final String autoOffsetResetConfig; /** * Constructor @@ -29,6 +31,7 @@ public class KafkaSourceConfig { public KafkaSourceConfig(Map params) { this.topic = ConfigurationUtils.readStringProperty(params, PROP_TOPIC); this.bootstrapServers = ConfigurationUtils.readStringProperty(params, PROP_BOOTSTRAP_SERVERS); + this.autoOffsetResetConfig = ConfigurationUtils.readOptionalStringProperty(params, PROP_AUTO_OFFSET_RESET); } /** @@ -47,4 +50,13 @@ public String getTopic() { public String getBootstrapServers() { return bootstrapServers; } + + /** + * Get the auto offset reset configuration + * + * @return the auto offset reset configuration + */ + public String getAutoOffsetResetConfig() { + return autoOffsetResetConfig; + } } diff --git a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java index 96f639366d887..1f35d31e76861 100644 --- a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java +++ b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java @@ -90,6 +90,22 @@ public void testLatestPointer() { assertEquals(10L, offset.getOffset()); } + public void testPointerFromTimestampMillis() { + TopicPartition topicPartition = new TopicPartition("test-topic", 0); + when(mockConsumer.offsetsForTimes(Collections.singletonMap(topicPartition, 1000L))).thenReturn( + Collections.singletonMap(topicPartition, new org.apache.kafka.clients.consumer.OffsetAndTimestamp(5L, 1000L)) + ); + + KafkaOffset offset = (KafkaOffset) consumer.pointerFromTimestampMillis("1000"); + + assertEquals(5L, offset.getOffset()); + } + + public void testPointerFromOffset() { + KafkaOffset offset = new KafkaOffset(5L); + assertEquals(5L, offset.getOffset()); + } + public void testTopicDoesNotExist() { Map params = new HashMap<>(); params.put("topic", "non-existent-topic"); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index d50192f106cfe..d4fcadc4ac56d 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -716,8 +716,7 @@ public void validate(final String value, final Map, Object> settings) @Override public void validate(final String value) { - if (!(value.equalsIgnoreCase(StreamPoller.ResetState.LATEST.name()) - || value.equalsIgnoreCase(StreamPoller.ResetState.EARLIEST.name()))) { + if (!isValidResetState(value)) { throw new IllegalArgumentException( "Invalid value for " + SETTING_INGESTION_SOURCE_POINTER_INIT_RESET + " [" + value + "]" ); @@ -725,10 +724,50 @@ public void validate(final String value) { } @Override - public void validate(final String value, final Map, Object> settings) {} + public void validate(final String value, final Map, Object> settings) { + if (isRewindState(value)) { + // Ensure the reset value setting is provided when rewinding. + final String resetValue = (String) settings.get(INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING); + if (resetValue == null || resetValue.isEmpty()) { + throw new IllegalArgumentException( + "Setting " + + INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING.getKey() + + " should be set when REWIND_BY_OFFSET or REWIND_BY_TIMESTAMP" + ); + } + } + } + + private boolean isValidResetState(String value) { + return StreamPoller.ResetState.LATEST.name().equalsIgnoreCase(value) + || StreamPoller.ResetState.EARLIEST.name().equalsIgnoreCase(value) + || isRewindState(value); + } + + private boolean isRewindState(String value) { + return StreamPoller.ResetState.REWIND_BY_OFFSET.name().equalsIgnoreCase(value) + || StreamPoller.ResetState.REWIND_BY_TIMESTAMP.name().equalsIgnoreCase(value); + } + + @Override + public Iterator> settings() { + final List> settings = Collections.singletonList(INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING); + return settings.iterator(); + } }, Property.IndexScope, - Property.Dynamic + Property.Final + ); + + /** + * Defines the setting for the value to be used when resetting by offset or timestamp. + */ + public static final String SETTING_INGESTION_SOURCE_POINTER_INIT_RESET_VALUE = "index.ingestion_source.pointer.init.reset.value"; + public static final Setting INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING = Setting.simpleString( + SETTING_INGESTION_SOURCE_POINTER_INIT_RESET_VALUE, + "", + Property.IndexScope, + Property.Final ); public static final Setting.AffixSetting INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting( @@ -954,7 +993,14 @@ public Version getCreationVersion() { public IngestionSource getIngestionSource() { final String ingestionSourceType = INGESTION_SOURCE_TYPE_SETTING.get(settings); if (ingestionSourceType != null && !(NONE_INGESTION_SOURCE_TYPE.equals(ingestionSourceType))) { - final String pointerInitReset = INGESTION_SOURCE_POINTER_INIT_RESET_SETTING.get(settings); + final StreamPoller.ResetState pointerInitResetType = StreamPoller.ResetState.valueOf( + INGESTION_SOURCE_POINTER_INIT_RESET_SETTING.get(settings).toUpperCase(Locale.ROOT) + ); + final String pointerInitResetValue = INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING.get(settings); + IngestionSource.PointerInitReset pointerInitReset = new IngestionSource.PointerInitReset( + pointerInitResetType, + pointerInitResetValue + ); final Map ingestionSourceParams = INGESTION_SOURCE_PARAMS_SETTING.getAsMap(settings); return new IngestionSource(ingestionSourceType, pointerInitReset, ingestionSourceParams); } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java b/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java index 583114d9ecbd2..9bb06b5460eb3 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java @@ -9,6 +9,7 @@ package org.opensearch.cluster.metadata; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.indices.pollingingest.StreamPoller; import java.util.Map; import java.util.Objects; @@ -19,10 +20,10 @@ @ExperimentalApi public class IngestionSource { private String type; - private String pointerInitReset; + private PointerInitReset pointerInitReset; private Map params; - public IngestionSource(String type, String pointerInitReset, Map params) { + public IngestionSource(String type, PointerInitReset pointerInitReset, Map params) { this.type = type; this.pointerInitReset = pointerInitReset; this.params = params; @@ -32,7 +33,7 @@ public String getType() { return type; } - public String getPointerInitReset() { + public PointerInitReset getPointerInitReset() { return pointerInitReset; } @@ -59,4 +60,41 @@ public int hashCode() { public String toString() { return "IngestionSource{" + "type='" + type + '\'' + ",pointer_init_reset='" + pointerInitReset + '\'' + ", params=" + params + '}'; } + + @ExperimentalApi + public static class PointerInitReset { + private final StreamPoller.ResetState type; + private final String value; + + public PointerInitReset(StreamPoller.ResetState type, String value) { + this.type = type; + this.value = value; + } + + public StreamPoller.ResetState getType() { + return type; + } + + public String getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PointerInitReset pointerInitReset = (PointerInitReset) o; + return Objects.equals(type, pointerInitReset.type) && Objects.equals(value, pointerInitReset.value); + } + + @Override + public int hashCode() { + return Objects.hash(type, value); + } + + @Override + public String toString() { + return "PointerInitReset{" + "type='" + type + '\'' + ", value=" + value + '}'; + } + } } diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 0e21104fb6426..946d7fe734deb 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -263,6 +263,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { // Settings for ingestion source IndexMetadata.INGESTION_SOURCE_TYPE_SETTING, IndexMetadata.INGESTION_SOURCE_POINTER_INIT_RESET_SETTING, + IndexMetadata.INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING, IndexMetadata.INGESTION_SOURCE_PARAMS_SETTING, // validate that built-in similarities don't get redefined diff --git a/server/src/main/java/org/opensearch/index/IngestionShardConsumer.java b/server/src/main/java/org/opensearch/index/IngestionShardConsumer.java index 02a9f5a18ebb1..455833e936daf 100644 --- a/server/src/main/java/org/opensearch/index/IngestionShardConsumer.java +++ b/server/src/main/java/org/opensearch/index/IngestionShardConsumer.java @@ -82,6 +82,22 @@ public M getMessage() { */ IngestionShardPointer latestPointer(); + /** + * Returns an ingestion shard pointer based on the provided timestamp in milliseconds. + * + * @param timestampMillis the timestamp in milliseconds + * @return the ingestion shard pointer corresponding to the given timestamp + */ + IngestionShardPointer pointerFromTimestampMillis(String timestampMillis); + + /** + * Returns an ingestion shard pointer based on the provided offset. + * + * @param offset the offset value + * @return the ingestion shard pointer corresponding to the given offset + */ + IngestionShardPointer pointerFromOffset(String offset); + /** * @return the shard id */ diff --git a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java index 58c6371d51c0a..b37281b9d1582 100644 --- a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java @@ -65,7 +65,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -172,9 +171,7 @@ public void start() { logger.info("created ingestion consumer for shard [{}]", engineConfig.getShardId()); Map commitData = commitDataAsMap(); - StreamPoller.ResetState resetState = StreamPoller.ResetState.valueOf( - ingestionSource.getPointerInitReset().toUpperCase(Locale.ROOT) - ); + StreamPoller.ResetState resetState = ingestionSource.getPointerInitReset().getType(); IngestionShardPointer startPointer = null; Set persistedPointers = new HashSet<>(); if (commitData.containsKey(StreamPoller.BATCH_START)) { @@ -191,7 +188,8 @@ public void start() { resetState = StreamPoller.ResetState.NONE; } - streamPoller = new DefaultStreamPoller(startPointer, persistedPointers, ingestionShardConsumer, this, resetState); + String resetValue = ingestionSource.getPointerInitReset().getValue(); + streamPoller = new DefaultStreamPoller(startPointer, persistedPointers, ingestionShardConsumer, this, resetState, resetValue); streamPoller.start(); } diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java b/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java index b5c1db999544a..4b87587196e4b 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java @@ -52,6 +52,7 @@ public class DefaultStreamPoller implements StreamPoller { private IngestionShardPointer batchStartPointer; private ResetState resetState; + private final String resetValue; private Set persistedPointers; @@ -68,14 +69,16 @@ public DefaultStreamPoller( Set persistedPointers, IngestionShardConsumer consumer, IngestionEngine ingestionEngine, - ResetState resetState + ResetState resetState, + String resetValue ) { this( startPointer, persistedPointers, consumer, new MessageProcessorRunnable(new ArrayBlockingQueue<>(100), ingestionEngine), - resetState + resetState, + resetValue ); } @@ -84,10 +87,12 @@ public DefaultStreamPoller( Set persistedPointers, IngestionShardConsumer consumer, MessageProcessorRunnable processorRunnable, - ResetState resetState + ResetState resetState, + String resetValue ) { this.consumer = Objects.requireNonNull(consumer); this.resetState = resetState; + this.resetValue = resetValue; batchStartPointer = startPointer; this.persistedPointers = persistedPointers; if (!this.persistedPointers.isEmpty()) { @@ -151,6 +156,18 @@ protected void startPoll() { batchStartPointer = consumer.latestPointer(); logger.info("Resetting offset by seeking to latest offset {}", batchStartPointer.asString()); break; + case REWIND_BY_OFFSET: + batchStartPointer = consumer.pointerFromOffset(resetValue); + logger.info("Resetting offset by seeking to offset {}", batchStartPointer.asString()); + break; + case REWIND_BY_TIMESTAMP: + batchStartPointer = consumer.pointerFromTimestampMillis(resetValue); + logger.info( + "Resetting offset by seeking to timestamp {}, corresponding offset {}", + resetValue, + batchStartPointer.asString() + ); + break; } resetState = ResetState.NONE; } diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java b/server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java index f674f6dc55c85..5010982991ecc 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java @@ -8,6 +8,7 @@ package org.opensearch.indices.pollingingest; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.IngestionShardPointer; import java.io.Closeable; @@ -63,9 +64,12 @@ enum State { /** * a reset state to indicate how to reset the pointer */ + @ExperimentalApi enum ResetState { EARLIEST, LATEST, + REWIND_BY_OFFSET, + REWIND_BY_TIMESTAMP, NONE, } } diff --git a/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java index f67d13e54e608..0afe67002517b 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java @@ -8,6 +8,7 @@ package org.opensearch.cluster.metadata; +import org.opensearch.indices.pollingingest.StreamPoller; import org.opensearch.test.OpenSearchTestCase; import java.util.HashMap; @@ -15,53 +16,60 @@ public class IngestionSourceTests extends OpenSearchTestCase { + private final IngestionSource.PointerInitReset pointerInitReset = new IngestionSource.PointerInitReset( + StreamPoller.ResetState.REWIND_BY_OFFSET, + "1000" + ); + public void testConstructorAndGetters() { Map params = new HashMap<>(); params.put("key", "value"); - IngestionSource source = new IngestionSource("type", "pointerInitReset", params); + IngestionSource source = new IngestionSource("type", pointerInitReset, params); assertEquals("type", source.getType()); - assertEquals("pointerInitReset", source.getPointerInitReset()); + assertEquals(StreamPoller.ResetState.REWIND_BY_OFFSET, source.getPointerInitReset().getType()); + assertEquals("1000", source.getPointerInitReset().getValue()); assertEquals(params, source.params()); } public void testEquals() { Map params1 = new HashMap<>(); params1.put("key", "value"); - IngestionSource source1 = new IngestionSource("type", "pointerInitReset", params1); + IngestionSource source1 = new IngestionSource("type", pointerInitReset, params1); Map params2 = new HashMap<>(); params2.put("key", "value"); - IngestionSource source2 = new IngestionSource("type", "pointerInitReset", params2); + IngestionSource source2 = new IngestionSource("type", pointerInitReset, params2); assertTrue(source1.equals(source2)); assertTrue(source2.equals(source1)); - IngestionSource source3 = new IngestionSource("differentType", "pointerInitReset", params1); + IngestionSource source3 = new IngestionSource("differentType", pointerInitReset, params1); assertFalse(source1.equals(source3)); } public void testHashCode() { Map params1 = new HashMap<>(); params1.put("key", "value"); - IngestionSource source1 = new IngestionSource("type", "pointerInitReset", params1); + IngestionSource source1 = new IngestionSource("type", pointerInitReset, params1); Map params2 = new HashMap<>(); params2.put("key", "value"); - IngestionSource source2 = new IngestionSource("type", "pointerInitReset", params2); + IngestionSource source2 = new IngestionSource("type", pointerInitReset, params2); assertEquals(source1.hashCode(), source2.hashCode()); - IngestionSource source3 = new IngestionSource("differentType", "pointerInitReset", params1); + IngestionSource source3 = new IngestionSource("differentType", pointerInitReset, params1); assertNotEquals(source1.hashCode(), source3.hashCode()); } public void testToString() { Map params = new HashMap<>(); params.put("key", "value"); - IngestionSource source = new IngestionSource("type", "pointerInitReset", params); + IngestionSource source = new IngestionSource("type", pointerInitReset, params); - String expected = "IngestionSource{type='type',pointer_init_reset='pointerInitReset', params={key=value}}"; + String expected = + "IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='REWIND_BY_OFFSET', value=1000}', params={key=value}}"; assertEquals(expected, source.toString()); } } diff --git a/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java b/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java index de03dcd313c29..dfed0240e7547 100644 --- a/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java +++ b/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java @@ -93,6 +93,16 @@ public FakeIngestionShardPointer latestPointer() { return new FakeIngestionShardPointer(messages.size()); } + @Override + public IngestionShardPointer pointerFromTimestampMillis(String timestampMillis) { + throw new UnsupportedOperationException("Not implemented yet."); + } + + @Override + public IngestionShardPointer pointerFromOffset(String offset) { + return new FakeIngestionShardPointer(Long.parseLong(offset)); + } + @Override public int getShardId() { return shardId; diff --git a/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java b/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java index 1a98f65d04f7c..3b7c799a98b89 100644 --- a/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java +++ b/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java @@ -52,7 +52,8 @@ public void setUp() throws Exception { persistedPointers, fakeConsumer, processorRunnable, - StreamPoller.ResetState.NONE + StreamPoller.ResetState.NONE, + "" ); } @@ -90,7 +91,8 @@ public void testSkipProcessed() throws InterruptedException { persistedPointers, fakeConsumer, processorRunnable, - StreamPoller.ResetState.NONE + StreamPoller.ResetState.NONE, + "" ); poller.start(); Thread.sleep(sleepTime); // Allow some time for the poller to run @@ -118,7 +120,8 @@ public void testResetStateEarliest() throws InterruptedException { persistedPointers, fakeConsumer, processorRunnable, - StreamPoller.ResetState.EARLIEST + StreamPoller.ResetState.EARLIEST, + "" ); poller.start(); @@ -134,7 +137,8 @@ public void testResetStateLatest() throws InterruptedException { persistedPointers, fakeConsumer, processorRunnable, - StreamPoller.ResetState.LATEST + StreamPoller.ResetState.LATEST, + "" ); poller.start(); @@ -145,6 +149,22 @@ public void testResetStateLatest() throws InterruptedException { assertEquals(new FakeIngestionSource.FakeIngestionShardPointer(2), poller.getBatchStartPointer()); } + public void testResetStateRewindByOffset() throws InterruptedException { + poller = new DefaultStreamPoller( + new FakeIngestionSource.FakeIngestionShardPointer(2), + persistedPointers, + fakeConsumer, + processorRunnable, + StreamPoller.ResetState.REWIND_BY_OFFSET, + "1" + ); + + poller.start(); + Thread.sleep(sleepTime); // Allow some time for the poller to run + // 1 message is processed + verify(processor, times(1)).process(any(), any()); + } + public void testStartPollWithoutStart() { try { poller.startPoll();