Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add TableBuilder API for creating Table instances #163

Merged
merged 31 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
adc3959
introduce table builder
kazdy Oct 11, 2024
3ceaeb6
builder prototype works
kazdy Oct 11, 2024
6858ec7
hudi options validation is Table responsibility not the TableBuilder
kazdy Oct 11, 2024
fcdb878
Table uses TableBuilder for plain table creation, does not panic when…
kazdy Oct 11, 2024
29561cb
Table uses TableBuilder for plain table creation, does not panic when…
kazdy Oct 11, 2024
05b3dbe
use TableBuilder in Table::new_with_options, nothing breaks
kazdy Oct 11, 2024
07b1ef3
tidy up
kazdy Oct 11, 2024
5a55f60
python bindings support hudi_options and storage_options, tidy up
kazdy Oct 12, 2024
7c71277
add license to builder.rs
kazdy Oct 12, 2024
991f89c
TableBuilder handles generic options
kazdy Oct 12, 2024
d94e9a9
Table builder supports chaining multiple ::with_*
kazdy Oct 12, 2024
f548a81
adjust according to the reviewer comments
kazdy Oct 12, 2024
77088a9
Merge branch 'main' into storage_opsions
kazdy Oct 12, 2024
e09ab3f
fix build
xushiyan Oct 13, 2024
99a466f
add getters
xushiyan Oct 13, 2024
06a0299
align python and rust apis
xushiyan Oct 13, 2024
2b366ee
add python top level import
xushiyan Oct 13, 2024
f4885d2
improve python HudiTable test, add tests for build_hudi_table
kazdy Oct 13, 2024
91d3086
minimal tests for handling options in table builder
kazdy Oct 13, 2024
b7d902d
tidy up python bindings, fix formatting
kazdy Oct 13, 2024
7b7a767
add docstrings for HudiTableBuilder
kazdy Oct 13, 2024
47dc24e
remove hyphens
kazdy Oct 13, 2024
8add399
add minimal docs to TableBuilder
kazdy Oct 13, 2024
bf439ff
improve logic to resolve options
xushiyan Oct 13, 2024
0c92283
update test case
xushiyan Oct 13, 2024
43cc8db
update python apis
xushiyan Oct 13, 2024
18c7f97
fix python api
xushiyan Oct 13, 2024
797e2ed
move validation logic to builder
xushiyan Oct 13, 2024
f3fcd64
fix style
xushiyan Oct 13, 2024
e372cf3
update macro doc
xushiyan Oct 13, 2024
48821ab
add UTs
xushiyan Oct 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ serde_json = { version = "1" }
# "stdlib"
anyhow = { version = "1.0.86" }
bytes = { version = "1" }
paste = { version = "1.0.15" }
once_cell = { version = "1.19.0" }
strum = { version = "0.26.3", features = ["derive"] }
strum_macros = "0.26.4"
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ serde_json = { workspace = true }
# "stdlib"
anyhow = { workspace = true }
bytes = { workspace = true }
paste = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
url = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub struct Storage {
}

