Skip to content

Commit

Permalink
feat(promql): supports sort, sort_desc etc. functions (#5542)
Browse files Browse the repository at this point in the history
* feat(promql): supports sort, sort_desc etc. functions

* chore: fix toml format and tests

* chore: update deps

Co-authored-by: Weny Xu <[email protected]>

* chore: remove fixme

* fix: cargo lock

* chore: style

---------

Co-authored-by: Weny Xu <[email protected]>
  • Loading branch information
killme2008 and WenyXu authored Feb 19, 2025
1 parent c8bdeaa commit 62a8b8b
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 10 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ parquet = { version = "53.0.0", default-features = false, features = ["arrow", "
paste = "1.0"
pin-project = "1.0"
prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { version = "0.4.3", features = ["ser"] }
promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", features = [
"ser",
], rev = "27abb8e16003a50c720f00d6c85f41f5fa2a2a8e" }
prost = "0.13"
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.8"
Expand Down
81 changes: 74 additions & 7 deletions src/query/src/promql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,8 +612,8 @@ impl PromPlanner {

// transform function arguments
let args = self.create_function_args(&args.args)?;
let input = if let Some(prom_expr) = args.input {
self.prom_expr_to_plan(&prom_expr, session_state).await?
let input = if let Some(prom_expr) = &args.input {
self.prom_expr_to_plan(prom_expr, session_state).await?
} else {
self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
self.ctx.reset_table_name_and_schema();
Expand All @@ -631,17 +631,43 @@ impl PromPlanner {
),
})
};
let mut func_exprs = self.create_function_expr(func, args.literals, session_state)?;
let mut func_exprs =
self.create_function_expr(func, args.literals.clone(), session_state)?;
func_exprs.insert(0, self.create_time_index_column_expr()?);
func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);

LogicalPlanBuilder::from(input)
let builder = LogicalPlanBuilder::from(input)
.project(func_exprs)
.context(DataFusionPlanningSnafu)?
.filter(self.create_empty_values_filter_expr()?)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)
.context(DataFusionPlanningSnafu)?;

let builder = match func.name {
"sort" => builder
.sort(self.create_field_columns_sort_exprs(true))
.context(DataFusionPlanningSnafu)?,
"sort_desc" => builder
.sort(self.create_field_columns_sort_exprs(false))
.context(DataFusionPlanningSnafu)?,
"sort_by_label" => builder
.sort(Self::create_sort_exprs_by_tags(
func.name,
args.literals,
true,
)?)
.context(DataFusionPlanningSnafu)?,
"sort_by_label_desc" => builder
.sort(Self::create_sort_exprs_by_tags(
func.name,
args.literals,
false,
)?)
.context(DataFusionPlanningSnafu)?,

_ => builder,
};

builder.build().context(DataFusionPlanningSnafu)
}

async fn prom_ext_expr_to_plan(
Expand Down Expand Up @@ -1432,6 +1458,16 @@ impl PromPlanner {

ScalarFunc::GeneratedExpr
}
"sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" => {
// These functions are not expression but a part of plan,
// they are processed by `prom_call_expr_to_plan`.
for value in &self.ctx.field_columns {
let expr = DfExpr::Column(Column::from_name(value));
exprs.push(expr);
}

ScalarFunc::GeneratedExpr
}
_ => {
if let Some(f) = session_state.scalar_functions().get(func.name) {
ScalarFunc::DataFusionBuiltin(f.clone())
Expand Down Expand Up @@ -1691,6 +1727,37 @@ impl PromPlanner {
Ok(result)
}

fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
self.ctx
.field_columns
.iter()
.map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, false))
.collect::<Vec<_>>()
}

fn create_sort_exprs_by_tags(
func: &str,
tags: Vec<DfExpr>,
asc: bool,
) -> Result<Vec<SortExpr>> {
ensure!(
!tags.is_empty(),
FunctionInvalidArgumentSnafu { fn_name: func }
);

tags.iter()
.map(|col| match col {
DfExpr::Literal(ScalarValue::Utf8(Some(label))) => {
Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
}
other => UnexpectedPlanExprSnafu {
desc: format!("expected label string literal, but found {:?}", other),
}
.fail(),
})
.collect::<Result<Vec<_>>>()
}

fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
for value in &self.ctx.field_columns {
Expand Down
154 changes: 154 additions & 0 deletions tests/cases/standalone/common/promql/sort.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
PRIMARY KEY(host, idc),
);

Affected Rows: 0

INSERT INTO TABLE test VALUES
(0, 'host1', 'idc1', 1),
(0, 'host2', 'idc1', 2),
(5000, 'host1', 'idc2', 3),
(5000, 'host2', 'idc2', 4),
(10000, 'host1', 'idc3', 5),
(10000, 'host2', 'idc3', 6),
(15000, 'host1', 'idc4', 7),
(15000, 'host2', 'idc4', 8);

Affected Rows: 8

TQL EVAL (0, 15, '5s') sort(test{host="host1"});

+---------------------+-----+-------+------+
| ts | val | host | idc |
+---------------------+-----+-------+------+
| 1970-01-01T00:00:00 | 1 | host1 | idc1 |
| 1970-01-01T00:00:05 | 1 | host1 | idc1 |
| 1970-01-01T00:00:10 | 1 | host1 | idc1 |
| 1970-01-01T00:00:15 | 1 | host1 | idc1 |
| 1970-01-01T00:00:05 | 3 | host1 | idc2 |
| 1970-01-01T00:00:10 | 3 | host1 | idc2 |
| 1970-01-01T00:00:15 | 3 | host1 | idc2 |
| 1970-01-01T00:00:10 | 5 | host1 | idc3 |
| 1970-01-01T00:00:15 | 5 | host1 | idc3 |
| 1970-01-01T00:00:15 | 7 | host1 | idc4 |
+---------------------+-----+-------+------+

TQL EVAL (0, 15, '5s') sort_desc(test{host="host1"});

+---------------------+-----+-------+------+
| ts | val | host | idc |
+---------------------+-----+-------+------+
| 1970-01-01T00:00:15 | 7 | host1 | idc4 |
| 1970-01-01T00:00:10 | 5 | host1 | idc3 |
| 1970-01-01T00:00:15 | 5 | host1 | idc3 |
| 1970-01-01T00:00:05 | 3 | host1 | idc2 |
| 1970-01-01T00:00:10 | 3 | host1 | idc2 |
| 1970-01-01T00:00:15 | 3 | host1 | idc2 |
| 1970-01-01T00:00:00 | 1 | host1 | idc1 |
| 1970-01-01T00:00:05 | 1 | host1 | idc1 |
| 1970-01-01T00:00:10 | 1 | host1 | idc1 |
| 1970-01-01T00:00:15 | 1 | host1 | idc1 |
+---------------------+-----+-------+------+

-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp
TQL EVAL (0, 15, '5s') sort(sum(test{host="host2"}) by (idc));

+---------------------+---------------+------+
| ts | sum(test.val) | idc |
+---------------------+---------------+------+
|timestamp | 2 | idc1 |
|timestamp | 2 | idc1 |
|timestamp | 2 | idc1 |
|timestamp | 2 | idc1 |
|timestamp | 4 | idc2 |
|timestamp | 4 | idc2 |
|timestamp | 4 | idc2 |
|timestamp | 6 | idc3 |
|timestamp | 6 | idc3 |
|timestamp | 8 | idc4 |
+---------------------+---------------+------+

-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp
TQL EVAL (0, 15, '5s') sort_desc(sum(test{host="host2"}) by (idc));

+---------------------+---------------+------+
| ts | sum(test.val) | idc |
+---------------------+---------------+------+
|timestamp | 8 | idc4 |
|timestamp | 6 | idc3 |
|timestamp | 6 | idc3 |
|timestamp | 4 | idc2 |
|timestamp | 4 | idc2 |
|timestamp | 4 | idc2 |
|timestamp | 2 | idc1 |
|timestamp | 2 | idc1 |
|timestamp | 2 | idc1 |
|timestamp | 2 | idc1 |
+---------------------+---------------+------+

