Skip to content

Commit

Permalink
Make OpenLineageClient and Transports AutoCloseable (OpenLineage#3122)
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Dębowczyk <[email protected]>
  • Loading branch information
ddebowczyk92 authored Sep 28, 2024
1 parent c3c7be9 commit 96028b5
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

/** HTTP client used to emit {@link OpenLineage.RunEvent}s to HTTP backend. */
@Slf4j
public final class OpenLineageClient {
public final class OpenLineageClient implements AutoCloseable {
final Transport transport;
final Optional<CircuitBreaker> circuitBreaker;
final MeterRegistry meterRegistry;
Expand Down Expand Up @@ -160,6 +160,12 @@ public static Builder builder() {
return new Builder();
}

@Override
public void close() throws Exception {
transport.close();
meterRegistry.close();
}

/**
* Builder for {@link OpenLineageClient} instances.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.openlineage.client.transports;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientException;
import java.util.ArrayList;
import java.util.List;
import lombok.NonNull;
Expand Down Expand Up @@ -73,4 +74,17 @@ private void handleEmissionFailure(Transport transport, Exception e) {
"Transport " + transport.getClass().getSimpleName() + " failed to emit event", e);
}
}

@Override
public void close() throws Exception {
transports.forEach(
t -> {
try {
t.close();
} catch (Exception e) {
log.error("Failed to close {} transport", t.getClass().getSimpleName(), e);
throw new OpenLineageClientException(e);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientException;
import io.openlineage.client.OpenLineageClientUtils;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -39,7 +38,7 @@
import org.apache.http.util.EntityUtils;

@Slf4j
public final class HttpTransport extends Transport implements Closeable {
public final class HttpTransport extends Transport {
private static final String API_V1 = "/api/v1";

private final CloseableHttpClient http;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,9 @@ private void emit(String eventAsJson, String eventKey) {
log.error("Failed to collect lineage event: {}", eventAsJson, e);
}
}

@Override
public void close() throws Exception {
producer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import lombok.NonNull;

@NoArgsConstructor
public abstract class Transport {
public abstract class Transport implements AutoCloseable {
enum Type {
CONSOLE,
FILE,
Expand All @@ -28,4 +28,7 @@ enum Type {
public abstract void emit(@NonNull OpenLineage.DatasetEvent datasetEvent);

public abstract void emit(@NonNull OpenLineage.JobEvent jobEvent);

@Override
public void close() throws Exception {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DataplexTransport extends Transport implements Closeable {
public class DataplexTransport extends Transport {

private final ProducerClientWrapper producerClientWrapper;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,9 @@ private void uploadObject(String objectName, String contents) {
Blob blob = storage.create(blobInfo, content);
log.debug("Stored event: {}", blob.asBlobInfo().getBlobId().toGsUtilUri());
}

@Override
public void close() throws Exception {
storage.close();
}
}

0 comments on commit 96028b5

Please sign in to comment.