diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index 92243328d..e6e2d99fb 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -654,10 +654,10 @@ public InsertRowsResponse get() throws Throwable { "Invoking insertRows API for channel:{}, streamingBuffer:{}", this.channel.getFullyQualifiedName(), this.insertRowsStreamingBuffer); - Pair>, List> recordsAndOffsets = + Pair>, List> recordsAndOriginalSinkRecords = this.insertRowsStreamingBuffer.getData(); - List> records = recordsAndOffsets.getKey(); - List offsets = recordsAndOffsets.getValue(); + List> records = recordsAndOriginalSinkRecords.getKey(); + List originalSinkRecords = recordsAndOriginalSinkRecords.getValue(); InsertValidationResponse finalResponse = new InsertValidationResponse(); boolean needToResetOffset = false; if (!enableSchemaEvolution) { @@ -671,14 +671,18 @@ public InsertRowsResponse get() throws Throwable { // For schema evolution, we need to call the insertRows API row by row in order to // preserve the original order, for anything after the first schema mismatch error we will // retry after the evolution - InsertValidationResponse response = - this.channel.insertRow(records.get(idx), Long.toString(offsets.get(idx))); + SinkRecord originalSinkRecord = originalSinkRecords.get(idx); + InsertValidationResponse response = this.channel.insertRow( + records.get(idx), Long.toString(originalSinkRecord.kafkaOffset()) + ); if (response.hasErrors()) { InsertValidationResponse.InsertError insertError = response.getInsertErrors().get(0); List extraColNames = insertError.getExtraColNames(); List nonNullableColumns = insertError.getMissingNotNullColNames(); - long originalSinkRecordIdx = - offsets.get(idx) - this.insertRowsStreamingBuffer.getFirstOffset(); + + // TODO : originalSinkRecordIdx can be replaced by idx + long originalSinkRecordIdx = originalSinkRecord.kafkaOffset() + - this.insertRowsStreamingBuffer.getFirstOffset(); if (extraColNames == null && nonNullableColumns == null) { InsertValidationResponse.InsertError newInsertError = new InsertValidationResponse.InsertError( @@ -694,7 +698,8 @@ public InsertRowsResponse get() throws Throwable { this.channel.getTableName(), nonNullableColumns, extraColNames, - this.insertRowsStreamingBuffer.getSinkRecord(originalSinkRecordIdx)); + originalSinkRecord + ); // Offset reset needed since it's possible that we successfully ingested partial batch needToResetOffset = true; break; @@ -1251,7 +1256,7 @@ protected long getApproxSizeOfRecordInBytes(SinkRecord kafkaSinkRecord) { */ @VisibleForTesting protected class StreamingBuffer - extends PartitionBuffer>, List>> { + extends PartitionBuffer>, List>> { // Records coming from Kafka private final List sinkRecords; @@ -1285,9 +1290,9 @@ public void insert(SinkRecord kafkaSinkRecord) { * @return A pair that contains the records and their corresponding offsets */ @Override - public Pair>, List> getData() { + public Pair>, List> getData() { final List> records = new ArrayList<>(); - final List offsets = new ArrayList<>(); + final List filteredOriginalSinkRecords = new ArrayList<>(); for (SinkRecord kafkaSinkRecord : sinkRecords) { SinkRecord snowflakeRecord = getSnowflakeSinkRecordFromKafkaRecord(kafkaSinkRecord); @@ -1313,7 +1318,7 @@ public Pair>, List> getData() { Map tableRow = recordService.getProcessedRecordForStreamingIngest(snowflakeRecord); records.add(tableRow); - offsets.add(snowflakeRecord.kafkaOffset()); + filteredOriginalSinkRecords.add(kafkaSinkRecord); } catch (JsonProcessingException e) { LOGGER.warn( "Record has JsonProcessingException offset:{}, topic:{}", @@ -1329,7 +1334,7 @@ public Pair>, List> getData() { getBufferSizeBytes(), getFirstOffset(), getLastOffset()); - return new Pair<>(records, offsets); + return new Pair<>(records, filteredOriginalSinkRecords); } @Override diff --git a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java index 82ff5e70b..6801f8713 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java @@ -649,17 +649,41 @@ public static List createNativeJsonSinkRecords( final long startOffset, final long noOfRecords, final String topicName, - final int partitionNo) { - ArrayList records = new ArrayList<>(); + final int partitionNo + ) { + return createJsonRecords( + startOffset, noOfRecords, topicName, partitionNo, + TestUtils.JSON_WITH_SCHEMA.getBytes(StandardCharsets.UTF_8), + Collections.singletonMap("schemas.enable", Boolean.toString(true)) + ); + } + + /* Generate (noOfRecords - startOffset) blank records for a given topic and partition. */ + public static List createBlankJsonSinkRecords( + final long startOffset, + final long noOfRecords, + final String topicName, + final int partitionNo + ) { + return createJsonRecords( + startOffset, noOfRecords, topicName, partitionNo, null, + Collections.singletonMap("schemas.enable", Boolean.toString(false)) + ); + } + private static List createJsonRecords( + final long startOffset, + final long noOfRecords, + final String topicName, + final int partitionNo, + byte[] value, + Map converterConfig + ) { JsonConverter converter = new JsonConverter(); - HashMap converterConfig = new HashMap<>(); - converterConfig.put("schemas.enable", "true"); converter.configure(converterConfig, false); - SchemaAndValue schemaInputValue = - converter.toConnectData( - "test", TestUtils.JSON_WITH_SCHEMA.getBytes(StandardCharsets.UTF_8)); + SchemaAndValue schemaInputValue = converter.toConnectData("test", value); + ArrayList records = new ArrayList<>(); for (long i = startOffset; i < startOffset + noOfRecords; ++i) { records.add( new SinkRecord( diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java index 6dab5e6f3..317651daf 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java @@ -709,6 +709,30 @@ public void testChannelMigrateOffsetTokenSystemFunction_NullOffsetTokenInFormatV service.closeAll(); } + @Test + public void testInsertRowsWithGaps_schematization_largeBufferSize_largeGap() throws Exception { + SnowflakeSinkService service = setupSnowflakeService(true, 4); + + // insert blank records that do not evolve schema: 0, 1 + List records = TestUtils.createBlankJsonSinkRecords(0, 2, topic, PARTITION); + + // add records with change in schema with extreme gap in offsets. + records.addAll(TestUtils.createNativeJsonSinkRecords(300, 2, topic, PARTITION)); + + // records' offsets -> [0, 1, 300, 301] + service.insert(records); + // With schematization, we need to resend a new batch should succeed even if there is an offset + // gap from the previous committed offset + service.insert(records); + + TestUtils.assertWithRetry( + () -> service.getOffset(new TopicPartition(topic, PARTITION)) == 302, 10, 10); + + assert TestUtils.tableSize(testTableName) == 4 + : "expected: " + 4 + " actual: " + TestUtils.tableSize(testTableName); + service.closeAll(); + } + @Test public void testInsertRowsWithGaps_schematization() throws Exception { testInsertRowsWithGaps(true); @@ -720,41 +744,10 @@ public void testInsertRowsWithGaps_nonSchematization() throws Exception { } private void testInsertRowsWithGaps(boolean withSchematization) throws Exception { - // setup - Map config = TestUtils.getConfForStreaming(); - SnowflakeSinkConnectorConfig.setDefaultValues(config); - config.put( - SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, - Boolean.toString(withSchematization)); - - // create tpChannel - SnowflakeSinkService service = - SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) - .setRecordNumber(1) - .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) - .setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition))) - .addTask(testTableName, topicPartition) - .build(); + SnowflakeSinkService service = setupSnowflakeService(withSchematization, 1); // insert blank records that do not evolve schema: 0, 1 - JsonConverter converter = new JsonConverter(); - HashMap converterConfig = new HashMap<>(); - converterConfig.put("schemas.enable", "false"); - converter.configure(converterConfig, false); - SchemaAndValue schemaInputValue = converter.toConnectData("test", null); - List blankRecords = new ArrayList<>(); - for (int i = 0; i < 2; i++) { - blankRecords.add( - new SinkRecord( - topic, - PARTITION, - Schema.STRING_SCHEMA, - "test", - schemaInputValue.schema(), - schemaInputValue.value(), - i)); - } - + List blankRecords = TestUtils.createBlankJsonSinkRecords(0, 2, topic, PARTITION); service.insert(blankRecords); TestUtils.assertWithRetry( () -> service.getOffset(new TopicPartition(topic, PARTITION)) == 2, 20, 5); @@ -777,4 +770,23 @@ private void testInsertRowsWithGaps(boolean withSchematization) throws Exception : "expected: " + 4 + " actual: " + TestUtils.tableSize(testTableName); service.closeAll(); } + + private SnowflakeSinkService setupSnowflakeService(boolean withSchematization, int recordNumber) { + // setup + Map config = TestUtils.getConfForStreaming(); + SnowflakeSinkConnectorConfig.setDefaultValues(config); + config.put( + SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, + Boolean.toString(withSchematization) + ); + + // create tpChannel + return SnowflakeSinkServiceFactory + .builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) + .setRecordNumber(recordNumber) + .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) + .setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition))) + .addTask(testTableName, topicPartition) + .build(); + } }