Skip to content

Commit

Permalink
Merge pull request #13 from nubank/nufacet-resolved-input-map-enhance…
Browse files Browse the repository at this point in the history
…ment

Nufacet resolved input map enhancement
  • Loading branch information
jrosend authored Nov 27, 2024
2 parents 78946e4 + ac82254 commit 8a8e2f6
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 21 deletions.
2 changes: 2 additions & 0 deletions integration/spark/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ bin/
*/spark-warehouse

.sdkmanrc

integration/spark/docker/notebooks/
Original file line number Diff line number Diff line change
Expand Up @@ -6,48 +6,56 @@
package io.openlineage.spark.agent.facets;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.Versions;

import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Map;

import lombok.Getter;
import lombok.NonNull;
import io.openlineage.spark.api.OpenLineageContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.SparkSession;

import static io.openlineage.spark.agent.util.NuFacetsUtils.getConfigValue;
import static io.openlineage.spark.agent.util.NuFacetsUtils.parseJsonToMap;

/** Captures information related to the Apache Spark job. */
@Getter
@Slf4j
public class NuFacet extends OpenLineage.DefaultRunFacet {
// @JsonProperty("jobId")
// @NonNull
// private Integer jobId;

// @JsonProperty("jobDescription")
// private String jobDescription;

@JsonProperty("jobNurn")
private String jobNurn;

private String fetchJobNurn(OpenLineageContext olContext) {
if (olContext.getSparkSession().isPresent()) {
SparkSession sparkSession = olContext.getSparkSession().get();
try {
return sparkSession.conf().get("spark.job.name");
} catch (NoSuchElementException e) {
log.warn("spark.job.name property not found in the context");
return null;
}
}
/**
* Resolved inputs for the job.
* Map of input dataset NURNs by their location path
*/
@JsonProperty("resolvedInputs")
private Map<String, String> resolvedInputs;

log.warn("spark.job.name property not found because the SparkContext could not be retrieved from OpenLineageContext");
return null;
private String getJobNurn(SparkSession sparkSession) {
return getConfigValue("spark.job.name", sparkSession);
}

private Map<String, String> getResolvedInputs(SparkSession sparkSession) {
String resolvedInputsJson = getConfigValue("spark.job.resolvedInputsMap", sparkSession);
try {
return parseJsonToMap(resolvedInputsJson);
} catch (JsonProcessingException e) {
log.warn("Error parsing resolvedInputsJson JSON", e);
return null;
}
}

public NuFacet(@NonNull OpenLineageContext olContext) {
super(Versions.OPEN_LINEAGE_PRODUCER_URI);
this.jobNurn = fetchJobNurn(olContext);
if (olContext.getSparkSession().isPresent()) {
SparkSession sparkSession = olContext.getSparkSession().get();
this.jobNurn = getJobNurn(sparkSession);
this.resolvedInputs = getResolvedInputs(sparkSession);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.openlineage.spark.agent.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.SparkSession;

import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;

@Slf4j
public class NuFacetsUtils {
public static Map<String, String> parseJsonToMap(String jsonString) throws JsonProcessingException {
if (Objects.isNull(jsonString)) {
return null;
}
ObjectMapper mapper = new ObjectMapper();
TypeReference<HashMap<String, String>> typeRef = new TypeReference<HashMap<String, String>>() {};
return mapper.readValue(jsonString, typeRef);
}

public static String getConfigValue(String key, SparkSession sparkSession) {
try {
return sparkSession.conf().get(key);
} catch (NoSuchElementException e) {
log.warn("Property {} not found in the context", key);
return null;
}
}
}

0 comments on commit 8a8e2f6

Please sign in to comment.