Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
Signed-off-by: xuxiong1 <[email protected]>
  • Loading branch information
xuxiong1 committed Feb 14, 2025
1 parent 5097521 commit bbefff4
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,67 @@ public void testKafkaIngestion() {
}
}

public void testKafkaIngestion_RewindByTimeStamp() {
try {
setupKafka();
// create an index with ingestion source from kafka
createIndex(
"test_rewind_by_timestamp",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "rewind_by_timestamp")
// 1739459500000 is the timestamp of the first message
// 1739459800000 is the timestamp of the second message
// by resetting to 1739459600000, only the second message will be ingested
.put("ingestion_source.pointer.init.reset.value", 1739459600000L)
.put("ingestion_source.param.topic", "test")
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
refresh("test_rewind_by_timestamp");
SearchResponse response = client().prepareSearch("test_rewind_by_timestamp").setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
});
} finally {
stopKafka();
}
}

public void testKafkaIngestion_RewindByOffset() {
try {
setupKafka();
// create an index with ingestion source from kafka
createIndex(
"test_rewind_by_offset",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "rewind_by_offset")
.put("ingestion_source.pointer.init.reset.value", 1L)
.put("ingestion_source.param.topic", "test")
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
refresh("test_rewind_by_offset");
SearchResponse response = client().prepareSearch("test_rewind_by_offset").setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
});
} finally {
stopKafka();
}
}

private void setupKafka() {
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
// disable topic auto creation
Expand All @@ -122,10 +183,14 @@ private void prepareKafkaData() {
Properties props = new Properties();
props.put("bootstrap.servers", kafka.getBootstrapServers());
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.send(new ProducerRecord<>(topicName, "null", "{\"_id\":\"1\",\"_source\":{\"name\":\"bob\", \"age\": 24}}"));
producer.send(
new ProducerRecord<>(topicName, null, 1739459500000L, "null", "{\"_id\":\"1\",\"_source\":{\"name\":\"bob\", \"age\": 24}}")
);
producer.send(
new ProducerRecord<>(
topicName,
null,
1739459800000L,
"null",
"{\"_id\":\"2\", \"_op_type:\":\"index\",\"_source\":{\"name\":\"alice\", \"age\": 20}}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -143,11 +144,24 @@ public IngestionShardPointer latestPointer() {

@Override
public IngestionShardPointer pointerFromTimestampMillis(long timestampMillis) {
long offset = AccessController.doPrivileged(
(PrivilegedAction<Long>) () -> consumer.offsetsForTimes(Collections.singletonMap(topicPartition, timestampMillis))
.getOrDefault(topicPartition, new OffsetAndTimestamp(0L, timestampMillis))
.offset()
);
long offset = AccessController.doPrivileged((PrivilegedAction<Long>) () -> {
Map<TopicPartition, OffsetAndTimestamp> position = consumer.offsetsForTimes(
Collections.singletonMap(topicPartition, timestampMillis)
);
if (position == null || position.isEmpty()) {
return -1L;
}
OffsetAndTimestamp offsetAndTimestamp = position.values().iterator().next();
if (offsetAndTimestamp == null) {
return -1L;
}
return offsetAndTimestamp.offset();
});
if (offset < 0) {
// no message found for the timestamp, return the latest offset
logger.warn("No message found for timestamp {}, seeking to the latest", timestampMillis);
return latestPointer();
}
return new KafkaOffset(offset);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,22 @@ public void testLatestPointer() {
assertEquals(10L, offset.getOffset());
}

public void testPointerFromTimestampMillis() {
TopicPartition topicPartition = new TopicPartition("test-topic", 0);
when(mockConsumer.offsetsForTimes(Collections.singletonMap(topicPartition, 1000L))).thenReturn(
Collections.singletonMap(topicPartition, new org.apache.kafka.clients.consumer.OffsetAndTimestamp(5L, 1000L))
);

KafkaOffset offset = (KafkaOffset) consumer.pointerFromTimestampMillis(1000L);

assertEquals(5L, offset.getOffset());
}

public void testPointerFromOffset() {
KafkaOffset offset = new KafkaOffset(5L);
assertEquals(5L, offset.getOffset());
}

public void testTopicDoesNotExist() {
Map<String, Object> params = new HashMap<>();
params.put("topic", "non-existent-topic");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@

public class IngestionSourceTests extends OpenSearchTestCase {

private final IngestionSource.PointerInitReset pointerInitReset = new IngestionSource.PointerInitReset("pointerInitReset", 0);
private final IngestionSource.PointerInitReset pointerInitReset = new IngestionSource.PointerInitReset("pointerInitReset", 1000L);

public void testConstructorAndGetters() {
Map<String, Object> params = new HashMap<>();
params.put("key", "value");
IngestionSource source = new IngestionSource("type", pointerInitReset, params);

assertEquals("type", source.getType());
assertEquals("pointerInitReset", source.getPointerInitReset());
assertEquals("pointerInitReset", source.getPointerInitReset().getType());
assertEquals(1000L, source.getPointerInitReset().getValue());
assertEquals(params, source.params());
}

Expand Down Expand Up @@ -63,7 +64,7 @@ public void testToString() {
params.put("key", "value");
IngestionSource source = new IngestionSource("type", pointerInitReset, params);

String expected = "IngestionSource{type='type',pointer_init_reset='pointerInitReset', params={key=value}}";
String expected = "IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='pointerInitReset', value=1000}', params={key=value}}";
assertEquals(expected, source.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public FakeIngestionShardPointer latestPointer() {

@Override
public IngestionShardPointer pointerFromTimestampMillis(long timestampMillis) {
return new FakeIngestionShardPointer(0); // Placeholder implementation
throw new UnsupportedOperationException("Not implemented yet.");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,22 @@ public void testResetStateLatest() throws InterruptedException {
assertEquals(new FakeIngestionSource.FakeIngestionShardPointer(2), poller.getBatchStartPointer());
}

public void testResetStateRewindByOffset() throws InterruptedException {
poller = new DefaultStreamPoller(
new FakeIngestionSource.FakeIngestionShardPointer(2),
persistedPointers,
fakeConsumer,
processorRunnable,
StreamPoller.ResetState.REWIND_BY_OFFSET,
1L
);

poller.start();
Thread.sleep(sleepTime); // Allow some time for the poller to run
// 1 message is processed
verify(processor, times(1)).process(any(), any());
}

public void testStartPollWithoutStart() {
try {
poller.startPoll();
Expand Down

0 comments on commit bbefff4

Please sign in to comment.