Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Cannot use org.apache.spark.sql.hudi.HoodieSparkSessionExtension to configuration session extensions Caused by: ClassNotfoundException: org.apache.spark.arvo.HoodieAvroSerializer #12633

Closed
ductrieu opened this issue Jan 14, 2025 · 2 comments

Comments

@ductrieu
Copy link

ductrieu commented Jan 14, 2025

Environment Description

  • Hudi version : 1.0.0
  • Spark version : 3.3.1
  • Storage : S3 minio
  • Running on Docker? (yes/no) : K8S

I'm trying to submit pyspark job in k8s environment and got error can not found class: HoodieAvroSerializer

here's my code:
import logging
import os
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType
from pyspark.sql.functions import lit, col
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("MinioSparkJob")
my_packages = ["org.apache.hadoop:hadoop-aws:3.3.1",
"org.apache.spark:spark-avro_2.12:3.3.1",
"org.apache.hudi:hudi-spark3.3.x_2.12:1.0.0",
"org.apache.hudi:hudi-spark3-bundle_2.12:1.0.0"]

spark = SparkSession.builder
.appName("MinioSparkJob")
.config("spark.jars.packages", ",".join(my_packages))
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.getOrCreate()

def load_config(spark_context: SparkContext):
spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID","minioadmin"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.getenv("AWS_SECRET_ACCESS_KEY","minioadmin"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT","http://minio-service.default.svc.cluster.local:9000"))
spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")
load_config(spark.sparkContext)

schema = StructType([
StructField('VendorID', LongType(), True),
StructField('tpep_pickup_datetime', StringType(), True),
StructField('tpep_dropoff_datetime', StringType(), True),
StructField('passenger_count', DoubleType(), True),
StructField('trip_distance', DoubleType(), True),
StructField('RatecodeID', DoubleType(), True),
StructField('store_and_fwd_flag', StringType(), True),
StructField('PULocationID', LongType(), True),
StructField('DOLocationID', LongType(), True),
StructField('payment_type', LongType(), True),
StructField('fare_amount', DoubleType(), True),
StructField('extra', DoubleType(), True),
StructField('mta_tax', DoubleType(), True),
StructField('tip_amount', DoubleType(), True),
StructField('tolls_amount', DoubleType(), True),
StructField('improvement_surcharge', DoubleType(), True),
StructField('total_amount', DoubleType(), True),
StructField('congestion_surcharge', DoubleType(), True)])

df = spark.read.option("header", "true").schema(schema).csv(
os.getenv("INPUT_PATH", "s3a://openlake/taxi-data.csv")).limit(100)

large_passengers_df = df.filter(df.passenger_count > 6)
total_rows_count = df.count()

tableName = "trips_table"
basePath = "s3a://openlake/hudi/trieu/trips_table"

hudi_options = {
"hoodie.table.name": tableName,
"hoodie.datasource.write.precombine.field": "tpep_pickup_datetime",
"hoodie.datasource.write.partitionpath.field": "VendorID",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.table.type": "COPY_ON_WRITE",

}
logger.info("Writing data to Hudi")

df.write.format("hudi")
.options(**hudi_options)
.mode("overwrite")
.save(basePath)
logger.info("Reading data from Hudi")
taxiDF = spark.read.format("hudi").load(basePath).limit(10)

taxiDF.printSchema()

Log error:
25/01/14 09:39:40 INFO SparkContext: Added JAR local:///app/hudi-spark3-bundle_2.12-1.0.0.jar at file:/app/hudi-spark3-bundle_2.12-1.0.0.jar with timestamp 1736847579978
25/01/14 09:39:40 INFO SparkContext: Added JAR local:///app/hadoop-aws-3.3.1.jar at file:/app/hadoop-aws-3.3.1.jar with timestamp 1736847579978
25/01/14 09:39:40 INFO SparkContext: Added JAR local:///app/hudi-spark3.3.x_2.12-1.0.0.jar at file:/app/hudi-spark3.3.x_2.12-1.0.0.jar with timestamp 1736847579978
25/01/14 09:39:40 INFO SparkContext: Added JAR local:///app/hudi-spark3-common-1.0.0.jar at file:/app/hudi-spark3-common-1.0.0.jar with timestamp 1736847579978
25/01/14 09:39:40 INFO SparkContext: Added JAR local:///app/spark-avro_2.13-3.3.1.jar at file:/app/spark-avro_2.13-3.3.1.jar with timestamp 1736847579978
25/01/14 09:39:40 INFO SparkContext: Added JAR local:///app/hudi-spark3.3-bundle_2.12-1.0.0.jar at file:/app/hudi-spark3.3-bundle_2.12-1.0.0.jar with timestamp 1736847579978

25/01/14 09:39:46 WARN SparkSession: Cannot use org.apache.spark.sql.hudi.HoodieSparkSessionExtension to configure session extensions.
java.lang.NoClassDefFoundError: org/apache/spark/sql/avro/HoodieAvroSerializer
at java.base/java.lang.Class.getDeclaredConstructors0(Native Method)
at java.base/java.lang.Class.privateGetDeclaredConstructors(Unknown Source)
at java.base/java.lang.Class.getConstructor0(Unknown Source)
at java.base/java.lang.Class.newInstance(Unknown Source)

py4j.protocol.Py4JJavaError: An error occurred while calling o84.csv.
: java.lang.NoClassDefFoundError: org/apache/spark/sql/avro/HoodieAvroSerializer
at java.base/java.lang.Class.getDeclaredConstructors0(Native Method)
at java.base/java.lang.Class.privateGetDeclaredConstructors(Unknown Source)
at java.base/java.lang.Class.getConstructor0(Unknown Source)
at java.base/java.lang.Class.newInstance(Unknown Source)

Has anyone encountered this problem and how to fix this bug? please help!

@rangareddy
Copy link

rangareddy commented Jan 15, 2025

Hi @ductrieu

Could you please update your code to include the my_packages as shown below:

SPARK_VERSION='3.5.3'
SPARK_MAJOR_VERSION='3.5'
HUDI_VERSION = '1.0.0'
HADOOP_VERSION = '3.3.4'

my_packages = [f"org.apache.hadoop:hadoop-aws:{HADOOP_VERSION}",
f"org.apache.spark:spark-avro_2.12:{SPARK_VERSION}",
f"org.apache.hudi:hudi-spark{SPARK_MAJOR_VERSION}-bundle_2.12:{HUDI_VERSION}"]

@ad1happy2go
Copy link
Collaborator

@ductrieu Did suggestion from @rangareddy worked. Do you need any other help on this.

@github-project-automation github-project-automation bot moved this to ⏳ Awaiting Triage in Hudi Issue Support Jan 20, 2025
@ad1happy2go ad1happy2go moved this from ⏳ Awaiting Triage to 👤 User Action in Hudi Issue Support Jan 27, 2025
@github-project-automation github-project-automation bot moved this from 👤 User Action to ✅ Done in Hudi Issue Support Feb 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Done
Development

No branches or pull requests

3 participants