impl Storage {
pub const CLOUD_STORAGE_PREFIXES: [&'static str; 3] = ["AWS_", "AZURE_", "GOOGLE_"];

pub fn new(
options: Arc<HashMap<String, String>>,
hudi_configs: Arc<HudiConfigs>,
Expand Down
314 changes: 314 additions & 0 deletions crates/core/src/table/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

use anyhow::{anyhow, Context, Result};
use paste::paste;
use std::collections::HashMap;
use std::env;
use std::hash::Hash;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use strum::IntoEnumIterator;

use crate::config::internal::HudiInternalConfig::SkipConfigValidation;
use crate::config::read::HudiReadConfig;
use crate::config::table::HudiTableConfig::{DropsPartitionFields, TableType, TableVersion};
use crate::config::table::TableTypeValue::CopyOnWrite;
use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::config::utils::{parse_data_for_options, split_hudi_options_from_others};
use crate::config::{HudiConfigs, HUDI_CONF_DIR};
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
use crate::table::timeline::Timeline;
use crate::table::Table;

/// Builder for creating a [Table] instance.
#[derive(Debug, Clone)]
pub struct TableBuilder {
base_uri: String,
hudi_options: HashMap<String, String>,
storage_options: HashMap<String, String>,
options: HashMap<String, String>,
}

macro_rules! impl_with_options {
($struct_name:ident, $($field:ident),+) => {
impl $struct_name {
$(
paste! {
#[doc = "Add " $field " to the builder. Subsequent calls overwrite the previous values if the key already exists."]
pub fn [<with_ $field>]<I, K, V>(mut self, options: I) -> Self
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
self.$field.extend(options.into_iter().map(|(k, v)| (k.as_ref().to_string(), v.into())));
self
}
}
)+
}
};
}

impl_with_options!(TableBuilder, hudi_options, storage_options, options);

impl TableBuilder {
/// Create Hudi table builder from base table uri
pub fn from_base_uri(base_uri: &str) -> Self {
TableBuilder {
base_uri: base_uri.to_string(),
storage_options: HashMap::new(),
hudi_options: HashMap::new(),
options: HashMap::new(),
}
}
xushiyan marked this conversation as resolved.
Show resolved Hide resolved

pub async fn build(&mut self) -> Result<Table> {
self.resolve_options().await?;

let hudi_configs = HudiConfigs::new(self.hudi_options.iter());

Self::validate_configs(&hudi_configs).expect("Hudi configs are not valid.");

let hudi_configs = Arc::from(hudi_configs);
let storage_options = Arc::from(self.storage_options.clone());

let timeline = Timeline::new(hudi_configs.clone(), storage_options.clone())
.await
.context("Failed to load timeline")?;

let file_system_view = FileSystemView::new(hudi_configs.clone(), storage_options.clone())
.await

Check warning on line 100 in crates/core/src/table/builder.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/builder.rs#L100

Added line #L100 was not covered by tests
.context("Failed to load file system view")?;

Ok(Table {
hudi_configs,
storage_options,
timeline,
file_system_view,
})
}

/// Resolve all options by combining the ones from hoodie.properties, user-provided options,
/// env vars, and global Hudi configs. The precedence order is as follows:
///
/// 1. hoodie.properties
/// 2. Explicit options provided by the user
/// 3. Generic options provided by the user
/// 4. Env vars
/// 5. Global Hudi configs
///
/// [note] Error may occur when 1 and 2 have conflicts.
async fn resolve_options(&mut self) -> Result<()> {
// Insert the base path into hudi options since it is explicitly provided
self.hudi_options.insert(
HudiTableConfig::BasePath.as_ref().to_string(),
self.base_uri.clone(),
);

let (generic_hudi_opts, generic_other_opts) =
split_hudi_options_from_others(self.options.iter());

// Combine generic options (lower precedence) with explicit options.
// Note that we treat all non-Hudi options as storage options
Self::extend_if_absent(&mut self.hudi_options, &generic_hudi_opts);
Self::extend_if_absent(&mut self.storage_options, &generic_other_opts);

// if any user-provided options are intended for cloud storage and in uppercase,
// convert them to lowercase. This is to allow `object_store` to pick them up.
Self::imbue_cloud_env_vars(&mut self.storage_options);

// At this point, we have resolved the storage options needed for accessing the storage layer.
// We can now resolve the hudi options
Self::resolve_hudi_options(&self.storage_options, &mut self.hudi_options).await
}

fn imbue_cloud_env_vars(options: &mut HashMap<String, String>) {
for (key, value) in env::vars() {
if Storage::CLOUD_STORAGE_PREFIXES
.iter()
.any(|prefix| key.starts_with(prefix))
&& !options.contains_key(&key.to_ascii_lowercase())

Check warning on line 150 in crates/core/src/table/builder.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/builder.rs#L150

Added line #L150 was not covered by tests
{
options.insert(key.to_ascii_lowercase(), value);

Check warning on line 152 in crates/core/src/table/builder.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/builder.rs#L152

Added line #L152 was not covered by tests
}
}
}

async fn resolve_hudi_options(
storage_options: &HashMap<String, String>,
hudi_options: &mut HashMap<String, String>,
) -> Result<()> {
// create a [Storage] instance to load properties from storage layer.
let storage = Storage::new(
Arc::new(storage_options.clone()),
Arc::new(HudiConfigs::new(hudi_options.iter())),
)?;

Self::imbue_table_properties(hudi_options, storage.clone()).await?;

// TODO load Hudi configs from env vars here before loading global configs

Self::imbue_global_hudi_configs_if_absent(hudi_options, storage.clone()).await
}

async fn imbue_table_properties(
options: &mut HashMap<String, String>,
storage: Arc<Storage>,
) -> Result<()> {
let bytes = storage.get_file_data(".hoodie/hoodie.properties").await?;
let table_properties = parse_data_for_options(&bytes, "=")?;

// We currently treat all table properties as the highest precedence, which is valid for most cases.
// TODO: handle the case where the same key is present in both table properties and options
kazdy marked this conversation as resolved.
Show resolved Hide resolved
for (k, v) in table_properties {
options.insert(k.to_string(), v.to_string());
}

Ok(())
}

async fn imbue_global_hudi_configs_if_absent(
options: &mut HashMap<String, String>,
storage: Arc<Storage>,
) -> Result<()> {
let global_config_path = env::var(HUDI_CONF_DIR)
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("/etc/hudi/conf"))
.join("hudi-defaults.conf");

if let Ok(bytes) = storage
.get_file_data_from_absolute_path(global_config_path.to_str().unwrap())
.await
{
if let Ok(global_configs) = parse_data_for_options(&bytes, " \t=") {
for (key, value) in global_configs {
if key.starts_with("hoodie.") && !options.contains_key(&key) {
options.insert(key.to_string(), value.to_string());
}
}
}
}

Ok(())
}

fn validate_configs(hudi_configs: &HudiConfigs) -> Result<()> {
if hudi_configs
.get_or_default(SkipConfigValidation)
.to::<bool>()
{
return Ok(());
}

for conf in HudiTableConfig::iter() {
hudi_configs.validate(conf)?

Check warning on line 224 in crates/core/src/table/builder.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/builder.rs#L224

Added line #L224 was not covered by tests
}

for conf in HudiReadConfig::iter() {
hudi_configs.validate(conf)?

Check warning on line 228 in crates/core/src/table/builder.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/builder.rs#L228

Added line #L228 was not covered by tests
}

// additional validation
let table_type = hudi_configs.get(TableType)?.to::<String>();
if TableTypeValue::from_str(&table_type)? != CopyOnWrite {
return Err(anyhow!("Only support copy-on-write table."));

Check warning on line 234 in crates/core/src/table/builder.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/builder.rs#L234

Added line #L234 was not covered by tests
}

let table_version = hudi_configs.get(TableVersion)?.to::<isize>();
if !(5..=6).contains(&table_version) {
return Err(anyhow!("Only support table version 5 and 6."));

Check warning on line 239 in crates/core/src/table/builder.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/builder.rs#L239

Added line #L239 was not covered by tests
}

let drops_partition_cols = hudi_configs
.get_or_default(DropsPartitionFields)
.to::<bool>();
if drops_partition_cols {
return Err(anyhow!(
"Only support when `{}` is disabled",
DropsPartitionFields.as_ref()

Check warning on line 248 in crates/core/src/table/builder.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/builder.rs#L246-L248

Added lines #L246 - L248 were not covered by tests
));
}

Ok(())
}

fn extend_if_absent<K, V>(target: &mut HashMap<K, V>, source: &HashMap<K, V>)
where
K: Eq + Hash + Clone,
V: Clone,
{
for (key, value) in source {
target.entry(key.clone()).or_insert_with(|| value.clone());
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_build_from_explicit_options() {
let hudi_options = [("hoodie.option1", "value1"), ("hoodie.option3", "value3")];
let storage_options = [
("AWS_REGION", "us-east-1"),
("AWS_ENDPOINT", "s3.us-east-1.amazonaws.com"),
];
let builder = TableBuilder::from_base_uri("/tmp/hudi_data")
.with_hudi_options(hudi_options)
.with_storage_options(storage_options);
let hudi_options = &builder.hudi_options;
let storage_options = &builder.storage_options;
assert_eq!(hudi_options.len(), 2);
assert_eq!(hudi_options["hoodie.option1"], "value1");
assert_eq!(hudi_options["hoodie.option3"], "value3");
assert_eq!(storage_options.len(), 2);
assert_eq!(storage_options["AWS_REGION"], "us-east-1");
assert_eq!(
storage_options["AWS_ENDPOINT"],
"s3.us-east-1.amazonaws.com"
);
}

#[test]
fn test_build_from_explicit_options_chained() {
let builder = TableBuilder::from_base_uri("/tmp/hudi_data")
.with_hudi_options([("hoodie.option1", "value1")])
.with_hudi_options([("hoodie.option1", "value1-1")])
.with_hudi_options([("hoodie.option3", "value3")])
.with_storage_options([("AWS_REGION", "us-east-2")])
.with_storage_options([("AWS_REGION", "us-east-1")])
.with_storage_options([("AWS_ENDPOINT", "s3.us-east-1.amazonaws.com")]);
let hudi_options = &builder.hudi_options.clone();
let storage_options = &builder.storage_options.clone();
assert_eq!(hudi_options.len(), 2);
assert_eq!(hudi_options["hoodie.option1"], "value1-1");
assert_eq!(hudi_options["hoodie.option3"], "value3");
assert_eq!(storage_options.len(), 2);
assert_eq!(storage_options["AWS_REGION"], "us-east-1");
assert_eq!(
storage_options["AWS_ENDPOINT"],
"s3.us-east-1.amazonaws.com"
);
}
}
Loading
Loading