Skip to content

Commit

Permalink
[Source-Postgres] : Add config to throw an error on invalid CDC posit…
Browse files Browse the repository at this point in the history
…ion (#35304)
  • Loading branch information
akashkulk authored and xiaohansong committed Feb 27, 2024
1 parent 47d0182 commit b7ad7fd
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.3.10
dockerImageTag: 3.3.11
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.postgres;

// Constants defined in
// airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json.
public class PostgresSpecConstants {

public static final String INVALID_CDC_CURSOR_POSITION_PROPERTY = "invalid_cdc_cursor_position_behavior";
public static final String FAIL_SYNC_OPTION = "Fail sync";
public static final String RESYNC_DATA_OPTION = "Re-sync data";

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage;
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.streamsUnderVacuum;
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.FAIL_SYNC_OPTION;
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY;
import static io.airbyte.integrations.source.postgres.PostgresUtils.isDebugMode;
import static io.airbyte.integrations.source.postgres.PostgresUtils.prettyPrintConfiguredAirbyteStreamList;

Expand Down Expand Up @@ -112,6 +114,11 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin

if (!savedOffsetAfterReplicationSlotLSN) {
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage());
if (!sourceConfig.get("replication_method").has(INVALID_CDC_CURSOR_POSITION_PROPERTY) || sourceConfig.get("replication_method").get(
INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(FAIL_SYNC_OPTION)) {
throw new ConfigErrorException(
"Saved offset is before replication slot's confirmed lsn. Please reset the connection, and then increase WAL retention or reduce sync frequency to prevent this from happening in the future. See https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details.");
}
LOGGER.warn("Saved offset is before Replication slot's confirmed_flush_lsn, Airbyte will trigger sync from scratch");
} else if (!isDebugMode(sourceConfig) && PostgresUtils.shouldFlushAfterSync(sourceConfig)) {
// We do not want to acknowledge the WAL logs in debug mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,14 @@
"description": "Specifies a query that the connector executes on the source database when the connector sends a heartbeat message. Please see the <a href=\"https://docs.airbyte.com/integrations/sources/postgres/postgres-wal-disk-consumption-and-heartbeat-action-query\">setup guide</a> for how and when to configure this setting.",
"default": "",
"order": 8
},
"invalid_cdc_cursor_position_behavior": {
"type": "string",
"title": "Invalid CDC position behavior (Advanced)",
"description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
"enum": ["Fail sync", "Re-sync data"],
"default": "Fail sync",
"order": 9
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,14 @@
"description": "Specifies a query that the connector executes on the source database when the connector sends a heartbeat message. Please see the <a href=\"https://docs.airbyte.com/integrations/sources/postgres/postgres-wal-disk-consumption-and-heartbeat-action-query\">setup guide</a> for how and when to configure this setting.",
"default": "",
"order": 8
},
"invalid_cdc_cursor_position_behavior": {
"type": "string",
"title": "Invalid CDC position behavior (Advanced)",
"description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
"enum": ["Fail sync", "Re-sync data"],
"default": "Fail sync",
"order": 9
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,14 @@
"description": "Specifies a query that the connector executes on the source database when the connector sends a heartbeat message. Please see the <a href=\"https://docs.airbyte.com/integrations/sources/postgres/postgres-wal-disk-consumption-and-heartbeat-action-query\">setup guide</a> for how and when to configure this setting.",
"default": "",
"order": 8
},
"invalid_cdc_cursor_position_behavior": {
"type": "string",
"title": "Invalid CDC position behavior (Advanced)",
"description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
"enum": ["Fail sync", "Re-sync data"],
"default": "Fail sync",
"order": 9
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import static io.airbyte.cdk.integrations.debezium.internals.DebeziumEventConverter.CDC_DELETED_AT;
import static io.airbyte.cdk.integrations.debezium.internals.DebeziumEventConverter.CDC_LSN;
import static io.airbyte.cdk.integrations.debezium.internals.DebeziumEventConverter.CDC_UPDATED_AT;
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.FAIL_SYNC_OPTION;
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.RESYNC_DATA_OPTION;
import static io.airbyte.integrations.source.postgres.ctid.CtidStateManager.STATE_TYPE_KEY;
import static io.airbyte.integrations.source.postgres.ctid.InitialSyncCtidIteratorConstants.USE_TEST_CHUNK_SIZE;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -95,7 +97,7 @@ protected JsonNode config() {
return testdb.testConfigBuilder()
.withSchemas(modelsSchema(), modelsSchema() + "_random")
.withoutSsl()
.withCdcReplication("After loading Data in the destination")
.withCdcReplication("After loading Data in the destination", RESYNC_DATA_OPTION)
.with(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1)
.with("heartbeat_action_query", "")
.build();
Expand All @@ -122,7 +124,7 @@ void testDebugMode() {
final JsonNode invalidDebugConfig = testdb.testConfigBuilder()
.withSchemas(modelsSchema(), modelsSchema() + "_random")
.withoutSsl()
.withCdcReplication("While reading Data")
.withCdcReplication("While reading Data", RESYNC_DATA_OPTION)
.with(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1)
.with("debug_mode", true)
.build();
Expand Down Expand Up @@ -604,6 +606,47 @@ private void createAndPopulateTimestampTable() {
}
}

@Test
void testSyncShouldFailPurgedLogs() throws Exception {
final int recordsToCreate = 20;

final JsonNode config = testdb.testConfigBuilder()
.withSchemas(modelsSchema(), modelsSchema() + "_random")
.withoutSsl()
.withCdcReplication("While reading Data", FAIL_SYNC_OPTION)
.with(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1)
.build();
final AutoCloseableIterator<AirbyteMessage> firstBatchIterator = source()
.read(config, getConfiguredCatalog(), null);
final List<AirbyteMessage> dataFromFirstBatch = AutoCloseableIterators
.toListAndClose(firstBatchIterator);
final List<AirbyteStateMessage> stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch);
assertExpectedStateMessages(stateAfterFirstBatch);
// second batch of records again 20 being created
bulkInsertRecords(recordsToCreate);

// Extract the last state message
final JsonNode state = Jsons.jsonNode(Collections.singletonList(stateAfterFirstBatch.get(stateAfterFirstBatch.size() - 1)));
final AutoCloseableIterator<AirbyteMessage> secondBatchIterator = source()
.read(config, getConfiguredCatalog(), state);
final List<AirbyteMessage> dataFromSecondBatch = AutoCloseableIterators
.toListAndClose(secondBatchIterator);
final List<AirbyteStateMessage> stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch);
assertExpectedStateMessagesFromIncrementalSync(stateAfterSecondBatch);

for (int recordsCreated = 0; recordsCreated < 1; recordsCreated++) {
final JsonNode record =
Jsons.jsonNode(ImmutableMap
.of(COL_ID, 400 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL,
"H-" + recordsCreated));
writeModelRecord(record);
}

// Triggering sync with the first sync's state only which would mimic a scenario that the second
// sync failed on destination end, and we didn't save state
assertThrows(ConfigErrorException.class, () -> source().read(config, getConfiguredCatalog(), state));
}

@Test
protected void syncShouldHandlePurgedLogsGracefully() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.integrations.source.postgres;

import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY;
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.RESYNC_DATA_OPTION;

import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
Expand Down Expand Up @@ -174,10 +177,10 @@ public PostgresConfigBuilder withStandardReplication() {
}

public PostgresConfigBuilder withCdcReplication() {
return withCdcReplication("While reading Data");
return withCdcReplication("While reading Data", RESYNC_DATA_OPTION);
}

public PostgresConfigBuilder withCdcReplication(String LsnCommitBehaviour) {
public PostgresConfigBuilder withCdcReplication(String LsnCommitBehaviour, String cdcCursorFailBehaviour) {
return this
.with("is_test", true)
.with("replication_method", Jsons.jsonNode(ImmutableMap.builder()
Expand All @@ -186,6 +189,7 @@ public PostgresConfigBuilder withCdcReplication(String LsnCommitBehaviour) {
.put("publication", testDatabase.getPublicationName())
.put("initial_waiting_seconds", DEFAULT_CDC_REPLICATION_INITIAL_WAIT.getSeconds())
.put("lsn_commit_behaviour", LsnCommitBehaviour)
.put(INVALID_CDC_CURSOR_POSITION_PROPERTY, cdcCursorFailBehaviour)
.build()));
}

Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,10 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.3.11 | 2024-02-20 | [35304](https://github.com/airbytehq/airbyte/pull/35304) | Add config to throw an error on invalid CDC position and enable it by default. |
| 3.3.10 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. |
| 3.3.9 | 2024-02-13 | [35224](https://github.com/airbytehq/airbyte/pull/35224) | Adopt CDK 0.20.4 |
| 3.3.8 | 2024-02-08 | [34751](https://github.com/airbytehq/airbyte/pull/34751) | Adopt CDK 0.19.0 |
| 3.3.9 | 2024-02-13 | [35224](https://github.com/airbytehq/airbyte/pull/35224) | Adopt CDK 0.20.4 |
| 3.3.8 | 2024-02-08 | [34751](https://github.com/airbytehq/airbyte/pull/34751) | Adopt CDK 0.19.0 |
| 3.3.7 | 2024-02-08 | [34781](https://github.com/airbytehq/airbyte/pull/34781) | Add a setting in the setup page to advance the LSN. |
| 3.3.6 | 2024-02-07 | [34892](https://github.com/airbytehq/airbyte/pull/34892) | Adopt CDK v0.16.6 |
| 3.3.5 | 2024-02-07 | [34948](https://github.com/airbytehq/airbyte/pull/34948) | Adopt CDK v0.16.5 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Normally under the CDC mode, the Postgres source will first run a full refresh s
The root causes is that the WALs needed for the incremental sync has been removed by Postgres. This can occur under the following scenarios:

- When there are lots of database updates resulting in more WAL files than allowed in the `pg_wal` directory, Postgres will purge or archive the WAL files. This scenario is preventable. Possible solutions include:
- Sync the data source more frequently. The downside is that more computation resources will be consumed, leading to a higher Airbyte bill.
- Sync the data source more frequently.
- Set a higher `wal_keep_size`. If no unit is provided, it is in megabytes, and the default is `0`. See detailed documentation [here](https://www.postgresql.org/docs/current/runtime-config-replication.html#GUC-WAL-KEEP-SIZE). The downside of this approach is that more disk space will be needed.
- When the Postgres connector successfully reads the WAL and acknowledges it to Postgres, but the destination connector fails to consume the data, the Postgres connector will try to read the same WAL again, which may have been removed by Postgres, since the WAL record is already acknowledged. This scenario is rare, because it can happen, and currently there is no way to prevent it. The correct behavior is to perform a full refresh.

Expand Down

0 comments on commit b7ad7fd

Please sign in to comment.