Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix indexOutOfBoundException #74

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -654,10 +654,10 @@ public InsertRowsResponse get() throws Throwable {
"Invoking insertRows API for channel:{}, streamingBuffer:{}",
this.channel.getFullyQualifiedName(),
this.insertRowsStreamingBuffer);
Pair<List<Map<String, Object>>, List<Long>> recordsAndOffsets =
Pair<List<Map<String, Object>>, List<SinkRecord>> recordsAndOriginalSinkRecords =
this.insertRowsStreamingBuffer.getData();
List<Map<String, Object>> records = recordsAndOffsets.getKey();
List<Long> offsets = recordsAndOffsets.getValue();
List<Map<String, Object>> records = recordsAndOriginalSinkRecords.getKey();
List<SinkRecord> originalSinkRecords = recordsAndOriginalSinkRecords.getValue();
InsertValidationResponse finalResponse = new InsertValidationResponse();
boolean needToResetOffset = false;
if (!enableSchemaEvolution) {
Expand All @@ -671,14 +671,18 @@ public InsertRowsResponse get() throws Throwable {
// For schema evolution, we need to call the insertRows API row by row in order to
// preserve the original order, for anything after the first schema mismatch error we will
// retry after the evolution
InsertValidationResponse response =
this.channel.insertRow(records.get(idx), Long.toString(offsets.get(idx)));
SinkRecord originalSinkRecord = originalSinkRecords.get(idx);
InsertValidationResponse response = this.channel.insertRow(
records.get(idx), Long.toString(originalSinkRecord.kafkaOffset())
);
if (response.hasErrors()) {
InsertValidationResponse.InsertError insertError = response.getInsertErrors().get(0);
List<String> extraColNames = insertError.getExtraColNames();
List<String> nonNullableColumns = insertError.getMissingNotNullColNames();
long originalSinkRecordIdx =
offsets.get(idx) - this.insertRowsStreamingBuffer.getFirstOffset();

// TODO : originalSinkRecordIdx can be replaced by idx
long originalSinkRecordIdx = originalSinkRecord.kafkaOffset()
- this.insertRowsStreamingBuffer.getFirstOffset();
if (extraColNames == null && nonNullableColumns == null) {
InsertValidationResponse.InsertError newInsertError =
new InsertValidationResponse.InsertError(
Expand All @@ -694,7 +698,8 @@ public InsertRowsResponse get() throws Throwable {
this.channel.getTableName(),
nonNullableColumns,
extraColNames,
this.insertRowsStreamingBuffer.getSinkRecord(originalSinkRecordIdx));
originalSinkRecord
);
// Offset reset needed since it's possible that we successfully ingested partial batch
needToResetOffset = true;
break;
Expand Down Expand Up @@ -1251,7 +1256,7 @@ protected long getApproxSizeOfRecordInBytes(SinkRecord kafkaSinkRecord) {
*/
@VisibleForTesting
protected class StreamingBuffer
extends PartitionBuffer<Pair<List<Map<String, Object>>, List<Long>>> {
extends PartitionBuffer<Pair<List<Map<String, Object>>, List<SinkRecord>>> {
// Records coming from Kafka
private final List<SinkRecord> sinkRecords;

Expand Down Expand Up @@ -1285,9 +1290,9 @@ public void insert(SinkRecord kafkaSinkRecord) {
* @return A pair that contains the records and their corresponding offsets
*/
@Override
public Pair<List<Map<String, Object>>, List<Long>> getData() {
public Pair<List<Map<String, Object>>, List<SinkRecord>> getData() {
final List<Map<String, Object>> records = new ArrayList<>();
final List<Long> offsets = new ArrayList<>();
final List<SinkRecord> filteredOriginalSinkRecords = new ArrayList<>();
for (SinkRecord kafkaSinkRecord : sinkRecords) {
SinkRecord snowflakeRecord = getSnowflakeSinkRecordFromKafkaRecord(kafkaSinkRecord);

Expand All @@ -1313,7 +1318,7 @@ public Pair<List<Map<String, Object>>, List<Long>> getData() {
Map<String, Object> tableRow =
recordService.getProcessedRecordForStreamingIngest(snowflakeRecord);
records.add(tableRow);
offsets.add(snowflakeRecord.kafkaOffset());
filteredOriginalSinkRecords.add(kafkaSinkRecord);
} catch (JsonProcessingException e) {
LOGGER.warn(
"Record has JsonProcessingException offset:{}, topic:{}",
Expand All @@ -1329,7 +1334,7 @@ public Pair<List<Map<String, Object>>, List<Long>> getData() {
getBufferSizeBytes(),
getFirstOffset(),
getLastOffset());
return new Pair<>(records, offsets);
return new Pair<>(records, filteredOriginalSinkRecords);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,17 +649,41 @@ public static List<SinkRecord> createNativeJsonSinkRecords(
final long startOffset,
final long noOfRecords,
final String topicName,
final int partitionNo) {
ArrayList<SinkRecord> records = new ArrayList<>();
final int partitionNo
) {
return createJsonRecords(
startOffset, noOfRecords, topicName, partitionNo,
TestUtils.JSON_WITH_SCHEMA.getBytes(StandardCharsets.UTF_8),
Collections.singletonMap("schemas.enable", Boolean.toString(true))
);
}

/* Generate (noOfRecords - startOffset) blank records for a given topic and partition. */
public static List<SinkRecord> createBlankJsonSinkRecords(
final long startOffset,
final long noOfRecords,
final String topicName,
final int partitionNo
) {
return createJsonRecords(
startOffset, noOfRecords, topicName, partitionNo, null,
Collections.singletonMap("schemas.enable", Boolean.toString(false))
);
}

private static List<SinkRecord> createJsonRecords(
final long startOffset,
final long noOfRecords,
final String topicName,
final int partitionNo,
byte[] value,
Map<String, String> converterConfig
) {
JsonConverter converter = new JsonConverter();
HashMap<String, String> converterConfig = new HashMap<>();
converterConfig.put("schemas.enable", "true");
converter.configure(converterConfig, false);
SchemaAndValue schemaInputValue =
converter.toConnectData(
"test", TestUtils.JSON_WITH_SCHEMA.getBytes(StandardCharsets.UTF_8));
SchemaAndValue schemaInputValue = converter.toConnectData("test", value);

ArrayList<SinkRecord> records = new ArrayList<>();
for (long i = startOffset; i < startOffset + noOfRecords; ++i) {
records.add(
new SinkRecord(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,30 @@ public void testChannelMigrateOffsetTokenSystemFunction_NullOffsetTokenInFormatV
service.closeAll();
}

@Test
public void testInsertRowsWithGaps_schematization_largeBufferSize_largeGap() throws Exception {
SnowflakeSinkService service = setupSnowflakeService(true, 4);

// insert blank records that do not evolve schema: 0, 1
List<SinkRecord> records = TestUtils.createBlankJsonSinkRecords(0, 2, topic, PARTITION);

// add records with change in schema with extreme gap in offsets.
records.addAll(TestUtils.createNativeJsonSinkRecords(300, 2, topic, PARTITION));

// records' offsets -> [0, 1, 300, 301]
service.insert(records);
// With schematization, we need to resend a new batch should succeed even if there is an offset
// gap from the previous committed offset
service.insert(records);

TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == 302, 10, 10);

assert TestUtils.tableSize(testTableName) == 4
: "expected: " + 4 + " actual: " + TestUtils.tableSize(testTableName);
service.closeAll();
}

@Test
public void testInsertRowsWithGaps_schematization() throws Exception {
testInsertRowsWithGaps(true);
Expand All @@ -720,41 +744,10 @@ public void testInsertRowsWithGaps_nonSchematization() throws Exception {
}

private void testInsertRowsWithGaps(boolean withSchematization) throws Exception {
// setup
Map<String, String> config = TestUtils.getConfForStreaming();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG,
Boolean.toString(withSchematization));

// create tpChannel
SnowflakeSinkService service =
SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config)
.setRecordNumber(1)
.setErrorReporter(new InMemoryKafkaRecordErrorReporter())
.setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition)))
.addTask(testTableName, topicPartition)
.build();
SnowflakeSinkService service = setupSnowflakeService(withSchematization, 1);

// insert blank records that do not evolve schema: 0, 1
JsonConverter converter = new JsonConverter();
HashMap<String, String> converterConfig = new HashMap<>();
converterConfig.put("schemas.enable", "false");
converter.configure(converterConfig, false);
SchemaAndValue schemaInputValue = converter.toConnectData("test", null);
List<SinkRecord> blankRecords = new ArrayList<>();
for (int i = 0; i < 2; i++) {
blankRecords.add(
new SinkRecord(
topic,
PARTITION,
Schema.STRING_SCHEMA,
"test",
schemaInputValue.schema(),
schemaInputValue.value(),
i));
}

List<SinkRecord> blankRecords = TestUtils.createBlankJsonSinkRecords(0, 2, topic, PARTITION);
service.insert(blankRecords);
TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == 2, 20, 5);
Expand All @@ -777,4 +770,23 @@ private void testInsertRowsWithGaps(boolean withSchematization) throws Exception
: "expected: " + 4 + " actual: " + TestUtils.tableSize(testTableName);
service.closeAll();
}

private SnowflakeSinkService setupSnowflakeService(boolean withSchematization, int recordNumber) {
// setup
Map<String, String> config = TestUtils.getConfForStreaming();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG,
Boolean.toString(withSchematization)
);

// create tpChannel
return SnowflakeSinkServiceFactory
.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config)
.setRecordNumber(recordNumber)
.setErrorReporter(new InMemoryKafkaRecordErrorReporter())
.setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition)))
.addTask(testTableName, topicPartition)
.build();
}
}