From 34a9a0b9ab9eb6d3230d83553e0c5551de0ebbd7 Mon Sep 17 00:00:00 2001 From: xuxiong1 Date: Thu, 13 Feb 2025 23:32:35 +0800 Subject: [PATCH] add tests --- .../plugin/kafka/IngestFromKafkaIT.java | 67 ++++++++++++++++++- .../plugin/kafka/KafkaPartitionConsumer.java | 24 +++++-- .../kafka/KafkaPartitionConsumerTests.java | 16 +++++ .../metadata/IngestionSourceTests.java | 7 +- .../index/engine/FakeIngestionSource.java | 2 +- .../DefaultStreamPollerTests.java | 16 +++++ 6 files changed, 122 insertions(+), 10 deletions(-) diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java index e7d8e36acb302..e668f97ce70d6 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java @@ -102,6 +102,67 @@ public void testKafkaIngestion() { } } + public void testKafkaIngestion_RewindByTimeStamp() { + try { + setupKafka(); + // create an index with ingestion source from kafka + createIndex( + "test_rewind_by_timestamp", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.pointer.init.reset", "rewind_by_timestamp") + // 1739459500000 is the timestamp of the first message + // 1739459800000 is the timestamp of the second message + // by resetting to 1739459600000, only the second message will be ingested + .put("ingestion_source.pointer.init.reset.value", 1739459600000L) + .put("ingestion_source.param.topic", "test") + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .build(), + "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" + ); + + RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0); + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + refresh("test_rewind_by_timestamp"); + SearchResponse response = client().prepareSearch("test_rewind_by_timestamp").setQuery(query).get(); + assertThat(response.getHits().getTotalHits().value(), is(1L)); + }); + } finally { + stopKafka(); + } + } + + public void testKafkaIngestion_RewindByOffset() { + try { + setupKafka(); + // create an index with ingestion source from kafka + createIndex( + "test_rewind_by_offset", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.pointer.init.reset", "rewind_by_offset") + .put("ingestion_source.pointer.init.reset.value", 1L) + .put("ingestion_source.param.topic", "test") + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .build(), + "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" + ); + + RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0); + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + refresh("test_rewind_by_offset"); + SearchResponse response = client().prepareSearch("test_rewind_by_offset").setQuery(query).get(); + assertThat(response.getHits().getTotalHits().value(), is(1L)); + }); + } finally { + stopKafka(); + } + } + private void setupKafka() { kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) // disable topic auto creation @@ -122,10 +183,14 @@ private void prepareKafkaData() { Properties props = new Properties(); props.put("bootstrap.servers", kafka.getBootstrapServers()); Producer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); - producer.send(new ProducerRecord<>(topicName, "null", "{\"_id\":\"1\",\"_source\":{\"name\":\"bob\", \"age\": 24}}")); + producer.send( + new ProducerRecord<>(topicName, null, 1739459500000L, "null", "{\"_id\":\"1\",\"_source\":{\"name\":\"bob\", \"age\": 24}}") + ); producer.send( new ProducerRecord<>( topicName, + null, + 1739459800000L, "null", "{\"_id\":\"2\", \"_op_type:\":\"index\",\"_source\":{\"name\":\"alice\", \"age\": 20}}" ) diff --git a/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java b/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java index 462b120e88704..836824c193e03 100644 --- a/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java +++ b/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeoutException; @@ -143,11 +144,24 @@ public IngestionShardPointer latestPointer() { @Override public IngestionShardPointer pointerFromTimestampMillis(long timestampMillis) { - long offset = AccessController.doPrivileged( - (PrivilegedAction) () -> consumer.offsetsForTimes(Collections.singletonMap(topicPartition, timestampMillis)) - .getOrDefault(topicPartition, new OffsetAndTimestamp(0L, timestampMillis)) - .offset() - ); + long offset = AccessController.doPrivileged((PrivilegedAction) () -> { + Map position = consumer.offsetsForTimes( + Collections.singletonMap(topicPartition, timestampMillis) + ); + if (position == null || position.isEmpty()) { + return -1L; + } + OffsetAndTimestamp offsetAndTimestamp = position.values().iterator().next(); + if (offsetAndTimestamp == null) { + return -1L; + } + return offsetAndTimestamp.offset(); + }); + if (offset < 0) { + // no message found for the timestamp, return the latest offset + logger.warn("No message found for timestamp {}, seeking to the latest", timestampMillis); + return latestPointer(); + } return new KafkaOffset(offset); } diff --git a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java index 96f639366d887..8883086f48889 100644 --- a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java +++ b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java @@ -90,6 +90,22 @@ public void testLatestPointer() { assertEquals(10L, offset.getOffset()); } + public void testPointerFromTimestampMillis() { + TopicPartition topicPartition = new TopicPartition("test-topic", 0); + when(mockConsumer.offsetsForTimes(Collections.singletonMap(topicPartition, 1000L))).thenReturn( + Collections.singletonMap(topicPartition, new org.apache.kafka.clients.consumer.OffsetAndTimestamp(5L, 1000L)) + ); + + KafkaOffset offset = (KafkaOffset) consumer.pointerFromTimestampMillis(1000L); + + assertEquals(5L, offset.getOffset()); + } + + public void testPointerFromOffset() { + KafkaOffset offset = new KafkaOffset(5L); + assertEquals(5L, offset.getOffset()); + } + public void testTopicDoesNotExist() { Map params = new HashMap<>(); params.put("topic", "non-existent-topic"); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java index 4d0e05e9f63a1..96c49cf92e883 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java @@ -15,7 +15,7 @@ public class IngestionSourceTests extends OpenSearchTestCase { - private final IngestionSource.PointerInitReset pointerInitReset = new IngestionSource.PointerInitReset("pointerInitReset", 0); + private final IngestionSource.PointerInitReset pointerInitReset = new IngestionSource.PointerInitReset("pointerInitReset", 1000L); public void testConstructorAndGetters() { Map params = new HashMap<>(); @@ -23,7 +23,8 @@ public void testConstructorAndGetters() { IngestionSource source = new IngestionSource("type", pointerInitReset, params); assertEquals("type", source.getType()); - assertEquals("pointerInitReset", source.getPointerInitReset()); + assertEquals("pointerInitReset", source.getPointerInitReset().getType()); + assertEquals(1000L, source.getPointerInitReset().getValue()); assertEquals(params, source.params()); } @@ -63,7 +64,7 @@ public void testToString() { params.put("key", "value"); IngestionSource source = new IngestionSource("type", pointerInitReset, params); - String expected = "IngestionSource{type='type',pointer_init_reset='pointerInitReset', params={key=value}}"; + String expected = "IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='pointerInitReset', value=1000}', params={key=value}}"; assertEquals(expected, source.toString()); } } diff --git a/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java b/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java index 5372824d0e3c6..fc74c5feb7d51 100644 --- a/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java +++ b/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java @@ -95,7 +95,7 @@ public FakeIngestionShardPointer latestPointer() { @Override public IngestionShardPointer pointerFromTimestampMillis(long timestampMillis) { - return new FakeIngestionShardPointer(0); // Placeholder implementation + throw new UnsupportedOperationException("Not implemented yet."); } @Override diff --git a/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java b/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java index bb85841772d52..d3896b6d6390f 100644 --- a/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java +++ b/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java @@ -149,6 +149,22 @@ public void testResetStateLatest() throws InterruptedException { assertEquals(new FakeIngestionSource.FakeIngestionShardPointer(2), poller.getBatchStartPointer()); } + public void testResetStateRewindByOffset() throws InterruptedException { + poller = new DefaultStreamPoller( + new FakeIngestionSource.FakeIngestionShardPointer(2), + persistedPointers, + fakeConsumer, + processorRunnable, + StreamPoller.ResetState.REWIND_BY_OFFSET, + 1L + ); + + poller.start(); + Thread.sleep(sleepTime); // Allow some time for the poller to run + // 1 message is processed + verify(processor, times(1)).process(any(), any()); + } + public void testStartPollWithoutStart() { try { poller.startPoll();