Skip to content

Commit

Permalink
Add test
Browse files Browse the repository at this point in the history
  • Loading branch information
sudeshwasnik committed Dec 21, 2024
1 parent 85be666 commit c348699
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1318,7 +1318,7 @@ public Pair<List<Map<String, Object>>, List<SinkRecord>> getData() {
Map<String, Object> tableRow =
recordService.getProcessedRecordForStreamingIngest(snowflakeRecord);
records.add(tableRow);
filteredOriginalSinkRecords.add(snowflakeRecord);
filteredOriginalSinkRecords.add(kafkaSinkRecord);
} catch (JsonProcessingException e) {
LOGGER.warn(
"Record has JsonProcessingException offset:{}, topic:{}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,17 +649,41 @@ public static List<SinkRecord> createNativeJsonSinkRecords(
final long startOffset,
final long noOfRecords,
final String topicName,
final int partitionNo) {
ArrayList<SinkRecord> records = new ArrayList<>();
final int partitionNo
) {
return createJsonRecords(
startOffset, noOfRecords, topicName, partitionNo,
TestUtils.JSON_WITH_SCHEMA.getBytes(StandardCharsets.UTF_8),
Collections.singletonMap("schemas.enable", Boolean.toString(true))
);
}

/* Generate (noOfRecords - startOffset) blank records for a given topic and partition. */
public static List<SinkRecord> createBlankJsonSinkRecords(
final long startOffset,
final long noOfRecords,
final String topicName,
final int partitionNo
) {
return createJsonRecords(
startOffset, noOfRecords, topicName, partitionNo, null,
Collections.singletonMap("schemas.enable", Boolean.toString(false))
);
}

private static List<SinkRecord> createJsonRecords(
final long startOffset,
final long noOfRecords,
final String topicName,
final int partitionNo,
byte[] value,
Map<String, String> converterConfig
) {
JsonConverter converter = new JsonConverter();
HashMap<String, String> converterConfig = new HashMap<>();
converterConfig.put("schemas.enable", "true");
converter.configure(converterConfig, false);
SchemaAndValue schemaInputValue =
converter.toConnectData(
"test", TestUtils.JSON_WITH_SCHEMA.getBytes(StandardCharsets.UTF_8));
SchemaAndValue schemaInputValue = converter.toConnectData("test", value);

ArrayList<SinkRecord> records = new ArrayList<>();
for (long i = startOffset; i < startOffset + noOfRecords; ++i) {
records.add(
new SinkRecord(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,30 @@ public void testChannelMigrateOffsetTokenSystemFunction_NullOffsetTokenInFormatV
service.closeAll();
}

@Test
public void testInsertRowsWithGaps_schematization_largeBufferSize_largeGap() throws Exception {
SnowflakeSinkService service = setupSnowflakeService(true, 4);

// insert blank records that do not evolve schema: 0, 1
List<SinkRecord> records = TestUtils.createBlankJsonSinkRecords(0, 2, topic, PARTITION);

// add records with change in schema with extreme gap in offsets.
records.addAll(TestUtils.createNativeJsonSinkRecords(300, 2, topic, PARTITION));

// records' offsets -> [0, 1, 300, 301]
service.insert(records);
// With schematization, we need to resend a new batch should succeed even if there is an offset
// gap from the previous committed offset
service.insert(records);

TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == 302, 10, 10);

assert TestUtils.tableSize(testTableName) == 4
: "expected: " + 4 + " actual: " + TestUtils.tableSize(testTableName);
service.closeAll();
}

@Test
public void testInsertRowsWithGaps_schematization() throws Exception {
testInsertRowsWithGaps(true);
Expand All @@ -720,41 +744,10 @@ public void testInsertRowsWithGaps_nonSchematization() throws Exception {
}

private void testInsertRowsWithGaps(boolean withSchematization) throws Exception {
// setup
Map<String, String> config = TestUtils.getConfForStreaming();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG,
Boolean.toString(withSchematization));

// create tpChannel
SnowflakeSinkService service =
SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config)
.setRecordNumber(1)
.setErrorReporter(new InMemoryKafkaRecordErrorReporter())
.setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition)))
.addTask(testTableName, topicPartition)
.build();
SnowflakeSinkService service = setupSnowflakeService(withSchematization, 1);

// insert blank records that do not evolve schema: 0, 1
JsonConverter converter = new JsonConverter();
HashMap<String, String> converterConfig = new HashMap<>();
converterConfig.put("schemas.enable", "false");
converter.configure(converterConfig, false);
SchemaAndValue schemaInputValue = converter.toConnectData("test", null);
List<SinkRecord> blankRecords = new ArrayList<>();
for (int i = 0; i < 2; i++) {
blankRecords.add(
new SinkRecord(
topic,
PARTITION,
Schema.STRING_SCHEMA,
"test",
schemaInputValue.schema(),
schemaInputValue.value(),
i));
}

List<SinkRecord> blankRecords = TestUtils.createBlankJsonSinkRecords(0, 2, topic, PARTITION);
service.insert(blankRecords);
TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == 2, 20, 5);
Expand All @@ -777,4 +770,23 @@ private void testInsertRowsWithGaps(boolean withSchematization) throws Exception
: "expected: " + 4 + " actual: " + TestUtils.tableSize(testTableName);
service.closeAll();
}

private SnowflakeSinkService setupSnowflakeService(boolean withSchematization, int recordNumber) {
// setup
Map<String, String> config = TestUtils.getConfForStreaming();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG,
Boolean.toString(withSchematization)
);

// create tpChannel
return SnowflakeSinkServiceFactory
.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config)
.setRecordNumber(recordNumber)
.setErrorReporter(new InMemoryKafkaRecordErrorReporter())
.setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition)))
.addTask(testTableName, topicPartition)
.build();
}
}

0 comments on commit c348699

Please sign in to comment.