Skip to content

Commit

Permalink
feat: Add Destination MSSQL v2 rc. (#52096)
Browse files Browse the repository at this point in the history
Co-authored-by: Jonathan Pearlin <[email protected]>
Co-authored-by: Jonathan Pearlin <[email protected]>
Co-authored-by: Octavia Squidington III <[email protected]>
Co-authored-by: Edward Gao <[email protected]>
  • Loading branch information
5 people authored Jan 31, 2025
1 parent c8022ba commit cbb1a36
Show file tree
Hide file tree
Showing 35 changed files with 2,733 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.cdk.load.command

import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.ConnectorErrorException
import io.airbyte.cdk.command.ConfigurationSpecification

interface DestinationConfigurationFactory<
Expand All @@ -15,6 +16,8 @@ interface DestinationConfigurationFactory<
fun make(spec: I): O =
try {
makeWithoutExceptionHandling(spec)
} catch (e: ConnectorErrorException) {
throw e
} catch (e: Exception) {
// Wrap NPEs (mostly) in ConfigErrorException.
throw ConfigErrorException("Failed to build ConnectorConfiguration.", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ enum class UnionBehavior {

abstract class BasicFunctionalityIntegrationTest(
/** The config to pass into the connector, as a serialized JSON blob */
val configContents: String,
configContents: String,
val configSpecClass: Class<out ConfigurationSpecification>,
dataDumper: DestinationDataDumper,
destinationCleaner: DestinationCleaner,
Expand Down Expand Up @@ -197,8 +197,10 @@ abstract class BasicFunctionalityIntegrationTest(
configUpdater = configUpdater,
micronautProperties = micronautProperties,
) {
val parsedConfig =
ValidatedJsonUtils.parseOne(configSpecClass, configUpdater.update(configContents))

// Update config with any replacements. This may be necessary when using testcontainers.
val updatedConfig = configUpdater.update(configContents)
val parsedConfig = ValidatedJsonUtils.parseOne(configSpecClass, updatedConfig)

@Test
open fun testBasicWrite() {
Expand All @@ -213,7 +215,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
val messages =
runSync(
configContents,
updatedConfig,
stream,
listOf(
InputRecord(
Expand Down Expand Up @@ -264,7 +266,7 @@ abstract class BasicFunctionalityIntegrationTest(
{
if (verifyDataWriting) {
dumpAndDiffRecords(
ValidatedJsonUtils.parseOne(configSpecClass, configContents),
ValidatedJsonUtils.parseOne(configSpecClass, updatedConfig),
listOf(
OutputRecord(
extractedAt = 1234,
Expand Down Expand Up @@ -325,7 +327,7 @@ abstract class BasicFunctionalityIntegrationTest(

val messages =
runSync(
configContents,
updatedConfig,
stream,
listOf(
InputFile(
Expand Down Expand Up @@ -363,7 +365,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
})

val config = ValidatedJsonUtils.parseOne(configSpecClass, configContents)
val config = ValidatedJsonUtils.parseOne(configSpecClass, updatedConfig)
val fileContent = dataDumper.dumpFile(config, stream)

assertEquals(listOf("123"), fileContent)
Expand All @@ -384,7 +386,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
val stateMessage =
runSyncUntilStateAck(
configContents,
this@BasicFunctionalityIntegrationTest.updatedConfig,
stream,
listOf(
InputRecord(
Expand All @@ -402,7 +404,7 @@ abstract class BasicFunctionalityIntegrationTest(
),
allowGracefulShutdown = false,
)
runSync(configContents, stream, emptyList())
runSync(this@BasicFunctionalityIntegrationTest.updatedConfig, stream, emptyList())

val streamName = stateMessage.stream.streamDescriptor.name
val streamNamespace = stateMessage.stream.streamDescriptor.namespace
Expand Down Expand Up @@ -465,7 +467,7 @@ abstract class BasicFunctionalityIntegrationTest(
val stream1 = makeStream(randomizedNamespace + "_1")
val stream2 = makeStream(randomizedNamespace + "_2")
runSync(
configContents,
updatedConfig,
DestinationCatalog(
listOf(
stream1,
Expand Down Expand Up @@ -594,7 +596,7 @@ abstract class BasicFunctionalityIntegrationTest(
serialized = "",
)
}
runSync(configContents, catalog, messages)
runSync(updatedConfig, catalog, messages)
assertAll(
catalog.streams.map { stream ->
{
Expand Down Expand Up @@ -634,7 +636,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId,
)
runSync(
configContents,
updatedConfig,
makeStream(generationId = 12, minimumGenerationId = 0, syncId = 42),
listOf(
InputRecord(
Expand All @@ -647,7 +649,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
val finalStream = makeStream(generationId = 13, minimumGenerationId = 13, syncId = 43)
runSync(
configContents,
updatedConfig,
finalStream,
listOf(
InputRecord(
Expand Down Expand Up @@ -729,7 +731,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 41,
)
runSync(
configContents,
updatedConfig,
stream1,
listOf(
makeInputRecord(1, "2024-01-23T01:00:00Z", 100),
Expand Down Expand Up @@ -768,7 +770,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
// Run a sync, but emit a status incomplete. This should not delete any existing data.
runSyncUntilStateAck(
configContents,
updatedConfig,
stream2,
listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)),
StreamCheckpoint(
Expand Down Expand Up @@ -817,7 +819,7 @@ abstract class BasicFunctionalityIntegrationTest(
// Run a third sync, this time with a successful status.
// This should delete the first sync's data, and retain the second+third syncs' data.
runSync(
configContents,
updatedConfig,
stream2,
listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)),
)
Expand Down Expand Up @@ -898,7 +900,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
// Run a sync, but emit a stream status incomplete.
runSyncUntilStateAck(
configContents,
updatedConfig,
stream,
listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)),
StreamCheckpoint(
Expand Down Expand Up @@ -933,7 +935,7 @@ abstract class BasicFunctionalityIntegrationTest(
// Run a second sync, this time with a successful status.
// This should retain the first syncs' data.
runSync(
configContents,
updatedConfig,
stream,
listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)),
)
Expand Down Expand Up @@ -1019,7 +1021,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 41,
)
runSync(
configContents,
updatedConfig,
stream1,
listOf(
makeInputRecord(1, "2024-01-23T01:00:00Z", 100),
Expand Down Expand Up @@ -1059,7 +1061,7 @@ abstract class BasicFunctionalityIntegrationTest(
// Run a sync, but emit a stream status incomplete. This should not delete any existing
// data.
runSyncUntilStateAck(
configContents,
updatedConfig,
stream2,
listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)),
StreamCheckpoint(
Expand Down Expand Up @@ -1114,7 +1116,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 43,
)
runSync(
configContents,
updatedConfig,
stream3,
listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)),
)
Expand Down Expand Up @@ -1174,7 +1176,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId,
)
runSync(
configContents,
updatedConfig,
makeStream(syncId = 42),
listOf(
InputRecord(
Expand All @@ -1187,7 +1189,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
val finalStream = makeStream(syncId = 43)
runSync(
configContents,
updatedConfig,
finalStream,
listOf(
InputRecord(
Expand Down Expand Up @@ -1240,7 +1242,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId,
)
runSync(
configContents,
updatedConfig,
makeStream(
syncId = 42,
linkedMapOf("id" to intType, "to_drop" to stringType, "to_change" to intType)
Expand All @@ -1260,7 +1262,7 @@ abstract class BasicFunctionalityIntegrationTest(
linkedMapOf("id" to intType, "to_change" to stringType, "to_add" to stringType)
)
runSync(
configContents,
updatedConfig,
finalStream,
listOf(
InputRecord(
Expand Down Expand Up @@ -1334,7 +1336,7 @@ abstract class BasicFunctionalityIntegrationTest(

val sync1Stream = makeStream(syncId = 42)
runSync(
configContents,
updatedConfig,
sync1Stream,
listOf(
// emitted_at:1000 is equal to 1970-01-01 00:00:01Z.
Expand Down Expand Up @@ -1395,7 +1397,7 @@ abstract class BasicFunctionalityIntegrationTest(

val sync2Stream = makeStream(syncId = 43)
runSync(
configContents,
updatedConfig,
sync2Stream,
listOf(
// Update both Alice and Bob
Expand Down Expand Up @@ -1479,9 +1481,9 @@ abstract class BasicFunctionalityIntegrationTest(
// instead of being able to fallback onto extractedAt.
emittedAtMs = 100,
)
runSync(configContents, makeStream("cursor1"), listOf(makeRecord("cursor1")))
runSync(updatedConfig, makeStream("cursor1"), listOf(makeRecord("cursor1")))
val stream2 = makeStream("cursor2")
runSync(configContents, stream2, listOf(makeRecord("cursor2")))
runSync(updatedConfig, stream2, listOf(makeRecord("cursor2")))
dumpAndDiffRecords(
parsedConfig,
listOf(
Expand Down Expand Up @@ -1538,7 +1540,7 @@ abstract class BasicFunctionalityIntegrationTest(
)
}
// Just verify that we don't crash.
assertDoesNotThrow { runSync(configContents, DestinationCatalog(streams), messages) }
assertDoesNotThrow { runSync(updatedConfig, DestinationCatalog(streams), messages) }
}

/**
Expand Down Expand Up @@ -1591,7 +1593,7 @@ abstract class BasicFunctionalityIntegrationTest(
emittedAtMs = 100,
)
runSync(
configContents,
updatedConfig,
stream,
listOf(
// A record with valid values for all fields
Expand Down Expand Up @@ -1875,7 +1877,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 42,
)
runSync(
configContents,
updatedConfig,
stream,
listOf(
InputRecord(
Expand Down Expand Up @@ -2054,7 +2056,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 42,
)
runSync(
configContents,
updatedConfig,
stream,
listOf(
InputRecord(
Expand Down Expand Up @@ -2225,7 +2227,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 42,
)
runSync(
configContents,
updatedConfig,
stream,
listOf(
InputRecord(
Expand Down Expand Up @@ -2418,7 +2420,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 42,
)
runSync(
configContents,
updatedConfig,
stream,
listOf(
InputRecord(
Expand Down Expand Up @@ -2462,7 +2464,7 @@ abstract class BasicFunctionalityIntegrationTest(
minimumGenerationId = 0,
syncId = 42,
)
assertDoesNotThrow { runSync(configContents, stream, messages = emptyList()) }
assertDoesNotThrow { runSync(updatedConfig, stream, messages = emptyList()) }
dumpAndDiffRecords(
parsedConfig,
canonicalExpectedRecords = emptyList(),
Expand All @@ -2484,7 +2486,7 @@ abstract class BasicFunctionalityIntegrationTest(
syncId = 42,
)
assertDoesNotThrow {
runSync(configContents, stream, messages = listOf(InputGlobalCheckpoint(null)))
runSync(updatedConfig, stream, messages = listOf(InputGlobalCheckpoint(null)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ data class PerformanceTestSummary(
val namespace: String?,
val streamName: String,
val recordCount: Long?,
val expectedRecordCount: Long,
val emittedRecordCount: Long,
val recordPerSeconds: Double,
val megabytePerSeconds: Double,
Expand Down Expand Up @@ -345,6 +346,7 @@ abstract class BasicPerformanceTest(
namespace = testScenario.catalog.streams[0].descriptor.namespace,
streamName = testScenario.catalog.streams[0].descriptor.name,
recordCount = recordCount,
expectedRecordCount = summary.expectedRecordsCount,
emittedRecordCount = summary.records,
recordPerSeconds = recordPerSeconds,
megabytePerSeconds = megabytePerSeconds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ application {
//applicationDefaultJvmArgs = listOf("-XX:+ExitOnOutOfMemoryError", "-XX:MaxRAMPercentage=75.0", "--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED", "--add-opens", "java.base/sun.security.action=ALL-UNNAMED", "--add-opens", "java.base/java.lang=ALL-UNNAMED")
}

val junitVersion = "5.11.3"
val junitVersion = "5.11.4"

configurations.configureEach {
// Exclude additional SLF4J providers from all classpaths
Expand All @@ -38,13 +38,16 @@ dependencies {
implementation("com.microsoft.sqlserver:mssql-jdbc:12.8.1.jre11")
implementation("io.github.oshai:kotlin-logging-jvm:7.0.0")
implementation("jakarta.inject:jakarta.inject-api:2.0.1")
implementation("com.github.spotbugs:spotbugs-annotations:4.8.6")
implementation("io.micronaut:micronaut-inject:4.6.1")
implementation("com.github.spotbugs:spotbugs-annotations:4.9.0")
implementation("io.micronaut:micronaut-inject:4.7.12")
implementation("com.zaxxer:HikariCP:6.2.1")

testImplementation("io.mockk:mockk:1.13.13")
testImplementation("io.mockk:mockk:1.13.16")
testImplementation("org.junit.jupiter:junit-jupiter-api:$junitVersion")
testImplementation("org.junit.jupiter:junit-jupiter-params:$junitVersion")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitVersion")

integrationTestImplementation("org.testcontainers:mssqlserver:1.20.4")
}

tasks.named<Test>("test") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
services:
sql-server:
image: mcr.microsoft.com/mssql/server:2022-latest
ports:
- "1433:1433"
environment:
- ACCEPT_EULA=Y
- MSSQL_SA_PASSWORD=A_Str0ng_Required_Password
Loading

0 comments on commit cbb1a36

Please sign in to comment.