Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make partition loading more efficient #152

Merged
merged 1 commit into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 45 additions & 36 deletions crates/core/src/table/fs_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ use crate::file_group::{BaseFile, FileGroup, FileSlice};
use crate::storage::file_info::FileInfo;
use crate::storage::{get_leaf_dirs, Storage};
use crate::table::partition::PartitionPruner;
use anyhow::{anyhow, Result};
use anyhow::Result;
use arrow::record_batch::RecordBatch;
use dashmap::DashMap;
use futures::stream::{self, StreamExt, TryStreamExt};
use url::Url;

#[derive(Clone, Debug)]
Expand All @@ -53,6 +54,10 @@ impl FileSystemView {
})
}

async fn load_all_partition_paths(storage: &Storage) -> Result<Vec<String>> {
Self::load_partition_paths(storage, &PartitionPruner::empty()).await
}

async fn load_partition_paths(
storage: &Storage,
partition_pruner: &PartitionPruner,
Expand Down Expand Up @@ -80,22 +85,6 @@ impl FileSystemView {
.collect())
}

async fn load_file_groups_for_partitions(
storage: &Storage,
partition_paths: Vec<String>,
) -> Result<HashMap<String, Vec<FileGroup>>> {
let mut partition_to_file_groups = HashMap::new();
for p in partition_paths {
match Self::load_file_groups_for_partition(storage, p.as_str()).await {
Ok(file_groups) => {
partition_to_file_groups.insert(p, file_groups);
}
Err(e) => return Err(anyhow!("Failed to load partitions: {}", e)),
}
}
Ok(partition_to_file_groups)
}

async fn load_file_groups_for_partition(
storage: &Storage,
partition_path: &str,
Expand Down Expand Up @@ -133,32 +122,52 @@ impl FileSystemView {
timestamp: &str,
partition_pruner: &PartitionPruner,
excluding_file_groups: &HashSet<FileGroup>,
) -> Result<Vec<FileSlice>> {
let all_partition_paths = Self::load_all_partition_paths(&self.storage).await?;

let partition_paths_to_load = all_partition_paths
.into_iter()
.filter(|p| !self.partition_to_file_groups.contains_key(p))
.filter(|p| partition_pruner.should_include(p))
.collect::<HashSet<_>>();

stream::iter(partition_paths_to_load)
.map(|path| async move {
let file_groups =
Self::load_file_groups_for_partition(&self.storage, &path).await?;
Ok::<_, anyhow::Error>((path, file_groups))
})
// TODO parameterize the parallelism for partition loading
.buffer_unordered(10)
.try_for_each(|(path, file_groups)| async move {
self.partition_to_file_groups.insert(path, file_groups);
Ok(())
})
.await?;

self.collect_file_slices_as_of(timestamp, partition_pruner, excluding_file_groups)
.await
}

async fn collect_file_slices_as_of(
&self,
timestamp: &str,
partition_pruner: &PartitionPruner,
excluding_file_groups: &HashSet<FileGroup>,
) -> Result<Vec<FileSlice>> {
let mut file_slices = Vec::new();
if self.partition_to_file_groups.is_empty() {
let partition_paths =
Self::load_partition_paths(&self.storage, partition_pruner).await?;
let partition_to_file_groups =
Self::load_file_groups_for_partitions(&self.storage, partition_paths).await?;
partition_to_file_groups.into_iter().for_each(|pair| {
self.partition_to_file_groups.insert(pair.0, pair.1);
});
}
for mut fgs in self
.partition_to_file_groups
.iter_mut()
.filter(|item| partition_pruner.should_include(item.key()))
{
let fgs_ref = fgs.value_mut();
for fg in fgs_ref {
for mut partition_entry in self.partition_to_file_groups.iter_mut() {
if !partition_pruner.should_include(partition_entry.key()) {
continue;
}
let file_groups = partition_entry.value_mut();
for fg in file_groups.iter_mut() {
if excluding_file_groups.contains(fg) {
continue;
}
if let Some(fsl) = fg.get_file_slice_mut_as_of(timestamp) {
// TODO: pass ref instead of copying
fsl.load_stats(&self.storage).await?;
let immut_fsl: &FileSlice = fsl;
file_slices.push(immut_fsl.clone());
file_slices.push(fsl.clone());
}
}
}
Expand Down
118 changes: 68 additions & 50 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,88 +793,106 @@ mod tests {
}

#[tokio::test]
async fn hudi_table_get_file_paths_for_simple_key_and_non_hive_style() {
async fn hudi_table_get_file_paths_for_simple_keygen_non_hive_style() {
let base_url = TestTable::V6SimplekeygenNonhivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
assert_eq!(hudi_table.timeline.instants.len(), 2);

let partition_filters = &[];
let actual: HashSet<String> = HashSet::from_iter(
hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap(),
);
let expected: HashSet<String> = HashSet::from_iter(
vec![
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
"30/6db57019-98ee-480e-8eb1-fb3de48e1c24-0_1-119-167_20240418172804498.parquet",
]
let actual = hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap()
.into_iter()
.map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
.collect::<Vec<_>>(),
);
.collect::<HashSet<_>>();
let expected = [
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
"30/6db57019-98ee-480e-8eb1-fb3de48e1c24-0_1-119-167_20240418172804498.parquet",
]
.map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
.into_iter()
.collect::<HashSet<_>>();
assert_eq!(actual, expected);

let partition_filters = &["byteField >= 10", "byteField < 30"];
let actual: HashSet<String> = HashSet::from_iter(
hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap(),
);
let expected: HashSet<String> = HashSet::from_iter(
vec![
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
]
let actual = hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap()
.into_iter()
.map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
.collect::<Vec<_>>(),
);
.collect::<HashSet<_>>();
let expected = [
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
]
.map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
.into_iter()
.collect::<HashSet<_>>();
assert_eq!(actual, expected);

let partition_filters = &["byteField > 30"];
let actual = hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
let expected = HashSet::new();
assert_eq!(actual, expected);
}

#[tokio::test]
async fn hudi_table_get_file_paths_for_complex_keygen_and_hive_style() {
async fn hudi_table_get_file_paths_for_complex_keygen_hive_style() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
assert_eq!(hudi_table.timeline.instants.len(), 2);

let partition_filters = &[];
let actual: HashSet<String> = HashSet::from_iter(
hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap(),
);
let expected: HashSet<String> = HashSet::from_iter(vec![
let actual = hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
let expected= [
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
"byteField=20/shortField=100/bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet",
"byteField=30/shortField=100/4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0_1-161-224_20240418173235694.parquet",
]
.into_iter().map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() })
.collect::<Vec<_>>());
.map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() })
.into_iter()
.collect::<HashSet<_>>();
assert_eq!(actual, expected);

let partition_filters = &["byteField >= 10", "byteField < 20", "shortField != 100"];
let actual: HashSet<String> = HashSet::from_iter(
hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap(),
);
let expected: HashSet<String> = HashSet::from_iter(vec![
let actual = hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
let expected = [
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
]
.into_iter().map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() })
.collect::<Vec<_>>());
.map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() })
.into_iter()
.collect::<HashSet<_>>();
assert_eq!(actual, expected);

let partition_filters = &["byteField > 20", "shortField = 300"];
let actual = hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
let expected = HashSet::new();
assert_eq!(actual, expected);
}

#[tokio::test]
async fn hudi_table_read_snapshot_for_complex_keygen_and_hive_style() {
async fn hudi_table_read_snapshot_for_complex_keygen_hive_style() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let partition_filters = &["byteField >= 10", "byteField < 20", "shortField != 100"];
Expand Down
Loading