Skip to content

Commit

Permalink
[source volcano] Allow meta field to be queried in SelectQuerySpec
Browse files Browse the repository at this point in the history
  • Loading branch information
burakku committed Jan 31, 2025
1 parent 5db0f6c commit b2e44d1
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.StreamIdentifier
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.discover.FieldOrMetaField
import io.airbyte.cdk.discover.MetaFieldDecorator
import io.airbyte.cdk.output.OutputConsumer
import io.airbyte.cdk.util.Jsons
Expand Down Expand Up @@ -75,12 +75,15 @@ sealed class FeedBootstrap<T : Feed>(
private inner class EfficientStreamRecordConsumer(override val stream: Stream) :
StreamRecordConsumer {

override fun accept(recordData: ObjectNode, changes: Map<Field, FieldValueChange>?) {
override fun accept(
recordData: ObjectNode,
changes: Map<FieldOrMetaField, FieldValueChange>?
) {
if (changes.isNullOrEmpty()) {
acceptWithoutChanges(recordData)
} else {
val protocolChanges: List<AirbyteRecordMessageMetaChange> =
changes.map { (field: Field, fieldValueChange: FieldValueChange) ->
changes.map { (field: FieldOrMetaField, fieldValueChange: FieldValueChange) ->
AirbyteRecordMessageMetaChange()
.withField(field.id)
.withChange(fieldValueChange.protocolChange())
Expand Down Expand Up @@ -234,7 +237,7 @@ interface StreamRecordConsumer {

val stream: Stream

fun accept(recordData: ObjectNode, changes: Map<Field, FieldValueChange>?)
fun accept(recordData: ObjectNode, changes: Map<FieldOrMetaField, FieldValueChange>?)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package io.airbyte.cdk.read.cdc

import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.discover.FieldOrMetaField
import io.airbyte.cdk.read.FieldValueChange
import io.airbyte.cdk.read.Stream
import org.apache.kafka.connect.source.SourceRecord
Expand Down Expand Up @@ -76,5 +76,5 @@ interface CdcPartitionReaderDebeziumOperations<T : Comparable<T>> {
/** [DeserializedRecord]s are used to generate Airbyte RECORD messages. */
data class DeserializedRecord(
val data: ObjectNode,
val changes: Map<Field, FieldValueChange>,
val changes: Map<FieldOrMetaField, FieldValueChange>,
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package io.airbyte.cdk.read

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.discover.FieldOrMetaField
import io.airbyte.cdk.jdbc.JdbcConnectionFactory
import io.airbyte.cdk.jdbc.JdbcFieldType
import io.airbyte.cdk.util.Jsons
Expand Down Expand Up @@ -39,7 +39,7 @@ interface SelectQuerier {

interface ResultRow {
val data: ObjectNode
val changes: Map<Field, FieldValueChange>
val changes: Map<FieldOrMetaField, FieldValueChange>
}
}

Expand All @@ -55,7 +55,7 @@ class JdbcSelectQuerier(

data class ResultRow(
override var data: ObjectNode = Jsons.objectNode(),
override var changes: MutableMap<Field, FieldValueChange> = mutableMapOf(),
override var changes: MutableMap<FieldOrMetaField, FieldValueChange> = mutableMapOf(),
) : SelectQuerier.ResultRow

class Result(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
package io.airbyte.cdk.read

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.discover.FieldOrMetaField
import io.airbyte.cdk.jdbc.LosslessJdbcFieldType

/**
Expand All @@ -11,7 +11,7 @@ import io.airbyte.cdk.jdbc.LosslessJdbcFieldType
*/
data class SelectQuery(
val sql: String,
val columns: List<Field>,
val columns: List<FieldOrMetaField>,
val bindings: List<Binding>,
) {
data class Binding(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
package io.airbyte.cdk.read

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.discover.Field
import io.airbyte.cdk.discover.FieldOrMetaField
import java.math.BigDecimal

/**
Expand All @@ -21,19 +21,19 @@ data class SelectQuerySpec(
)

sealed interface SelectNode {
val columns: List<Field>
val columns: List<FieldOrMetaField>
}

data class SelectColumns(
override val columns: List<Field>,
override val columns: List<FieldOrMetaField>,
) : SelectNode {
constructor(vararg columns: Field) : this(columns.toList())
constructor(vararg columns: FieldOrMetaField) : this(columns.toList())
}

data class SelectColumnMaxValue(
val column: Field,
val column: FieldOrMetaField,
) : SelectNode {
override val columns: List<Field>
override val columns: List<FieldOrMetaField>
get() = listOf(column)
}

Expand Down Expand Up @@ -85,41 +85,41 @@ data class Or(
}

sealed interface WhereClauseLeafNode : WhereClauseNode {
val column: Field
val column: FieldOrMetaField
val bindingValue: JsonNode
}

data class GreaterOrEqual(
override val column: Field,
override val column: FieldOrMetaField,
override val bindingValue: JsonNode,
) : WhereClauseLeafNode

data class Greater(
override val column: Field,
override val column: FieldOrMetaField,
override val bindingValue: JsonNode,
) : WhereClauseLeafNode

data class LesserOrEqual(
override val column: Field,
override val column: FieldOrMetaField,
override val bindingValue: JsonNode,
) : WhereClauseLeafNode

data class Lesser(
override val column: Field,
override val column: FieldOrMetaField,
override val bindingValue: JsonNode,
) : WhereClauseLeafNode

data class Equal(
override val column: Field,
override val column: FieldOrMetaField,
override val bindingValue: JsonNode,
) : WhereClauseLeafNode

sealed interface OrderByNode

data class OrderBy(
val columns: List<Field>,
val columns: List<FieldOrMetaField>,
) : OrderByNode {
constructor(vararg columns: Field) : this(columns.toList())
constructor(vararg columns: FieldOrMetaField) : this(columns.toList())
}

data object NoOrderBy : OrderByNode
Expand Down

0 comments on commit b2e44d1

Please sign in to comment.