From adc3959088952f5378723d0d2c1c2541ee155573 Mon Sep 17 00:00:00 2001 From: kazdy Date: Fri, 11 Oct 2024 23:36:12 +0200 Subject: [PATCH 01/30] introduce table builder --- crates/core/src/table/builder.rs | 28 ++++++++++++++++++++++++++++ crates/core/src/table/mod.rs | 1 + 2 files changed, 29 insertions(+) create mode 100644 crates/core/src/table/builder.rs diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs new file mode 100644 index 00000000..75ac62a8 --- /dev/null +++ b/crates/core/src/table/builder.rs @@ -0,0 +1,28 @@ +use std::collections::HashMap; +use std::hash::Hash; +use std::sync::Arc; +use object_store::DynObjectStore; +use url::Url; +use crate::table::Table; + +pub struct TableBuilder { + base_uri: str, + hudi_options: Option>, + storage_option: Option>, + storage_backend: Option<(Arc, Url)> +} + +impl TableBuilder { + pub fn with_hudi_options(mut self, _hudi_options: HashMap) -> Self { + self + } + + pub fn with_storage_options(mut self, _storage_options: HashMap) -> Self { + self + } + + pub async fn build(self) -> anyhow::Result { + let mut table = Table::new(&self.base_uri).await; + table + } +} \ No newline at end of file diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 9ca58178..668da5f6 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -117,6 +117,7 @@ use crate::table::timeline::Timeline; mod fs_view; mod partition; mod timeline; +mod builder; /// Hudi Table in-memory #[derive(Clone, Debug)] From 3ceaeb6c78c93b4718bca93360e1088b2f0060f8 Mon Sep 17 00:00:00 2001 From: kazdy Date: Sat, 12 Oct 2024 01:35:37 +0200 Subject: [PATCH 02/30] builder prototype works --- crates/core/src/table/builder.rs | 214 +++++++++++++++++++++++++++++-- 1 file changed, 205 insertions(+), 9 deletions(-) diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 75ac62a8..32a270ec 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -1,28 +1,224 @@ use std::collections::HashMap; -use std::hash::Hash; +use std::env; +use std::path::PathBuf; +use std::str::FromStr; use std::sync::Arc; -use object_store::DynObjectStore; +use anyhow::{anyhow, Context}; +use strum::IntoEnumIterator; use url::Url; +use crate::config::{HudiConfigs, HUDI_CONF_DIR}; +use crate::config::internal::HudiInternalConfig::SkipConfigValidation; +use crate::config::read::HudiReadConfig; +use crate::config::table::{HudiTableConfig, TableTypeValue}; +use crate::config::table::HudiTableConfig::{DropsPartitionFields, TableType, TableVersion}; +use crate::config::table::TableTypeValue::CopyOnWrite; +use crate::storage::Storage; +use crate::storage::utils::{parse_config_data, parse_uri}; +use crate::table::fs_view::FileSystemView; use crate::table::Table; +use crate::table::timeline::Timeline; +#[derive(Debug)] pub struct TableBuilder { - base_uri: str, + base_url: Url, hudi_options: Option>, - storage_option: Option>, - storage_backend: Option<(Arc, Url)> + storage_options: Option> } impl TableBuilder { - pub fn with_hudi_options(mut self, _hudi_options: HashMap) -> Self { + + pub fn from_uri(uri: &str) -> Self { + let base_url = parse_uri(uri).unwrap(); // TODO: handle err + TableBuilder { + base_url: base_url.into(), + storage_options: None, + hudi_options: None + } + } + pub fn with_hudi_options(mut self, hudi_options: HashMap) -> Self { + self.hudi_options = Some(hudi_options); self } - pub fn with_storage_options(mut self, _storage_options: HashMap) -> Self { + pub fn with_storage_options(mut self, storage_options: HashMap) -> Self { + self.storage_options = Some(storage_options); self } pub async fn build(self) -> anyhow::Result
{ - let mut table = Table::new(&self.base_uri).await; - table + let base_url = Arc::new(self.base_url); + + let hudi_options = self.hudi_options.unwrap().clone(); + let mut storage_options = self.storage_options.unwrap_or(Default::default()).clone(); + + Self::load_storage_configs(&mut storage_options); + + let hudi_configs = Self::load_hudi_configs(base_url.clone(), hudi_options, &storage_options).await + .context("Failed to load table properties")?; + + let timeline = Timeline::new(base_url.clone(), Arc::from(storage_options.clone()), Arc::from(hudi_configs.clone())) + .await + .context("Failed to load timeline")?; + + let file_system_view = + FileSystemView::new(base_url.clone(), Arc::from(storage_options.clone()), Arc::from(hudi_configs.clone())) + .await + .context("Failed to load file system view")?; + + Ok(Table { + base_url, + configs: Arc::from(hudi_configs), + extra_options: Arc::from(storage_options), + timeline, + file_system_view, + }) + } + + fn load_storage_configs( + mut storage_options: &mut HashMap + ) { + Self::imbue_cloud_env_vars(&mut storage_options); + } + + async fn load_hudi_configs( + base_url: Arc, + mut hudi_options: HashMap, + storage_configs: &HashMap, + ) -> anyhow::Result + { + // let mut hudi_options = HashMap::new(); + + let storage = Storage::new(base_url, &storage_configs)?; + + Self::imbue_table_properties(&mut hudi_options, storage.clone()).await?; + + Self::imbue_global_hudi_configs(&mut hudi_options, storage.clone()).await?; + + let hudi_configs = HudiConfigs::new(hudi_options); + + Self::validate_configs(&hudi_configs).expect("Hudi configs are not valid."); + Ok(hudi_configs) + } + + fn imbue_cloud_env_vars(options: &mut HashMap) { + const PREFIXES: [&str; 3] = ["AWS_", "AZURE_", "GOOGLE_"]; + + for (key, value) in env::vars() { + if PREFIXES.iter().any(|prefix| key.starts_with(prefix)) + && !options.contains_key(&key.to_ascii_lowercase()) + { + options.insert(key.to_ascii_lowercase(), value); + } + } + } + + async fn imbue_table_properties( + options: &mut HashMap, + storage: Arc, + ) -> anyhow::Result<()> { + let bytes = storage.get_file_data(".hoodie/hoodie.properties").await?; + let table_properties = parse_config_data(&bytes, "=").await?; + + // TODO: handle the case where the same key is present in both table properties and options + for (k, v) in table_properties { + options.insert(k.to_string(), v.to_string()); + } + + Ok(()) + } + + async fn imbue_global_hudi_configs( + options: &mut HashMap, + storage: Arc, + ) -> anyhow::Result<()> { + let global_config_path = env::var(HUDI_CONF_DIR) + .map(PathBuf::from) + .unwrap_or_else(|_| PathBuf::from("/etc/hudi/conf")) + .join("hudi-defaults.conf"); + + if let Ok(bytes) = storage + .get_file_data_from_absolute_path(global_config_path.to_str().unwrap()) + .await + { + if let Ok(global_configs) = parse_config_data(&bytes, " \t=").await { + for (key, value) in global_configs { + if key.starts_with("hoodie.") && !options.contains_key(&key) { + options.insert(key.to_string(), value.to_string()); + } + } + } + } + + Ok(()) + } + + fn validate_configs(hudi_configs: &HudiConfigs) -> anyhow::Result<()> { + if hudi_configs + .get_or_default(SkipConfigValidation) + .to::() + { + return Ok(()); + } + + for conf in HudiTableConfig::iter() { + hudi_configs.validate(conf)? + } + + for conf in HudiReadConfig::iter() { + hudi_configs.validate(conf)? + } + + // additional validation + let table_type = hudi_configs.get(TableType)?.to::(); + if TableTypeValue::from_str(&table_type)? != CopyOnWrite { + return Err(anyhow!("Only support copy-on-write table.")); + } + + let table_version = hudi_configs.get(TableVersion)?.to::(); + if !(5..=6).contains(&table_version) { + return Err(anyhow!("Only support table version 5 and 6.")); + } + + let drops_partition_cols = hudi_configs + .get_or_default(DropsPartitionFields) + .to::(); + if drops_partition_cols { + return Err(anyhow!( + "Only support when `{}` is disabled", + DropsPartitionFields.as_ref() + )); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + + use std::collections::{HashMap}; + use std::fs::canonicalize; + use std::path::PathBuf; + use url::Url; + + use crate::table::builder::TableBuilder; + use crate::table::Table; + + /// Test helper to create a new `Table` instance without validating the configuration. + /// + /// # Arguments + /// + /// * `table_dir_name` - Name of the table root directory; all under `crates/core/tests/data/`. + async fn build_test_table_without_validation(table_dir_name: &str) -> Table { + let base_url = Url::from_file_path( + canonicalize(PathBuf::from("tests").join("data").join(table_dir_name)).unwrap(), + ) + .unwrap(); + + TableBuilder::from_uri(base_url.as_str()) + .with_hudi_options(HashMap::from([("hoodie.internal.skip.config.validation".to_string(), "true".to_string())])) + .build() + .await + .unwrap() } } \ No newline at end of file From 6858ec7df781b3122e5f3d8070cbbaac092e98ca Mon Sep 17 00:00:00 2001 From: kazdy Date: Sat, 12 Oct 2024 01:42:29 +0200 Subject: [PATCH 03/30] hudi options validation is Table responsibility not the TableBuilder --- crates/core/src/table/builder.rs | 51 ++------------------------------ 1 file changed, 2 insertions(+), 49 deletions(-) diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 32a270ec..40ff2c3b 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -3,15 +3,10 @@ use std::env; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; -use anyhow::{anyhow, Context}; +use anyhow::{Context}; use strum::IntoEnumIterator; use url::Url; use crate::config::{HudiConfigs, HUDI_CONF_DIR}; -use crate::config::internal::HudiInternalConfig::SkipConfigValidation; -use crate::config::read::HudiReadConfig; -use crate::config::table::{HudiTableConfig, TableTypeValue}; -use crate::config::table::HudiTableConfig::{DropsPartitionFields, TableType, TableVersion}; -use crate::config::table::TableTypeValue::CopyOnWrite; use crate::storage::Storage; use crate::storage::utils::{parse_config_data, parse_uri}; use crate::table::fs_view::FileSystemView; @@ -86,8 +81,6 @@ impl TableBuilder { storage_configs: &HashMap, ) -> anyhow::Result { - // let mut hudi_options = HashMap::new(); - let storage = Storage::new(base_url, &storage_configs)?; Self::imbue_table_properties(&mut hudi_options, storage.clone()).await?; @@ -96,7 +89,7 @@ impl TableBuilder { let hudi_configs = HudiConfigs::new(hudi_options); - Self::validate_configs(&hudi_configs).expect("Hudi configs are not valid."); + Table::validate_configs(&hudi_configs).expect("Hudi configs are not valid."); Ok(hudi_configs) } @@ -151,46 +144,6 @@ impl TableBuilder { Ok(()) } - - fn validate_configs(hudi_configs: &HudiConfigs) -> anyhow::Result<()> { - if hudi_configs - .get_or_default(SkipConfigValidation) - .to::() - { - return Ok(()); - } - - for conf in HudiTableConfig::iter() { - hudi_configs.validate(conf)? - } - - for conf in HudiReadConfig::iter() { - hudi_configs.validate(conf)? - } - - // additional validation - let table_type = hudi_configs.get(TableType)?.to::(); - if TableTypeValue::from_str(&table_type)? != CopyOnWrite { - return Err(anyhow!("Only support copy-on-write table.")); - } - - let table_version = hudi_configs.get(TableVersion)?.to::(); - if !(5..=6).contains(&table_version) { - return Err(anyhow!("Only support table version 5 and 6.")); - } - - let drops_partition_cols = hudi_configs - .get_or_default(DropsPartitionFields) - .to::(); - if drops_partition_cols { - return Err(anyhow!( - "Only support when `{}` is disabled", - DropsPartitionFields.as_ref() - )); - } - - Ok(()) - } } #[cfg(test)] From fcdb878e1c71e1a149c8e718fc3416211642ec8e Mon Sep 17 00:00:00 2001 From: kazdy Date: Sat, 12 Oct 2024 01:46:11 +0200 Subject: [PATCH 04/30] Table uses TableBuilder for plain table creation, does not panic when empty hudi_options --- crates/core/src/table/builder.rs | 2 +- crates/core/src/table/mod.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 40ff2c3b..8f9eceef 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -43,7 +43,7 @@ impl TableBuilder { pub async fn build(self) -> anyhow::Result
{ let base_url = Arc::new(self.base_url); - let hudi_options = self.hudi_options.unwrap().clone(); + let hudi_options = self.hudi_options.unwrap_or(Default::default()).clone(); let mut storage_options = self.storage_options.unwrap_or(Default::default()).clone(); Self::load_storage_configs(&mut storage_options); diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 668da5f6..6507a4aa 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -110,6 +110,7 @@ use crate::config::HudiConfigs; use crate::config::HUDI_CONF_DIR; use crate::file_group::FileSlice; use crate::storage::Storage; +use crate::table::builder::TableBuilder; use crate::table::fs_view::FileSystemView; use crate::table::partition::PartitionPruner; use crate::table::timeline::Timeline; @@ -131,7 +132,7 @@ pub struct Table { impl Table { /// Create hudi table by base_uri pub async fn new(base_uri: &str) -> Result { - Self::new_with_options(base_uri, empty_options()).await + TableBuilder::from_uri(base_uri).build().await } /// Create hudi table with options From 29561cbf0e80fb541ae2fa82b0c91dd0bc075688 Mon Sep 17 00:00:00 2001 From: kazdy Date: Sat, 12 Oct 2024 01:46:28 +0200 Subject: [PATCH 05/30] Table uses TableBuilder for plain table creation, does not panic when empty hudi_options --- crates/core/src/table/mod.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 6507a4aa..eb35f482 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -39,7 +39,7 @@ //! //! pub async fn test() { //! use arrow_schema::Schema; -//! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap(); +//! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap(); //! let hudi_table = Table::new(base_uri.path()).await.unwrap(); //! let schema = hudi_table.get_schema().await.unwrap(); //! } @@ -95,6 +95,7 @@ use arrow::record_batch::RecordBatch; use arrow_schema::{Field, Schema}; use strum::IntoEnumIterator; use url::Url; + use HudiInternalConfig::SkipConfigValidation; use HudiTableConfig::{DropsPartitionFields, TableType, TableVersion}; use TableTypeValue::CopyOnWrite; @@ -152,13 +153,15 @@ impl Table { .await .context("Failed to load timeline")?; - let file_system_view = FileSystemView::new(hudi_configs.clone(), storage_options.clone()) - .await - .context("Failed to load file system view")?; + let file_system_view = + FileSystemView::new(base_url.clone(), extra_options.clone(), configs.clone()) + .await + .context("Failed to load file system view")?; Ok(Table { - hudi_configs, - storage_options, + base_url, + configs, + extra_options, timeline, file_system_view, }) From 05b3dbe2cf3d79464bbcccae88d1b3ee77b8d50b Mon Sep 17 00:00:00 2001 From: kazdy Date: Sat, 12 Oct 2024 01:53:47 +0200 Subject: [PATCH 06/30] use TableBuilder in Table::new_with_options, nothing breaks --- crates/core/src/table/mod.rs | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index eb35f482..d5eeb045 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -110,6 +110,7 @@ use crate::config::utils::{empty_options, split_hudi_options_from_others}; use crate::config::HudiConfigs; use crate::config::HUDI_CONF_DIR; use crate::file_group::FileSlice; +use crate::storage::utils::{parse_config_data}; use crate::storage::Storage; use crate::table::builder::TableBuilder; use crate::table::fs_view::FileSystemView; @@ -153,18 +154,6 @@ impl Table { .await .context("Failed to load timeline")?; - let file_system_view = - FileSystemView::new(base_url.clone(), extra_options.clone(), configs.clone()) - .await - .context("Failed to load file system view")?; - - Ok(Table { - base_url, - configs, - extra_options, - timeline, - file_system_view, - }) } pub fn base_url(&self) -> Result { From 07b1ef3a01731f3c4a10dfbf51c670ff2a66dc8e Mon Sep 17 00:00:00 2001 From: kazdy Date: Sat, 12 Oct 2024 01:59:42 +0200 Subject: [PATCH 07/30] tidy up --- crates/core/src/table/builder.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 8f9eceef..1922528f 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -1,10 +1,8 @@ use std::collections::HashMap; use std::env; use std::path::PathBuf; -use std::str::FromStr; use std::sync::Arc; use anyhow::{Context}; -use strum::IntoEnumIterator; use url::Url; use crate::config::{HudiConfigs, HUDI_CONF_DIR}; use crate::storage::Storage; @@ -162,6 +160,7 @@ mod tests { /// # Arguments /// /// * `table_dir_name` - Name of the table root directory; all under `crates/core/tests/data/`. + #[cfg(test)] async fn build_test_table_without_validation(table_dir_name: &str) -> Table { let base_url = Url::from_file_path( canonicalize(PathBuf::from("tests").join("data").join(table_dir_name)).unwrap(), From 5a55f60ea08c65e9acff1a2d1cd4cc188783ca73 Mon Sep 17 00:00:00 2001 From: kazdy Date: Sat, 12 Oct 2024 03:03:36 +0200 Subject: [PATCH 08/30] python bindings support hudi_options and storage_options, tidy up --- crates/core/src/table/builder.rs | 107 ++++++++++++------------------- python/tests/test_table_read.py | 4 +- 2 files changed, 45 insertions(+), 66 deletions(-) diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 1922528f..9f3c49d3 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -1,31 +1,30 @@ +use crate::config::{HudiConfigs, HUDI_CONF_DIR}; +use crate::storage::utils::{parse_config_data, parse_uri}; +use crate::storage::Storage; +use crate::table::fs_view::FileSystemView; +use crate::table::timeline::Timeline; +use crate::table::Table; +use anyhow::Context; use std::collections::HashMap; use std::env; use std::path::PathBuf; use std::sync::Arc; -use anyhow::{Context}; use url::Url; -use crate::config::{HudiConfigs, HUDI_CONF_DIR}; -use crate::storage::Storage; -use crate::storage::utils::{parse_config_data, parse_uri}; -use crate::table::fs_view::FileSystemView; -use crate::table::Table; -use crate::table::timeline::Timeline; #[derive(Debug)] pub struct TableBuilder { base_url: Url, hudi_options: Option>, - storage_options: Option> + storage_options: Option>, } impl TableBuilder { - pub fn from_uri(uri: &str) -> Self { let base_url = parse_uri(uri).unwrap(); // TODO: handle err TableBuilder { - base_url: base_url.into(), + base_url, storage_options: None, - hudi_options: None + hudi_options: None, } } pub fn with_hudi_options(mut self, hudi_options: HashMap) -> Self { @@ -41,45 +40,54 @@ impl TableBuilder { pub async fn build(self) -> anyhow::Result
{ let base_url = Arc::new(self.base_url); - let hudi_options = self.hudi_options.unwrap_or(Default::default()).clone(); - let mut storage_options = self.storage_options.unwrap_or(Default::default()).clone(); + let hudi_options = self.hudi_options.unwrap_or_default().clone(); + let mut storage_options = self.storage_options.unwrap_or_default().clone(); - Self::load_storage_configs(&mut storage_options); + Self::load_storage_options(&mut storage_options); - let hudi_configs = Self::load_hudi_configs(base_url.clone(), hudi_options, &storage_options).await - .context("Failed to load table properties")?; + let hudi_configs = + Self::load_hudi_configs(base_url.clone(), hudi_options, &storage_options) + .await + .context("Failed to load table properties")?; - let timeline = Timeline::new(base_url.clone(), Arc::from(storage_options.clone()), Arc::from(hudi_configs.clone())) - .await - .context("Failed to load timeline")?; + let hudi_configs = Arc::from(hudi_configs); + let storage_options = Arc::from(storage_options); - let file_system_view = - FileSystemView::new(base_url.clone(), Arc::from(storage_options.clone()), Arc::from(hudi_configs.clone())) - .await - .context("Failed to load file system view")?; + let timeline = Timeline::new( + base_url.clone(), + storage_options.clone(), + hudi_configs.clone(), + ) + .await + .context("Failed to load timeline")?; + + let file_system_view = FileSystemView::new( + base_url.clone(), + storage_options.clone(), + hudi_configs.clone(), + ) + .await + .context("Failed to load file system view")?; Ok(Table { base_url, - configs: Arc::from(hudi_configs), - extra_options: Arc::from(storage_options), + configs: hudi_configs, + extra_options: storage_options, timeline, file_system_view, }) } - - fn load_storage_configs( - mut storage_options: &mut HashMap - ) { - Self::imbue_cloud_env_vars(&mut storage_options); + + fn load_storage_options(storage_options: &mut HashMap) { + Self::imbue_cloud_env_vars(storage_options); } - + async fn load_hudi_configs( base_url: Arc, mut hudi_options: HashMap, storage_configs: &HashMap, - ) -> anyhow::Result - { - let storage = Storage::new(base_url, &storage_configs)?; + ) -> anyhow::Result { + let storage = Storage::new(base_url, storage_configs)?; Self::imbue_table_properties(&mut hudi_options, storage.clone()).await?; @@ -143,34 +151,3 @@ impl TableBuilder { Ok(()) } } - -#[cfg(test)] -mod tests { - - use std::collections::{HashMap}; - use std::fs::canonicalize; - use std::path::PathBuf; - use url::Url; - - use crate::table::builder::TableBuilder; - use crate::table::Table; - - /// Test helper to create a new `Table` instance without validating the configuration. - /// - /// # Arguments - /// - /// * `table_dir_name` - Name of the table root directory; all under `crates/core/tests/data/`. - #[cfg(test)] - async fn build_test_table_without_validation(table_dir_name: &str) -> Table { - let base_url = Url::from_file_path( - canonicalize(PathBuf::from("tests").join("data").join(table_dir_name)).unwrap(), - ) - .unwrap(); - - TableBuilder::from_uri(base_url.as_str()) - .with_hudi_options(HashMap::from([("hoodie.internal.skip.config.validation".to_string(), "true".to_string())])) - .build() - .await - .unwrap() - } -} \ No newline at end of file diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index 2d1fd74c..f34881df 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -133,7 +133,9 @@ def test_sample_table(get_sample_table): }, ] - table = HudiTable(table_path, {"hoodie.read.as.of.timestamp": "20240402123035233"}) + table = HudiTable( + table_path, hudi_options={"hoodie.read.as.of.timestamp": "20240402123035233"} + ) batches = table.read_snapshot() t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts") assert t.to_pylist() == [ From 7c7127746dd3207cd8f618257dde6db7f8c8bf99 Mon Sep 17 00:00:00 2001 From: kazdy Date: Sat, 12 Oct 2024 03:04:41 +0200 Subject: [PATCH 09/30] add license to builder.rs --- crates/core/src/table/builder.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 9f3c49d3..34286d7e 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + use crate::config::{HudiConfigs, HUDI_CONF_DIR}; use crate::storage::utils::{parse_config_data, parse_uri}; use crate::storage::Storage; From 991f89c59fe255d23ad69f6ca24313281523055c Mon Sep 17 00:00:00 2001 From: kazdy Date: Sat, 12 Oct 2024 11:45:35 +0200 Subject: [PATCH 10/30] TableBuilder handles generic options --- crates/core/src/table/builder.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 34286d7e..b9f7bff2 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -46,6 +46,29 @@ impl TableBuilder { hudi_options: None, } } + + pub fn with_options(mut self, all_options: I) -> Self + where + I: IntoIterator, + K: AsRef, + V: Into, + { + let mut hudi_options = HashMap::new(); + let mut storage_options = HashMap::new(); + + for (k, v) in all_options { + if k.as_ref().starts_with("hoodie.") { + hudi_options.insert(k.as_ref().to_string(), v.into()); + } else { + storage_options.insert(k.as_ref().to_string(), v.into()); + } + } + + self.storage_options = Some(storage_options); + self.hudi_options = Some(hudi_options); + self + } + pub fn with_hudi_options(mut self, hudi_options: HashMap) -> Self { self.hudi_options = Some(hudi_options); self From d94e9a9893fd20186e8b0faf4818e0780952ae82 Mon Sep 17 00:00:00 2001 From: kazdy Date: Sat, 12 Oct 2024 12:12:41 +0200 Subject: [PATCH 11/30] Table builder supports chaining multiple ::with_* --- crates/core/src/table/builder.rs | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index b9f7bff2..eb6234e3 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -47,7 +47,7 @@ impl TableBuilder { } } - pub fn with_options(mut self, all_options: I) -> Self + pub fn with_options(self, all_options: I) -> Self where I: IntoIterator, K: AsRef, @@ -63,19 +63,23 @@ impl TableBuilder { storage_options.insert(k.as_ref().to_string(), v.into()); } } - - self.storage_options = Some(storage_options); - self.hudi_options = Some(hudi_options); - self + self.with_hudi_options(hudi_options) + .with_storage_options(storage_options) } pub fn with_hudi_options(mut self, hudi_options: HashMap) -> Self { - self.hudi_options = Some(hudi_options); + match self.hudi_options { + None => self.hudi_options = Some(hudi_options), + Some(options) => self.hudi_options = Some(Self::merge(options, hudi_options)), + } self } pub fn with_storage_options(mut self, storage_options: HashMap) -> Self { - self.storage_options = Some(storage_options); + match self.storage_options { + None => self.storage_options = Some(storage_options), + Some(options) => self.storage_options = Some(Self::merge(options, storage_options)), + } self } @@ -192,4 +196,8 @@ impl TableBuilder { Ok(()) } + + fn merge(map1: HashMap, map2: HashMap) -> HashMap { + map1.into_iter().chain(map2).collect() + } } From f548a81a2b4f5a7664b387f5781a5896ed489c3e Mon Sep 17 00:00:00 2001 From: kazdy Date: Sat, 12 Oct 2024 20:38:00 +0200 Subject: [PATCH 12/30] adjust according to the reviewer comments --- crates/core/src/table/builder.rs | 83 +++++++++++----------- crates/core/src/table/mod.rs | 114 ++----------------------------- python/hudi/__init__.py | 1 + python/hudi/_internal.pyi | 11 +++ python/src/internal.rs | 52 ++++++++++++++ python/src/lib.rs | 3 +- python/tests/test_table_read.py | 4 +- 7 files changed, 112 insertions(+), 156 deletions(-) diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index eb6234e3..a8ac71ef 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -17,8 +17,9 @@ * under the License. */ +use crate::config::table::HudiTableConfig; +use crate::config::utils::{parse_data_for_options, split_hudi_options_from_others}; use crate::config::{HudiConfigs, HUDI_CONF_DIR}; -use crate::storage::utils::{parse_config_data, parse_uri}; use crate::storage::Storage; use crate::table::fs_view::FileSystemView; use crate::table::timeline::Timeline; @@ -28,26 +29,24 @@ use std::collections::HashMap; use std::env; use std::path::PathBuf; use std::sync::Arc; -use url::Url; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TableBuilder { - base_url: Url, + base_uri: String, hudi_options: Option>, storage_options: Option>, } impl TableBuilder { pub fn from_uri(uri: &str) -> Self { - let base_url = parse_uri(uri).unwrap(); // TODO: handle err TableBuilder { - base_url, + base_uri: uri.to_string(), storage_options: None, hudi_options: None, } } - pub fn with_options(self, all_options: I) -> Self + pub fn with_options(self, all_options: I) -> Self where I: IntoIterator, K: AsRef, @@ -56,13 +55,10 @@ impl TableBuilder { let mut hudi_options = HashMap::new(); let mut storage_options = HashMap::new(); - for (k, v) in all_options { - if k.as_ref().starts_with("hoodie.") { - hudi_options.insert(k.as_ref().to_string(), v.into()); - } else { - storage_options.insert(k.as_ref().to_string(), v.into()); - } - } + let (hudi_opts, others) = split_hudi_options_from_others(all_options); + hudi_options.extend(hudi_opts); + storage_options.extend(others); + self.with_hudi_options(hudi_options) .with_storage_options(storage_options) } @@ -84,41 +80,30 @@ impl TableBuilder { } pub async fn build(self) -> anyhow::Result
{ - let base_url = Arc::new(self.base_url); - let hudi_options = self.hudi_options.unwrap_or_default().clone(); let mut storage_options = self.storage_options.unwrap_or_default().clone(); Self::load_storage_options(&mut storage_options); let hudi_configs = - Self::load_hudi_configs(base_url.clone(), hudi_options, &storage_options) + Self::load_hudi_configs(self.base_uri.clone(), hudi_options, &storage_options) .await .context("Failed to load table properties")?; let hudi_configs = Arc::from(hudi_configs); let storage_options = Arc::from(storage_options); - let timeline = Timeline::new( - base_url.clone(), - storage_options.clone(), - hudi_configs.clone(), - ) - .await - .context("Failed to load timeline")?; - - let file_system_view = FileSystemView::new( - base_url.clone(), - storage_options.clone(), - hudi_configs.clone(), - ) - .await - .context("Failed to load file system view")?; + let timeline = Timeline::new(hudi_configs.clone(), storage_options.clone()) + .await + .context("Failed to load timeline")?; + + let file_system_view = FileSystemView::new(hudi_configs.clone(), storage_options.clone()) + .await + .context("Failed to load file system view")?; Ok(Table { - base_url, - configs: hudi_configs, - extra_options: storage_options, + hudi_configs, + storage_options, timeline, file_system_view, }) @@ -129,15 +114,24 @@ impl TableBuilder { } async fn load_hudi_configs( - base_url: Arc, + base_uri: String, mut hudi_options: HashMap, - storage_configs: &HashMap, + storage_options: &HashMap, ) -> anyhow::Result { - let storage = Storage::new(base_url, storage_configs)?; + hudi_options.insert( + HudiTableConfig::BasePath.as_ref().to_string(), + base_uri.to_string(), + ); + + // create a [Storage] instance to load properties from storage layer. + let storage = Storage::new( + Arc::new(storage_options.clone()), + Arc::new(HudiConfigs::new(&hudi_options)), + )?; Self::imbue_table_properties(&mut hudi_options, storage.clone()).await?; - Self::imbue_global_hudi_configs(&mut hudi_options, storage.clone()).await?; + Self::imbue_global_hudi_configs_if_not_present(&mut hudi_options, storage.clone()).await?; let hudi_configs = HudiConfigs::new(hudi_options); @@ -162,7 +156,7 @@ impl TableBuilder { storage: Arc, ) -> anyhow::Result<()> { let bytes = storage.get_file_data(".hoodie/hoodie.properties").await?; - let table_properties = parse_config_data(&bytes, "=").await?; + let table_properties = parse_data_for_options(&bytes, "=")?; // TODO: handle the case where the same key is present in both table properties and options for (k, v) in table_properties { @@ -172,7 +166,7 @@ impl TableBuilder { Ok(()) } - async fn imbue_global_hudi_configs( + async fn imbue_global_hudi_configs_if_not_present( options: &mut HashMap, storage: Arc, ) -> anyhow::Result<()> { @@ -185,7 +179,7 @@ impl TableBuilder { .get_file_data_from_absolute_path(global_config_path.to_str().unwrap()) .await { - if let Ok(global_configs) = parse_config_data(&bytes, " \t=").await { + if let Ok(global_configs) = parse_data_for_options(&bytes, " \t=") { for (key, value) in global_configs { if key.starts_with("hoodie.") && !options.contains_key(&key) { options.insert(key.to_string(), value.to_string()); @@ -197,7 +191,10 @@ impl TableBuilder { Ok(()) } - fn merge(map1: HashMap, map2: HashMap) -> HashMap { + fn merge( + map1: HashMap, + map2: HashMap, + ) -> HashMap { map1.into_iter().chain(map2).collect() } } diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index d5eeb045..10ad3686 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -39,7 +39,7 @@ //! //! pub async fn test() { //! use arrow_schema::Schema; -//! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap(); +//! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap(); //! let hudi_table = Table::new(base_uri.path()).await.unwrap(); //! let schema = hudi_table.get_schema().await.unwrap(); //! } @@ -85,8 +85,6 @@ //! ``` use std::collections::{HashMap, HashSet}; -use std::env; -use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; @@ -95,7 +93,6 @@ use arrow::record_batch::RecordBatch; use arrow_schema::{Field, Schema}; use strum::IntoEnumIterator; use url::Url; - use HudiInternalConfig::SkipConfigValidation; use HudiTableConfig::{DropsPartitionFields, TableType, TableVersion}; use TableTypeValue::CopyOnWrite; @@ -105,22 +102,17 @@ use crate::config::read::HudiReadConfig; use crate::config::read::HudiReadConfig::AsOfTimestamp; use crate::config::table::HudiTableConfig::PartitionFields; use crate::config::table::{HudiTableConfig, TableTypeValue}; -use crate::config::utils::parse_data_for_options; -use crate::config::utils::{empty_options, split_hudi_options_from_others}; use crate::config::HudiConfigs; -use crate::config::HUDI_CONF_DIR; use crate::file_group::FileSlice; -use crate::storage::utils::{parse_config_data}; -use crate::storage::Storage; use crate::table::builder::TableBuilder; use crate::table::fs_view::FileSystemView; use crate::table::partition::PartitionPruner; use crate::table::timeline::Timeline; +pub mod builder; mod fs_view; mod partition; mod timeline; -mod builder; /// Hudi Table in-memory #[derive(Clone, Debug)] @@ -144,16 +136,10 @@ impl Table { K: AsRef, V: Into, { - let (hudi_configs, storage_options) = Self::load_configs(base_uri, options) + TableBuilder::from_uri(base_uri) + .with_options(options) + .build() .await - .context("Failed to load table properties")?; - let hudi_configs = Arc::new(hudi_configs); - let storage_options = Arc::new(storage_options); - - let timeline = Timeline::new(hudi_configs.clone(), storage_options.clone()) - .await - .context("Failed to load timeline")?; - } pub fn base_url(&self) -> Result { @@ -173,96 +159,6 @@ impl Table { .register_object_store(runtime_env.clone()); } - async fn load_configs( - base_uri: &str, - all_options: I, - ) -> Result<(HudiConfigs, HashMap)> - where - I: IntoIterator, - K: AsRef, - V: Into, - { - let mut hudi_options = HashMap::new(); - let mut other_options = HashMap::new(); - - Self::imbue_cloud_env_vars(&mut other_options); - - let (hudi_opts, others) = split_hudi_options_from_others(all_options); - hudi_options.extend(hudi_opts); - other_options.extend(others); - - hudi_options.insert( - HudiTableConfig::BasePath.as_ref().to_string(), - base_uri.to_string(), - ); - - // create a [Storage] instance to load properties from storage layer. - let storage = Storage::new( - Arc::new(other_options.clone()), - Arc::new(HudiConfigs::new(&hudi_options)), - )?; - - Self::imbue_table_properties(&mut hudi_options, storage.clone()).await?; - - Self::imbue_global_hudi_configs_if_not_present(&mut hudi_options, storage.clone()).await?; - - let hudi_configs = HudiConfigs::new(hudi_options); - - Self::validate_configs(&hudi_configs).map(|_| (hudi_configs, other_options)) - } - - fn imbue_cloud_env_vars(options: &mut HashMap) { - const PREFIXES: [&str; 3] = ["AWS_", "AZURE_", "GOOGLE_"]; - - for (key, value) in env::vars() { - if PREFIXES.iter().any(|prefix| key.starts_with(prefix)) - && !options.contains_key(&key.to_ascii_lowercase()) - { - options.insert(key.to_ascii_lowercase(), value); - } - } - } - - async fn imbue_table_properties( - options: &mut HashMap, - storage: Arc, - ) -> Result<()> { - let bytes = storage.get_file_data(".hoodie/hoodie.properties").await?; - let table_properties = parse_data_for_options(&bytes, "=")?; - - // TODO: handle the case where the same key is present in both table properties and options - for (k, v) in table_properties { - options.insert(k.to_string(), v.to_string()); - } - - Ok(()) - } - - async fn imbue_global_hudi_configs_if_not_present( - options: &mut HashMap, - storage: Arc, - ) -> Result<()> { - let global_config_path = env::var(HUDI_CONF_DIR) - .map(PathBuf::from) - .unwrap_or_else(|_| PathBuf::from("/etc/hudi/conf")) - .join("hudi-defaults.conf"); - - if let Ok(bytes) = storage - .get_file_data_from_absolute_path(global_config_path.to_str().unwrap()) - .await - { - if let Ok(global_configs) = parse_data_for_options(&bytes, " \t=") { - for (key, value) in global_configs { - if key.starts_with("hoodie.") && !options.contains_key(&key) { - options.insert(key.to_string(), value.to_string()); - } - } - } - } - - Ok(()) - } - fn validate_configs(hudi_configs: &HudiConfigs) -> Result<()> { if hudi_configs .get_or_default(SkipConfigValidation) diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py index b0a792e5..a5ef6f47 100644 --- a/python/hudi/__init__.py +++ b/python/hudi/__init__.py @@ -17,4 +17,5 @@ from ._internal import HudiFileSlice as HudiFileSlice from ._internal import HudiTable as HudiTable +from ._internal import HudiTableBuilder as HudiTableBuilder from ._internal import __version__ as __version__ diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi index 47da6aaf..ee15fbd0 100644 --- a/python/hudi/_internal.pyi +++ b/python/hudi/_internal.pyi @@ -50,3 +50,14 @@ class HudiTable: def read_snapshot( self, filters: Optional[List[str]] ) -> List["pyarrow.RecordBatch"]: ... + +@dataclass(init=False) +class HudiTableBuilder: + def __init__( + self + ): ... + def from_uri(self, base_uri: str) -> "HudiTableBuilder": ... + def with_options(self, options: Optional[Dict[str, str]]) -> "HudiTableBuilder": ... + def with_hudi_options(self, hudi_options: Optional[Dict[str, str]]) -> "HudiTableBuilder": ... + def with_storage_options(self, storage_options: Optional[Dict[str, str]]) -> "HudiTableBuilder": ... + def build(self) -> "HudiTable": ... diff --git a/python/src/internal.rs b/python/src/internal.rs index 4c448b51..50bcb038 100644 --- a/python/src/internal.rs +++ b/python/src/internal.rs @@ -27,6 +27,7 @@ use tokio::runtime::Runtime; use hudi::file_group::FileSlice; use hudi::table::Table; +use hudi::table::builder::TableBuilder; macro_rules! vec_string_to_slice { ($vec:expr) => { @@ -171,6 +172,57 @@ impl HudiTable { } } +#[cfg(not(tarpaulin))] +#[pyclass] +pub struct HudiTableBuilder { + inner: TableBuilder, +} + +#[cfg(not(tarpaulin))] +impl HudiTableBuilder { + + fn from_uri( + base_uri: &str + ) -> PyResult { + let inner: TableBuilder = TableBuilder::from_uri(base_uri); + Ok(HudiTableBuilder { inner }) + } + + fn with_options( + &mut self, + options: Option> + ) -> PyResult<&Self> { + let inner = self.inner.clone().with_options(options.unwrap_or_default()); + self.inner = inner; + Ok(self) + } + + fn with_hudi_options( + &mut self, + hudi_options: Option> + ) -> PyResult<&Self> { + let inner = self.inner.clone().with_options(hudi_options.unwrap_or_default()); + self.inner = inner; + Ok(self) + } + + fn with_storage_options( + &mut self, + storage_options: Option> + ) -> PyResult<&Self> { + let inner = self.inner.clone().with_options(storage_options.unwrap_or_default()); + self.inner = inner; + Ok(self) + } + + fn build( + &mut self, + ) -> PyResult { + let table = rt().block_on(self.inner.clone().build())?; + Ok(HudiTable { inner: table }) + } +} + #[cfg(not(tarpaulin))] fn rt() -> &'static Runtime { static TOKIO_RT: OnceLock = OnceLock::new(); diff --git a/python/src/lib.rs b/python/src/lib.rs index ad96dc60..d035ebb4 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -25,8 +25,9 @@ mod internal; fn _internal(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add("__version__", env!("CARGO_PKG_VERSION"))?; - use internal::{HudiFileSlice, HudiTable}; + use internal::{HudiFileSlice, HudiTable, HudiTableBuilder}; m.add_class::()?; m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index f34881df..794d84ae 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -133,9 +133,7 @@ def test_sample_table(get_sample_table): }, ] - table = HudiTable( - table_path, hudi_options={"hoodie.read.as.of.timestamp": "20240402123035233"} - ) + table = HudiTable(table_path, options={"hoodie.read.as.of.timestamp": "20240402123035233"}) batches = table.read_snapshot() t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts") assert t.to_pylist() == [ From e09ab3fe2797fdc4c81053b2ad7ef7ed1db0102e Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Sun, 13 Oct 2024 00:12:03 -0500 Subject: [PATCH 13/30] fix build --- crates/core/src/table/mod.rs | 1 + python/hudi/__init__.py | 1 - python/hudi/_internal.pyi | 15 +++----- python/src/internal.rs | 66 ++++++++------------------------- python/src/lib.rs | 6 ++- python/tests/test_table_read.py | 4 +- 6 files changed, 29 insertions(+), 64 deletions(-) diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index dc87511b..8d8bfde3 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -104,6 +104,7 @@ use crate::config::read::HudiReadConfig::AsOfTimestamp; use crate::config::table::HudiTableConfig::PartitionFields; use crate::config::table::{HudiTableConfig, TableTypeValue}; use crate::config::HudiConfigs; +use crate::file_group::reader::FileGroupReader; use crate::file_group::FileSlice; use crate::table::builder::TableBuilder; use crate::table::fs_view::FileSystemView; diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py index ebae6855..37738c76 100644 --- a/python/hudi/__init__.py +++ b/python/hudi/__init__.py @@ -18,5 +18,4 @@ from ._internal import HudiFileGroupReader as HudiFileGroupReader from ._internal import HudiFileSlice as HudiFileSlice from ._internal import HudiTable as HudiTable -from ._internal import HudiTableBuilder as HudiTableBuilder from ._internal import __version__ as __version__ diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi index 55d3ddea..6a6d9544 100644 --- a/python/hudi/_internal.pyi +++ b/python/hudi/_internal.pyi @@ -58,13 +58,8 @@ class HudiTable: self, filters: Optional[List[str]] ) -> List["pyarrow.RecordBatch"]: ... -@dataclass(init=False) -class HudiTableBuilder: - def __init__( - self - ): ... - def from_uri(self, base_uri: str) -> "HudiTableBuilder": ... - def with_options(self, options: Optional[Dict[str, str]]) -> "HudiTableBuilder": ... - def with_hudi_options(self, hudi_options: Optional[Dict[str, str]]) -> "HudiTableBuilder": ... - def with_storage_options(self, storage_options: Optional[Dict[str, str]]) -> "HudiTableBuilder": ... - def build(self) -> "HudiTable": ... +def build_hudi_table( + base_uri: str, + hudi_options: Optional[Dict[str, str]] = None, + storage_options: Optional[Dict[str, str]] = None, +) -> HudiTable: ... diff --git a/python/src/internal.rs b/python/src/internal.rs index 4571a84f..8bb7f8ac 100644 --- a/python/src/internal.rs +++ b/python/src/internal.rs @@ -22,13 +22,13 @@ use std::sync::OnceLock; use anyhow::Context; use arrow::pyarrow::ToPyArrow; -use pyo3::{pyclass, pymethods, PyErr, PyObject, PyResult, Python}; +use pyo3::{pyclass, pyfunction, pymethods, PyErr, PyObject, PyResult, Python}; use tokio::runtime::Runtime; use hudi::file_group::reader::FileGroupReader; use hudi::file_group::FileSlice; -use hudi::table::Table; use hudi::table::builder::TableBuilder; +use hudi::table::Table; macro_rules! vec_string_to_slice { ($vec:expr) => { @@ -201,54 +201,20 @@ impl HudiTable { } #[cfg(not(tarpaulin))] -#[pyclass] -pub struct HudiTableBuilder { - inner: TableBuilder, -} - -#[cfg(not(tarpaulin))] -impl HudiTableBuilder { - - fn from_uri( - base_uri: &str - ) -> PyResult { - let inner: TableBuilder = TableBuilder::from_uri(base_uri); - Ok(HudiTableBuilder { inner }) - } - - fn with_options( - &mut self, - options: Option> - ) -> PyResult<&Self> { - let inner = self.inner.clone().with_options(options.unwrap_or_default()); - self.inner = inner; - Ok(self) - } - - fn with_hudi_options( - &mut self, - hudi_options: Option> - ) -> PyResult<&Self> { - let inner = self.inner.clone().with_options(hudi_options.unwrap_or_default()); - self.inner = inner; - Ok(self) - } - - fn with_storage_options( - &mut self, - storage_options: Option> - ) -> PyResult<&Self> { - let inner = self.inner.clone().with_options(storage_options.unwrap_or_default()); - self.inner = inner; - Ok(self) - } - - fn build( - &mut self, - ) -> PyResult { - let table = rt().block_on(self.inner.clone().build())?; - Ok(HudiTable { inner: table }) - } +#[pyfunction] +#[pyo3(signature = (base_uri, hudi_options=None, storage_options=None))] +pub fn build_hudi_table( + base_uri: String, + hudi_options: Option>, + storage_options: Option>, +) -> PyResult { + let inner = rt().block_on( + TableBuilder::from_uri(&base_uri) + .with_hudi_options(hudi_options.unwrap_or_default()) + .with_storage_options(storage_options.unwrap_or_default()) + .build(), + )?; + Ok(HudiTable { inner }) } #[cfg(not(tarpaulin))] diff --git a/python/src/lib.rs b/python/src/lib.rs index 7966fb5f..b44832b1 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -25,10 +25,12 @@ mod internal; fn _internal(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add("__version__", env!("CARGO_PKG_VERSION"))?; - use internal::{HudiFileGroupReader, HudiFileSlice, HudiTable, HudiTableBuilder}; + use internal::{HudiFileGroupReader, HudiFileSlice, HudiTable}; m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; + + use internal::build_hudi_table; + m.add_function(wrap_pyfunction!(build_hudi_table, m)?)?; Ok(()) } diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index 624c554e..b5491348 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -135,7 +135,9 @@ def test_sample_table(get_sample_table): }, ] - table = HudiTable(table_path, options={"hoodie.read.as.of.timestamp": "20240402123035233"}) + table = HudiTable( + table_path, options={"hoodie.read.as.of.timestamp": "20240402123035233"} + ) batches = table.read_snapshot() t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts") assert t.to_pylist() == [ From 99a466fa1d21871b1b078348537a7a93d9aa6686 Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Sun, 13 Oct 2024 00:50:18 -0500 Subject: [PATCH 14/30] add getters --- crates/core/src/table/mod.rs | 8 ++++++++ python/hudi/_internal.pyi | 2 ++ python/src/internal.rs | 8 ++++++++ 3 files changed, 18 insertions(+) diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 8d8bfde3..4d9c9412 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -148,6 +148,14 @@ impl Table { self.hudi_configs.get(HudiTableConfig::BasePath)?.to_url() } + pub fn hudi_options(&self) -> HashMap { + self.hudi_configs.as_options() + } + + pub fn storage_options(&self) -> HashMap { + self.storage_options.as_ref().clone() + } + #[cfg(feature = "datafusion")] pub fn register_storage( &self, diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi index 6a6d9544..1b77b23a 100644 --- a/python/hudi/_internal.pyi +++ b/python/hudi/_internal.pyi @@ -47,6 +47,8 @@ class HudiTable: base_uri: str, options: Optional[Dict[str, str]] = None, ): ... + def hudi_options(self) -> Dict[str, str]: ... + def storage_options(self) -> Dict[str, str]: ... def get_schema(self) -> "pyarrow.Schema": ... def get_partition_schema(self) -> "pyarrow.Schema": ... def split_file_slices( diff --git a/python/src/internal.rs b/python/src/internal.rs index 8bb7f8ac..0ce9ff66 100644 --- a/python/src/internal.rs +++ b/python/src/internal.rs @@ -142,6 +142,14 @@ impl HudiTable { Ok(HudiTable { inner }) } + fn hudi_options(&self) -> HashMap { + self.inner.hudi_options() + } + + fn storage_options(&self) -> HashMap { + self.inner.storage_options() + } + fn get_schema(&self, py: Python) -> PyResult { rt().block_on(self.inner.get_schema())?.to_pyarrow(py) } From 06a02995ff26395b405e4ea809cec755959f9a0e Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Sun, 13 Oct 2024 01:10:50 -0500 Subject: [PATCH 15/30] align python and rust apis --- crates/core/src/table/builder.rs | 4 +-- crates/core/src/table/mod.rs | 4 +-- python/hudi/_internal.pyi | 1 + python/hudi/table/__init__.py | 16 +++++++++ python/hudi/table/builder.py | 58 ++++++++++++++++++++++++++++++++ python/src/internal.rs | 6 ++-- 6 files changed, 83 insertions(+), 6 deletions(-) create mode 100644 python/hudi/table/__init__.py create mode 100644 python/hudi/table/builder.py diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index a8ac71ef..bd40b237 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -38,9 +38,9 @@ pub struct TableBuilder { } impl TableBuilder { - pub fn from_uri(uri: &str) -> Self { + pub fn from_base_uri(base_uri: &str) -> Self { TableBuilder { - base_uri: uri.to_string(), + base_uri: base_uri.to_string(), storage_options: None, hudi_options: None, } diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 4d9c9412..6c5605b3 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -128,7 +128,7 @@ pub struct Table { impl Table { /// Create hudi table by base_uri pub async fn new(base_uri: &str) -> Result { - TableBuilder::from_uri(base_uri).build().await + TableBuilder::from_base_uri(base_uri).build().await } /// Create hudi table with options @@ -138,7 +138,7 @@ impl Table { K: AsRef, V: Into, { - TableBuilder::from_uri(base_uri) + TableBuilder::from_base_uri(base_uri) .with_options(options) .build() .await diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi index 1b77b23a..d296821c 100644 --- a/python/hudi/_internal.pyi +++ b/python/hudi/_internal.pyi @@ -64,4 +64,5 @@ def build_hudi_table( base_uri: str, hudi_options: Optional[Dict[str, str]] = None, storage_options: Optional[Dict[str, str]] = None, + options: Optional[Dict[str, str]] = None, ) -> HudiTable: ... diff --git a/python/hudi/table/__init__.py b/python/hudi/table/__init__.py new file mode 100644 index 00000000..a67d5ea2 --- /dev/null +++ b/python/hudi/table/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/python/hudi/table/builder.py b/python/hudi/table/builder.py new file mode 100644 index 00000000..5b767f58 --- /dev/null +++ b/python/hudi/table/builder.py @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from dataclasses import dataclass +from typing import Dict, Optional + +from hudi._internal import HudiTable, build_hudi_table + + +@dataclass +class HudiTableBuilder: + base_uri: str + options: Optional[Dict[str, str]] = None + hudi_options: Optional[Dict[str, str]] = None + storage_options: Optional[Dict[str, str]] = None + + @classmethod + def from_base_uri(cls, base_uri: str) -> "HudiTableBuilder": + builder = cls(base_uri) + return builder + + def with_options(self, options: Dict[str, str]) -> "HudiTableBuilder": + if self.options is None: + self.options = {} + self.options = options + return self + + def with_hudi_options(self, hudi_options: Dict[str, str]) -> "HudiTableBuilder": + if self.hudi_options is None: + self.hudi_options = {} + self.hudi_options.update(hudi_options) + return self + + def with_storage_options( + self, storage_options: Dict[str, str] + ) -> "HudiTableBuilder": + if self.storage_options is None: + self.storage_options = {} + self.storage_options.update(storage_options) + return self + + def build(self) -> "HudiTable": + return build_hudi_table( + self.base_uri, self.hudi_options, self.storage_options, self.options + ) diff --git a/python/src/internal.rs b/python/src/internal.rs index 0ce9ff66..b26a511d 100644 --- a/python/src/internal.rs +++ b/python/src/internal.rs @@ -210,16 +210,18 @@ impl HudiTable { #[cfg(not(tarpaulin))] #[pyfunction] -#[pyo3(signature = (base_uri, hudi_options=None, storage_options=None))] +#[pyo3(signature = (base_uri, hudi_options=None, storage_options=None, options=None))] pub fn build_hudi_table( base_uri: String, hudi_options: Option>, storage_options: Option>, + options: Option>, ) -> PyResult { let inner = rt().block_on( - TableBuilder::from_uri(&base_uri) + TableBuilder::from_base_uri(&base_uri) .with_hudi_options(hudi_options.unwrap_or_default()) .with_storage_options(storage_options.unwrap_or_default()) + .with_options(options.unwrap_or_default()) .build(), )?; Ok(HudiTable { inner }) From 2b366ee362588461c0c51d3cccf602465daf10d0 Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Sun, 13 Oct 2024 01:12:01 -0500 Subject: [PATCH 16/30] add python top level import --- python/hudi/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py index 37738c76..73d661e5 100644 --- a/python/hudi/__init__.py +++ b/python/hudi/__init__.py @@ -19,3 +19,4 @@ from ._internal import HudiFileSlice as HudiFileSlice from ._internal import HudiTable as HudiTable from ._internal import __version__ as __version__ +from .table.builder import HudiTableBuilder as HudiTableBuilder From f4885d2ede1a2d6bd39a6b473096d57ed5e74815 Mon Sep 17 00:00:00 2001 From: kazdy Date: Sun, 13 Oct 2024 13:35:27 +0200 Subject: [PATCH 17/30] improve python HudiTable test, add tests for build_hudi_table --- python/hudi/__init__.py | 2 +- python/tests/test_table_builder.py | 114 +++++++++++++++++++++++++++++ python/tests/test_table_read.py | 28 ++++++- 3 files changed, 142 insertions(+), 2 deletions(-) create mode 100644 python/tests/test_table_builder.py diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py index 73d661e5..d8c5f816 100644 --- a/python/hudi/__init__.py +++ b/python/hudi/__init__.py @@ -18,5 +18,5 @@ from ._internal import HudiFileGroupReader as HudiFileGroupReader from ._internal import HudiFileSlice as HudiFileSlice from ._internal import HudiTable as HudiTable +from ._internal import build_hudi_table from ._internal import __version__ as __version__ -from .table.builder import HudiTableBuilder as HudiTableBuilder diff --git a/python/tests/test_table_builder.py b/python/tests/test_table_builder.py new file mode 100644 index 00000000..6855ac29 --- /dev/null +++ b/python/tests/test_table_builder.py @@ -0,0 +1,114 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pyarrow as pa +import pytest + +from hudi import build_hudi_table + +PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) < ( + 8, + 0, + 0, +) +pytestmark = pytest.mark.skipif( + PYARROW_LE_8_0_0, reason="hudi only supported if pyarrow >= 8.0.0" +) + +def test_read_table_returns_correct_data(get_sample_table): + table_path = get_sample_table + table = build_hudi_table(table_path) + + batches = table.read_snapshot() + t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts") + assert t.to_pylist() == [ + { + "_hoodie_commit_time": "20240402144910683", + "ts": 1695046462179, + "uuid": "9909a8b1-2d15-4d3d-8ec9-efc48c536a00", + "fare": 339.0, + }, + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695091554788, + "uuid": "e96c4396-3fad-413a-a942-4cb36106d721", + "fare": 27.7, + }, + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695115999911, + "uuid": "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", + "fare": 17.85, + }, + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695159649087, + "uuid": "334e26e9-8355-45cc-97c6-c31daf0df330", + "fare": 19.1, + }, + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695516137016, + "uuid": "e3cf430c-889d-4015-bc98-59bdce1e530c", + "fare": 34.15, + }, + ] + +@pytest.mark.parametrize("hudi_options,storage_options,options", [ +({"hoodie.read.as.of.timestamp": "20240402123035233"}, None, None), +(None, None, {"hoodie.read.as.of.timestamp": "20240402123035233"})]) +def test_read_table_as_of_timestamp(get_sample_table, hudi_options, storage_options, options): + table_path = get_sample_table + table = build_hudi_table(base_uri=table_path, + hudi_options=hudi_options, + storage_options=storage_options, + options=options) + + batches = table.read_snapshot() + t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts") + assert t.to_pylist() == [ + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695046462179, + "uuid": "9909a8b1-2d15-4d3d-8ec9-efc48c536a00", + "fare": 33.9, + }, + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695091554788, + "uuid": "e96c4396-3fad-413a-a942-4cb36106d721", + "fare": 27.7, + }, + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695115999911, + "uuid": "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", + "fare": 17.85, + }, + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695159649087, + "uuid": "334e26e9-8355-45cc-97c6-c31daf0df330", + "fare": 19.1, + }, + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695516137016, + "uuid": "e3cf430c-889d-4015-bc98-59bdce1e530c", + "fare": 34.15, + }, + ] diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index b5491348..d9c7142d 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -30,7 +30,7 @@ ) -def test_sample_table(get_sample_table): +def test_read_table_has_correct_schema(get_sample_table): table_path = get_sample_table table = HudiTable(table_path) @@ -47,8 +47,16 @@ def test_sample_table(get_sample_table): "fare", "city", ] + +def test_read_table_has_correct_partition_schema(get_sample_table): + table_path = get_sample_table + table = HudiTable(table_path) assert table.get_partition_schema().names == ["city"] +def test_read_table_returns_correct_file_slices(get_sample_table): + table_path = get_sample_table + table = HudiTable(table_path) + file_slices = table.get_file_slices() assert len(file_slices) == 5 assert set(f.commit_time for f in file_slices) == { @@ -66,6 +74,12 @@ def test_sample_table(get_sample_table): "sao_paulo/ee915c68-d7f8-44f6-9759-e691add290d8-0_3-11-0_20240402123035233.parquet", } +def test_read_table_can_read_from_batches(get_sample_table): + table_path = get_sample_table + table = HudiTable(table_path) + + file_slices = table.get_file_slices() + file_slice_paths = [f.base_file_relative_path() for f in file_slices] batch = table.create_file_group_reader().read_file_slice_by_base_file_path( file_slice_paths[0] ) @@ -77,6 +91,10 @@ def test_sample_table(get_sample_table): assert len(next(file_slices_gen)) == 3 assert len(next(file_slices_gen)) == 2 +def test_read_table_returns_correct_data(get_sample_table): + table_path = get_sample_table + table = HudiTable(table_path) + batches = table.read_snapshot() t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts") assert t.to_pylist() == [ @@ -112,6 +130,10 @@ def test_sample_table(get_sample_table): }, ] +def test_read_table_for_partition(get_sample_table): + table_path = get_sample_table + table = HudiTable(table_path) + batches = table.read_snapshot(["city = san_francisco"]) t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts") assert t.to_pylist() == [ @@ -135,6 +157,10 @@ def test_sample_table(get_sample_table): }, ] +def test_read_table_as_of_timestamp(get_sample_table): + table_path = get_sample_table + table = HudiTable(table_path) + table = HudiTable( table_path, options={"hoodie.read.as.of.timestamp": "20240402123035233"} ) From 91d3086a27797d6101bd4f76ca9d0bcce4049f26 Mon Sep 17 00:00:00 2001 From: kazdy Date: Sun, 13 Oct 2024 14:27:34 +0200 Subject: [PATCH 18/30] minimal tests for handling options in table builder --- crates/core/src/table/builder.rs | 88 ++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index bd40b237..919d8ae2 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -198,3 +198,91 @@ impl TableBuilder { map1.into_iter().chain(map2).collect() } } + +#[cfg(test)] +mod tests { + use crate::table::TableBuilder; + use std::collections::HashMap; + #[test] + fn test_build_from_mixed_options() { + let options = vec![ + ("hoodie.option1", "value1"), + ("AWS_REGION", "us-east-1"), + ("hoodie.option3", "value3"), + ("AWS_ENDPOINT", "s3.us-east-1.amazonaws.com"), + ]; + let builder = TableBuilder::from_base_uri("./tmp").with_options(options); + let hudi_options = builder.hudi_options.clone().unwrap(); + let storage_options = builder.storage_options.clone().unwrap(); + assert_eq!(hudi_options.len(), 2); + assert_eq!(hudi_options["hoodie.option1"], "value1"); + assert_eq!(hudi_options["hoodie.option3"], "value3"); + assert_eq!(storage_options.len(), 2); + assert_eq!(storage_options["AWS_REGION"], "us-east-1"); + assert_eq!( + storage_options["AWS_ENDPOINT"], + "s3.us-east-1.amazonaws.com" + ); + } + + #[test] + fn test_build_from_explicit_options() { + let hudi_options = HashMap::from([ + ("hoodie.option1".to_string(), "value1".to_string()), + ("hoodie.option3".to_string(), "value3".to_string()), + ]); + let storage_options = HashMap::from([ + ("AWS_REGION".to_string(), "us-east-1".to_string()), + ( + "AWS_ENDPOINT".to_string(), + "s3.us-east-1.amazonaws.com".to_string(), + ), + ]); + let builder = TableBuilder::from_base_uri("./tmp") + .with_hudi_options(hudi_options) + .with_storage_options(storage_options); + let hudi_options = builder.hudi_options.clone().unwrap(); + let storage_options = builder.storage_options.clone().unwrap(); + assert_eq!(hudi_options.len(), 2); + assert_eq!(hudi_options["hoodie.option1"], "value1"); + assert_eq!(hudi_options["hoodie.option3"], "value3"); + assert_eq!(storage_options.len(), 2); + assert_eq!(storage_options["AWS_REGION"], "us-east-1"); + assert_eq!( + storage_options["AWS_ENDPOINT"], + "s3.us-east-1.amazonaws.com" + ); + } + + #[test] + fn test_build_from_explicit_options_chained() { + let builder = TableBuilder::from_base_uri("./tmp") + .with_hudi_options(HashMap::from([( + "hoodie.option1".to_string(), + "value1".to_string(), + )])) + .with_hudi_options(HashMap::from([( + "hoodie.option3".to_string(), + "value3".to_string(), + )])) + .with_storage_options(HashMap::from([( + "AWS_REGION".to_string(), + "us-east-1".to_string(), + )])) + .with_storage_options(HashMap::from([( + "AWS_ENDPOINT".to_string(), + "s3.us-east-1.amazonaws.com".to_string(), + )])); + let hudi_options = builder.hudi_options.clone().unwrap(); + let storage_options = builder.storage_options.clone().unwrap(); + assert_eq!(hudi_options.len(), 2); + assert_eq!(hudi_options["hoodie.option1"], "value1"); + assert_eq!(hudi_options["hoodie.option3"], "value3"); + assert_eq!(storage_options.len(), 2); + assert_eq!(storage_options["AWS_REGION"], "us-east-1"); + assert_eq!( + storage_options["AWS_ENDPOINT"], + "s3.us-east-1.amazonaws.com" + ); + } +} From b7d902db95ff7b9d49ea673c29eda30a6b521d9f Mon Sep 17 00:00:00 2001 From: kazdy Date: Sun, 13 Oct 2024 14:43:09 +0200 Subject: [PATCH 19/30] tidy up python bindings, fix formatting --- python/hudi/__init__.py | 3 ++- python/tests/test_table_builder.py | 31 ++++++++++++++++++++---------- python/tests/test_table_read.py | 6 ++++++ 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py index d8c5f816..b44835b2 100644 --- a/python/hudi/__init__.py +++ b/python/hudi/__init__.py @@ -15,8 +15,9 @@ # specific language governing permissions and limitations # under the License. +from hudi.table.builder import HudiTableBuilder as HudiTableBuilder + from ._internal import HudiFileGroupReader as HudiFileGroupReader from ._internal import HudiFileSlice as HudiFileSlice from ._internal import HudiTable as HudiTable -from ._internal import build_hudi_table from ._internal import __version__ as __version__ diff --git a/python/tests/test_table_builder.py b/python/tests/test_table_builder.py index 6855ac29..1c16f62b 100644 --- a/python/tests/test_table_builder.py +++ b/python/tests/test_table_builder.py @@ -18,7 +18,7 @@ import pyarrow as pa import pytest -from hudi import build_hudi_table +from hudi import HudiTableBuilder PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) < ( 8, @@ -29,9 +29,10 @@ PYARROW_LE_8_0_0, reason="hudi only supported if pyarrow >= 8.0.0" ) + def test_read_table_returns_correct_data(get_sample_table): table_path = get_sample_table - table = build_hudi_table(table_path) + table = HudiTableBuilder.from_base_uri(table_path).build() batches = table.read_snapshot() t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts") @@ -68,15 +69,25 @@ def test_read_table_returns_correct_data(get_sample_table): }, ] -@pytest.mark.parametrize("hudi_options,storage_options,options", [ -({"hoodie.read.as.of.timestamp": "20240402123035233"}, None, None), -(None, None, {"hoodie.read.as.of.timestamp": "20240402123035233"})]) -def test_read_table_as_of_timestamp(get_sample_table, hudi_options, storage_options, options): + +@pytest.mark.parametrize( + "hudi_options,storage_options,options", + [ + ({"hoodie.read.as.of.timestamp": "20240402123035233"}, {}, {}), + ({}, {}, {"hoodie.read.as.of.timestamp": "20240402123035233"}), + ], +) +def test_read_table_as_of_timestamp( + get_sample_table, hudi_options, storage_options, options +): table_path = get_sample_table - table = build_hudi_table(base_uri=table_path, - hudi_options=hudi_options, - storage_options=storage_options, - options=options) + table = ( + HudiTableBuilder.from_base_uri(table_path) + .with_hudi_options(hudi_options) + .with_storage_options(storage_options) + .with_options(options) + .build() + ) batches = table.read_snapshot() t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts") diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index d9c7142d..3347783f 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -48,11 +48,13 @@ def test_read_table_has_correct_schema(get_sample_table): "city", ] + def test_read_table_has_correct_partition_schema(get_sample_table): table_path = get_sample_table table = HudiTable(table_path) assert table.get_partition_schema().names == ["city"] + def test_read_table_returns_correct_file_slices(get_sample_table): table_path = get_sample_table table = HudiTable(table_path) @@ -74,6 +76,7 @@ def test_read_table_returns_correct_file_slices(get_sample_table): "sao_paulo/ee915c68-d7f8-44f6-9759-e691add290d8-0_3-11-0_20240402123035233.parquet", } + def test_read_table_can_read_from_batches(get_sample_table): table_path = get_sample_table table = HudiTable(table_path) @@ -91,6 +94,7 @@ def test_read_table_can_read_from_batches(get_sample_table): assert len(next(file_slices_gen)) == 3 assert len(next(file_slices_gen)) == 2 + def test_read_table_returns_correct_data(get_sample_table): table_path = get_sample_table table = HudiTable(table_path) @@ -130,6 +134,7 @@ def test_read_table_returns_correct_data(get_sample_table): }, ] + def test_read_table_for_partition(get_sample_table): table_path = get_sample_table table = HudiTable(table_path) @@ -157,6 +162,7 @@ def test_read_table_for_partition(get_sample_table): }, ] + def test_read_table_as_of_timestamp(get_sample_table): table_path = get_sample_table table = HudiTable(table_path) From 7b7a7676be626fab468592960938b0f580ec16b6 Mon Sep 17 00:00:00 2001 From: kazdy Date: Sun, 13 Oct 2024 21:47:29 +0200 Subject: [PATCH 20/30] add docstrings for HudiTableBuilder --- python/hudi/table/builder.py | 52 ++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/python/hudi/table/builder.py b/python/hudi/table/builder.py index 5b767f58..3d2b76e2 100644 --- a/python/hudi/table/builder.py +++ b/python/hudi/table/builder.py @@ -22,6 +22,16 @@ @dataclass class HudiTableBuilder: + """ + A builder class for constructing a HudiTable object with customizable options. + + Attributes: + base_uri (str): The base URI of the Hudi table. + options (Optional[Dict[str, str]]): Both hudi and storage options for building the table. + hudi_options (Optional[Dict[str, str]]): Hudi configuration options. + storage_options (Optional[Dict[str, str]]): Storage-related options. + """ + base_uri: str options: Optional[Dict[str, str]] = None hudi_options: Optional[Dict[str, str]] = None @@ -29,16 +39,43 @@ class HudiTableBuilder: @classmethod def from_base_uri(cls, base_uri: str) -> "HudiTableBuilder": + """ + Initializes a HudiTableBuilder using the base URI of the Hudi table. + + Parameters: + base_uri (str): The base URI of the Hudi table. + + Returns: + HudiTableBuilder: An instance of the builder. + """ builder = cls(base_uri) return builder def with_options(self, options: Dict[str, str]) -> "HudiTableBuilder": + """ + Adds general options for configuring the HudiTable. + + Parameters: + options (Dict[str, str]): General options to be applied, can pass hudi and storage options. + + Returns: + HudiTableBuilder: The builder instance. + """ if self.options is None: self.options = {} self.options = options return self def with_hudi_options(self, hudi_options: Dict[str, str]) -> "HudiTableBuilder": + """ + Adds Hudi options to the builder. + + Parameters: + - hudi_options (Dict[str, str]): Hudi options to be applied. + + Returns: + - HudiTableBuilder: The builder instance. + """ if self.hudi_options is None: self.hudi_options = {} self.hudi_options.update(hudi_options) @@ -47,12 +84,27 @@ def with_hudi_options(self, hudi_options: Dict[str, str]) -> "HudiTableBuilder": def with_storage_options( self, storage_options: Dict[str, str] ) -> "HudiTableBuilder": + """ + Adds storage-related options for configuring the table. + + Parameters: + - storage_options (Dict[str, str]): Storage-related options to be applied. + + Returns: + - HudiTableBuilder: The builder instance. + """ if self.storage_options is None: self.storage_options = {} self.storage_options.update(storage_options) return self def build(self) -> "HudiTable": + """ + Constructs and returns a HudiTable object with the specified options. + + Returns: + - HudiTable: The constructed HudiTable object. + """ return build_hudi_table( self.base_uri, self.hudi_options, self.storage_options, self.options ) From 47dc24e08baa4ec1502543056516eda24cb1f571 Mon Sep 17 00:00:00 2001 From: kazdy Date: Sun, 13 Oct 2024 21:48:23 +0200 Subject: [PATCH 21/30] remove hyphens --- python/hudi/table/builder.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/hudi/table/builder.py b/python/hudi/table/builder.py index 3d2b76e2..d6c87d5d 100644 --- a/python/hudi/table/builder.py +++ b/python/hudi/table/builder.py @@ -71,10 +71,10 @@ def with_hudi_options(self, hudi_options: Dict[str, str]) -> "HudiTableBuilder": Adds Hudi options to the builder. Parameters: - - hudi_options (Dict[str, str]): Hudi options to be applied. + hudi_options (Dict[str, str]): Hudi options to be applied. Returns: - - HudiTableBuilder: The builder instance. + HudiTableBuilder: The builder instance. """ if self.hudi_options is None: self.hudi_options = {} @@ -88,10 +88,10 @@ def with_storage_options( Adds storage-related options for configuring the table. Parameters: - - storage_options (Dict[str, str]): Storage-related options to be applied. + storage_options (Dict[str, str]): Storage-related options to be applied. Returns: - - HudiTableBuilder: The builder instance. + HudiTableBuilder: The builder instance. """ if self.storage_options is None: self.storage_options = {} @@ -103,7 +103,7 @@ def build(self) -> "HudiTable": Constructs and returns a HudiTable object with the specified options. Returns: - - HudiTable: The constructed HudiTable object. + HudiTable: The constructed HudiTable object. """ return build_hudi_table( self.base_uri, self.hudi_options, self.storage_options, self.options From 8add39931d66da79a35c0f9a60ea4da616c1254c Mon Sep 17 00:00:00 2001 From: kazdy Date: Sun, 13 Oct 2024 21:57:47 +0200 Subject: [PATCH 22/30] add minimal docs to TableBuilder --- crates/core/src/table/builder.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 919d8ae2..717d4b57 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -30,6 +30,7 @@ use std::env; use std::path::PathBuf; use std::sync::Arc; +/// Hudi Table builder #[derive(Debug, Clone)] pub struct TableBuilder { base_uri: String, @@ -38,6 +39,7 @@ pub struct TableBuilder { } impl TableBuilder { + /// Create Hudi table builder from base table uri pub fn from_base_uri(base_uri: &str) -> Self { TableBuilder { base_uri: base_uri.to_string(), @@ -46,6 +48,7 @@ impl TableBuilder { } } + /// Add hudi and/or storage options for configuring the TableBuilder. pub fn with_options(self, all_options: I) -> Self where I: IntoIterator, @@ -63,6 +66,7 @@ impl TableBuilder { .with_storage_options(storage_options) } + /// Add hudi options for configuring the TableBuilder. pub fn with_hudi_options(mut self, hudi_options: HashMap) -> Self { match self.hudi_options { None => self.hudi_options = Some(hudi_options), @@ -71,6 +75,7 @@ impl TableBuilder { self } + /// Add hudi options for configuring the TableBuilder. pub fn with_storage_options(mut self, storage_options: HashMap) -> Self { match self.storage_options { None => self.storage_options = Some(storage_options), @@ -79,6 +84,7 @@ impl TableBuilder { self } + /// Construct and return a Table object with the specified options. pub async fn build(self) -> anyhow::Result
{ let hudi_options = self.hudi_options.unwrap_or_default().clone(); let mut storage_options = self.storage_options.unwrap_or_default().clone(); From bf439ffe834e6a859ebf969bad4d4901025deba6 Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Sun, 13 Oct 2024 16:57:20 -0500 Subject: [PATCH 23/30] improve logic to resolve options --- Cargo.toml | 1 + crates/core/Cargo.toml | 1 + crates/core/src/table/builder.rs | 179 +++++++++++++++++-------------- 3 files changed, 98 insertions(+), 83 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 71f3ca01..60de5812 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,7 @@ serde_json = { version = "1" } # "stdlib" anyhow = { version = "1.0.86" } bytes = { version = "1" } +paste = { version = "1.0.15" } once_cell = { version = "1.19.0" } strum = { version = "0.26.3", features = ["derive"] } strum_macros = "0.26.4" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 1969d9a5..90fae7dc 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -50,6 +50,7 @@ serde_json = { workspace = true } # "stdlib" anyhow = { workspace = true } bytes = { workspace = true } +paste = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } url = { workspace = true } diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 717d4b57..68e23cbe 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -24,80 +24,67 @@ use crate::storage::Storage; use crate::table::fs_view::FileSystemView; use crate::table::timeline::Timeline; use crate::table::Table; -use anyhow::Context; +use anyhow::{Context, Result}; +use paste::paste; use std::collections::HashMap; use std::env; +use std::hash::Hash; use std::path::PathBuf; use std::sync::Arc; -/// Hudi Table builder +/// Builder for creating a [Table] instance. #[derive(Debug, Clone)] pub struct TableBuilder { base_uri: String, - hudi_options: Option>, - storage_options: Option>, + hudi_options: HashMap, + storage_options: HashMap, + options: HashMap, } +macro_rules! impl_with_options { + ($struct_name:ident, $($field:ident),+) => { + impl $struct_name { + $( + paste! { + /// Add options to the builder. + /// Subsequent calls overwrite the previous values if the key already exists. + pub fn [](mut self, options: I) -> Self + where + I: IntoIterator, + K: AsRef, + V: Into, + { + self.$field.extend(options.into_iter().map(|(k, v)| (k.as_ref().to_string(), v.into()))); + self + } + } + )+ + } + }; +} + +impl_with_options!(TableBuilder, hudi_options, storage_options, options); + impl TableBuilder { /// Create Hudi table builder from base table uri pub fn from_base_uri(base_uri: &str) -> Self { TableBuilder { base_uri: base_uri.to_string(), - storage_options: None, - hudi_options: None, + storage_options: HashMap::new(), + hudi_options: HashMap::new(), + options: HashMap::new(), } } - /// Add hudi and/or storage options for configuring the TableBuilder. - pub fn with_options(self, all_options: I) -> Self - where - I: IntoIterator, - K: AsRef, - V: Into, - { - let mut hudi_options = HashMap::new(); - let mut storage_options = HashMap::new(); + pub async fn build(&mut self) -> Result
{ + self.resolve_options().await?; - let (hudi_opts, others) = split_hudi_options_from_others(all_options); - hudi_options.extend(hudi_opts); - storage_options.extend(others); + let hudi_configs = HudiConfigs::new(self.hudi_options.iter()); - self.with_hudi_options(hudi_options) - .with_storage_options(storage_options) - } - - /// Add hudi options for configuring the TableBuilder. - pub fn with_hudi_options(mut self, hudi_options: HashMap) -> Self { - match self.hudi_options { - None => self.hudi_options = Some(hudi_options), - Some(options) => self.hudi_options = Some(Self::merge(options, hudi_options)), - } - self - } - - /// Add hudi options for configuring the TableBuilder. - pub fn with_storage_options(mut self, storage_options: HashMap) -> Self { - match self.storage_options { - None => self.storage_options = Some(storage_options), - Some(options) => self.storage_options = Some(Self::merge(options, storage_options)), - } - self - } - - /// Construct and return a Table object with the specified options. - pub async fn build(self) -> anyhow::Result
{ - let hudi_options = self.hudi_options.unwrap_or_default().clone(); - let mut storage_options = self.storage_options.unwrap_or_default().clone(); - - Self::load_storage_options(&mut storage_options); - - let hudi_configs = - Self::load_hudi_configs(self.base_uri.clone(), hudi_options, &storage_options) - .await - .context("Failed to load table properties")?; + Table::validate_configs(&hudi_configs).expect("Hudi configs are not valid."); let hudi_configs = Arc::from(hudi_configs); - let storage_options = Arc::from(storage_options); + let storage_options = Arc::from(self.storage_options.clone()); let timeline = Timeline::new(hudi_configs.clone(), storage_options.clone()) .await @@ -115,37 +102,42 @@ impl TableBuilder { }) } - fn load_storage_options(storage_options: &mut HashMap) { - Self::imbue_cloud_env_vars(storage_options); - } - - async fn load_hudi_configs( - base_uri: String, - mut hudi_options: HashMap, - storage_options: &HashMap, - ) -> anyhow::Result { - hudi_options.insert( + /// Resolve all options by combining the ones from hoodie.properties, user-provided options, + /// env vars, and global Hudi configs. The precedence order is as follows: + /// + /// 1. hoodie.properties + /// 2. Explicit options provided by the user + /// 3. Generic options provided by the user + /// 4. Env vars + /// 5. Global Hudi configs + /// + /// [note] Error may occur when 1 and 2 have conflicts. + async fn resolve_options(&mut self) -> Result<()> { + // Insert the base path into hudi options since it is explicitly provided + self.hudi_options.insert( HudiTableConfig::BasePath.as_ref().to_string(), - base_uri.to_string(), + self.base_uri.clone(), ); - // create a [Storage] instance to load properties from storage layer. - let storage = Storage::new( - Arc::new(storage_options.clone()), - Arc::new(HudiConfigs::new(&hudi_options)), - )?; - - Self::imbue_table_properties(&mut hudi_options, storage.clone()).await?; + let (generic_hudi_opts, generic_other_opts) = + split_hudi_options_from_others(self.options.iter()); - Self::imbue_global_hudi_configs_if_not_present(&mut hudi_options, storage.clone()).await?; + // Combine generic options (lower precedence) with explicit options. + // Note that we treat all non-Hudi options as storage options + Self::extend_if_absent(&mut self.hudi_options, &generic_hudi_opts); + Self::extend_if_absent(&mut self.storage_options, &generic_other_opts); - let hudi_configs = HudiConfigs::new(hudi_options); + // if any user-provided options are intended for cloud storage and in uppercase, + // convert them to lowercase. This is to allow `object_store` to pick them up. + // Note that we do not need to look up env vars for storage as `object_store` does that for us. + Self::format_cloud_env_vars(&mut self.storage_options); - Table::validate_configs(&hudi_configs).expect("Hudi configs are not valid."); - Ok(hudi_configs) + // At this point, we have resolved the storage options needed for accessing the storage layer. + // We can now resolve the hudi options + Self::resolve_hudi_options(&self.storage_options, &mut self.hudi_options).await } - fn imbue_cloud_env_vars(options: &mut HashMap) { + fn format_cloud_env_vars(options: &mut HashMap) { const PREFIXES: [&str; 3] = ["AWS_", "AZURE_", "GOOGLE_"]; for (key, value) in env::vars() { @@ -157,13 +149,31 @@ impl TableBuilder { } } + async fn resolve_hudi_options( + storage_options: &HashMap, + hudi_options: &mut HashMap, + ) -> Result<()> { + // create a [Storage] instance to load properties from storage layer. + let storage = Storage::new( + Arc::new(storage_options.clone()), + Arc::new(HudiConfigs::new(hudi_options.iter())), + )?; + + Self::imbue_table_properties(hudi_options, storage.clone()).await?; + + // TODO load Hudi configs from env vars here before loading global configs + + Self::imbue_global_hudi_configs_if_absent(hudi_options, storage.clone()).await + } + async fn imbue_table_properties( options: &mut HashMap, storage: Arc, - ) -> anyhow::Result<()> { + ) -> Result<()> { let bytes = storage.get_file_data(".hoodie/hoodie.properties").await?; let table_properties = parse_data_for_options(&bytes, "=")?; + // We currently treat all table properties as the highest precedence, which is valid for most cases. // TODO: handle the case where the same key is present in both table properties and options for (k, v) in table_properties { options.insert(k.to_string(), v.to_string()); @@ -172,10 +182,10 @@ impl TableBuilder { Ok(()) } - async fn imbue_global_hudi_configs_if_not_present( + async fn imbue_global_hudi_configs_if_absent( options: &mut HashMap, storage: Arc, - ) -> anyhow::Result<()> { + ) -> Result<()> { let global_config_path = env::var(HUDI_CONF_DIR) .map(PathBuf::from) .unwrap_or_else(|_| PathBuf::from("/etc/hudi/conf")) @@ -197,11 +207,14 @@ impl TableBuilder { Ok(()) } - fn merge( - map1: HashMap, - map2: HashMap, - ) -> HashMap { - map1.into_iter().chain(map2).collect() + fn extend_if_absent(target: &mut HashMap, source: &HashMap) + where + K: Eq + Hash + Clone, + V: Clone, + { + for (key, value) in source { + target.entry(key.clone()).or_insert_with(|| value.clone()); + } } } From 0c922833f98827ffe389a38eda9e9f9689bb8185 Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Sun, 13 Oct 2024 17:14:16 -0500 Subject: [PATCH 24/30] update test case --- crates/core/src/table/builder.rs | 74 ++++++++------------------------ 1 file changed, 18 insertions(+), 56 deletions(-) diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 68e23cbe..a305bdd7 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -220,48 +220,20 @@ impl TableBuilder { #[cfg(test)] mod tests { - use crate::table::TableBuilder; - use std::collections::HashMap; + use super::*; + #[test] - fn test_build_from_mixed_options() { - let options = vec![ - ("hoodie.option1", "value1"), + fn test_build_from_explicit_options() { + let hudi_options = [("hoodie.option1", "value1"), ("hoodie.option3", "value3")]; + let storage_options = [ ("AWS_REGION", "us-east-1"), - ("hoodie.option3", "value3"), ("AWS_ENDPOINT", "s3.us-east-1.amazonaws.com"), ]; - let builder = TableBuilder::from_base_uri("./tmp").with_options(options); - let hudi_options = builder.hudi_options.clone().unwrap(); - let storage_options = builder.storage_options.clone().unwrap(); - assert_eq!(hudi_options.len(), 2); - assert_eq!(hudi_options["hoodie.option1"], "value1"); - assert_eq!(hudi_options["hoodie.option3"], "value3"); - assert_eq!(storage_options.len(), 2); - assert_eq!(storage_options["AWS_REGION"], "us-east-1"); - assert_eq!( - storage_options["AWS_ENDPOINT"], - "s3.us-east-1.amazonaws.com" - ); - } - - #[test] - fn test_build_from_explicit_options() { - let hudi_options = HashMap::from([ - ("hoodie.option1".to_string(), "value1".to_string()), - ("hoodie.option3".to_string(), "value3".to_string()), - ]); - let storage_options = HashMap::from([ - ("AWS_REGION".to_string(), "us-east-1".to_string()), - ( - "AWS_ENDPOINT".to_string(), - "s3.us-east-1.amazonaws.com".to_string(), - ), - ]); - let builder = TableBuilder::from_base_uri("./tmp") + let builder = TableBuilder::from_base_uri("/tmp/hudi_data") .with_hudi_options(hudi_options) .with_storage_options(storage_options); - let hudi_options = builder.hudi_options.clone().unwrap(); - let storage_options = builder.storage_options.clone().unwrap(); + let hudi_options = &builder.hudi_options; + let storage_options = &builder.storage_options; assert_eq!(hudi_options.len(), 2); assert_eq!(hudi_options["hoodie.option1"], "value1"); assert_eq!(hudi_options["hoodie.option3"], "value3"); @@ -275,27 +247,17 @@ mod tests { #[test] fn test_build_from_explicit_options_chained() { - let builder = TableBuilder::from_base_uri("./tmp") - .with_hudi_options(HashMap::from([( - "hoodie.option1".to_string(), - "value1".to_string(), - )])) - .with_hudi_options(HashMap::from([( - "hoodie.option3".to_string(), - "value3".to_string(), - )])) - .with_storage_options(HashMap::from([( - "AWS_REGION".to_string(), - "us-east-1".to_string(), - )])) - .with_storage_options(HashMap::from([( - "AWS_ENDPOINT".to_string(), - "s3.us-east-1.amazonaws.com".to_string(), - )])); - let hudi_options = builder.hudi_options.clone().unwrap(); - let storage_options = builder.storage_options.clone().unwrap(); + let builder = TableBuilder::from_base_uri("/tmp/hudi_data") + .with_hudi_options([("hoodie.option1", "value1")]) + .with_hudi_options([("hoodie.option1", "value1-1")]) + .with_hudi_options([("hoodie.option3", "value3")]) + .with_storage_options([("AWS_REGION", "us-east-2")]) + .with_storage_options([("AWS_REGION", "us-east-1")]) + .with_storage_options([("AWS_ENDPOINT", "s3.us-east-1.amazonaws.com")]); + let hudi_options = &builder.hudi_options.clone(); + let storage_options = &builder.storage_options.clone(); assert_eq!(hudi_options.len(), 2); - assert_eq!(hudi_options["hoodie.option1"], "value1"); + assert_eq!(hudi_options["hoodie.option1"], "value1-1"); assert_eq!(hudi_options["hoodie.option3"], "value3"); assert_eq!(storage_options.len(), 2); assert_eq!(storage_options["AWS_REGION"], "us-east-1"); From 43cc8db5ec0a9918d35aaa5b0cf0a3b269dc1e90 Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Sun, 13 Oct 2024 17:42:12 -0500 Subject: [PATCH 25/30] update python apis --- python/hudi/table/builder.py | 48 +++++++++++++++------------------ python/tests/test_table_read.py | 3 +-- 2 files changed, 22 insertions(+), 29 deletions(-) diff --git a/python/hudi/table/builder.py b/python/hudi/table/builder.py index d6c87d5d..966fabf1 100644 --- a/python/hudi/table/builder.py +++ b/python/hudi/table/builder.py @@ -14,8 +14,8 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from dataclasses import dataclass -from typing import Dict, Optional +from dataclasses import dataclass, field +from typing import Dict from hudi._internal import HudiTable, build_hudi_table @@ -27,15 +27,15 @@ class HudiTableBuilder: Attributes: base_uri (str): The base URI of the Hudi table. - options (Optional[Dict[str, str]]): Both hudi and storage options for building the table. - hudi_options (Optional[Dict[str, str]]): Hudi configuration options. - storage_options (Optional[Dict[str, str]]): Storage-related options. + options (Dict[str, str]): Both hudi and storage options for building the table. + hudi_options (Dict[str, str]): Hudi configuration options. + storage_options (Dict[str, str]): Storage-related options. """ base_uri: str - options: Optional[Dict[str, str]] = None - hudi_options: Optional[Dict[str, str]] = None - storage_options: Optional[Dict[str, str]] = None + options: Dict[str, str] = field(default_factory=dict) + hudi_options: Dict[str, str] = field(default_factory=dict) + storage_options: Dict[str, str] = field(default_factory=dict) @classmethod def from_base_uri(cls, base_uri: str) -> "HudiTableBuilder": @@ -51,21 +51,6 @@ def from_base_uri(cls, base_uri: str) -> "HudiTableBuilder": builder = cls(base_uri) return builder - def with_options(self, options: Dict[str, str]) -> "HudiTableBuilder": - """ - Adds general options for configuring the HudiTable. - - Parameters: - options (Dict[str, str]): General options to be applied, can pass hudi and storage options. - - Returns: - HudiTableBuilder: The builder instance. - """ - if self.options is None: - self.options = {} - self.options = options - return self - def with_hudi_options(self, hudi_options: Dict[str, str]) -> "HudiTableBuilder": """ Adds Hudi options to the builder. @@ -76,8 +61,6 @@ def with_hudi_options(self, hudi_options: Dict[str, str]) -> "HudiTableBuilder": Returns: HudiTableBuilder: The builder instance. """ - if self.hudi_options is None: - self.hudi_options = {} self.hudi_options.update(hudi_options) return self @@ -93,11 +76,22 @@ def with_storage_options( Returns: HudiTableBuilder: The builder instance. """ - if self.storage_options is None: - self.storage_options = {} self.storage_options.update(storage_options) return self + def with_options(self, options: Dict[str, str]) -> "HudiTableBuilder": + """ + Adds general options for configuring the HudiTable. + + Parameters: + options (Dict[str, str]): General options to be applied, can pass hudi and storage options. + + Returns: + HudiTableBuilder: The builder instance. + """ + self.options = options + return self + def build(self) -> "HudiTable": """ Constructs and returns a HudiTable object with the specified options. diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index 3347783f..6ee5ae58 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -165,11 +165,10 @@ def test_read_table_for_partition(get_sample_table): def test_read_table_as_of_timestamp(get_sample_table): table_path = get_sample_table - table = HudiTable(table_path) - table = HudiTable( table_path, options={"hoodie.read.as.of.timestamp": "20240402123035233"} ) + batches = table.read_snapshot() t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts") assert t.to_pylist() == [ From 18c7f976fb690c70d8c06dad8ce3f1a01e0a3c04 Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Sun, 13 Oct 2024 17:46:04 -0500 Subject: [PATCH 26/30] fix python api --- python/hudi/table/builder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hudi/table/builder.py b/python/hudi/table/builder.py index 966fabf1..81790bfe 100644 --- a/python/hudi/table/builder.py +++ b/python/hudi/table/builder.py @@ -89,7 +89,7 @@ def with_options(self, options: Dict[str, str]) -> "HudiTableBuilder": Returns: HudiTableBuilder: The builder instance. """ - self.options = options + self.options.update(options) return self def build(self) -> "HudiTable": From 797e2ed6e2434642a01c0be6ab0de0811a51964b Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Sun, 13 Oct 2024 17:52:19 -0500 Subject: [PATCH 27/30] move validation logic to builder --- crates/core/src/table/builder.rs | 65 +++++++++++++++++++++++++++----- crates/core/src/table/mod.rs | 48 +---------------------- 2 files changed, 57 insertions(+), 56 deletions(-) diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index a305bdd7..3003a3dc 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -17,20 +17,27 @@ * under the License. */ -use crate::config::table::HudiTableConfig; -use crate::config::utils::{parse_data_for_options, split_hudi_options_from_others}; -use crate::config::{HudiConfigs, HUDI_CONF_DIR}; -use crate::storage::Storage; -use crate::table::fs_view::FileSystemView; -use crate::table::timeline::Timeline; -use crate::table::Table; -use anyhow::{Context, Result}; +use anyhow::{anyhow, Context, Result}; use paste::paste; use std::collections::HashMap; use std::env; use std::hash::Hash; use std::path::PathBuf; +use std::str::FromStr; use std::sync::Arc; +use strum::IntoEnumIterator; + +use crate::config::internal::HudiInternalConfig::SkipConfigValidation; +use crate::config::read::HudiReadConfig; +use crate::config::table::HudiTableConfig::{DropsPartitionFields, TableType, TableVersion}; +use crate::config::table::TableTypeValue::CopyOnWrite; +use crate::config::table::{HudiTableConfig, TableTypeValue}; +use crate::config::utils::{parse_data_for_options, split_hudi_options_from_others}; +use crate::config::{HudiConfigs, HUDI_CONF_DIR}; +use crate::storage::Storage; +use crate::table::fs_view::FileSystemView; +use crate::table::timeline::Timeline; +use crate::table::Table; /// Builder for creating a [Table] instance. #[derive(Debug, Clone)] @@ -81,7 +88,7 @@ impl TableBuilder { let hudi_configs = HudiConfigs::new(self.hudi_options.iter()); - Table::validate_configs(&hudi_configs).expect("Hudi configs are not valid."); + Self::validate_configs(&hudi_configs).expect("Hudi configs are not valid."); let hudi_configs = Arc::from(hudi_configs); let storage_options = Arc::from(self.storage_options.clone()); @@ -207,6 +214,46 @@ impl TableBuilder { Ok(()) } + fn validate_configs(hudi_configs: &HudiConfigs) -> Result<()> { + if hudi_configs + .get_or_default(SkipConfigValidation) + .to::() + { + return Ok(()); + } + + for conf in HudiTableConfig::iter() { + hudi_configs.validate(conf)? + } + + for conf in HudiReadConfig::iter() { + hudi_configs.validate(conf)? + } + + // additional validation + let table_type = hudi_configs.get(TableType)?.to::(); + if TableTypeValue::from_str(&table_type)? != CopyOnWrite { + return Err(anyhow!("Only support copy-on-write table.")); + } + + let table_version = hudi_configs.get(TableVersion)?.to::(); + if !(5..=6).contains(&table_version) { + return Err(anyhow!("Only support table version 5 and 6.")); + } + + let drops_partition_cols = hudi_configs + .get_or_default(DropsPartitionFields) + .to::(); + if drops_partition_cols { + return Err(anyhow!( + "Only support when `{}` is disabled", + DropsPartitionFields.as_ref() + )); + } + + Ok(()) + } + fn extend_if_absent(target: &mut HashMap, source: &HashMap) where K: Eq + Hash + Clone, diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 6c5605b3..0ba61aa6 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -94,15 +94,9 @@ use arrow_schema::{Field, Schema}; use strum::IntoEnumIterator; use url::Url; -use HudiInternalConfig::SkipConfigValidation; -use HudiTableConfig::{DropsPartitionFields, TableType, TableVersion}; -use TableTypeValue::CopyOnWrite; - -use crate::config::internal::HudiInternalConfig; -use crate::config::read::HudiReadConfig; use crate::config::read::HudiReadConfig::AsOfTimestamp; +use crate::config::table::HudiTableConfig; use crate::config::table::HudiTableConfig::PartitionFields; -use crate::config::table::{HudiTableConfig, TableTypeValue}; use crate::config::HudiConfigs; use crate::file_group::reader::FileGroupReader; use crate::file_group::FileSlice; @@ -169,46 +163,6 @@ impl Table { .register_object_store(runtime_env.clone()); } - fn validate_configs(hudi_configs: &HudiConfigs) -> Result<()> { - if hudi_configs - .get_or_default(SkipConfigValidation) - .to::() - { - return Ok(()); - } - - for conf in HudiTableConfig::iter() { - hudi_configs.validate(conf)? - } - - for conf in HudiReadConfig::iter() { - hudi_configs.validate(conf)? - } - - // additional validation - let table_type = hudi_configs.get(TableType)?.to::(); - if TableTypeValue::from_str(&table_type)? != CopyOnWrite { - return Err(anyhow!("Only support copy-on-write table.")); - } - - let table_version = hudi_configs.get(TableVersion)?.to::(); - if !(5..=6).contains(&table_version) { - return Err(anyhow!("Only support table version 5 and 6.")); - } - - let drops_partition_cols = hudi_configs - .get_or_default(DropsPartitionFields) - .to::(); - if drops_partition_cols { - return Err(anyhow!( - "Only support when `{}` is disabled", - DropsPartitionFields.as_ref() - )); - } - - Ok(()) - } - /// Get the latest [Schema] of the table. pub async fn get_schema(&self) -> Result { self.timeline.get_latest_schema().await From f3fcd649900a1f7f93dfdf1401798ec57d704e1f Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Sun, 13 Oct 2024 17:53:51 -0500 Subject: [PATCH 28/30] fix style --- crates/core/src/table/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 0ba61aa6..b9119f17 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -85,13 +85,11 @@ //! ``` use std::collections::{HashMap, HashSet}; -use std::str::FromStr; use std::sync::Arc; use anyhow::{anyhow, Context, Result}; use arrow::record_batch::RecordBatch; use arrow_schema::{Field, Schema}; -use strum::IntoEnumIterator; use url::Url; use crate::config::read::HudiReadConfig::AsOfTimestamp; From e372cf30743133dfbd99da89222e80d3a5a1709b Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Sun, 13 Oct 2024 18:23:28 -0500 Subject: [PATCH 29/30] update macro doc --- crates/core/src/table/builder.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 3003a3dc..3ebdfcce 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -53,8 +53,7 @@ macro_rules! impl_with_options { impl $struct_name { $( paste! { - /// Add options to the builder. - /// Subsequent calls overwrite the previous values if the key already exists. + #[doc = "Add " $field " to the builder. Subsequent calls overwrite the previous values if the key already exists."] pub fn [](mut self, options: I) -> Self where I: IntoIterator, From 48821ab25479d45f656109f2297d77d9a4effdf0 Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Sun, 13 Oct 2024 19:21:55 -0500 Subject: [PATCH 30/30] add UTs --- crates/core/src/storage/mod.rs | 2 ++ crates/core/src/table/builder.rs | 11 ++++----- crates/core/src/table/mod.rs | 39 ++++++++++++++++++++++++++++++++ 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index 2b4c1186..9a4aae14 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -53,6 +53,8 @@ pub struct Storage { } impl Storage { + pub const CLOUD_STORAGE_PREFIXES: [&'static str; 3] = ["AWS_", "AZURE_", "GOOGLE_"]; + pub fn new( options: Arc>, hudi_configs: Arc, diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 3ebdfcce..ff34571b 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -135,19 +135,18 @@ impl TableBuilder { // if any user-provided options are intended for cloud storage and in uppercase, // convert them to lowercase. This is to allow `object_store` to pick them up. - // Note that we do not need to look up env vars for storage as `object_store` does that for us. - Self::format_cloud_env_vars(&mut self.storage_options); + Self::imbue_cloud_env_vars(&mut self.storage_options); // At this point, we have resolved the storage options needed for accessing the storage layer. // We can now resolve the hudi options Self::resolve_hudi_options(&self.storage_options, &mut self.hudi_options).await } - fn format_cloud_env_vars(options: &mut HashMap) { - const PREFIXES: [&str; 3] = ["AWS_", "AZURE_", "GOOGLE_"]; - + fn imbue_cloud_env_vars(options: &mut HashMap) { for (key, value) in env::vars() { - if PREFIXES.iter().any(|prefix| key.starts_with(prefix)) + if Storage::CLOUD_STORAGE_PREFIXES + .iter() + .any(|prefix| key.starts_with(prefix)) && !options.contains_key(&key.to_ascii_lowercase()) { options.insert(key.to_ascii_lowercase(), value); diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index b9119f17..486e3204 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -300,6 +300,7 @@ mod tests { }; use crate::config::HUDI_CONF_DIR; use crate::storage::utils::join_url_segments; + use crate::storage::Storage; use crate::table::Table; /// Test helper to create a new `Table` instance without validating the configuration. @@ -320,6 +321,44 @@ mod tests { .unwrap() } + #[tokio::test] + async fn test_hudi_table_get_hudi_options() { + let base_url = TestTable::V6Nonpartitioned.url(); + let hudi_table = Table::new(base_url.path()).await.unwrap(); + let hudi_options = hudi_table.hudi_options(); + for (k, v) in hudi_options.iter() { + assert!(k.starts_with("hoodie.")); + assert!(!v.is_empty()); + } + } + + #[tokio::test] + async fn test_hudi_table_get_storage_options() { + let base_url = TestTable::V6Nonpartitioned.url(); + let hudi_table = Table::new(base_url.path()).await.unwrap(); + + let cloud_prefixes: HashSet<_> = Storage::CLOUD_STORAGE_PREFIXES + .iter() + .map(|prefix| prefix.to_lowercase()) + .collect(); + + for (key, value) in hudi_table.storage_options.iter() { + let key_lower = key.to_lowercase(); + assert!( + cloud_prefixes + .iter() + .any(|prefix| key_lower.starts_with(prefix)), + "Storage option key '{}' should start with a cloud storage prefix", + key + ); + assert!( + !value.is_empty(), + "Storage option value for key '{}' should not be empty", + key + ); + } + } + #[tokio::test] async fn hudi_table_get_schema() { let base_url = TestTable::V6Nonpartitioned.url();