-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bug: improve schema checking for insert into
cases
#14572
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain the reason of the change in test.slt, thanks
datafusion/common/src/dfschema.rs
Outdated
// 1. The len of the schema of the plan and the schema of the table should be the same | ||
// 2. The nullable flag of the schema of the plan and the schema of the table should be the same | ||
// 3. The datatype of the schema of the plan and the schema of the table should be the same | ||
fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<(), String> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not Result<bool>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally i use Result < bool >, but i want to get three different error messages for different case, so i change to Result<(), String>.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also define different messages with internal_err!("msg1"), internal_err!("msg2")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @jayzhan211 for this good suggestion, i change my code to use result<()>, it seems it's enough for this case, similar with many other cases, for example:
/// Check if the schema have some fields with the same name
pub fn check_names(&self) -> Result<()> {
}
datafusion/common/src/dfschema.rs
Outdated
f1.name() == f2.name() | ||
&& DFSchema::datatype_is_logically_equal( | ||
.try_for_each(|(f1, f2)| { | ||
if f1.is_nullable() != f2.is_nullable() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the field is nullable, we can insert non-null column. Similar to #14519
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a regression to me 🤔. Even though the schema of a source is nullable, all of its data can be non-nullable, and in such cases, it can still be inserted into a non-nullable sink. When inserting, we currently validate against the actual data rather than the schema. See check_not_null_constraints
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If 'DataSink receiving different schemas' is an issue, we can rewrite the schema of batches emitted by DataSinkExec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @jayzhan211 and @jonahgao for review, this is a good point, i change it to the only error case for nullable check:
// only check the case when the table field is not nullable and the insert data field is nullable
@@ -78,7 +104,7 @@ physical_plan | |||
query I | |||
INSERT INTO table_without_values SELECT | |||
SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), | |||
COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) | |||
NULLIF(COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need NULLIF? Does its use indicate a potential issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This regression now not happened after the above code changes.
0e7dac6
to
af496fb
Compare
@@ -81,11 +77,9 @@ STORED AS arrow | |||
LOCATION 'test_files/scratch/insert_to_external/arrow_dict_partitioned/' | |||
PARTITIONED BY (b); | |||
|
|||
query I | |||
query error DataFusion error: Error during planning: Inserting query must have the same schema nullability as the table\. Expected table field 'b' nullability: false, got field: 'b', nullability: true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is strange, it means the PARTITIONED BY (b) will make the field 'b' nullability: false?
This is the only different case when PARTITIONED BY happen.
@@ -228,7 +228,7 @@ CREATE TABLE aggregate_test_100_null ( | |||
c11 FLOAT | |||
); | |||
|
|||
statement ok | |||
statement error DataFusion error: Error during planning: Inserting query must have the same schema nullability as the table\. Expected table field 'c5' nullability: false, got field: 'c5', nullability: true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only regression in the slt i think. cc @jayzhan211 @jonahgao
# Setup test data table
statement ok
CREATE EXTERNAL TABLE aggregate_test_100 (
c1 VARCHAR NOT NULL,
c2 TINYINT NOT NULL,
c3 SMALLINT NOT NULL,
c4 SMALLINT,
c5 INT,
c6 BIGINT NOT NULL,
c7 SMALLINT NOT NULL,
c8 INT NOT NULL,
c9 INT UNSIGNED NOT NULL,
c10 BIGINT UNSIGNED NOT NULL,
c11 FLOAT NOT NULL,
c12 DOUBLE NOT NULL,
c13 VARCHAR NOT NULL
)
STORED AS CSV
LOCATION '../../testing/data/csv/aggregate_test_100.csv'
OPTIONS ('format.has_header' 'true');
statement ok
CREATE TABLE aggregate_test_100_null (
c2 TINYINT NOT NULL,
c5 INT NOT NULL,
c3 SMALLINT,
c11 FLOAT
);
statement error DataFusion error: Error during planning: Inserting query must have the same schema nullability as the table\. Expected table field 'c5' nullability: false, got field: 'c5', nullability: true
INSERT INTO aggregate_test_100_null
SELECT
c2,
c5,
CASE WHEN c1 = 'e' THEN NULL ELSE c3 END as c3,
CASE WHEN c1 = 'a' THEN NULL ELSE c11 END as c11
FROM aggregate_test_100;
I think the original behaviour is wrong, because the insert table is not nullable.
statement ok | ||
CREATE TABLE aggregate_test_100_null ( | ||
c2 TINYINT NOT NULL, | ||
c5 INT, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted, i also add the successful case which the table field c5 is nullable.
insert into
cases
Thank you for review @jayzhan211, i already updated the slt now, and added note for the only 2 different results for the sql. |
0071247
to
f648838
Compare
insert into table_without_values(field2) values(300); | ||
---- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is due to we have checking in insert into plan now, before this we only have the check for execution plan check.
The CI error is caused by: |
insert into dictionary_encoded_parquet_partitioned | ||
select * from dictionary_encoded_values | ||
---- | ||
2 | ||
DataFusion error: Error during planning: Inserting query must have the same schema nullability as the table. Expected table field 'b' nullability: false, got field: 'b', nullability: true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also expected, because PARTITIONED BY (b), will make the b nullable to false.
We shouldn't support insert nullable value for partition key i think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error should be after query error
to pass CI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because PARTITIONED BY (b), will make the b nullable to false
can be add as comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments in latest PR, thanks! @jayzhan211
Error should be after query error to pass CI, i think it's auto generated after the PR here:
For example:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is auto generated a long time before, we need to manually move it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But why CI is green 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jayzhan211 This https://github.com/apache/datafusion/pull/14439/files#diff-51757b2b1d0a07b88551d88eabeba7f74e11b5217e44203ac7c6f613c0221196R273 merged less than a week ago. I think it's start from there, and i tried local sql logic test use -- --complete, it's also the same result, i guess the CI also use it to generate and verify?
We may need a follow-up issue to investigate it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the multiline error feature of sqllogictest-rs.
https://github.com/risinglightdb/sqllogictest-rs/blob/7ee44cd995fb65175bb647d07f63b557dbaa22c7/CHANGELOG.md#0180---2023-11-08
datafusion/common/src/dfschema.rs
Outdated
.zip(other.fields().iter()) | ||
.try_for_each(|(f1, f2)| { | ||
// only check the case when the table field is not nullable and the insert data field is nullable | ||
if !f1.is_nullable() && f2.is_nullable() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition would prevent the following query from executing, but it works on both the main branch and Postgres.
create table t1(a int not null);
create table t2(a int);
insert into t2 values(100);
insert into t1 select * from t2;
As I mentioned earlier, we already have a check during execution called check_not_null_constraints, so I think we should not add this restriction here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jonahgao , got it now, this is a good example to explain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if it's necessary to ensure that the schema of output batches has the same nullability. This issue exists not only with inserts but also with other queries like UNION.
DataFusion CLI v45.0.0
> create table t1(a int not null) as values(1);
0 row(s) fetched.
Elapsed 0.009 seconds.
> create table t2(a int) as values(2);
0 row(s) fetched.
Elapsed 0.011 seconds.
> select * from t1 union all select * from t2;
batch schema: Schema { fields: [Field { name: "a", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }
batch schema: Schema { fields: [Field { name: "a", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }
+---+
| a |
+---+
| 1 |
| 2 |
+---+
2 row(s) fetched.
Elapsed 0.007 seconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is necessary, perhaps we should rewrite the nullability of the output batches instead of restricting the input schemas to have the same nullability, as the latter could prevent some queries from being executed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataFusion CLI v45.0.0
> create table t1(a int not null) as values(1);
0 row(s) fetched.
Elapsed 0.012 seconds.
> create table t2(a int) as values(null);
0 row(s) fetched.
Elapsed 0.003 seconds.
> select * from t1 union all select * from t2;
+------+
| a |
+------+
| NULL |
| 1 |
+------+
2 row(s) fetched.
Elapsed 0.004 seconds.
Thanks @jonahgao , got it now, we may need to make the output schema unified if we have nullable field for any input.
For example, above case, we need to make sure the schema is nullable for output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, it seems several union issues are also related the nullable for union:
#14352
.try_for_each(|(f1, f2)| { | ||
if f1.name() != f2.name() || !DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) { | ||
_plan_err!( | ||
"Inserting query schema mismatch: Expected table field '{}' with type {:?}, \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the function name, it seems that we haven't restricted it to only be used for insertion🤔.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked the function only be called by insert into cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we could update the comments to reflect this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments in latest PR, thanks all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like an improvment to me -- thank you @zhuqi-lucas and @jonahgao
.try_for_each(|(f1, f2)| { | ||
if f1.name() != f2.name() || !DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) { | ||
_plan_err!( | ||
"Inserting query schema mismatch: Expected table field '{}' with type {:?}, \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we could update the comments to reflect this
match write_df | ||
.write_table("t", DataFrameWriteOptions::new()) | ||
.await | ||
{ | ||
Ok(_) => {} | ||
Err(e) => { | ||
assert_contains!( | ||
e.to_string(), | ||
"Inserting query must have the same schema length as the table." | ||
); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can write this much more concisely using unwrap_err
match write_df | |
.write_table("t", DataFrameWriteOptions::new()) | |
.await | |
{ | |
Ok(_) => {} | |
Err(e) => { | |
assert_contains!( | |
e.to_string(), | |
"Inserting query must have the same schema length as the table." | |
); | |
} | |
} | |
let e = write_df | |
.write_table("t", DataFrameWriteOptions::new()) | |
.await | |
.unwrap_err(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same comment applies to the code below as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, thanks @alamb , addressed in latest PR.
let initial_table = Arc::new(MemTable::try_new(schema.clone(), vec![vec![]])?); | ||
session_ctx.register_table("t", initial_table.clone())?; | ||
|
||
// There are three cases we need to check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// There are three cases we need to check | |
// There are two cases we need to check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Addressed in latest
Thanks @alamb for review, addressed comments in latest PR. For union schema potential schema checking improvement #14572 (comment) , we may can create a new issue to discuss it. |
🚀 |
Which issue does this PR close?
Describe the bug
In, #14394, it was reported that while attempting to implement a DataSink different schemas for the record batches were being given than per the RecordBatchStream.
A fix for the given example, an INSERT INTO ... VALUES query, was merged (#14472). However, this issue likely arises when the schema of the source of an INSERT statement contain fields that differ from the table schema in terms of nullability. That is, the problem is not just limited to INSERT INTO ... VALUES statements.
What changes are included in this PR?
Add a separate nullable checking besides the original checking which only include the name and datatype.
Improve the error message to including more info about the error.
We will improve the checking for the 3 cases, also improve the error message.
There are three cases we need to check
2 (The nullable flag of the schema of the plan and the schema of the table should be the same) This is not needed, we have checking in execution plan.
Are these changes tested?
Yes
Are there any user-facing changes?
No