-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp
-- SQLNESS REPLACE (\s\d\s) val
TQL EVAL (0, 15, '5s') sort_by_label(sum(test) by (idc, host), "idc", "host");

+---------------------+---------------+------+-------+
| ts | sum(test.val) | idc | host |
+---------------------+---------------+------+-------+
|timestamp |val | idc1 | host1 |
|timestamp |val | idc1 | host1 |
|timestamp |val | idc1 | host1 |
|timestamp |val | idc1 | host1 |
|timestamp |val | idc1 | host2 |
|timestamp |val | idc1 | host2 |
|timestamp |val | idc1 | host2 |
|timestamp |val | idc1 | host2 |
|timestamp |val | idc2 | host1 |
|timestamp |val | idc2 | host1 |
|timestamp |val | idc2 | host1 |
|timestamp |val | idc2 | host2 |
|timestamp |val | idc2 | host2 |
|timestamp |val | idc2 | host2 |
|timestamp |val | idc3 | host1 |
|timestamp |val | idc3 | host1 |
|timestamp |val | idc3 | host2 |
|timestamp |val | idc3 | host2 |
|timestamp |val | idc4 | host1 |
|timestamp |val | idc4 | host2 |
+---------------------+---------------+------+-------+

-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp
-- SQLNESS REPLACE (\s\d\s) val
TQL EVAL (0, 15, '5s') sort_by_label_desc(sum(test) by (idc, host), "idc", "host");

+---------------------+---------------+------+-------+
| ts | sum(test.val) | idc | host |
+---------------------+---------------+------+-------+
|timestamp |val | idc4 | host2 |
|timestamp |val | idc4 | host1 |
|timestamp |val | idc3 | host2 |
|timestamp |val | idc3 | host2 |
|timestamp |val | idc3 | host1 |
|timestamp |val | idc3 | host1 |
|timestamp |val | idc2 | host2 |
|timestamp |val | idc2 | host2 |
|timestamp |val | idc2 | host2 |
|timestamp |val | idc2 | host1 |
|timestamp |val | idc2 | host1 |
|timestamp |val | idc2 | host1 |
|timestamp |val | idc1 | host2 |
|timestamp |val | idc1 | host2 |
|timestamp |val | idc1 | host2 |
|timestamp |val | idc1 | host2 |
|timestamp |val | idc1 | host1 |
|timestamp |val | idc1 | host1 |
|timestamp |val | idc1 | host1 |
|timestamp |val | idc1 | host1 |
+---------------------+---------------+------+-------+

drop table test;

Affected Rows: 0

38 changes: 38 additions & 0 deletions tests/cases/standalone/common/promql/sort.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
PRIMARY KEY(host, idc),
);

INSERT INTO TABLE test VALUES
(0, 'host1', 'idc1', 1),
(0, 'host2', 'idc1', 2),
(5000, 'host1', 'idc2', 3),
(5000, 'host2', 'idc2', 4),
(10000, 'host1', 'idc3', 5),
(10000, 'host2', 'idc3', 6),
(15000, 'host1', 'idc4', 7),
(15000, 'host2', 'idc4', 8);


TQL EVAL (0, 15, '5s') sort(test{host="host1"});

TQL EVAL (0, 15, '5s') sort_desc(test{host="host1"});

-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp
TQL EVAL (0, 15, '5s') sort(sum(test{host="host2"}) by (idc));

-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp
TQL EVAL (0, 15, '5s') sort_desc(sum(test{host="host2"}) by (idc));

-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp
-- SQLNESS REPLACE (\s\d\s) val
TQL EVAL (0, 15, '5s') sort_by_label(sum(test) by (idc, host), "idc", "host");

-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp
-- SQLNESS REPLACE (\s\d\s) val
TQL EVAL (0, 15, '5s') sort_by_label_desc(sum(test) by (idc, host), "idc", "host");

drop table test;

0 comments on commit 62a8b8b

Please sign in to comment.