diff --git a/Cargo.toml b/Cargo.toml index 71f3ca01..60de5812 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,7 @@ serde_json = { version = "1" } # "stdlib" anyhow = { version = "1.0.86" } bytes = { version = "1" } +paste = { version = "1.0.15" } once_cell = { version = "1.19.0" } strum = { version = "0.26.3", features = ["derive"] } strum_macros = "0.26.4" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 1969d9a5..90fae7dc 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -50,6 +50,7 @@ serde_json = { workspace = true } # "stdlib" anyhow = { workspace = true } bytes = { workspace = true } +paste = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } url = { workspace = true } diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index 2b4c1186..9a4aae14 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -53,6 +53,8 @@ pub struct Storage { } impl Storage { + pub const CLOUD_STORAGE_PREFIXES: [&'static str; 3] = ["AWS_", "AZURE_", "GOOGLE_"]; + pub fn new( options: Arc>, hudi_configs: Arc, diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs new file mode 100644 index 00000000..ff34571b --- /dev/null +++ b/crates/core/src/table/builder.rs @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use anyhow::{anyhow, Context, Result}; +use paste::paste; +use std::collections::HashMap; +use std::env; +use std::hash::Hash; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::Arc; +use strum::IntoEnumIterator; + +use crate::config::internal::HudiInternalConfig::SkipConfigValidation; +use crate::config::read::HudiReadConfig; +use crate::config::table::HudiTableConfig::{DropsPartitionFields, TableType, TableVersion}; +use crate::config::table::TableTypeValue::CopyOnWrite; +use crate::config::table::{HudiTableConfig, TableTypeValue}; +use crate::config::utils::{parse_data_for_options, split_hudi_options_from_others}; +use crate::config::{HudiConfigs, HUDI_CONF_DIR}; +use crate::storage::Storage; +use crate::table::fs_view::FileSystemView; +use crate::table::timeline::Timeline; +use crate::table::Table; + +/// Builder for creating a [Table] instance. +#[derive(Debug, Clone)] +pub struct TableBuilder { + base_uri: String, + hudi_options: HashMap, + storage_options: HashMap, + options: HashMap, +} + +macro_rules! impl_with_options { + ($struct_name:ident, $($field:ident),+) => { + impl $struct_name { + $( + paste! { + #[doc = "Add " $field " to the builder. Subsequent calls overwrite the previous values if the key already exists."] + pub fn [](mut self, options: I) -> Self + where + I: IntoIterator, + K: AsRef, + V: Into, + { + self.$field.extend(options.into_iter().map(|(k, v)| (k.as_ref().to_string(), v.into()))); + self + } + } + )+ + } + }; +} + +impl_with_options!(TableBuilder, hudi_options, storage_options, options); + +impl TableBuilder { + /// Create Hudi table builder from base table uri + pub fn from_base_uri(base_uri: &str) -> Self { + TableBuilder { + base_uri: base_uri.to_string(), + storage_options: HashMap::new(), + hudi_options: HashMap::new(), + options: HashMap::new(), + } + } + + pub async fn build(&mut self) -> Result { + self.resolve_options().await?; + + let hudi_configs = HudiConfigs::new(self.hudi_options.iter()); + + Self::validate_configs(&hudi_configs).expect("Hudi configs are not valid."); + + let hudi_configs = Arc::from(hudi_configs); + let storage_options = Arc::from(self.storage_options.clone()); + + let timeline = Timeline::new(hudi_configs.clone(), storage_options.clone()) + .await + .context("Failed to load timeline")?; + + let file_system_view = FileSystemView::new(hudi_configs.clone(), storage_options.clone()) + .await + .context("Failed to load file system view")?; + + Ok(Table { + hudi_configs, + storage_options, + timeline, + file_system_view, + }) + } + + /// Resolve all options by combining the ones from hoodie.properties, user-provided options, + /// env vars, and global Hudi configs. The precedence order is as follows: + /// + /// 1. hoodie.properties + /// 2. Explicit options provided by the user + /// 3. Generic options provided by the user + /// 4. Env vars + /// 5. Global Hudi configs + /// + /// [note] Error may occur when 1 and 2 have conflicts. + async fn resolve_options(&mut self) -> Result<()> { + // Insert the base path into hudi options since it is explicitly provided + self.hudi_options.insert( + HudiTableConfig::BasePath.as_ref().to_string(), + self.base_uri.clone(), + ); + + let (generic_hudi_opts, generic_other_opts) = + split_hudi_options_from_others(self.options.iter()); + + // Combine generic options (lower precedence) with explicit options. + // Note that we treat all non-Hudi options as storage options + Self::extend_if_absent(&mut self.hudi_options, &generic_hudi_opts); + Self::extend_if_absent(&mut self.storage_options, &generic_other_opts); + + // if any user-provided options are intended for cloud storage and in uppercase, + // convert them to lowercase. This is to allow `object_store` to pick them up. + Self::imbue_cloud_env_vars(&mut self.storage_options); + + // At this point, we have resolved the storage options needed for accessing the storage layer. + // We can now resolve the hudi options + Self::resolve_hudi_options(&self.storage_options, &mut self.hudi_options).await + } + + fn imbue_cloud_env_vars(options: &mut HashMap) { + for (key, value) in env::vars() { + if Storage::CLOUD_STORAGE_PREFIXES + .iter() + .any(|prefix| key.starts_with(prefix)) + && !options.contains_key(&key.to_ascii_lowercase()) + { + options.insert(key.to_ascii_lowercase(), value); + } + } + } + + async fn resolve_hudi_options( + storage_options: &HashMap, + hudi_options: &mut HashMap, + ) -> Result<()> { + // create a [Storage] instance to load properties from storage layer. + let storage = Storage::new( + Arc::new(storage_options.clone()), + Arc::new(HudiConfigs::new(hudi_options.iter())), + )?; + + Self::imbue_table_properties(hudi_options, storage.clone()).await?; + + // TODO load Hudi configs from env vars here before loading global configs + + Self::imbue_global_hudi_configs_if_absent(hudi_options, storage.clone()).await + } + + async fn imbue_table_properties( + options: &mut HashMap, + storage: Arc, + ) -> Result<()> { + let bytes = storage.get_file_data(".hoodie/hoodie.properties").await?; + let table_properties = parse_data_for_options(&bytes, "=")?; + + // We currently treat all table properties as the highest precedence, which is valid for most cases. + // TODO: handle the case where the same key is present in both table properties and options + for (k, v) in table_properties { + options.insert(k.to_string(), v.to_string()); + } + + Ok(()) + } + + async fn imbue_global_hudi_configs_if_absent( + options: &mut HashMap, + storage: Arc, + ) -> Result<()> { + let global_config_path = env::var(HUDI_CONF_DIR) + .map(PathBuf::from) + .unwrap_or_else(|_| PathBuf::from("/etc/hudi/conf")) + .join("hudi-defaults.conf"); + + if let Ok(bytes) = storage + .get_file_data_from_absolute_path(global_config_path.to_str().unwrap()) + .await + { + if let Ok(global_configs) = parse_data_for_options(&bytes, " \t=") { + for (key, value) in global_configs { + if key.starts_with("hoodie.") && !options.contains_key(&key) { + options.insert(key.to_string(), value.to_string()); + } + } + } + } + + Ok(()) + } + + fn validate_configs(hudi_configs: &HudiConfigs) -> Result<()> { + if hudi_configs + .get_or_default(SkipConfigValidation) + .to::() + { + return Ok(()); + } + + for conf in HudiTableConfig::iter() { + hudi_configs.validate(conf)? + } + + for conf in HudiReadConfig::iter() { + hudi_configs.validate(conf)? + } + + // additional validation + let table_type = hudi_configs.get(TableType)?.to::(); + if TableTypeValue::from_str(&table_type)? != CopyOnWrite { + return Err(anyhow!("Only support copy-on-write table.")); + } + + let table_version = hudi_configs.get(TableVersion)?.to::(); + if !(5..=6).contains(&table_version) { + return Err(anyhow!("Only support table version 5 and 6.")); + } + + let drops_partition_cols = hudi_configs + .get_or_default(DropsPartitionFields) + .to::(); + if drops_partition_cols { + return Err(anyhow!( + "Only support when `{}` is disabled", + DropsPartitionFields.as_ref() + )); + } + + Ok(()) + } + + fn extend_if_absent(target: &mut HashMap, source: &HashMap) + where + K: Eq + Hash + Clone, + V: Clone, + { + for (key, value) in source { + target.entry(key.clone()).or_insert_with(|| value.clone()); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_build_from_explicit_options() { + let hudi_options = [("hoodie.option1", "value1"), ("hoodie.option3", "value3")]; + let storage_options = [ + ("AWS_REGION", "us-east-1"), + ("AWS_ENDPOINT", "s3.us-east-1.amazonaws.com"), + ]; + let builder = TableBuilder::from_base_uri("/tmp/hudi_data") + .with_hudi_options(hudi_options) + .with_storage_options(storage_options); + let hudi_options = &builder.hudi_options; + let storage_options = &builder.storage_options; + assert_eq!(hudi_options.len(), 2); + assert_eq!(hudi_options["hoodie.option1"], "value1"); + assert_eq!(hudi_options["hoodie.option3"], "value3"); + assert_eq!(storage_options.len(), 2); + assert_eq!(storage_options["AWS_REGION"], "us-east-1"); + assert_eq!( + storage_options["AWS_ENDPOINT"], + "s3.us-east-1.amazonaws.com" + ); + } + + #[test] + fn test_build_from_explicit_options_chained() { + let builder = TableBuilder::from_base_uri("/tmp/hudi_data") + .with_hudi_options([("hoodie.option1", "value1")]) + .with_hudi_options([("hoodie.option1", "value1-1")]) + .with_hudi_options([("hoodie.option3", "value3")]) + .with_storage_options([("AWS_REGION", "us-east-2")]) + .with_storage_options([("AWS_REGION", "us-east-1")]) + .with_storage_options([("AWS_ENDPOINT", "s3.us-east-1.amazonaws.com")]); + let hudi_options = &builder.hudi_options.clone(); + let storage_options = &builder.storage_options.clone(); + assert_eq!(hudi_options.len(), 2); + assert_eq!(hudi_options["hoodie.option1"], "value1-1"); + assert_eq!(hudi_options["hoodie.option3"], "value3"); + assert_eq!(storage_options.len(), 2); + assert_eq!(storage_options["AWS_REGION"], "us-east-1"); + assert_eq!( + storage_options["AWS_ENDPOINT"], + "s3.us-east-1.amazonaws.com" + ); + } +} diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index fa694e42..486e3204 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -85,37 +85,25 @@ //! ``` use std::collections::{HashMap, HashSet}; -use std::env; -use std::path::PathBuf; -use std::str::FromStr; use std::sync::Arc; use anyhow::{anyhow, Context, Result}; use arrow::record_batch::RecordBatch; use arrow_schema::{Field, Schema}; -use strum::IntoEnumIterator; use url::Url; -use HudiInternalConfig::SkipConfigValidation; -use HudiTableConfig::{DropsPartitionFields, TableType, TableVersion}; -use TableTypeValue::CopyOnWrite; - -use crate::config::internal::HudiInternalConfig; -use crate::config::read::HudiReadConfig; use crate::config::read::HudiReadConfig::AsOfTimestamp; +use crate::config::table::HudiTableConfig; use crate::config::table::HudiTableConfig::PartitionFields; -use crate::config::table::{HudiTableConfig, TableTypeValue}; -use crate::config::utils::parse_data_for_options; -use crate::config::utils::{empty_options, split_hudi_options_from_others}; use crate::config::HudiConfigs; -use crate::config::HUDI_CONF_DIR; use crate::file_group::reader::FileGroupReader; use crate::file_group::FileSlice; -use crate::storage::Storage; +use crate::table::builder::TableBuilder; use crate::table::fs_view::FileSystemView; use crate::table::partition::PartitionPruner; use crate::table::timeline::Timeline; +pub mod builder; mod fs_view; mod partition; mod timeline; @@ -132,7 +120,7 @@ pub struct Table { impl Table { /// Create hudi table by base_uri pub async fn new(base_uri: &str) -> Result { - Self::new_with_options(base_uri, empty_options()).await + TableBuilder::from_base_uri(base_uri).build().await } /// Create hudi table with options @@ -142,32 +130,24 @@ impl Table { K: AsRef, V: Into, { - let (hudi_configs, storage_options) = Self::load_configs(base_uri, options) - .await - .context("Failed to load table properties")?; - let hudi_configs = Arc::new(hudi_configs); - let storage_options = Arc::new(storage_options); - - let timeline = Timeline::new(hudi_configs.clone(), storage_options.clone()) - .await - .context("Failed to load timeline")?; - - let file_system_view = FileSystemView::new(hudi_configs.clone(), storage_options.clone()) + TableBuilder::from_base_uri(base_uri) + .with_options(options) + .build() .await - .context("Failed to load file system view")?; - - Ok(Table { - hudi_configs, - storage_options, - timeline, - file_system_view, - }) } pub fn base_url(&self) -> Result { self.hudi_configs.get(HudiTableConfig::BasePath)?.to_url() } + pub fn hudi_options(&self) -> HashMap { + self.hudi_configs.as_options() + } + + pub fn storage_options(&self) -> HashMap { + self.storage_options.as_ref().clone() + } + #[cfg(feature = "datafusion")] pub fn register_storage( &self, @@ -181,136 +161,6 @@ impl Table { .register_object_store(runtime_env.clone()); } - async fn load_configs( - base_uri: &str, - all_options: I, - ) -> Result<(HudiConfigs, HashMap)> - where - I: IntoIterator, - K: AsRef, - V: Into, - { - let mut hudi_options = HashMap::new(); - let mut other_options = HashMap::new(); - - Self::imbue_cloud_env_vars(&mut other_options); - - let (hudi_opts, others) = split_hudi_options_from_others(all_options); - hudi_options.extend(hudi_opts); - other_options.extend(others); - - hudi_options.insert( - HudiTableConfig::BasePath.as_ref().into(), - base_uri.to_string(), - ); - - // create a [Storage] instance to load properties from storage layer. - let storage = Storage::new( - Arc::new(other_options.clone()), - Arc::new(HudiConfigs::new(&hudi_options)), - )?; - - Self::imbue_table_properties(&mut hudi_options, storage.clone()).await?; - - Self::imbue_global_hudi_configs_if_not_present(&mut hudi_options, storage.clone()).await?; - - let hudi_configs = HudiConfigs::new(hudi_options); - - Self::validate_configs(&hudi_configs).map(|_| (hudi_configs, other_options)) - } - - fn imbue_cloud_env_vars(options: &mut HashMap) { - const PREFIXES: [&str; 3] = ["AWS_", "AZURE_", "GOOGLE_"]; - - for (key, value) in env::vars() { - if PREFIXES.iter().any(|prefix| key.starts_with(prefix)) - && !options.contains_key(&key.to_ascii_lowercase()) - { - options.insert(key.to_ascii_lowercase(), value); - } - } - } - - async fn imbue_table_properties( - options: &mut HashMap, - storage: Arc, - ) -> Result<()> { - let bytes = storage.get_file_data(".hoodie/hoodie.properties").await?; - let table_properties = parse_data_for_options(&bytes, "=")?; - - // TODO: handle the case where the same key is present in both table properties and options - for (k, v) in table_properties { - options.insert(k.to_string(), v.to_string()); - } - - Ok(()) - } - - async fn imbue_global_hudi_configs_if_not_present( - options: &mut HashMap, - storage: Arc, - ) -> Result<()> { - let global_config_path = env::var(HUDI_CONF_DIR) - .map(PathBuf::from) - .unwrap_or_else(|_| PathBuf::from("/etc/hudi/conf")) - .join("hudi-defaults.conf"); - - if let Ok(bytes) = storage - .get_file_data_from_absolute_path(global_config_path.to_str().unwrap()) - .await - { - if let Ok(global_configs) = parse_data_for_options(&bytes, " \t=") { - for (key, value) in global_configs { - if key.starts_with("hoodie.") && !options.contains_key(&key) { - options.insert(key.to_string(), value.to_string()); - } - } - } - } - - Ok(()) - } - - fn validate_configs(hudi_configs: &HudiConfigs) -> Result<()> { - if hudi_configs - .get_or_default(SkipConfigValidation) - .to::() - { - return Ok(()); - } - - for conf in HudiTableConfig::iter() { - hudi_configs.validate(conf)? - } - - for conf in HudiReadConfig::iter() { - hudi_configs.validate(conf)? - } - - // additional validation - let table_type = hudi_configs.get(TableType)?.to::(); - if TableTypeValue::from_str(&table_type)? != CopyOnWrite { - return Err(anyhow!("Only support copy-on-write table.")); - } - - let table_version = hudi_configs.get(TableVersion)?.to::(); - if !(5..=6).contains(&table_version) { - return Err(anyhow!("Only support table version 5 and 6.")); - } - - let drops_partition_cols = hudi_configs - .get_or_default(DropsPartitionFields) - .to::(); - if drops_partition_cols { - return Err(anyhow!( - "Only support when `{}` is disabled", - DropsPartitionFields.as_ref() - )); - } - - Ok(()) - } - /// Get the latest [Schema] of the table. pub async fn get_schema(&self) -> Result { self.timeline.get_latest_schema().await @@ -450,6 +300,7 @@ mod tests { }; use crate::config::HUDI_CONF_DIR; use crate::storage::utils::join_url_segments; + use crate::storage::Storage; use crate::table::Table; /// Test helper to create a new `Table` instance without validating the configuration. @@ -470,6 +321,44 @@ mod tests { .unwrap() } + #[tokio::test] + async fn test_hudi_table_get_hudi_options() { + let base_url = TestTable::V6Nonpartitioned.url(); + let hudi_table = Table::new(base_url.path()).await.unwrap(); + let hudi_options = hudi_table.hudi_options(); + for (k, v) in hudi_options.iter() { + assert!(k.starts_with("hoodie.")); + assert!(!v.is_empty()); + } + } + + #[tokio::test] + async fn test_hudi_table_get_storage_options() { + let base_url = TestTable::V6Nonpartitioned.url(); + let hudi_table = Table::new(base_url.path()).await.unwrap(); + + let cloud_prefixes: HashSet<_> = Storage::CLOUD_STORAGE_PREFIXES + .iter() + .map(|prefix| prefix.to_lowercase()) + .collect(); + + for (key, value) in hudi_table.storage_options.iter() { + let key_lower = key.to_lowercase(); + assert!( + cloud_prefixes + .iter() + .any(|prefix| key_lower.starts_with(prefix)), + "Storage option key '{}' should start with a cloud storage prefix", + key + ); + assert!( + !value.is_empty(), + "Storage option value for key '{}' should not be empty", + key + ); + } + } + #[tokio::test] async fn hudi_table_get_schema() { let base_url = TestTable::V6Nonpartitioned.url(); diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py index 37738c76..b44835b2 100644 --- a/python/hudi/__init__.py +++ b/python/hudi/__init__.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +from hudi.table.builder import HudiTableBuilder as HudiTableBuilder + from ._internal import HudiFileGroupReader as HudiFileGroupReader from ._internal import HudiFileSlice as HudiFileSlice from ._internal import HudiTable as HudiTable diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi index e0acc996..d296821c 100644 --- a/python/hudi/_internal.pyi +++ b/python/hudi/_internal.pyi @@ -47,6 +47,8 @@ class HudiTable: base_uri: str, options: Optional[Dict[str, str]] = None, ): ... + def hudi_options(self) -> Dict[str, str]: ... + def storage_options(self) -> Dict[str, str]: ... def get_schema(self) -> "pyarrow.Schema": ... def get_partition_schema(self) -> "pyarrow.Schema": ... def split_file_slices( @@ -57,3 +59,10 @@ class HudiTable: def read_snapshot( self, filters: Optional[List[str]] ) -> List["pyarrow.RecordBatch"]: ... + +def build_hudi_table( + base_uri: str, + hudi_options: Optional[Dict[str, str]] = None, + storage_options: Optional[Dict[str, str]] = None, + options: Optional[Dict[str, str]] = None, +) -> HudiTable: ... diff --git a/python/hudi/table/__init__.py b/python/hudi/table/__init__.py new file mode 100644 index 00000000..a67d5ea2 --- /dev/null +++ b/python/hudi/table/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/python/hudi/table/builder.py b/python/hudi/table/builder.py new file mode 100644 index 00000000..81790bfe --- /dev/null +++ b/python/hudi/table/builder.py @@ -0,0 +1,104 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from dataclasses import dataclass, field +from typing import Dict + +from hudi._internal import HudiTable, build_hudi_table + + +@dataclass +class HudiTableBuilder: + """ + A builder class for constructing a HudiTable object with customizable options. + + Attributes: + base_uri (str): The base URI of the Hudi table. + options (Dict[str, str]): Both hudi and storage options for building the table. + hudi_options (Dict[str, str]): Hudi configuration options. + storage_options (Dict[str, str]): Storage-related options. + """ + + base_uri: str + options: Dict[str, str] = field(default_factory=dict) + hudi_options: Dict[str, str] = field(default_factory=dict) + storage_options: Dict[str, str] = field(default_factory=dict) + + @classmethod + def from_base_uri(cls, base_uri: str) -> "HudiTableBuilder": + """ + Initializes a HudiTableBuilder using the base URI of the Hudi table. + + Parameters: + base_uri (str): The base URI of the Hudi table. + + Returns: + HudiTableBuilder: An instance of the builder. + """ + builder = cls(base_uri) + return builder + + def with_hudi_options(self, hudi_options: Dict[str, str]) -> "HudiTableBuilder": + """ + Adds Hudi options to the builder. + + Parameters: + hudi_options (Dict[str, str]): Hudi options to be applied. + + Returns: + HudiTableBuilder: The builder instance. + """ + self.hudi_options.update(hudi_options) + return self + + def with_storage_options( + self, storage_options: Dict[str, str] + ) -> "HudiTableBuilder": + """ + Adds storage-related options for configuring the table. + + Parameters: + storage_options (Dict[str, str]): Storage-related options to be applied. + + Returns: + HudiTableBuilder: The builder instance. + """ + self.storage_options.update(storage_options) + return self + + def with_options(self, options: Dict[str, str]) -> "HudiTableBuilder": + """ + Adds general options for configuring the HudiTable. + + Parameters: + options (Dict[str, str]): General options to be applied, can pass hudi and storage options. + + Returns: + HudiTableBuilder: The builder instance. + """ + self.options.update(options) + return self + + def build(self) -> "HudiTable": + """ + Constructs and returns a HudiTable object with the specified options. + + Returns: + HudiTable: The constructed HudiTable object. + """ + return build_hudi_table( + self.base_uri, self.hudi_options, self.storage_options, self.options + ) diff --git a/python/src/internal.rs b/python/src/internal.rs index f303134d..b26a511d 100644 --- a/python/src/internal.rs +++ b/python/src/internal.rs @@ -22,11 +22,12 @@ use std::sync::OnceLock; use anyhow::Context; use arrow::pyarrow::ToPyArrow; -use pyo3::{pyclass, pymethods, PyErr, PyObject, PyResult, Python}; +use pyo3::{pyclass, pyfunction, pymethods, PyErr, PyObject, PyResult, Python}; use tokio::runtime::Runtime; use hudi::file_group::reader::FileGroupReader; use hudi::file_group::FileSlice; +use hudi::table::builder::TableBuilder; use hudi::table::Table; macro_rules! vec_string_to_slice { @@ -141,6 +142,14 @@ impl HudiTable { Ok(HudiTable { inner }) } + fn hudi_options(&self) -> HashMap { + self.inner.hudi_options() + } + + fn storage_options(&self) -> HashMap { + self.inner.storage_options() + } + fn get_schema(&self, py: Python) -> PyResult { rt().block_on(self.inner.get_schema())?.to_pyarrow(py) } @@ -199,6 +208,25 @@ impl HudiTable { } } +#[cfg(not(tarpaulin))] +#[pyfunction] +#[pyo3(signature = (base_uri, hudi_options=None, storage_options=None, options=None))] +pub fn build_hudi_table( + base_uri: String, + hudi_options: Option>, + storage_options: Option>, + options: Option>, +) -> PyResult { + let inner = rt().block_on( + TableBuilder::from_base_uri(&base_uri) + .with_hudi_options(hudi_options.unwrap_or_default()) + .with_storage_options(storage_options.unwrap_or_default()) + .with_options(options.unwrap_or_default()) + .build(), + )?; + Ok(HudiTable { inner }) +} + #[cfg(not(tarpaulin))] fn rt() -> &'static Runtime { static TOKIO_RT: OnceLock = OnceLock::new(); diff --git a/python/src/lib.rs b/python/src/lib.rs index 817d936f..b44832b1 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -29,5 +29,8 @@ fn _internal(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + + use internal::build_hudi_table; + m.add_function(wrap_pyfunction!(build_hudi_table, m)?)?; Ok(()) } diff --git a/python/tests/test_table_builder.py b/python/tests/test_table_builder.py new file mode 100644 index 00000000..1c16f62b --- /dev/null +++ b/python/tests/test_table_builder.py @@ -0,0 +1,125 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pyarrow as pa +import pytest + +from hudi import HudiTableBuilder + +PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) < ( + 8, + 0, + 0, +) +pytestmark = pytest.mark.skipif( + PYARROW_LE_8_0_0, reason="hudi only supported if pyarrow >= 8.0.0" +) + + +def test_read_table_returns_correct_data(get_sample_table): + table_path = get_sample_table + table = HudiTableBuilder.from_base_uri(table_path).build() + + batches = table.read_snapshot() + t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts") + assert t.to_pylist() == [ + { + "_hoodie_commit_time": "20240402144910683", + "ts": 1695046462179, + "uuid": "9909a8b1-2d15-4d3d-8ec9-efc48c536a00", + "fare": 339.0, + }, + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695091554788, + "uuid": "e96c4396-3fad-413a-a942-4cb36106d721", + "fare": 27.7, + }, + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695115999911, + "uuid": "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", + "fare": 17.85, + }, + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695159649087, + "uuid": "334e26e9-8355-45cc-97c6-c31daf0df330", + "fare": 19.1, + }, + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695516137016, + "uuid": "e3cf430c-889d-4015-bc98-59bdce1e530c", + "fare": 34.15, + }, + ] + + +@pytest.mark.parametrize( + "hudi_options,storage_options,options", + [ + ({"hoodie.read.as.of.timestamp": "20240402123035233"}, {}, {}), + ({}, {}, {"hoodie.read.as.of.timestamp": "20240402123035233"}), + ], +) +def test_read_table_as_of_timestamp( + get_sample_table, hudi_options, storage_options, options +): + table_path = get_sample_table + table = ( + HudiTableBuilder.from_base_uri(table_path) + .with_hudi_options(hudi_options) + .with_storage_options(storage_options) + .with_options(options) + .build() + ) + + batches = table.read_snapshot() + t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts") + assert t.to_pylist() == [ + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695046462179, + "uuid": "9909a8b1-2d15-4d3d-8ec9-efc48c536a00", + "fare": 33.9, + }, + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695091554788, + "uuid": "e96c4396-3fad-413a-a942-4cb36106d721", + "fare": 27.7, + }, + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695115999911, + "uuid": "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", + "fare": 17.85, + }, + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695159649087, + "uuid": "334e26e9-8355-45cc-97c6-c31daf0df330", + "fare": 19.1, + }, + { + "_hoodie_commit_time": "20240402123035233", + "ts": 1695516137016, + "uuid": "e3cf430c-889d-4015-bc98-59bdce1e530c", + "fare": 34.15, + }, + ] diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index 2b30a923..6ee5ae58 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -30,7 +30,7 @@ ) -def test_sample_table(get_sample_table): +def test_read_table_has_correct_schema(get_sample_table): table_path = get_sample_table table = HudiTable(table_path) @@ -47,8 +47,18 @@ def test_sample_table(get_sample_table): "fare", "city", ] + + +def test_read_table_has_correct_partition_schema(get_sample_table): + table_path = get_sample_table + table = HudiTable(table_path) assert table.get_partition_schema().names == ["city"] + +def test_read_table_returns_correct_file_slices(get_sample_table): + table_path = get_sample_table + table = HudiTable(table_path) + file_slices = table.get_file_slices() assert len(file_slices) == 5 assert set(f.commit_time for f in file_slices) == { @@ -66,6 +76,13 @@ def test_sample_table(get_sample_table): "sao_paulo/ee915c68-d7f8-44f6-9759-e691add290d8-0_3-11-0_20240402123035233.parquet", } + +def test_read_table_can_read_from_batches(get_sample_table): + table_path = get_sample_table + table = HudiTable(table_path) + + file_slices = table.get_file_slices() + file_slice_paths = [f.base_file_relative_path() for f in file_slices] batch = table.create_file_group_reader().read_file_slice_by_base_file_path( file_slice_paths[0] ) @@ -77,6 +94,11 @@ def test_sample_table(get_sample_table): assert len(next(file_slices_gen)) == 3 assert len(next(file_slices_gen)) == 2 + +def test_read_table_returns_correct_data(get_sample_table): + table_path = get_sample_table + table = HudiTable(table_path) + batches = table.read_snapshot() t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts") assert t.to_pylist() == [ @@ -112,6 +134,11 @@ def test_sample_table(get_sample_table): }, ] + +def test_read_table_for_partition(get_sample_table): + table_path = get_sample_table + table = HudiTable(table_path) + batches = table.read_snapshot(["city = san_francisco"]) t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts") assert t.to_pylist() == [ @@ -135,7 +162,13 @@ def test_sample_table(get_sample_table): }, ] - table = HudiTable(table_path, {"hoodie.read.as.of.timestamp": "20240402123035233"}) + +def test_read_table_as_of_timestamp(get_sample_table): + table_path = get_sample_table + table = HudiTable( + table_path, options={"hoodie.read.as.of.timestamp": "20240402123035233"} + ) + batches = table.read_snapshot() t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts") assert t.to_pylist() == [