Skip to content

Commit

Permalink
fix: correct promql behavior on nonexistent columns (#5547)
Browse files Browse the repository at this point in the history
* Revert "fix(promql): ignore filters for non-existent labels (#5519)"

This reverts commit 33a2485.

* reimplement

Signed-off-by: Ruihang Xia <[email protected]>

* state safety

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Feb 17, 2025
1 parent deb9520 commit 4ef038d
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 114 deletions.
87 changes: 34 additions & 53 deletions src/query/src/promql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use datafusion_expr::utils::conjunction;
use datafusion_expr::SortExpr;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::Schema;
use itertools::Itertools;
use promql::extension_plan::{
build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
Expand Down Expand Up @@ -759,31 +758,26 @@ impl PromPlanner {
label_matchers: Matchers,
is_range_selector: bool,
) -> Result<LogicalPlan> {
// make table scan plan
let table_ref = self.table_ref()?;
let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
let table_schema = table_scan.schema();

// make filter exprs
let offset_duration = match offset {
Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
None => 0,
};

let time_index_filter = self.build_time_index_filter(offset_duration)?;
// make table scan with filter exprs
let table_ref = self.table_ref()?;

let moved_label_matchers = label_matchers.clone();
let mut table_scan = self
.create_table_scan_plan(table_ref.clone(), |schema| {
let mut scan_filters =
PromPlanner::matchers_to_expr(moved_label_matchers, |name| {
schema.column_index_by_name(name).is_some()
})?;
if let Some(time_index_filter) = time_index_filter {
scan_filters.push(time_index_filter);
}

Ok(scan_filters)
})
.await?;
let mut scan_filters = self.matchers_to_expr(label_matchers.clone(), table_schema)?;
if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
scan_filters.push(time_index_filter);
}
table_scan = LogicalPlanBuilder::from(table_scan)
.filter(conjunction(scan_filters).unwrap()) // Safety: `scan_filters` is not empty.
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;

// make a projection plan if there is any `__field__` matcher
if let Some(field_matchers) = &self.ctx.field_column_matcher {
Expand Down Expand Up @@ -963,20 +957,22 @@ impl PromPlanner {
}
}

/// Convert [`Matchers`] to [`DfExpr`]s.
///
/// This method will filter out the matchers that don't match the filter function.
fn matchers_to_expr<F>(label_matchers: Matchers, filter: F) -> Result<Vec<DfExpr>>
where
F: Fn(&str) -> bool,
{
// TODO(ruihang): ignore `MetricNameLabel` (`__name__`) matcher
fn matchers_to_expr(
&self,
label_matchers: Matchers,
table_schema: &DFSchemaRef,
) -> Result<Vec<DfExpr>> {
let mut exprs = Vec::with_capacity(label_matchers.matchers.len());
for matcher in label_matchers.matchers {
// ignores the matchers that don't match the filter function
if !filter(&matcher.name) {
continue;
}
let col = DfExpr::Column(Column::from_name(matcher.name));
let col = if table_schema
.field_with_unqualified_name(&matcher.name)
.is_err()
{
DfExpr::Literal(ScalarValue::Utf8(Some(String::new()))).alias(matcher.name)
} else {
DfExpr::Column(Column::from_name(matcher.name))
};
let lit = DfExpr::Literal(ScalarValue::Utf8(Some(matcher.value)));
let expr = match matcher.op {
MatchOp::Equal => col.eq(lit),
Expand Down Expand Up @@ -1076,21 +1072,14 @@ impl PromPlanner {
///
/// # Panic
/// If the filter is empty
async fn create_table_scan_plan<F>(
&mut self,
table_ref: TableReference,
filter_builder: F,
) -> Result<LogicalPlan>
where
F: FnOnce(&Schema) -> Result<Vec<DfExpr>>,
{
async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
let provider = self
.table_provider
.resolve_table(table_ref.clone())
.await
.context(CatalogSnafu)?;

let schema = provider
let is_time_index_ms = provider
.as_any()
.downcast_ref::<DefaultTableSource>()
.context(UnknownTableSnafu)?
Expand All @@ -1099,17 +1088,14 @@ impl PromPlanner {
.downcast_ref::<DfTableProviderAdapter>()
.context(UnknownTableSnafu)?
.table()
.schema();

let is_time_index_ms = schema
.schema()
.timestamp_column()
.with_context(|| TimeIndexNotFoundSnafu {
table: table_ref.to_quoted_string(),
})?
.data_type
== ConcreteDataType::timestamp_millisecond_datatype();

let filter = filter_builder(schema.as_ref())?;
let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
.context(DataFusionPlanningSnafu)?
.build()
Expand Down Expand Up @@ -1146,14 +1132,9 @@ impl PromPlanner {
.context(DataFusionPlanningSnafu)?;
}

let mut builder = LogicalPlanBuilder::from(scan_plan);
if !filter.is_empty() {
// Safety: filter is not empty, checked above
builder = builder
.filter(conjunction(filter).unwrap())
.context(DataFusionPlanningSnafu)?;
}
let result = builder.build().context(DataFusionPlanningSnafu)?;
let result = LogicalPlanBuilder::from(scan_plan)
.build()
.context(DataFusionPlanningSnafu)?;
Ok(result)
}

Expand Down
64 changes: 64 additions & 0 deletions tests/cases/standalone/common/promql/label.result
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,67 @@ DROP TABLE test;

Affected Rows: 0

CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
val BIGINT,
PRIMARY KEY(host),
);

Affected Rows: 0

INSERT INTO TABLE test VALUES
(0, 'host1', 1),
(0, 'host2', 2);

Affected Rows: 2

SELECT * FROM test;

+---------------------+-------+-----+
| ts | host | val |
+---------------------+-------+-----+
| 1970-01-01T00:00:00 | host1 | 1 |
| 1970-01-01T00:00:00 | host2 | 2 |
+---------------------+-------+-----+

-- test the non-existent matchers --
TQL EVAL (0, 1, '5s') test{job=~"host1|host3"};

++
++

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 1, '5s') test{job=~".*"};

+---------------------+-------+-----+
| ts | host | val |
+---------------------+-------+-----+
| 1970-01-01T00:00:00 | host1 | 1 |
| 1970-01-01T00:00:00 | host2 | 2 |
+---------------------+-------+-----+

TQL EVAL (0, 1, '5s') test{job=~".+"};

++
++

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 1, '5s') test{job=""};

+---------------------+-------+-----+
| ts | host | val |
+---------------------+-------+-----+
| 1970-01-01T00:00:00 | host1 | 1 |
| 1970-01-01T00:00:00 | host2 | 2 |
+---------------------+-------+-----+

TQL EVAL (0, 1, '5s') test{job!=""};

++
++

DROP TABLE test;

Affected Rows: 0

28 changes: 28 additions & 0 deletions tests/cases/standalone/common/promql/label.sql
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,31 @@ TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "idc", "$2", "idc", "(.
TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "idc", "", "", "");

DROP TABLE test;

CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
val BIGINT,
PRIMARY KEY(host),
);

INSERT INTO TABLE test VALUES
(0, 'host1', 1),
(0, 'host2', 2);

SELECT * FROM test;

-- test the non-existent matchers --
TQL EVAL (0, 1, '5s') test{job=~"host1|host3"};

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 1, '5s') test{job=~".*"};

TQL EVAL (0, 1, '5s') test{job=~".+"};

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 1, '5s') test{job=""};

TQL EVAL (0, 1, '5s') test{job!=""};

DROP TABLE test;
44 changes: 0 additions & 44 deletions tests/cases/standalone/common/promql/non_existent_matchers.result

This file was deleted.

17 changes: 0 additions & 17 deletions tests/cases/standalone/common/promql/non_existent_matchers.sql

This file was deleted.

0 comments on commit 4ef038d

Please sign in to comment.