From 96028b56b84bcce6f39da37fe0eb04bd0ac2c520 Mon Sep 17 00:00:00 2001 From: ddebowczyk92 Date: Sat, 28 Sep 2024 11:11:44 +0200 Subject: [PATCH] Make OpenLineageClient and Transports AutoCloseable (#3122) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Dominik Dębowczyk --- .../io/openlineage/client/OpenLineageClient.java | 8 +++++++- .../client/transports/CompositeTransport.java | 14 ++++++++++++++ .../client/transports/HttpTransport.java | 3 +-- .../client/transports/KafkaTransport.java | 5 +++++ .../openlineage/client/transports/Transport.java | 5 ++++- .../transports/dataplex/DataplexTransport.java | 2 +- .../client/transports/gcs/GcsTransport.java | 5 +++++ 7 files changed, 37 insertions(+), 5 deletions(-) diff --git a/client/java/src/main/java/io/openlineage/client/OpenLineageClient.java b/client/java/src/main/java/io/openlineage/client/OpenLineageClient.java index 6610ebb012..91788fc603 100644 --- a/client/java/src/main/java/io/openlineage/client/OpenLineageClient.java +++ b/client/java/src/main/java/io/openlineage/client/OpenLineageClient.java @@ -22,7 +22,7 @@ /** HTTP client used to emit {@link OpenLineage.RunEvent}s to HTTP backend. */ @Slf4j -public final class OpenLineageClient { +public final class OpenLineageClient implements AutoCloseable { final Transport transport; final Optional circuitBreaker; final MeterRegistry meterRegistry; @@ -160,6 +160,12 @@ public static Builder builder() { return new Builder(); } + @Override + public void close() throws Exception { + transport.close(); + meterRegistry.close(); + } + /** * Builder for {@link OpenLineageClient} instances. * diff --git a/client/java/src/main/java/io/openlineage/client/transports/CompositeTransport.java b/client/java/src/main/java/io/openlineage/client/transports/CompositeTransport.java index 5e120ab216..f504b48322 100644 --- a/client/java/src/main/java/io/openlineage/client/transports/CompositeTransport.java +++ b/client/java/src/main/java/io/openlineage/client/transports/CompositeTransport.java @@ -6,6 +6,7 @@ package io.openlineage.client.transports; import io.openlineage.client.OpenLineage; +import io.openlineage.client.OpenLineageClientException; import java.util.ArrayList; import java.util.List; import lombok.NonNull; @@ -73,4 +74,17 @@ private void handleEmissionFailure(Transport transport, Exception e) { "Transport " + transport.getClass().getSimpleName() + " failed to emit event", e); } } + + @Override + public void close() throws Exception { + transports.forEach( + t -> { + try { + t.close(); + } catch (Exception e) { + log.error("Failed to close {} transport", t.getClass().getSimpleName(), e); + throw new OpenLineageClientException(e); + } + }); + } } diff --git a/client/java/src/main/java/io/openlineage/client/transports/HttpTransport.java b/client/java/src/main/java/io/openlineage/client/transports/HttpTransport.java index de371c4d00..e26666d92d 100644 --- a/client/java/src/main/java/io/openlineage/client/transports/HttpTransport.java +++ b/client/java/src/main/java/io/openlineage/client/transports/HttpTransport.java @@ -14,7 +14,6 @@ import io.openlineage.client.OpenLineage; import io.openlineage.client.OpenLineageClientException; import io.openlineage.client.OpenLineageClientUtils; -import java.io.Closeable; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -39,7 +38,7 @@ import org.apache.http.util.EntityUtils; @Slf4j -public final class HttpTransport extends Transport implements Closeable { +public final class HttpTransport extends Transport { private static final String API_V1 = "/api/v1"; private final CloseableHttpClient http; diff --git a/client/java/src/main/java/io/openlineage/client/transports/KafkaTransport.java b/client/java/src/main/java/io/openlineage/client/transports/KafkaTransport.java index 9b0482d012..f78564408d 100644 --- a/client/java/src/main/java/io/openlineage/client/transports/KafkaTransport.java +++ b/client/java/src/main/java/io/openlineage/client/transports/KafkaTransport.java @@ -99,4 +99,9 @@ private void emit(String eventAsJson, String eventKey) { log.error("Failed to collect lineage event: {}", eventAsJson, e); } } + + @Override + public void close() throws Exception { + producer.close(); + } } diff --git a/client/java/src/main/java/io/openlineage/client/transports/Transport.java b/client/java/src/main/java/io/openlineage/client/transports/Transport.java index 7f560e2716..59485df792 100644 --- a/client/java/src/main/java/io/openlineage/client/transports/Transport.java +++ b/client/java/src/main/java/io/openlineage/client/transports/Transport.java @@ -10,7 +10,7 @@ import lombok.NonNull; @NoArgsConstructor -public abstract class Transport { +public abstract class Transport implements AutoCloseable { enum Type { CONSOLE, FILE, @@ -28,4 +28,7 @@ enum Type { public abstract void emit(@NonNull OpenLineage.DatasetEvent datasetEvent); public abstract void emit(@NonNull OpenLineage.JobEvent jobEvent); + + @Override + public void close() throws Exception {} } diff --git a/client/java/transports-dataplex/src/main/java/io/openlineage/client/transports/dataplex/DataplexTransport.java b/client/java/transports-dataplex/src/main/java/io/openlineage/client/transports/dataplex/DataplexTransport.java index 9a05ce9140..cc2855b418 100644 --- a/client/java/transports-dataplex/src/main/java/io/openlineage/client/transports/dataplex/DataplexTransport.java +++ b/client/java/transports-dataplex/src/main/java/io/openlineage/client/transports/dataplex/DataplexTransport.java @@ -36,7 +36,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j -public class DataplexTransport extends Transport implements Closeable { +public class DataplexTransport extends Transport { private final ProducerClientWrapper producerClientWrapper; diff --git a/client/java/transports-gcs/src/main/java/io/openlineage/client/transports/gcs/GcsTransport.java b/client/java/transports-gcs/src/main/java/io/openlineage/client/transports/gcs/GcsTransport.java index b4348c923b..c7322aae5a 100644 --- a/client/java/transports-gcs/src/main/java/io/openlineage/client/transports/gcs/GcsTransport.java +++ b/client/java/transports-gcs/src/main/java/io/openlineage/client/transports/gcs/GcsTransport.java @@ -97,4 +97,9 @@ private void uploadObject(String objectName, String contents) { Blob blob = storage.create(blobInfo, content); log.debug("Stored event: {}", blob.asBlobInfo().getBlobId().toGsUtilUri()); } + + @Override + public void close() throws Exception { + storage.close(); + } }