Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ListingTable cannot handle partition evolution #13270

Open
adriangb opened this issue Nov 6, 2024 · 11 comments
Open

ListingTable cannot handle partition evolution #13270

adriangb opened this issue Nov 6, 2024 · 11 comments
Assignees
Labels
bug Something isn't working

Comments

@adriangb
Copy link
Contributor

adriangb commented Nov 6, 2024

Describe the bug

With CSV:

echo "a,b\n1,2" > data1.csv
mkdir a=2
echo "b\n3" > a=2/data2.csv
datafusion-cli
> SELECT * FROM '**/*.csv';
Arrow error: Csv error: incorrect number of fields for line 1, expected 2 got 1

With Parquet:

import os
import polars as pl

pl.DataFrame({'a': [1], 'b': [2]}).write_parquet('data1.parquet')
os.mkdir('a=2')
pl.DataFrame({'b': [3]}).write_parquet('a=2/data2.parquet')
datafusion-cli
> SELECT * FROM '**/*.parquet';
+---+---+
| b | a |
+---+---+
| 2 | 1 |
| 3 |   |
+---+---+
2 row(s) fetched.
Elapsed 0.055 seconds.

To Reproduce

No response

Expected behavior

Partition evolution is handled and both cases return

+---+---+
| b | a |
+---+---+
| 2 | 1 |
| 3 | 2 |
+---+---+

Additional context

Having played around quite a bit with ParquetExec and the SchemaAdapter machinery I think what should happen is:

  • Partition values are on a per-file basis, in particular on each PartitionedFile and not on the FileScanConfig
  • Partition values are passed into the SchemaAdapter machinery and for each file it decides if it needs to add a column generated from partition values or not
@adriangb adriangb added the bug Something isn't working label Nov 6, 2024
@adriangb
Copy link
Contributor Author

adriangb commented Nov 6, 2024

cc @alamb I had promised you this a long time ago but only got around to it now

@alamb
Copy link
Contributor

alamb commented Nov 6, 2024

Thanks @adriangb

@zhuqi-lucas
Copy link
Contributor

take

@adriangb
Copy link
Contributor Author

@logan-keede I see you're doing some work on FileScanConfig. Would it be relevant to consider what needs to be changed to fix this?

@logan-keede
Copy link
Contributor

@adriangb my focus has been on refactoring FileScanConfig to move it out of core. I cant say I understand the internals that much, but I will look into it and mention it here if I find something relevant.

@zhuqi-lucas
Copy link
Contributor

@adriangb Sorry for the delay, i am starting to investigate this issue this week.

@zhuqi-lucas
Copy link
Contributor

zhuqi-lucas commented Feb 11, 2025

First round investigation:

We need runtime to do the partition evolution and infer partitions result need to overwrite the empty FileScanConfig table_partition_cols, i can't find a good way until now. Because many cases in code using the FileScanConfig table_partition_cols to pass the paras.

@adriangb Do you have any suggestions, how can we do this in current architecture?

Updated:

We may can have a runtime cache to store the partition evolution result? So we can use it if FileScanConfig table_partition_cols is empty?

@adriangb
Copy link
Contributor Author

I think the fundamental issue is that the partition columns are specified on a per-exec basis via FileScanConfig. The only solutions I can think of are:

  • Change the APIs to allow multiple FileScanConfig's to be supplied. This bring about issues of making sure the output schemas all match so they can be unioned, etc.
  • Move partition column generation into SchemaAdapter. The issue with this is that SchemaAdapter exists at a lower level than the concept of partition columns and it might be inappropriate to put that logic in there directly. But at the same time FileScanConfig and ParquetExec (recently folded into ParquetDataSource?) exist above the level of a single file, and partitioning can be as granular as a single file. I think the solution here would be to add hooks into SchemaAdapter to be able to handle missing columns so that the exec can inject information on how to generate partition columns from file paths. It could do that very dynamically on a per-file basis with no config or we could say that you have to pass the union of all columns that might be partition columns along with their field types and then if a file has only a subset of those that's okay, but we error or fill in nulls if we encounter a missing column that was not declared as a partition column.

@TheBuilderJR
Copy link
Contributor

+1 I'm also blocked on this. It'd be nice if schema evolution could be a first class citizen in datafusion. It's been pretty painful/stressful running into schema evolution bugs with https://telemetry.sh. It feels like a ticking time bomb before a schema gets corrupted :(

@zhuqi-lucas
Copy link
Contributor

zhuqi-lucas commented Feb 18, 2025

Just noticed we have a solution for partition evolution for dynamic file catalog, see details PR, may be we need some improvement based on it?

https://github.com/apache/datafusion/pull/12683/files

Still can't find a good solution from code side, feel free to take it.

@TheBuilderJR
Copy link
Contributor

@zhuqi-lucas here's one current failure scenario with evolution: #14755

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

5 participants