Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
Signed-off-by: xuxiong1 <[email protected]>
  • Loading branch information
xuxiong1 committed Feb 13, 2025
1 parent 302a3fd commit 2369e4f
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
Expand Down Expand Up @@ -140,6 +141,21 @@ public IngestionShardPointer latestPointer() {
return new KafkaOffset(endOffset);
}

@Override
public IngestionShardPointer pointerFromTimestampMillis(long timestampMillis) {
long offset = AccessController.doPrivileged(
(PrivilegedAction<Long>) () -> consumer.offsetsForTimes(Collections.singletonMap(topicPartition, timestampMillis))
.getOrDefault(topicPartition, new OffsetAndTimestamp(0L, timestampMillis))
.offset()
);
return new KafkaOffset(offset);
}

@Override
public IngestionShardPointer pointerFromOffset(long offset) {
return new KafkaOffset(offset);
}

private synchronized List<ReadResult<KafkaOffset, KafkaMessage>> fetch(long startOffset, long maxMessages, int timeoutMillis) {
if (lastFetchedOffset < 0 || lastFetchedOffset != startOffset - 1) {
logger.info("Seeking to offset {}", startOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,21 +716,59 @@ public void validate(final String value, final Map<Setting<?>, Object> settings)

@Override
public void validate(final String value) {
if (!(value.equalsIgnoreCase(StreamPoller.ResetState.LATEST.name())
|| value.equalsIgnoreCase(StreamPoller.ResetState.EARLIEST.name()))) {
if (!isValidResetState(value)) {
throw new IllegalArgumentException(
"Invalid value for " + SETTING_INGESTION_SOURCE_POINTER_INIT_RESET + " [" + value + "]"
);
}
}

@Override
public void validate(final String value, final Map<Setting<?>, Object> settings) {}
public void validate(final String value, final Map<Setting<?>, Object> settings) {
if (isRewindState(value)) {
// Ensure the reset value setting is provided when rewinding.
final long resetValue = (long) settings.get(INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING);
if (resetValue <= 0) {
throw new IllegalArgumentException(
"Setting " + INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING.getKey() + " should be set and greater than 0"
);
}
}
}

private boolean isValidResetState(String value) {
return StreamPoller.ResetState.LATEST.name().equalsIgnoreCase(value)
|| StreamPoller.ResetState.EARLIEST.name().equalsIgnoreCase(value)
|| isRewindState(value);
}

private boolean isRewindState(String value) {
return StreamPoller.ResetState.REWIND_BY_OFFSET.name().equalsIgnoreCase(value)
|| StreamPoller.ResetState.REWIND_BY_TIMESTAMP.name().equalsIgnoreCase(value);
}

@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = Collections.singletonList(INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING);
return settings.iterator();
}
},
Property.IndexScope,
Property.Dynamic
);

/**
* Defines the setting for the value to be used when resetting by offset or timestamp.
*/
public static final String SETTING_INGESTION_SOURCE_POINTER_INIT_RESET_VALUE = "index.ingestion_source.pointer.init.reset.value";
public static final Setting<Long> INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING = Setting.longSetting(
SETTING_INGESTION_SOURCE_POINTER_INIT_RESET_VALUE,
0,
0,
Property.IndexScope,
Property.Dynamic
);

public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(
"index.ingestion_source.param.",
key -> new Setting<>(key, "", (value) -> {
Expand Down Expand Up @@ -954,7 +992,12 @@ public Version getCreationVersion() {
public IngestionSource getIngestionSource() {
final String ingestionSourceType = INGESTION_SOURCE_TYPE_SETTING.get(settings);
if (ingestionSourceType != null && !(NONE_INGESTION_SOURCE_TYPE.equals(ingestionSourceType))) {
final String pointerInitReset = INGESTION_SOURCE_POINTER_INIT_RESET_SETTING.get(settings);
final String pointerInitResetType = INGESTION_SOURCE_POINTER_INIT_RESET_SETTING.get(settings);
final long pointerInitResetValue = INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING.get(settings);
IngestionSource.PointerInitReset pointerInitReset = new IngestionSource.PointerInitReset(
pointerInitResetType,
pointerInitResetValue
);
final Map<String, Object> ingestionSourceParams = INGESTION_SOURCE_PARAMS_SETTING.getAsMap(settings);
return new IngestionSource(ingestionSourceType, pointerInitReset, ingestionSourceParams);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
@ExperimentalApi
public class IngestionSource {
private String type;
private String pointerInitReset;
private PointerInitReset pointerInitReset;
private Map<String, Object> params;

public IngestionSource(String type, String pointerInitReset, Map<String, Object> params) {
public IngestionSource(String type, PointerInitReset pointerInitReset, Map<String, Object> params) {
this.type = type;
this.pointerInitReset = pointerInitReset;
this.params = params;
Expand All @@ -32,7 +32,7 @@ public String getType() {
return type;
}

public String getPointerInitReset() {
public PointerInitReset getPointerInitReset() {
return pointerInitReset;
}

Expand All @@ -59,4 +59,41 @@ public int hashCode() {
public String toString() {
return "IngestionSource{" + "type='" + type + '\'' + ",pointer_init_reset='" + pointerInitReset + '\'' + ", params=" + params + '}';
}

@ExperimentalApi
public static class PointerInitReset {
private final String type;
private final long value;

public PointerInitReset(String policy, long value) {
this.type = policy;
this.value = value;
}

public String getType() {
return type;
}

public long getValue() {
return value;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PointerInitReset pointerInitReset = (PointerInitReset) o;
return Objects.equals(type, pointerInitReset.type) && Objects.equals(value, pointerInitReset.value);
}

@Override
public int hashCode() {
return Objects.hash(type, value);
}

@Override
public String toString() {
return "PointerInitReset{" + "type='" + type + '\'' + ", value=" + value + '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
// Settings for ingestion source
IndexMetadata.INGESTION_SOURCE_TYPE_SETTING,
IndexMetadata.INGESTION_SOURCE_POINTER_INIT_RESET_SETTING,
IndexMetadata.INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING,
IndexMetadata.INGESTION_SOURCE_PARAMS_SETTING,

// validate that built-in similarities don't get redefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,22 @@ public M getMessage() {
*/
IngestionShardPointer latestPointer();

/**
* Returns an ingestion shard pointer based on the provided timestamp in milliseconds.
*
* @param timestampMillis the timestamp in milliseconds
* @return the ingestion shard pointer corresponding to the given timestamp
*/
IngestionShardPointer pointerFromTimestampMillis(long timestampMillis);

/**
* Returns an ingestion shard pointer based on the provided offset.
*
* @param offset the offset value
* @return the ingestion shard pointer corresponding to the given offset
*/
IngestionShardPointer pointerFromOffset(long offset);

/**
* @return the shard id
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void start() {

Map<String, String> commitData = commitDataAsMap();
StreamPoller.ResetState resetState = StreamPoller.ResetState.valueOf(
ingestionSource.getPointerInitReset().toUpperCase(Locale.ROOT)
ingestionSource.getPointerInitReset().getType().toUpperCase(Locale.ROOT)
);
IngestionShardPointer startPointer = null;
Set<IngestionShardPointer> persistedPointers = new HashSet<>();
Expand All @@ -191,7 +191,8 @@ public void start() {
resetState = StreamPoller.ResetState.NONE;
}

streamPoller = new DefaultStreamPoller(startPointer, persistedPointers, ingestionShardConsumer, this, resetState);
long resetValue = ingestionSource.getPointerInitReset().getValue();
streamPoller = new DefaultStreamPoller(startPointer, persistedPointers, ingestionShardConsumer, this, resetState, resetValue);
streamPoller.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class DefaultStreamPoller implements StreamPoller {
private IngestionShardPointer batchStartPointer;

private ResetState resetState;
private final long resetValue;

private Set<IngestionShardPointer> persistedPointers;

Expand All @@ -68,14 +69,16 @@ public DefaultStreamPoller(
Set<IngestionShardPointer> persistedPointers,
IngestionShardConsumer consumer,
IngestionEngine ingestionEngine,
ResetState resetState
ResetState resetState,
long resetValue
) {
this(
startPointer,
persistedPointers,
consumer,
new MessageProcessorRunnable(new ArrayBlockingQueue<>(100), ingestionEngine),
resetState
resetState,
resetValue
);
}

Expand All @@ -84,10 +87,12 @@ public DefaultStreamPoller(
Set<IngestionShardPointer> persistedPointers,
IngestionShardConsumer consumer,
MessageProcessorRunnable processorRunnable,
ResetState resetState
ResetState resetState,
long resetValue
) {
this.consumer = Objects.requireNonNull(consumer);
this.resetState = resetState;
this.resetValue = resetValue;
batchStartPointer = startPointer;
this.persistedPointers = persistedPointers;
if (!this.persistedPointers.isEmpty()) {
Expand Down Expand Up @@ -151,6 +156,18 @@ protected void startPoll() {
batchStartPointer = consumer.latestPointer();
logger.info("Resetting offset by seeking to latest offset {}", batchStartPointer.asString());
break;
case REWIND_BY_OFFSET:
batchStartPointer = consumer.pointerFromOffset(resetValue);
logger.info("Resetting offset by seeking to offset {}", batchStartPointer.asString());
break;
case REWIND_BY_TIMESTAMP:
batchStartPointer = consumer.pointerFromTimestampMillis(resetValue);
logger.info(
"Resetting offset by seeking to timestamp {}, corresponding offset {}",
resetValue,
batchStartPointer.asString()
);
break;
}
resetState = ResetState.NONE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ enum State {
enum ResetState {
EARLIEST,
LATEST,
REWIND_BY_OFFSET,
REWIND_BY_TIMESTAMP,
NONE,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@

public class IngestionSourceTests extends OpenSearchTestCase {

private final IngestionSource.PointerInitReset pointerInitReset = new IngestionSource.PointerInitReset("pointerInitReset", 0);

public void testConstructorAndGetters() {
Map<String, Object> params = new HashMap<>();
params.put("key", "value");
IngestionSource source = new IngestionSource("type", "pointerInitReset", params);
IngestionSource source = new IngestionSource("type", pointerInitReset, params);

assertEquals("type", source.getType());
assertEquals("pointerInitReset", source.getPointerInitReset());
Expand All @@ -28,38 +30,38 @@ public void testConstructorAndGetters() {
public void testEquals() {
Map<String, Object> params1 = new HashMap<>();
params1.put("key", "value");
IngestionSource source1 = new IngestionSource("type", "pointerInitReset", params1);
IngestionSource source1 = new IngestionSource("type", pointerInitReset, params1);

Map<String, Object> params2 = new HashMap<>();
params2.put("key", "value");
IngestionSource source2 = new IngestionSource("type", "pointerInitReset", params2);
IngestionSource source2 = new IngestionSource("type", pointerInitReset, params2);

assertTrue(source1.equals(source2));
assertTrue(source2.equals(source1));

IngestionSource source3 = new IngestionSource("differentType", "pointerInitReset", params1);
IngestionSource source3 = new IngestionSource("differentType", pointerInitReset, params1);
assertFalse(source1.equals(source3));
}

public void testHashCode() {
Map<String, Object> params1 = new HashMap<>();
params1.put("key", "value");
IngestionSource source1 = new IngestionSource("type", "pointerInitReset", params1);
IngestionSource source1 = new IngestionSource("type", pointerInitReset, params1);

Map<String, Object> params2 = new HashMap<>();
params2.put("key", "value");
IngestionSource source2 = new IngestionSource("type", "pointerInitReset", params2);
IngestionSource source2 = new IngestionSource("type", pointerInitReset, params2);

assertEquals(source1.hashCode(), source2.hashCode());

IngestionSource source3 = new IngestionSource("differentType", "pointerInitReset", params1);
IngestionSource source3 = new IngestionSource("differentType", pointerInitReset, params1);
assertNotEquals(source1.hashCode(), source3.hashCode());
}

public void testToString() {
Map<String, Object> params = new HashMap<>();
params.put("key", "value");
IngestionSource source = new IngestionSource("type", "pointerInitReset", params);
IngestionSource source = new IngestionSource("type", pointerInitReset, params);

String expected = "IngestionSource{type='type',pointer_init_reset='pointerInitReset', params={key=value}}";
assertEquals(expected, source.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ public FakeIngestionShardPointer latestPointer() {
return new FakeIngestionShardPointer(messages.size());
}

@Override
public IngestionShardPointer pointerFromTimestampMillis(long timestampMillis) {
return new FakeIngestionShardPointer(0); // Placeholder implementation
}

@Override
public IngestionShardPointer pointerFromOffset(long offset) {
return new FakeIngestionShardPointer(offset);
}

@Override
public int getShardId() {
return shardId;
Expand Down
Loading

0 comments on commit 2369e4f

Please sign in to comment.