From 5c13a432034763a580f72290d89c430d575f5060 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sat, 15 Feb 2025 08:37:22 +0800 Subject: [PATCH 1/2] feat(log-query): implement the first part of log query expr Signed-off-by: Ruihang Xia --- src/log-query/src/log_query.rs | 6 +- src/query/src/log_query/error.rs | 26 ++++ src/query/src/log_query/planner.rs | 215 +++++++++++++++++++++++++++-- src/query/src/planner.rs | 2 +- 4 files changed, 234 insertions(+), 15 deletions(-) diff --git a/src/log-query/src/log_query.rs b/src/log-query/src/log_query.rs index 988c9c27a9b4..3d498d5a5969 100644 --- a/src/log-query/src/log_query.rs +++ b/src/log-query/src/log_query.rs @@ -55,7 +55,7 @@ pub struct LogQuery { } /// Expression to calculate on log after filtering. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum LogExpr { NamedIdent(String), PositionalIdent(usize), @@ -289,7 +289,7 @@ pub struct ColumnFilters { pub filters: Vec, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum ContentFilter { // Search-based filters /// Only match the exact content. @@ -317,7 +317,7 @@ pub enum ContentFilter { Compound(Vec, BinaryOperator), } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum BinaryOperator { And, Or, diff --git a/src/query/src/log_query/error.rs b/src/query/src/log_query/error.rs index 9045d30b6805..6f5088b0241c 100644 --- a/src/query/src/log_query/error.rs +++ b/src/query/src/log_query/error.rs @@ -18,6 +18,7 @@ use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use datafusion::error::DataFusionError; +use log_query::LogExpr; use snafu::{Location, Snafu}; #[derive(Snafu)] @@ -57,6 +58,28 @@ pub enum Error { location: Location, feature: String, }, + + #[snafu(display("Unknown aggregate function: {name}"))] + UnknownAggregateFunction { + name: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Unknown scalar function: {name}"))] + UnknownScalarFunction { + name: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Unexpected log expression: {expr:?}, expected {expected}"))] + UnexpectedLogExpr { + expr: LogExpr, + expected: String, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -67,6 +90,9 @@ impl ErrorExt for Error { DataFusionPlanning { .. } => StatusCode::External, UnknownTable { .. } | TimeIndexNotFound { .. } => StatusCode::Internal, Unimplemented { .. } => StatusCode::Unsupported, + UnknownAggregateFunction { .. } + | UnknownScalarFunction { .. } + | UnexpectedLogExpr { .. } => StatusCode::InvalidArguments, } } diff --git a/src/query/src/log_query/planner.rs b/src/query/src/log_query/planner.rs index 79474fab53cb..4c832012f770 100644 --- a/src/query/src/log_query/planner.rs +++ b/src/query/src/log_query/planner.rs @@ -15,17 +15,19 @@ use catalog::table_source::DfTableSourceProvider; use common_function::utils::escape_like_pattern; use datafusion::datasource::DefaultTableSource; -use datafusion_common::ScalarValue; +use datafusion::execution::SessionState; +use datafusion_common::{DFSchema, ScalarValue}; use datafusion_expr::utils::conjunction; use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder}; use datafusion_sql::TableReference; use datatypes::schema::Schema; -use log_query::{ColumnFilters, LogQuery, TimeFilter}; +use log_query::{ColumnFilters, LogExpr, LogQuery, TimeFilter}; use snafu::{OptionExt, ResultExt}; use table::table::adapter::DfTableProviderAdapter; use crate::log_query::error::{ - CatalogSnafu, DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, UnimplementedSnafu, + CatalogSnafu, DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, UnexpectedLogExprSnafu, + UnimplementedSnafu, UnknownAggregateFunctionSnafu, UnknownScalarFunctionSnafu, UnknownTableSnafu, }; @@ -33,11 +35,15 @@ const DEFAULT_LIMIT: usize = 1000; pub struct LogQueryPlanner { table_provider: DfTableSourceProvider, + session_state: SessionState, } impl LogQueryPlanner { - pub fn new(table_provider: DfTableSourceProvider) -> Self { - Self { table_provider } + pub fn new(table_provider: DfTableSourceProvider, session_state: SessionState) -> Self { + Self { + table_provider, + session_state, + } } pub async fn query_to_plan(&mut self, query: LogQuery) -> Result { @@ -100,6 +106,54 @@ impl LogQueryPlanner { ) .context(DataFusionPlanningSnafu)?; + // Apply log expressions + for expr in &query.exprs { + match expr { + LogExpr::AggrFunc { + name, + args, + by, + range: _range, + } => { + let schema = plan_builder.schema(); + let (group_expr, aggr_exprs) = self.build_aggr_func(schema, name, args, by)?; + plan_builder = plan_builder + .aggregate([group_expr], aggr_exprs) + .context(DataFusionPlanningSnafu)?; + } + LogExpr::Filter { expr, filter } => { + let schema = plan_builder.schema(); + let expr = self.log_expr_to_df_expr(expr, &schema)?; + let col_name = expr.schema_name().to_string(); + let filter = self.build_column_filter(&ColumnFilters { + column_name: col_name, + filters: vec![filter.clone()], + })?; + if let Some(filter) = filter { + plan_builder = plan_builder + .filter(filter) + .context(DataFusionPlanningSnafu)?; + } + } + LogExpr::ScalarFunc { name, args } => { + let schema = plan_builder.schema(); + let expr = self.build_scalar_func(schema, name, args)?; + plan_builder = plan_builder + .project([expr]) + .context(DataFusionPlanningSnafu)?; + } + LogExpr::NamedIdent(_) | LogExpr::PositionalIdent(_) => { + // nothing to do + } + _ => { + UnimplementedSnafu { + feature: "log expression", + } + .fail()?; + } + } + } + // Build the final plan let plan = plan_builder.build().context(DataFusionPlanningSnafu)?; @@ -182,6 +236,61 @@ impl LogQueryPlanner { Ok(conjunction(exprs)) } + + fn build_aggr_func( + &self, + schema: &DFSchema, + fn_name: &str, + args: &[LogExpr], + by: &[LogExpr], + ) -> Result<(Expr, Vec)> { + let aggr_fn = self + .session_state + .aggregate_functions() + .get(fn_name) + .context(UnknownAggregateFunctionSnafu { + name: fn_name.to_string(), + })?; + let args = args + .iter() + .map(|expr| self.log_expr_to_df_expr(expr, schema)) + .try_collect::>()?; + let group_exprs = by + .iter() + .map(|expr| self.log_expr_to_df_expr(expr, schema)) + .try_collect::>()?; + let aggr_expr = aggr_fn.call(args); + + Ok((aggr_expr, group_exprs)) + } + + fn log_expr_to_df_expr(&self, expr: &LogExpr, schema: &DFSchema) -> Result { + match expr { + LogExpr::NamedIdent(name) => Ok(col(name)), + LogExpr::PositionalIdent(index) => Ok(col(schema.field(*index).name())), + LogExpr::Literal(literal) => Ok(lit(ScalarValue::Utf8(Some(literal.clone())))), + _ => UnexpectedLogExprSnafu { + expr: expr.clone(), + expected: "named identifier, positional identifier, or literal", + } + .fail(), + } + } + + fn build_scalar_func(&self, schema: &DFSchema, name: &str, args: &[LogExpr]) -> Result { + let args = args + .iter() + .map(|expr| self.log_expr_to_df_expr(expr, schema)) + .try_collect::>()?; + let func = self.session_state.scalar_functions().get(name).context( + UnknownScalarFunctionSnafu { + name: name.to_string(), + }, + )?; + let expr = func.call(args); + + Ok(expr) + } } #[cfg(test)] @@ -192,6 +301,7 @@ mod tests { use catalog::RegisterTableRequest; use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_query::test_util::DummyDecoder; + use datafusion::execution::SessionStateBuilder; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaRef}; use log_query::{ContentFilter, Context, Limit}; @@ -270,7 +380,8 @@ mod tests { async fn test_query_to_plan() { let table_provider = build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; - let mut planner = LogQueryPlanner::new(table_provider); + let session_state = SessionStateBuilder::new().with_default_features().build(); + let mut planner = LogQueryPlanner::new(table_provider, session_state); let log_query = LogQuery { table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"), @@ -304,7 +415,8 @@ mod tests { async fn test_build_time_filter() { let table_provider = build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; - let planner = LogQueryPlanner::new(table_provider); + let session_state = SessionStateBuilder::new().with_default_features().build(); + let planner = LogQueryPlanner::new(table_provider, session_state); let time_filter = TimeFilter { start: Some("2021-01-01T00:00:00Z".to_string()), @@ -331,7 +443,8 @@ mod tests { async fn test_build_time_filter_without_end() { let table_provider = build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; - let planner = LogQueryPlanner::new(table_provider); + let session_state = SessionStateBuilder::new().with_default_features().build(); + let planner = LogQueryPlanner::new(table_provider, session_state); let time_filter = TimeFilter { start: Some("2021-01-01T00:00:00Z".to_string()), @@ -358,7 +471,8 @@ mod tests { async fn test_build_column_filter() { let table_provider = build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; - let planner = LogQueryPlanner::new(table_provider); + let session_state = SessionStateBuilder::new().with_default_features().build(); + let planner = LogQueryPlanner::new(table_provider, session_state); let column_filter = ColumnFilters { column_name: "message".to_string(), @@ -384,7 +498,8 @@ mod tests { async fn test_query_to_plan_with_only_skip() { let table_provider = build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; - let mut planner = LogQueryPlanner::new(table_provider); + let session_state = SessionStateBuilder::new().with_default_features().build(); + let mut planner = LogQueryPlanner::new(table_provider, session_state); let log_query = LogQuery { table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"), @@ -418,7 +533,8 @@ mod tests { async fn test_query_to_plan_without_limit() { let table_provider = build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; - let mut planner = LogQueryPlanner::new(table_provider); + let session_state = SessionStateBuilder::new().with_default_features().build(); + let mut planner = LogQueryPlanner::new(table_provider, session_state); let log_query = LogQuery { table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"), @@ -455,4 +571,81 @@ mod tests { assert_eq!(escape_like_pattern("te_st"), "te\\_st"); assert_eq!(escape_like_pattern("te\\st"), "te\\\\st"); } + + #[tokio::test] + async fn test_query_to_plan_with_aggr_func() { + let table_provider = + build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; + let session_state = SessionStateBuilder::new().with_default_features().build(); + let mut planner = LogQueryPlanner::new(table_provider, session_state); + + let log_query = LogQuery { + table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"), + time_filter: TimeFilter { + start: Some("2021-01-01T00:00:00Z".to_string()), + end: Some("2021-01-02T00:00:00Z".to_string()), + span: None, + }, + filters: vec![], + limit: Limit { + skip: None, + fetch: Some(100), + }, + context: Context::None, + columns: vec![], + exprs: vec![LogExpr::AggrFunc { + name: "count".to_string(), + args: vec![LogExpr::NamedIdent("message".to_string())], + by: vec![LogExpr::NamedIdent("host".to_string())], + range: None, + }], + }; + + let plan = planner.query_to_plan(log_query).await.unwrap(); + let expected = "Aggregate: groupBy=[[count(greptime.public.test_table.message)]], aggr=[[greptime.public.test_table.host]] [count(greptime.public.test_table.message):Int64, host:Utf8;N]\ + \n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ + \n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ + \n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; + + assert_eq!(plan.display_indent_schema().to_string(), expected); + } + + #[tokio::test] + async fn test_query_to_plan_with_scalar_func() { + let table_provider = + build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; + let session_state = SessionStateBuilder::new().with_default_features().build(); + let mut planner = LogQueryPlanner::new(table_provider, session_state); + + let log_query = LogQuery { + table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"), + time_filter: TimeFilter { + start: Some("2021-01-01T00:00:00Z".to_string()), + end: Some("2021-01-02T00:00:00Z".to_string()), + span: None, + }, + filters: vec![], + limit: Limit { + skip: None, + fetch: Some(100), + }, + context: Context::None, + columns: vec![], + exprs: vec![LogExpr::ScalarFunc { + name: "date_trunc".to_string(), + args: vec![ + LogExpr::NamedIdent("timestamp".to_string()), + LogExpr::Literal("day".to_string()), + ], + }], + }; + + let plan = planner.query_to_plan(log_query).await.unwrap(); + let expected = "Projection: date_trunc(greptime.public.test_table.timestamp, Utf8(\"day\")) [date_trunc(greptime.public.test_table.timestamp,Utf8(\"day\")):Timestamp(Nanosecond, None);N]\ + \n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ + \n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ + \n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; + + assert_eq!(plan.display_indent_schema().to_string(), expected); + } } diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index a122bbf0085c..58e2bca937a2 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -220,7 +220,7 @@ impl LogicalPlanner for DfLogicalPlanner { .enable_ident_normalization, ); - let mut planner = LogQueryPlanner::new(table_provider); + let mut planner = LogQueryPlanner::new(table_provider, self.session_state.clone()); planner .query_to_plan(query) .await From 03a8a5232dc86fcf01684046811d93901eaa8777 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 16 Feb 2025 02:32:28 +0800 Subject: [PATCH 2/2] fix clippy Signed-off-by: Ruihang Xia --- src/query/src/log_query/planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/src/log_query/planner.rs b/src/query/src/log_query/planner.rs index 4c832012f770..58db2fa76f47 100644 --- a/src/query/src/log_query/planner.rs +++ b/src/query/src/log_query/planner.rs @@ -123,7 +123,7 @@ impl LogQueryPlanner { } LogExpr::Filter { expr, filter } => { let schema = plan_builder.schema(); - let expr = self.log_expr_to_df_expr(expr, &schema)?; + let expr = self.log_expr_to_df_expr(expr, schema)?; let col_name = expr.schema_name().to_string(); let filter = self.build_column_filter(&ColumnFilters { column_name: col_name,