[HUDI-8934] Claim of RFC-87. Avro elimination for Flink writer #12729
+34
−1
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Change Logs
Inspired by RFC-84 HUDI-8920: there is an opinion Avro is not the best choice for Hudi. It requires an extra ser/de operations not only between Flink operators (will be fixed by RFC-84).
I decided to benchmark a POC version with native Flink's RowData writer for Hudi. It was simple enough, because Hudi already has native RowData to Parquet writer used by append mode, I reused this writer to create log blocks and two bottlenecks were found:
Hudi performs a lot of Avro ser/de operations in writer runtime.
Hudi stores Avro recrods as List, it causes a GC pressure on writer runtime, on my benchmarks garbage collection is about 30% of all hudi writer runtime.
Impact
As a result I reduced write time from ~4min to ~1min 20sec (x3 write performance boost):
I have a POC version we are already testing in our cloud environment, key improvements:
My config
PC: 32CPU 128GiB
Data: 60 million records of TPC-H lineitem table
Java: 17 openjdk
Flink: 1.20, Single JM + Sinlge TM, standalone, taskmanager.process.size: 8G
Write: Hadoop HDFS 3.3.1, 9 node cluster
Read: Kafka 2.8, 3 node cluster, 8 partitions
Hudi table:
'connector' = 'hudi',
'path' = '<hdfs_path>',
'table.type' = 'MERGE_ON_READ',
'metadata.enabled' = 'false',
'index.type'='BUCKET',
'hoodie.bucket.index.hash.field'='l_orderkey,l_linenumber',
'hoodie.bucket.index.num.buckets'='8',
'hoodie.parquet.compression.codec' = 'snappy',
'hoodie.logfile.data.block.format' = 'parquet',
'hoodie.enable.fast.sort.write' = 'true',
'write.operation' = 'upsert',
'write.batch.size'='256',
'write.tasks'='8',
'compaction.async.enabled' = 'false',
'clean.async.enabled' = 'false',
'hoodie.archive.automatic' = 'false',
'hoodie.clean.automatic' = 'false'
Risk level (write none, low medium or high below)
None
Documentation Update
None
Contributor's checklist