Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataSink::write_all given invalid RecordBatchStream #14394

Closed
Tracked by #14123
gatesn opened this issue Jan 31, 2025 · 11 comments · Fixed by #14472
Closed
Tracked by #14123

DataSink::write_all given invalid RecordBatchStream #14394

gatesn opened this issue Jan 31, 2025 · 11 comments · Fixed by #14472
Labels
bug Something isn't working

Comments

@gatesn
Copy link
Contributor

gatesn commented Jan 31, 2025

Describe the bug

We were trying to implement a DataSink and found that we were being given different schemas for the record batches than per the RecordBatchStream.

STREAM DTYPE Schema { fields: [Field { name: "c1", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c2", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }
RB SCHEMA: Schema { fields: [Field { name: "c1", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c2", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }

STREAM DTYPE Schema { fields: [Field { name: "c1", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c2", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }
RB SCHEMA: Schema { fields: [Field { name: "c1", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c2", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }

The first (correct) invocation comes from assembling a logical plan with LogicalPlanBuilder::insert_into

The second (incorrect) invocation comes from session.sql("INSERT INTO my_tbl VALUES ('hello', 42::INT);")

I figured I'd add an assertion into the RecordBatchStreamAdapter, and it looks like ~12 tests fail on main right now with mismatched schemas. I wonder if it's worth adding that as a debug assertion?

https://github.com/gatesn/datafusion/pull/new/ngates/record-batch-stream-schema

cc @AdamGS

To Reproduce

No response

Expected behavior

No response

Additional context

No response

@gatesn gatesn added the bug Something isn't working label Jan 31, 2025
@ozankabak
Copy link
Contributor

Is this a bug in DataSink code, or is it a downstream bug that becomes somewhat hard to notice because there is no debug_assert?

@gatesn
Copy link
Contributor Author

gatesn commented Feb 2, 2025

I imagine it's downstream, and of course, the debug assert only catches bad RecordBatchStream impls that used the Adapter.

This specific bug may not even be caught by the debug assert if it doesn't use the Adapter somewhere.

@zhuqi-lucas
Copy link
Contributor

Thank you @gatesn for the report.

Can you provide the full code or sql to reproduce the it? So we can solve it more quickly.

@gatesn
Copy link
Contributor Author

gatesn commented Feb 2, 2025

Yes, this code fails the schema assertion:

https://github.com/apache/datafusion/compare/main...gatesn:datafusion:ngates/record-batch-stream-schema?expand=1#diff-9b1672adeba35025e24d21f8d7da2f0e87487231775c5e470edf56886f90c837R811-R833

@alamb
Copy link
Contributor

alamb commented Feb 3, 2025

I wonder if it's worth adding that as a debug assertion?

I agree this would be good to add -- most similar check in DataFusion do a check and raise an DataFusionError;:Internal if hit

@ozankabak
Copy link
Contributor

The only thing I would be worried about is the potential overhead. I get the feeling that this is something that should be done "once" but I'm not sure how we can do it. Maybe utilize the invariant and sanity checkers to check the contract somehow. Let's think about this.

@zhuqi-lucas
Copy link
Contributor

zhuqi-lucas commented Feb 3, 2025

The second (incorrect) invocation comes from session.sql("INSERT INTO my_tbl VALUES ('hello', 42::INT);")

          LogicalPlan::Dml(DmlStatement {
                table_name,
                op: WriteOp::Insert(insert_op),
                ..
            }) => {
                let name = table_name.table();
                let schema = session_state.schema_for_ref(table_name.clone())?;
                if let Some(provider) = schema.table(name).await? {
                    let input_exec = children.one()?;
                    provider
                        .insert_into(session_state, input_exec, *insert_op)
                        .await?
                } else {
                    return exec_err!("Table '{table_name}' does not exist");
                }
            }

I think we may can add check during the physical plan generation, we have the table schema, also we have the input_exec to insert into, so we can check here?

@gatesn
Copy link
Contributor Author

gatesn commented Feb 3, 2025

Is this overhead a problem if it's only a debug_assertion? It will be compiled out in release builds. Or are you looking for a release assertion to put somewhere?

@rkrishn7
Copy link
Contributor

rkrishn7 commented Feb 4, 2025

Poked around and I think the specific issue here is due to the fact that the schema assigned to LogicalPlan::Values during planning defaults all its fields to nullable.

I've opened a PR to capture nullability from the table schema for the specified columns.

@jonahgao
Copy link
Member

jonahgao commented Feb 6, 2025

I am concerned whether union will also lead to similar behavior when the nullability of the two inputs is different.

@rkrishn7
Copy link
Contributor

rkrishn7 commented Feb 7, 2025

@jonahgao Thank you for calling this out. I think you're right!

In fact, I think we could say more generally, this issue arises when the schema of the source of an INSERT statement contain fields that differ from the table schema in terms of nullability.

I think a better approach may be to map the schema of the source plan to a new schema that demonstrates parity in field nullability with the table schema. We could do this directly when planning the insert, after constructing the source LogicalPlan. In insert_to_plan.

I opened a new issue for this here - #14550

Would love a sanity check from you to verify the above makes sense!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants