Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination postgres: improve sentry error grouping #35266

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,34 +64,22 @@ public void uncaughtException(final Thread thread, final Throwable throwable) {
LOGGER.error(logMessage, throwable);

// Attempt to deinterpolate the error message before emitting a trace message
final String mangledMessage;
// If any exception in the chain is of a deinterpolatable type, find it and deinterpolate its
// message.
// This assumes that any wrapping exceptions are just noise (e.g. runtime exception).
final Optional<Throwable> deinterpolatableException = ExceptionUtils.getThrowableList(throwable).stream()
.filter(t -> THROWABLES_TO_DEINTERPOLATE.stream().anyMatch(deinterpolatableClass -> deinterpolatableClass.isAssignableFrom(t.getClass())))
.findFirst();
final boolean messageWasMangled;
if (deinterpolatableException.isPresent()) {
final String originalMessage = deinterpolatableException.get().getMessage();
mangledMessage = STRINGS_TO_DEINTERPOLATE.stream()
final String mangledMessage = STRINGS_TO_DEINTERPOLATE.stream()
// Sort the strings longest to shortest, in case any target string is a substring of another
// e.g. "airbyte_internal" should be swapped out before "airbyte"
.sorted(Comparator.comparing(String::length).reversed())
.reduce(originalMessage, AirbyteExceptionHandler::deinterpolate);
messageWasMangled = !mangledMessage.equals(originalMessage);
AirbyteTraceMessageUtility.emitCustomErrorTrace(throwable.getMessage(), mangledMessage);
} else {
mangledMessage = throwable.getMessage();
messageWasMangled = false;
}

if (!messageWasMangled) {
// If we did not modify the message (either not a deinterpolatable class, or we tried to
// deinterpolate but made no changes) then emit our default trace message
AirbyteTraceMessageUtility.emitSystemErrorTrace(throwable, logMessage);
} else {
// If we did modify the message, then emit a custom trace message
AirbyteTraceMessageUtility.emitCustomErrorTrace(throwable.getMessage(), mangledMessage);
}

terminate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,6 @@ public void setup() {
AirbyteExceptionHandler.addThrowableForDeinterpolation(RuntimeException.class);
}

@Test
void testTraceMessageEmission() throws Exception {
runTestWithMessage("error");

final AirbyteMessage traceMessage = findFirstTraceMessage();
assertAll(
() -> assertEquals(AirbyteTraceMessage.Type.ERROR, traceMessage.getTrace().getType()),
() -> assertEquals(AirbyteExceptionHandler.logMessage, traceMessage.getTrace().getError().getMessage()),
() -> assertEquals(AirbyteErrorTraceMessage.FailureType.SYSTEM_ERROR, traceMessage.getTrace().getError().getFailureType()));
}

@Test
void testMessageDeinterpolation() throws Exception {
AirbyteExceptionHandler.addStringForDeinterpolation("foo");
Expand Down Expand Up @@ -89,12 +78,10 @@ void testMessageSmartDeinterpolation() throws Exception {
runTestWithMessage("Error happened in foobar");

final AirbyteMessage traceMessage = findFirstTraceMessage();
// We shouldn't deinterpolate at all in this case, so we will get the default trace message
// behavior.
assertAll(
() -> assertEquals(AirbyteExceptionHandler.logMessage, traceMessage.getTrace().getError().getMessage()),
() -> assertEquals("Error happened in foobar", traceMessage.getTrace().getError().getMessage()),
() -> assertEquals(
"java.lang.RuntimeException: Error happened in foobar",
"Error happened in foobar",
traceMessage.getTrace().getError().getInternalMessage()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 2.0.0
dockerImageTag: 2.0.1
dockerRepository: airbyte/destination-postgres-strict-encrypt
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand All @@ -18,8 +18,8 @@ data:
breakingChanges:
2.0.0:
message: >
This version introduces [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2), which provides better error handling, incremental delivery of data for large syncs, and improved final table structures.
To review the breaking changes, and how to upgrade, see [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading).
This version introduces [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2), which provides better error handling, incremental delivery of data for large syncs, and improved final table structures.
To review the breaking changes, and how to upgrade, see [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading).
These changes will likely require updates to downstream dbt / SQL models, which we walk through [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#updating-downstream-transformations).
Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync. For more controlled upgrade [see instructions](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#upgrading-connections-one-by-one-with-dual-writing).
upgradeDeadline: "2024-05-31"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.IntegrationRunner;
import io.airbyte.cdk.integrations.base.spec_modification.SpecModifyingDestination;
Expand All @@ -15,6 +16,7 @@
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.ConnectorSpecification;
import java.util.Set;
import org.postgresql.util.PSQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -66,6 +68,7 @@ public boolean isV2Destination() {
}

public static void main(final String[] args) throws Exception {
AirbyteExceptionHandler.addThrowableForDeinterpolation(PSQLException.class);
final Destination destination = new PostgresDestinationStrictEncrypt();
LOGGER.info("starting destination: {}", PostgresDestinationStrictEncrypt.class);
new IntegrationRunner(destination).run(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.20.4'
features = ['db-destinations', 'datastore-postgres', 'typing-deduping']
useLocalCdk = false
useLocalCdk = true
}

application {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 2.0.0
dockerImageTag: 2.0.1
dockerRepository: airbyte/destination-postgres
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand All @@ -22,8 +22,8 @@ data:
breakingChanges:
2.0.0:
message: >
This version introduces [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2), which provides better error handling, incremental delivery of data for large syncs, and improved final table structures.
To review the breaking changes, and how to upgrade, see [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading).
This version introduces [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2), which provides better error handling, incremental delivery of data for large syncs, and improved final table structures.
To review the breaking changes, and how to upgrade, see [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading).
These changes will likely require updates to downstream dbt / SQL models, which we walk through [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#updating-downstream-transformations).
Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync. For more controlled upgrade [see instructions](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#upgrading-connections-one-by-one-with-dual-writing).
upgradeDeadline: "2024-05-31"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.cdk.db.factory.DataSourceFactory;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.IntegrationRunner;
import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination;
Expand All @@ -28,6 +29,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.postgresql.util.PSQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -133,6 +135,7 @@ public boolean isV2Destination() {
}

public static void main(final String[] args) throws Exception {
AirbyteExceptionHandler.addThrowableForDeinterpolation(PSQLException.class);
final Destination destination = PostgresDestination.sshWrappedDestination();
LOGGER.info("starting destination: {}", PostgresDestination.class);
new IntegrationRunner(destination).run(args);
Expand Down
7 changes: 4 additions & 3 deletions docs/integrations/destinations/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ following[ sync modes](https://docs.airbyte.com/cloud/core-concepts#connection-s

### Output Schema (Raw Tables)

Each stream will be mapped to a separate raw table in Postgres. The default schema in which the raw tables are
created is `airbyte_internal`. This can be overridden in the configuration.
Each stream will be mapped to a separate raw table in Postgres. The default schema in which the raw tables are
created is `airbyte_internal`. This can be overridden in the configuration.
Each table will contain 3 columns:

- `_airbyte_raw_id`: a uuid assigned by Airbyte to each event that is processed. The column type in
Expand Down Expand Up @@ -193,6 +193,7 @@ Now that you have set up the Postgres destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------|
| 2.0.1 | 2024-02-13 | [35266](https://github.com/airbytehq/airbyte/pull/35266) | Improve error reporting. |
| 2.0.0 | 2024-02-09 | [35042](https://github.com/airbytehq/airbyte/pull/35042) | GA release V2 destinations format. |
| 0.6.3 | 2024-02-06 | [34891](https://github.com/airbytehq/airbyte/pull/34891) | Remove varchar limit, use system defaults |
| 0.6.2 | 2024-01-30 | [34683](https://github.com/airbytehq/airbyte/pull/34683) | CDK Upgrade 0.16.3; Fix dependency mismatches in slf4j lib |
Expand Down Expand Up @@ -220,4 +221,4 @@ Now that you have set up the Postgres destination connector, check out the follo
| 0.3.13 | 2021-12-01 | [\#8371](https://github.com/airbytehq/airbyte/pull/8371) | Fixed incorrect handling "\n" in ssh key |
| 0.3.12 | 2021-11-08 | [\#7719](https://github.com/airbytehq/airbyte/pull/7719) | Improve handling of wide rows by buffering records based on their byte size rather than their count |
| 0.3.11 | 2021-09-07 | [\#5743](https://github.com/airbytehq/airbyte/pull/5743) | Add SSH Tunnel support |
| 0.3.10 | 2021-08-11 | [\#5336](https://github.com/airbytehq/airbyte/pull/5336) | Destination Postgres: fix \u0000\(NULL\) value processing |
| 0.3.10 | 2021-08-11 | [\#5336](https://github.com/airbytehq/airbyte/pull/5336) | Destination Postgres: fix \u0000\(NULL\) value processing |
Loading