Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into 0212-oidc
Browse files Browse the repository at this point in the history
  • Loading branch information
HangyuanLiu committed Feb 17, 2025
2 parents f3179bd + e9d315d commit c8ed522
Show file tree
Hide file tree
Showing 139 changed files with 1,888 additions and 936 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ This project is used by the following companies. Learn more about their use case
- [Demandbase](https://starrocks.medium.com/demandbase-ditches-denormalization-by-switching-off-clickhouse-44195d795a83)
- [Shopee](https://celerdata.com/blog/how-shopee-3xed-their-query-performance-with-starrocks)
- [Naver](https://starrocks.medium.com/how-join-changed-how-we-approach-data-infra-at-naver-3a5bb1dac49f)
- [HerdWatch](https://medium.com/p/a7916a7e87bf)
- [TRM Labs](https://www.trmlabs.com/post/from-bigquery-to-lakehouse-how-we-built-a-petabyte-scale-data-analytics-platform-part-1)
- [Verisoul](https://celerdata.com/blog/verisoul-enables-real-time-analytics-by-transitioning-off-bigquery)
- [Trip.com](https://starrocks.medium.com/trip-com-starrocks-efficiently-supports-high-concurrent-queries-dramatically-reduces-labor-and-1e1921dd6bf8)
- [Didi](https://www.starrocks.io/blog/reduced-80-cost-didis-journey-from-multiple-olap-engines-to-starrocks)
5 changes: 2 additions & 3 deletions be/src/bench/get_dict_codes_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include <benchmark/benchmark.h>

#include <map>
#include <memory>
#include <random>
#include <vector>

Expand Down Expand Up @@ -75,7 +74,7 @@ static void BM_GetDictCodesWithMap(benchmark::State& state) {
if (column->size() == 0) {
continue;
}
const std::vector<uint8_t>& null_data = down_cast<NullableColumn*>(column.get())->immutable_null_column_data();
const auto& null_data = down_cast<NullableColumn*>(column.get())->immutable_null_column_data();
bool has_null = column->has_null();
bool all_null = false;

Expand All @@ -90,7 +89,7 @@ static void BM_GetDictCodesWithMap(benchmark::State& state) {

auto* dict_nullable_column = down_cast<NullableColumn*>(column.get());
auto* dict_value_binary_column = down_cast<BinaryColumn*>(dict_nullable_column->data_column().get());
std::vector<Slice> dict_values_filtered = dict_value_binary_column->get_data();
auto dict_values_filtered = dict_value_binary_column->get_data();
if (!has_null) {
dict_codes.reserve(dict_values_filtered.size());
for (size_t i = 0; i < dict_values_filtered.size(); i++) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/bench/parquet_dict_decode_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ static void BM_DictDecoder(benchmark::State& state) {
std::random_device rd;
std::mt19937 rng(rd());
std::uniform_int_distribution<int> dist(0, 99);
std::vector<int32_t> dict_codes;
Buffer<int32_t> dict_codes;
int count = 0;

for (int i = 0; i < kTestChunkSize; i++) {
Expand Down
16 changes: 0 additions & 16 deletions be/src/block_cache/datacache_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,22 +124,6 @@ Status DataCacheUtils::parse_conf_datacache_disk_paths(const std::string& config
return Status::OK();
}

Status DataCacheUtils::parse_conf_datacache_disk_spaces(const std::string& config_disk_path,
const std::string& config_disk_size, bool ignore_broken_disk,
std::vector<DirSpace>* disk_spaces) {
std::vector<std::string> paths;
RETURN_IF_ERROR(parse_conf_datacache_disk_paths(config_disk_path, &paths, ignore_broken_disk));
for (auto& p : paths) {
int64_t disk_size = parse_conf_datacache_disk_size(p, config_disk_size, -1);
if (disk_size < 0) {
LOG(ERROR) << "invalid disk size for datacache: " << disk_size;
return Status::InvalidArgument("invalid disk size for datacache");
}
disk_spaces->push_back({.path = p, .size = static_cast<size_t>(disk_size)});
}
return Status::OK();
}

void DataCacheUtils::clean_residual_datacache(const std::string& disk_path) {
if (!FileSystem::Default()->path_exists(disk_path).ok()) {
// ignore none existed disk path
Expand Down
4 changes: 0 additions & 4 deletions be/src/block_cache/datacache_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ class DataCacheUtils {
static Status parse_conf_datacache_disk_paths(const std::string& config_path, std::vector<std::string>* paths,
bool ignore_broken_disk);

static Status parse_conf_datacache_disk_spaces(const std::string& config_disk_path,
const std::string& config_disk_size, bool ignore_broken_disk,
std::vector<DirSpace>* disk_spaces);

static void clean_residual_datacache(const std::string& disk_path);

static Status change_disk_path(const std::string& old_disk_path, const std::string& new_disk_path);
Expand Down
7 changes: 3 additions & 4 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,6 @@ CONF_Int32(be_service_threads, "64");
// key=value pair of default query options for StarRocks, separated by ','
CONF_String(default_query_options, "");

// If non-zero, StarRocks will output memory usage every log_mem_usage_interval'th fragment completion.
// CONF_Int32(log_mem_usage_interval, "0");

// Controls the number of threads to run work per core. It's common to pick 2x
// or 3x the number of cores. This keeps the cores busy without causing excessive
// thrashing.
Expand Down Expand Up @@ -1183,7 +1180,7 @@ CONF_Double(datacache_scheduler_threads_per_cpu, "0.125");
// If false, the raw data will be written to disk directly and read from disk without promotion.
// For object data, such as parquet footer object, which can only be cached in memory are not affected
// by this configuration.
CONF_Bool(datacache_tiered_cache_enable, "true");
CONF_Bool(datacache_tiered_cache_enable, "false");
// Whether to persist cached data
CONF_Bool(datacache_persistence_enable, "true");
// DataCache engines, alternatives: starcache.
Expand Down Expand Up @@ -1566,4 +1563,6 @@ CONF_mInt32(json_parse_many_batch_size, "1000000");
CONF_mBool(enable_dynamic_batch_size_for_json_parse_many, "true");
CONF_mInt32(put_combined_txn_log_thread_pool_num_max, "64");
CONF_mBool(enable_put_combinded_txn_log_parallel, "false");
// used to control whether the metrics/ interface collects table metrics
CONF_mBool(enable_collect_table_metrics, "true");
} // namespace starrocks::config
11 changes: 11 additions & 0 deletions be/src/exec/connector_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ Status ConnectorScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
}
_estimate_scan_row_bytes();

if (tnode.__isset.lake_scan_node) {
if (tnode.lake_scan_node.__isset.enable_topn_filter_back_pressure &&
tnode.lake_scan_node.enable_topn_filter_back_pressure) {
_enable_topn_filter_back_pressure = true;
_back_pressure_max_rounds = tnode.lake_scan_node.back_pressure_max_rounds;
_back_pressure_num_rows = tnode.lake_scan_node.back_pressure_num_rows;
_back_pressure_throttle_time = tnode.lake_scan_node.back_pressure_throttle_time;
_back_pressure_throttle_time_upper_bound = tnode.lake_scan_node.back_pressure_throttle_time_upper_bound;
}
}

return Status::OK();
}

Expand Down
9 changes: 9 additions & 0 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
}
}

if (tnode.olap_scan_node.__isset.enable_topn_filter_back_pressure &&
tnode.olap_scan_node.enable_topn_filter_back_pressure) {
_enable_topn_filter_back_pressure = true;
_back_pressure_max_rounds = tnode.olap_scan_node.back_pressure_max_rounds;
_back_pressure_num_rows = tnode.olap_scan_node.back_pressure_num_rows;
_back_pressure_throttle_time = tnode.olap_scan_node.back_pressure_throttle_time;
_back_pressure_throttle_time_upper_bound = tnode.olap_scan_node.back_pressure_throttle_time_upper_bound;
}

_estimate_scan_and_output_row_bytes();

return Status::OK();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class AggregateStreamingSinkOperator : public Operator {
AggrAutoState _auto_state{};
AggrAutoContext _auto_context;
LimitedMemAggState _limited_mem_state;
DECLARE_ONCE_DETECTOR(_set_finishing_once);
};

class AggregateStreamingSinkOperatorFactory final : public OperatorFactory {
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/pipeline/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class Operator {
Status eval_conjuncts(const std::vector<ExprContext*>& conjuncts, Chunk* chunk, FilterPtr* filter = nullptr);

// equal to ExecNode::eval_join_runtime_filters, is used to apply bloom-filters to Operators.
void eval_runtime_bloom_filters(Chunk* chunk);
virtual void eval_runtime_bloom_filters(Chunk* chunk);

// Pseudo plan_node_id for final sink, such as result_sink, table_sink
static const int32_t s_pseudo_plan_node_id_for_final_sink;
Expand Down Expand Up @@ -276,6 +276,8 @@ class Operator {
void set_observer(PipelineObserver* observer) { _observer = observer; }
PipelineObserver* observer() const { return _observer; }

void _init_rf_counters(bool init_bloom);

protected:
OperatorFactory* _factory;
const int32_t _id;
Expand Down Expand Up @@ -334,7 +336,6 @@ class Operator {
PipelineObserver* _observer = nullptr;

private:
void _init_rf_counters(bool init_bloom);
void _init_conjuct_counters();

std::shared_ptr<MemTracker> _mem_tracker;
Expand Down
18 changes: 18 additions & 0 deletions be/src/exec/pipeline/scan/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ Status ScanOperator::prepare(RuntimeState* state) {
_prepare_chunk_source_timer = ADD_TIMER(_unique_metrics, "PrepareChunkSourceTime");
_submit_io_task_timer = ADD_TIMER(_unique_metrics, "SubmitTaskTime");

if (_scan_node->is_enable_topn_filter_back_pressure()) {
if (auto* runtime_filters = runtime_bloom_filters(); runtime_filters != nullptr) {
auto has_topn_filters =
std::any_of(runtime_filters->descriptors().begin(), runtime_filters->descriptors().end(),
[](const auto& e) { return e.second->is_topn_filter(); });
if (has_topn_filters) {
_topn_filter_back_pressure = std::make_unique<TopnRfBackPressure>(
0.1, _scan_node->get_back_pressure_throttle_time_upper_bound(),
_scan_node->get_back_pressure_max_rounds(), _scan_node->get_back_pressure_throttle_time(),
_scan_node->get_back_pressure_num_rows());
}
}
}
RETURN_IF_ERROR(do_prepare(state));
return Status::OK();
}
Expand Down Expand Up @@ -139,6 +152,10 @@ bool ScanOperator::has_output() const {
return true;
}

if (!_morsel_queue->empty() && _topn_filter_back_pressure && _topn_filter_back_pressure->should_throttle()) {
return false;
}

// Try to buffer enough chunks for exec thread, to reduce scheduling overhead.
// It's like the Linux Block-Scheduler's Unplug algorithm, so we just name it unplug.
// The default threshould of unpluging is BufferCapacity/DOP/4, and its range is [1, 16]
Expand Down Expand Up @@ -275,6 +292,7 @@ StatusOr<ChunkPtr> ScanOperator::pull_chunk(RuntimeState* state) {
begin_pull_chunk(res);
// for query cache mechanism, we should emit EOS chunk when we receive the last chunk.
auto [owner_id, is_eos] = _should_emit_eos(res);
evaluate_topn_runtime_filters(res.get());
eval_runtime_bloom_filters(res.get());
res->owner_info().set_owner_id(owner_id, is_eos);
}
Expand Down
57 changes: 56 additions & 1 deletion be/src/exec/pipeline/scan/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

#pragma once

#include "exec/exec_node.h"
#include "exec/pipeline/pipeline_fwd.h"
#include "exec/pipeline/scan/balanced_chunk_buffer.h"
#include "exec/pipeline/source_operator.h"
#include "exec/pipeline/topn_runtime_filter_back_pressure.h"
#include "exec/query_cache/cache_operator.h"
#include "exec/query_cache/lane_arbiter.h"
#include "exec/workgroup/work_group_fwd.h"
Expand Down Expand Up @@ -171,6 +173,57 @@ class ScanOperator : public SourceOperator {
return _scan_status;
}

void evaluate_topn_runtime_filters(Chunk* chunk) {
if (chunk == nullptr || chunk->is_empty() || !_topn_filter_back_pressure) {
return;
}
if (auto* topn_runtime_filters = runtime_bloom_filters()) {
auto input_num_rows = chunk->num_rows();
_init_topn_runtime_filter_counters();
topn_runtime_filters->evaluate(chunk, _topn_filter_eval_context);
_topn_filter_back_pressure->inc_num_rows(chunk->num_rows());
if (_topn_filter_eval_context.selectivity.empty()) {
_topn_filter_back_pressure->update_selectivity(1.0);
} else {
double selectivity = _topn_filter_eval_context.selectivity.begin()->first;
if (input_num_rows > 1024) {
_topn_filter_back_pressure->update_selectivity(selectivity);
}
}
}
}

void _init_topn_runtime_filter_counters() {
if (_topn_filter_eval_context.join_runtime_filter_timer == nullptr) {
_topn_filter_eval_context.mode = RuntimeBloomFilterEvalContext::Mode::M_ONLY_TOPN;
_topn_filter_eval_context.join_runtime_filter_timer = ADD_TIMER(_common_metrics, "TopnRuntimeFilterTime");
_topn_filter_eval_context.join_runtime_filter_hash_timer =
ADD_TIMER(_common_metrics, "TopnRuntimeFilterHashTime");
_topn_filter_eval_context.join_runtime_filter_input_counter =
ADD_COUNTER(_common_metrics, "TopnRuntimeFilterInputRows", TUnit::UNIT);
_topn_filter_eval_context.join_runtime_filter_output_counter =
ADD_COUNTER(_common_metrics, "TopnRuntimeFilterOutputRows", TUnit::UNIT);
_topn_filter_eval_context.join_runtime_filter_eval_counter =
ADD_COUNTER(_common_metrics, "TopnRuntimeFilterEvaluate", TUnit::UNIT);
_topn_filter_eval_context.driver_sequence = _runtime_filter_probe_sequence;
}
}

void eval_runtime_bloom_filters(Chunk* chunk) override {
if (chunk == nullptr || chunk->is_empty()) {
return;
}

if (auto* bloom_filters = runtime_bloom_filters()) {
_init_rf_counters(true);
if (_topn_filter_back_pressure) {
_bloom_filter_eval_context.mode = RuntimeBloomFilterEvalContext::Mode::M_WITHOUT_TOPN;
}
bloom_filters->evaluate(chunk, _bloom_filter_eval_context);
}
ExecNode::eval_filter_null_values(chunk, filter_null_value_columns());
}

protected:
ScanNode* _scan_node = nullptr;
const int32_t _dop;
Expand Down Expand Up @@ -234,6 +287,9 @@ class ScanOperator : public SourceOperator {
RuntimeProfile::Counter* _prepare_chunk_source_timer = nullptr;
RuntimeProfile::Counter* _submit_io_task_timer = nullptr;

RuntimeBloomFilterEvalContext _topn_filter_eval_context;
std::unique_ptr<TopnRfBackPressure> _topn_filter_back_pressure = nullptr;

DECLARE_RACE_DETECTOR(race_pull_chunk)
};

Expand Down Expand Up @@ -261,7 +317,6 @@ class ScanOperatorFactory : public SourceOperatorFactory {

protected:
ScanNode* const _scan_node;

std::shared_ptr<workgroup::ScanTaskGroup> _scan_task_group;
};

Expand Down
Loading

0 comments on commit c8ed522

Please sign in to comment.