Skip to content

Commit

Permalink
fix: window sort support alias time index (#5543)
Browse files Browse the repository at this point in the history
* fix: use alias expr to check commutativity

* chore: debug sort

* feat: consider alias in window sort optimizer

* test: sqlness test

* test: update sqlness result
  • Loading branch information
evenyag authored Feb 18, 2025
1 parent 4ef038d commit 77223a0
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 15 deletions.
5 changes: 3 additions & 2 deletions src/query/src/dist_plan/commutativity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,9 @@ impl Categorizer {
| Expr::ScalarSubquery(_)
| Expr::Wildcard { .. } => Commutativity::Unimplemented,

Expr::Alias(_)
| Expr::Unnest(_)
Expr::Alias(alias) => Self::check_expr(&alias.expr),

Expr::Unnest(_)
| Expr::GroupingSet(_)
| Expr::Placeholder(_)
| Expr::OuterReferenceColumn(_, _) => Commutativity::Unimplemented,
Expand Down
24 changes: 19 additions & 5 deletions src/query/src/optimizer/windowed_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::sync::Arc;

use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
Expand Down Expand Up @@ -88,7 +90,7 @@ impl WindowedSortPhysicalRule {
.expr
.as_any()
.downcast_ref::<PhysicalColumn>()
&& column_expr.name() == scanner_info.time_index
&& scanner_info.time_index.contains(column_expr.name())
{
} else {
return Ok(Transformed::no(plan));
Expand Down Expand Up @@ -148,13 +150,13 @@ impl WindowedSortPhysicalRule {
#[derive(Debug)]
struct ScannerInfo {
partition_ranges: Vec<Vec<PartitionRange>>,
time_index: String,
time_index: HashSet<String>,
tag_columns: Vec<String>,
}

fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Option<ScannerInfo>> {
let mut partition_ranges = None;
let mut time_index = None;
let mut time_index = HashSet::new();
let mut tag_columns = None;
let mut is_batch_coalesced = false;

Expand All @@ -172,9 +174,21 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
is_batch_coalesced = true;
}

// Collects alias of the time index column.
if let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() {
for (expr, output_name) in projection.expr() {
if let Some(column_expr) = expr.as_any().downcast_ref::<PhysicalColumn>() {
if time_index.contains(column_expr.name()) {
time_index.insert(output_name.clone());
}
}
}
}

if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
time_index = Some(region_scan_exec.time_index());
// Reset time index column.
time_index = HashSet::from([region_scan_exec.time_index()]);
tag_columns = Some(region_scan_exec.tag_columns());

// set distinguish_partition_ranges to true, this is an incorrect workaround
Expand All @@ -189,7 +203,7 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
let result = try {
ScannerInfo {
partition_ranges: partition_ranges?,
time_index: time_index?,
time_index,
tag_columns: tag_columns?,
}
};
Expand Down
81 changes: 81 additions & 0 deletions tests/cases/distributed/explain/order_by.result
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,84 @@ DROP TABLE test;

Affected Rows: 0

CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4');

Affected Rows: 0

INSERT INTO test_pk VALUES (1, 1, 1), (2, NULL, 2), (3, 1, 3), (4, 2, 4), (5, 2, 5), (6, NULL, 6);

Affected Rows: 6

-- Test aliasing.
SELECT i, t AS alias_ts FROM test_pk ORDER BY t DESC LIMIT 5;

+---+-------------------------+
| i | alias_ts |
+---+-------------------------+
| | 1970-01-01T00:00:00.006 |
| 2 | 1970-01-01T00:00:00.005 |
| 2 | 1970-01-01T00:00:00.004 |
| 1 | 1970-01-01T00:00:00.003 |
| | 1970-01-01T00:00:00.002 |
+---+-------------------------+

-- Test aliasing.
SELECT i, t AS alias_ts FROM test_pk ORDER BY alias_ts DESC LIMIT 5;

+---+-------------------------+
| i | alias_ts |
+---+-------------------------+
| | 1970-01-01T00:00:00.006 |
| 2 | 1970-01-01T00:00:00.005 |
| 2 | 1970-01-01T00:00:00.004 |
| 1 | 1970-01-01T00:00:00.003 |
| | 1970-01-01T00:00:00.002 |
+---+-------------------------+

-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT i, t AS alias_ts FROM test_pk ORDER BY t DESC LIMIT 5;

+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_ProjectionExec: expr=[i@0 as i, t@1 as alias_ts] REDACTED
|_|_|_SortPreservingMergeExec: [test_pk.t__temp__0@2 DESC] REDACTED
|_|_|_WindowedSortExec: expr=test_pk.t__temp__0@2 DESC num_ranges=1 fetch=5 REDACTED
|_|_|_PartSortExec: expr=test_pk.t__temp__0@2 DESC num_ranges=1 limit=5 REDACTED
|_|_|_ProjectionExec: expr=[i@0 as i, t@1 as t, t@1 as test_pk.t__temp__0] REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|_|_|_|
|_|_| Total rows: 5_|
+-+-+-+

-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT i, t AS alias_ts FROM test_pk ORDER BY alias_ts DESC LIMIT 5;

+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_ProjectionExec: expr=[i@0 as i, t@1 as alias_ts] REDACTED
|_|_|_SortPreservingMergeExec: [t@1 DESC] REDACTED
|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=1 fetch=5 REDACTED
|_|_|_PartSortExec: expr=t@1 DESC num_ranges=1 limit=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|_|_|_|
|_|_| Total rows: 5_|
+-+-+-+

DROP TABLE test_pk;

Affected Rows: 0

26 changes: 26 additions & 0 deletions tests/cases/distributed/explain/order_by.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,29 @@ EXPLAIN SELECT a, b FROM test ORDER BY a, b;
EXPLAIN SELECT DISTINCT a, b FROM test ORDER BY a, b;

DROP TABLE test;

CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4');

INSERT INTO test_pk VALUES (1, 1, 1), (2, NULL, 2), (3, 1, 3), (4, 2, 4), (5, 2, 5), (6, NULL, 6);

-- Test aliasing.
SELECT i, t AS alias_ts FROM test_pk ORDER BY t DESC LIMIT 5;

-- Test aliasing.
SELECT i, t AS alias_ts FROM test_pk ORDER BY alias_ts DESC LIMIT 5;

-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT i, t AS alias_ts FROM test_pk ORDER BY t DESC LIMIT 5;

-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT i, t AS alias_ts FROM test_pk ORDER BY alias_ts DESC LIMIT 5;

DROP TABLE test_pk;
12 changes: 4 additions & 8 deletions tests/cases/standalone/common/order/order_by_exceptions.result
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,16 @@ EXPLAIN SELECT a % 2, b FROM test UNION SELECT a % 2 AS k, b FROM test ORDER BY
| logical_plan | Sort: Int64(-1) ASC NULLS LAST |
| | Aggregate: groupBy=[[test.a % Int64(2), test.b]], aggr=[[]] |
| | Union |
| | Projection: CAST(test.a AS Int64) % Int64(2) AS test.a % Int64(2), test.b |
| | MergeScan [is_placeholder=false] |
| | Projection: CAST(test.a AS Int64) % Int64(2) AS test.a % Int64(2), test.b |
| | MergeScan [is_placeholder=false] |
| | MergeScan [is_placeholder=false] |
| | MergeScan [is_placeholder=false] |
| physical_plan | CoalescePartitionsExec |
| | AggregateExec: mode=FinalPartitioned, gby=[test.a % Int64(2)@0 as test.a % Int64(2), b@1 as b], aggr=[] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: REDACTED
| | AggregateExec: mode=Partial, gby=[test.a % Int64(2)@0 as test.a % Int64(2), b@1 as b], aggr=[] |
| | UnionExec |
| | ProjectionExec: expr=[CAST(a@0 AS Int64) % 2 as test.a % Int64(2), b@1 as b] |
| | MergeScanExec: REDACTED
| | ProjectionExec: expr=[CAST(a@0 AS Int64) % 2 as test.a % Int64(2), b@1 as b] |
| | MergeScanExec: REDACTED
| | MergeScanExec: REDACTED
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------------------+

Expand Down
81 changes: 81 additions & 0 deletions tests/cases/standalone/optimizer/order_by.result
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,84 @@ explain select * from numbers order by number asc limit 10;
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4');

Affected Rows: 0

INSERT INTO test_pk VALUES (1, 1, 1), (2, NULL, 2), (3, 1, 3), (4, 2, 4), (5, 2, 5), (6, NULL, 6);

Affected Rows: 6

-- Test aliasing.
SELECT i, t AS alias_ts FROM test_pk ORDER BY t DESC LIMIT 5;

+---+-------------------------+
| i | alias_ts |
+---+-------------------------+
| | 1970-01-01T00:00:00.006 |
| 2 | 1970-01-01T00:00:00.005 |
| 2 | 1970-01-01T00:00:00.004 |
| 1 | 1970-01-01T00:00:00.003 |
| | 1970-01-01T00:00:00.002 |
+---+-------------------------+

-- Test aliasing.
SELECT i, t AS alias_ts FROM test_pk ORDER BY alias_ts DESC LIMIT 5;

+---+-------------------------+
| i | alias_ts |
+---+-------------------------+
| | 1970-01-01T00:00:00.006 |
| 2 | 1970-01-01T00:00:00.005 |
| 2 | 1970-01-01T00:00:00.004 |
| 1 | 1970-01-01T00:00:00.003 |
| | 1970-01-01T00:00:00.002 |
+---+-------------------------+

-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT i, t AS alias_ts FROM test_pk ORDER BY t DESC LIMIT 5;

+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_ProjectionExec: expr=[i@0 as i, alias_ts@1 as alias_ts] REDACTED
|_|_|_SortPreservingMergeExec: [t@2 DESC] REDACTED
|_|_|_WindowedSortExec: expr=t@2 DESC num_ranges=1 fetch=5 REDACTED
|_|_|_PartSortExec: expr=t@2 DESC num_ranges=1 limit=5 REDACTED
|_|_|_ProjectionExec: expr=[i@0 as i, t@1 as alias_ts, t@1 as t] REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|_|_|_|
|_|_| Total rows: 5_|
+-+-+-+

-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT i, t AS alias_ts FROM test_pk ORDER BY alias_ts DESC LIMIT 5;

+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [alias_ts@1 DESC] REDACTED
|_|_|_WindowedSortExec: expr=alias_ts@1 DESC num_ranges=1 fetch=5 REDACTED
|_|_|_PartSortExec: expr=alias_ts@1 DESC num_ranges=1 limit=5 REDACTED
|_|_|_ProjectionExec: expr=[i@0 as i, t@1 as alias_ts] REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|_|_|_|
|_|_| Total rows: 5_|
+-+-+-+

DROP TABLE test_pk;

Affected Rows: 0

26 changes: 26 additions & 0 deletions tests/cases/standalone/optimizer/order_by.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,29 @@ explain select * from numbers order by number asc;
explain select * from numbers order by number desc limit 10;

explain select * from numbers order by number asc limit 10;

CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4');

INSERT INTO test_pk VALUES (1, 1, 1), (2, NULL, 2), (3, 1, 3), (4, 2, 4), (5, 2, 5), (6, NULL, 6);

-- Test aliasing.
SELECT i, t AS alias_ts FROM test_pk ORDER BY t DESC LIMIT 5;

-- Test aliasing.
SELECT i, t AS alias_ts FROM test_pk ORDER BY alias_ts DESC LIMIT 5;

-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT i, t AS alias_ts FROM test_pk ORDER BY t DESC LIMIT 5;

-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT i, t AS alias_ts FROM test_pk ORDER BY alias_ts DESC LIMIT 5;

DROP TABLE test_pk;

0 comments on commit 77223a0

Please sign in to comment.