Skip to content

Commit

Permalink
Fix indexOutOfBoundException
Browse files Browse the repository at this point in the history
  • Loading branch information
sudeshwasnik committed Dec 21, 2024
1 parent ecc4fe4 commit 917ef41
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -654,10 +654,10 @@ public InsertRowsResponse get() throws Throwable {
"Invoking insertRows API for channel:{}, streamingBuffer:{}",
this.channel.getFullyQualifiedName(),
this.insertRowsStreamingBuffer);
Pair<List<Map<String, Object>>, List<Long>> recordsAndOffsets =
Pair<List<Map<String, Object>>, List<SinkRecord>> recordsAndOriginalSinkRecords =
this.insertRowsStreamingBuffer.getData();
List<Map<String, Object>> records = recordsAndOffsets.getKey();
List<Long> offsets = recordsAndOffsets.getValue();
List<Map<String, Object>> records = recordsAndOriginalSinkRecords.getKey();
List<SinkRecord> originalSinkRecords = recordsAndOriginalSinkRecords.getValue();
InsertValidationResponse finalResponse = new InsertValidationResponse();
boolean needToResetOffset = false;
if (!enableSchemaEvolution) {
Expand All @@ -671,14 +671,18 @@ public InsertRowsResponse get() throws Throwable {
// For schema evolution, we need to call the insertRows API row by row in order to
// preserve the original order, for anything after the first schema mismatch error we will
// retry after the evolution
InsertValidationResponse response =
this.channel.insertRow(records.get(idx), Long.toString(offsets.get(idx)));
SinkRecord originalSinkRecord = originalSinkRecords.get(idx);
InsertValidationResponse response = this.channel.insertRow(
records.get(idx), Long.toString(originalSinkRecord.kafkaOffset())
);
if (response.hasErrors()) {
InsertValidationResponse.InsertError insertError = response.getInsertErrors().get(0);
List<String> extraColNames = insertError.getExtraColNames();
List<String> nonNullableColumns = insertError.getMissingNotNullColNames();
long originalSinkRecordIdx =
offsets.get(idx) - this.insertRowsStreamingBuffer.getFirstOffset();

// TODO : originalSinkRecordIdx can be replaced by idx
long originalSinkRecordIdx = originalSinkRecord.kafkaOffset()
- this.insertRowsStreamingBuffer.getFirstOffset();
if (extraColNames == null && nonNullableColumns == null) {
InsertValidationResponse.InsertError newInsertError =
new InsertValidationResponse.InsertError(
Expand All @@ -694,7 +698,8 @@ public InsertRowsResponse get() throws Throwable {
this.channel.getTableName(),
nonNullableColumns,
extraColNames,
this.insertRowsStreamingBuffer.getSinkRecord(originalSinkRecordIdx));
originalSinkRecord
);
// Offset reset needed since it's possible that we successfully ingested partial batch
needToResetOffset = true;
break;
Expand Down Expand Up @@ -1251,7 +1256,7 @@ protected long getApproxSizeOfRecordInBytes(SinkRecord kafkaSinkRecord) {
*/
@VisibleForTesting
protected class StreamingBuffer
extends PartitionBuffer<Pair<List<Map<String, Object>>, List<Long>>> {
extends PartitionBuffer<Pair<List<Map<String, Object>>, List<SinkRecord>>> {
// Records coming from Kafka
private final List<SinkRecord> sinkRecords;

Expand Down Expand Up @@ -1285,9 +1290,9 @@ public void insert(SinkRecord kafkaSinkRecord) {
* @return A pair that contains the records and their corresponding offsets
*/
@Override
public Pair<List<Map<String, Object>>, List<Long>> getData() {
public Pair<List<Map<String, Object>>, List<SinkRecord>> getData() {
final List<Map<String, Object>> records = new ArrayList<>();
final List<Long> offsets = new ArrayList<>();
final List<SinkRecord> filteredOriginalSinkRecords = new ArrayList<>();
for (SinkRecord kafkaSinkRecord : sinkRecords) {
SinkRecord snowflakeRecord = getSnowflakeSinkRecordFromKafkaRecord(kafkaSinkRecord);

Expand All @@ -1313,7 +1318,7 @@ public Pair<List<Map<String, Object>>, List<Long>> getData() {
Map<String, Object> tableRow =
recordService.getProcessedRecordForStreamingIngest(snowflakeRecord);
records.add(tableRow);
offsets.add(snowflakeRecord.kafkaOffset());
filteredOriginalSinkRecords.add(kafkaSinkRecord);
} catch (JsonProcessingException e) {
LOGGER.warn(
"Record has JsonProcessingException offset:{}, topic:{}",
Expand All @@ -1329,7 +1334,7 @@ public Pair<List<Map<String, Object>>, List<Long>> getData() {
getBufferSizeBytes(),
getFirstOffset(),
getLastOffset());
return new Pair<>(records, offsets);
return new Pair<>(records, filteredOriginalSinkRecords);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,17 +649,41 @@ public static List<SinkRecord> createNativeJsonSinkRecords(
final long startOffset,
final long noOfRecords,
final String topicName,
final int partitionNo) {
ArrayList<SinkRecord> records = new ArrayList<>();
final int partitionNo
) {
return createJsonRecords(
startOffset, noOfRecords, topicName, partitionNo,
TestUtils.JSON_WITH_SCHEMA.getBytes(StandardCharsets.UTF_8),
Collections.singletonMap("schemas.enable", Boolean.toString(true))
);
}

/* Generate (noOfRecords - startOffset) blank records for a given topic and partition. */
public static List<SinkRecord> createBlankJsonSinkRecords(
final long startOffset,
final long noOfRecords,
final String topicName,
final int partitionNo
) {
return createJsonRecords(
startOffset, noOfRecords, topicName, partitionNo, null,
Collections.singletonMap("schemas.enable", Boolean.toString(false))
);
}

private static List<SinkRecord> createJsonRecords(
final long startOffset,
final long noOfRecords,
final String topicName,
final int partitionNo,
byte[] value,
Map<String, String> converterConfig
) {
JsonConverter converter = new JsonConverter();
HashMap<String, String> converterConfig = new HashMap<>();
converterConfig.put("schemas.enable", "true");
converter.configure(converterConfig, false);
SchemaAndValue schemaInputValue =
converter.toConnectData(
"test", TestUtils.JSON_WITH_SCHEMA.getBytes(StandardCharsets.UTF_8));
SchemaAndValue schemaInputValue = converter.toConnectData("test", value);

ArrayList<SinkRecord> records = new ArrayList<>();
for (long i = startOffset; i < startOffset + noOfRecords; ++i) {
records.add(
new SinkRecord(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,30 @@ public void testChannelMigrateOffsetTokenSystemFunction_NullOffsetTokenInFormatV
service.closeAll();
}

@Test
public void testInsertRowsWithGaps_schematization_largeBufferSize_largeGap() throws Exception {
SnowflakeSinkService service = setupSnowflakeService(true, 4);

// insert blank records that do not evolve schema: 0, 1
List<SinkRecord> records = TestUtils.createBlankJsonSinkRecords(0, 2, topic, PARTITION);

// add records with change in schema with extreme gap in offsets.
records.addAll(TestUtils.createNativeJsonSinkRecords(300, 2, topic, PARTITION));

// records' offsets -> [0, 1, 300, 301]
service.insert(records);
// With schematization, we need to resend a new batch should succeed even if there is an offset
// gap from the previous committed offset
service.insert(records);

TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == 302, 10, 10);

assert TestUtils.tableSize(testTableName) == 4
: "expected: " + 4 + " actual: " + TestUtils.tableSize(testTableName);
service.closeAll();
}

@Test
public void testInsertRowsWithGaps_schematization() throws Exception {
testInsertRowsWithGaps(true);
Expand All @@ -720,41 +744,10 @@ public void testInsertRowsWithGaps_nonSchematization() throws Exception {
}

private void testInsertRowsWithGaps(boolean withSchematization) throws Exception {
// setup
Map<String, String> config = TestUtils.getConfForStreaming();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG,
Boolean.toString(withSchematization));

// create tpChannel
SnowflakeSinkService service =
SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config)
.setRecordNumber(1)
.setErrorReporter(new InMemoryKafkaRecordErrorReporter())
.setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition)))
.addTask(testTableName, topicPartition)
.build();
SnowflakeSinkService service = setupSnowflakeService(withSchematization, 1);

// insert blank records that do not evolve schema: 0, 1
JsonConverter converter = new JsonConverter();
HashMap<String, String> converterConfig = new HashMap<>();
converterConfig.put("schemas.enable", "false");
converter.configure(converterConfig, false);
SchemaAndValue schemaInputValue = converter.toConnectData("test", null);
List<SinkRecord> blankRecords = new ArrayList<>();
for (int i = 0; i < 2; i++) {
blankRecords.add(
new SinkRecord(
topic,
PARTITION,
Schema.STRING_SCHEMA,
"test",
schemaInputValue.schema(),
schemaInputValue.value(),
i));
}

List<SinkRecord> blankRecords = TestUtils.createBlankJsonSinkRecords(0, 2, topic, PARTITION);
service.insert(blankRecords);
TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == 2, 20, 5);
Expand All @@ -777,4 +770,23 @@ private void testInsertRowsWithGaps(boolean withSchematization) throws Exception
: "expected: " + 4 + " actual: " + TestUtils.tableSize(testTableName);
service.closeAll();
}

private SnowflakeSinkService setupSnowflakeService(boolean withSchematization, int recordNumber) {
// setup
Map<String, String> config = TestUtils.getConfForStreaming();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG,
Boolean.toString(withSchematization)
);

// create tpChannel
return SnowflakeSinkServiceFactory
.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config)
.setRecordNumber(recordNumber)
.setErrorReporter(new InMemoryKafkaRecordErrorReporter())
.setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition)))
.addTask(testTableName, topicPartition)
.build();
}
}

0 comments on commit 917ef41

Please sign in to comment.