Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: correct promql behavior on nonexistent columns #5547

Merged
merged 3 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 35 additions & 53 deletions src/query/src/promql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use datafusion_expr::utils::conjunction;
use datafusion_expr::SortExpr;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::Schema;
use itertools::Itertools;
use promql::extension_plan::{
build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
Expand Down Expand Up @@ -759,31 +758,26 @@ impl PromPlanner {
label_matchers: Matchers,
is_range_selector: bool,
) -> Result<LogicalPlan> {
// make table scan plan
let table_ref = self.table_ref()?;
let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
let table_schema = table_scan.schema();

// make filter exprs
let offset_duration = match offset {
Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
None => 0,
};

let time_index_filter = self.build_time_index_filter(offset_duration)?;
// make table scan with filter exprs
let table_ref = self.table_ref()?;

let moved_label_matchers = label_matchers.clone();
let mut table_scan = self
.create_table_scan_plan(table_ref.clone(), |schema| {
let mut scan_filters =
PromPlanner::matchers_to_expr(moved_label_matchers, |name| {
schema.column_index_by_name(name).is_some()
})?;
if let Some(time_index_filter) = time_index_filter {
scan_filters.push(time_index_filter);
}

Ok(scan_filters)
})
.await?;
let mut scan_filters = self.matchers_to_expr(label_matchers.clone(), table_schema)?;
if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
scan_filters.push(time_index_filter);
}
table_scan = LogicalPlanBuilder::from(table_scan)
.filter(conjunction(scan_filters).unwrap())
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;

// make a projection plan if there is any `__field__` matcher
if let Some(field_matchers) = &self.ctx.field_column_matcher {
Expand Down Expand Up @@ -963,20 +957,22 @@ impl PromPlanner {
}
}

/// Convert [`Matchers`] to [`DfExpr`]s.
///
/// This method will filter out the matchers that don't match the filter function.
fn matchers_to_expr<F>(label_matchers: Matchers, filter: F) -> Result<Vec<DfExpr>>
where
F: Fn(&str) -> bool,
{
// TODO(ruihang): ignore `MetricNameLabel` (`__name__`) matcher
fn matchers_to_expr(
&self,
label_matchers: Matchers,
table_schema: &DFSchemaRef,
) -> Result<Vec<DfExpr>> {
let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
for matcher in label_matchers.matchers {
// ignores the matchers that don't match the filter function
if !filter(&matcher.name) {
continue;
}
let col = DfExpr::Column(Column::from_name(matcher.name));
let col = if table_schema
.field_with_unqualified_name(&matcher.name)
.is_err()
{
DfExpr::Literal(ScalarValue::Utf8(Some(String::new()))).alias(matcher.name)
} else {
DfExpr::Column(Column::from_name(matcher.name))
};
let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)));
let expr = match matcher.op {
MatchOp::Equal => col.eq(lit),
Expand Down Expand Up @@ -1070,21 +1066,14 @@ impl PromPlanner {
///
/// # Panic
/// If the filter is empty
async fn create_table_scan_plan<F>(
&mut self,
table_ref: TableReference,
filter_builder: F,
) -> Result<LogicalPlan>
where
F: FnOnce(&Schema) -> Result<Vec<DfExpr>>,
{
async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
let provider = self
.table_provider
.resolve_table(table_ref.clone())
.await
.context(CatalogSnafu)?;

let schema = provider
let is_time_index_ms = provider
.as_any()
.downcast_ref::<DefaultTableSource>()
.context(UnknownTableSnafu)?
Expand All @@ -1093,17 +1082,14 @@ impl PromPlanner {
.downcast_ref::<DfTableProviderAdapter>()
.context(UnknownTableSnafu)?
.table()
.schema();

let is_time_index_ms = schema
.schema()
.timestamp_column()
.with_context(|| TimeIndexNotFoundSnafu {
table: table_ref.to_quoted_string(),
})?
.data_type
== ConcreteDataType::timestamp_millisecond_datatype();

let filter = filter_builder(schema.as_ref())?;
let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
.context(DataFusionPlanningSnafu)?
.build()
Expand Down Expand Up @@ -1140,14 +1126,10 @@ impl PromPlanner {
.context(DataFusionPlanningSnafu)?;
}

let mut builder = LogicalPlanBuilder::from(scan_plan);
if !filter.is_empty() {
// Safety: filter is not empty, checked above
builder = builder
.filter(conjunction(filter).unwrap())
.context(DataFusionPlanningSnafu)?;
}
let result = builder.build().context(DataFusionPlanningSnafu)?;
// Safety: `scan_filters` is not empty.
let result = LogicalPlanBuilder::from(scan_plan)
.build()
.context(DataFusionPlanningSnafu)?;
Ok(result)
}

Expand Down
64 changes: 64 additions & 0 deletions tests/cases/standalone/common/promql/label.result
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,67 @@ DROP TABLE test;

Affected Rows: 0

CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
val BIGINT,
PRIMARY KEY(host),
);

Affected Rows: 0

INSERT INTO TABLE test VALUES
(0, 'host1', 1),
(0, 'host2', 2);

Affected Rows: 2

SELECT * FROM test;

+---------------------+-------+-----+
| ts | host | val |
+---------------------+-------+-----+
| 1970-01-01T00:00:00 | host1 | 1 |
| 1970-01-01T00:00:00 | host2 | 2 |
+---------------------+-------+-----+

-- test the non-existent matchers --
TQL EVAL (0, 1, '5s') test{job=~"host1|host3"};

++
++

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 1, '5s') test{job=~".*"};

+---------------------+-------+-----+
| ts | host | val |
+---------------------+-------+-----+
| 1970-01-01T00:00:00 | host1 | 1 |
| 1970-01-01T00:00:00 | host2 | 2 |
+---------------------+-------+-----+

TQL EVAL (0, 1, '5s') test{job=~".+"};

++
++

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 1, '5s') test{job=""};

+---------------------+-------+-----+
| ts | host | val |
+---------------------+-------+-----+
| 1970-01-01T00:00:00 | host1 | 1 |
| 1970-01-01T00:00:00 | host2 | 2 |
+---------------------+-------+-----+

TQL EVAL (0, 1, '5s') test{job!=""};

++
++

DROP TABLE test;

Affected Rows: 0

28 changes: 28 additions & 0 deletions tests/cases/standalone/common/promql/label.sql
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,31 @@ TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "idc", "$2", "idc", "(.
TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "idc", "", "", "");

DROP TABLE test;

CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
val BIGINT,
PRIMARY KEY(host),
);

INSERT INTO TABLE test VALUES
(0, 'host1', 1),
(0, 'host2', 2);

SELECT * FROM test;

-- test the non-existent matchers --
TQL EVAL (0, 1, '5s') test{job=~"host1|host3"};

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 1, '5s') test{job=~".*"};

TQL EVAL (0, 1, '5s') test{job=~".+"};

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 1, '5s') test{job=""};

TQL EVAL (0, 1, '5s') test{job!=""};

DROP TABLE test;

This file was deleted.

17 changes: 0 additions & 17 deletions tests/cases/standalone/common/promql/non_existent_matchers.sql

This file was deleted.

Loading