Skip to content

Commit

Permalink
feat: collect stager metrics (#5553)
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag authored Feb 17, 2025
1 parent f359eeb commit 01492a9
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 5 deletions.
71 changes: 71 additions & 0 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use lazy_static::lazy_static;
use prometheus::*;
use puffin::puffin_manager::stager::StagerNotifier;

/// Stage label.
pub const STAGE_LABEL: &str = "stage";
Expand All @@ -28,6 +31,10 @@ pub const FILE_TYPE_LABEL: &str = "file_type";
pub const WORKER_LABEL: &str = "worker";
/// Partition label.
pub const PARTITION_LABEL: &str = "partition";
/// Staging dir type label.
pub const STAGING_TYPE: &str = "index_staging";
/// Recycle bin type label.
pub const RECYCLE_TYPE: &str = "recycle_bin";

lazy_static! {
/// Global write buffer size in bytes.
Expand Down Expand Up @@ -381,3 +388,67 @@ lazy_static! {
exponential_buckets(0.01, 10.0, 6).unwrap(),
).unwrap();
}

/// Stager notifier to collect metrics.
pub struct StagerMetrics {
cache_hit: IntCounter,
cache_miss: IntCounter,
staging_cache_bytes: IntGauge,
recycle_cache_bytes: IntGauge,
cache_eviction: IntCounter,
staging_miss_read: Histogram,
}

impl StagerMetrics {
/// Creates a new stager notifier.
pub fn new() -> Self {
Self {
cache_hit: CACHE_HIT.with_label_values(&[STAGING_TYPE]),
cache_miss: CACHE_MISS.with_label_values(&[STAGING_TYPE]),
staging_cache_bytes: CACHE_BYTES.with_label_values(&[STAGING_TYPE]),
recycle_cache_bytes: CACHE_BYTES.with_label_values(&[RECYCLE_TYPE]),
cache_eviction: CACHE_EVICTION.with_label_values(&[STAGING_TYPE, "size"]),
staging_miss_read: READ_STAGE_ELAPSED.with_label_values(&["staging_miss_read"]),
}
}
}

impl Default for StagerMetrics {
fn default() -> Self {
Self::new()
}
}

impl StagerNotifier for StagerMetrics {
fn on_cache_hit(&self, _size: u64) {
self.cache_hit.inc();
}

fn on_cache_miss(&self, _size: u64) {
self.cache_miss.inc();
}

fn on_cache_insert(&self, size: u64) {
self.staging_cache_bytes.add(size as i64);
}

fn on_load_dir(&self, duration: Duration) {
self.staging_miss_read.observe(duration.as_secs_f64());
}

fn on_load_blob(&self, duration: Duration) {
self.staging_miss_read.observe(duration.as_secs_f64());
}

fn on_cache_evict(&self, _size: u64) {
self.cache_eviction.inc();
}

fn on_recycle_insert(&self, size: u64) {
self.recycle_cache_bytes.add(size as i64);
}

fn on_recycle_clear(&self, size: u64) {
self.recycle_cache_bytes.sub(size as i64);
}
}
14 changes: 9 additions & 5 deletions src/mito2/src/sst/index/puffin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use snafu::ResultExt;

use crate::error::{PuffinInitStagerSnafu, Result};
use crate::metrics::{
INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL, INDEX_PUFFIN_READ_OP_TOTAL,
INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL,
StagerMetrics, INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL,
INDEX_PUFFIN_READ_OP_TOTAL, INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL,
};
use crate::sst::index::store::{self, InstrumentedStore};

Expand Down Expand Up @@ -63,9 +63,13 @@ impl PuffinManagerFactory {
write_buffer_size: Option<usize>,
) -> Result<Self> {
let staging_dir = aux_path.as_ref().join(STAGING_DIR);
let stager = BoundedStager::new(staging_dir, staging_capacity, None)
.await
.context(PuffinInitStagerSnafu)?;
let stager = BoundedStager::new(
staging_dir,
staging_capacity,
Some(Arc::new(StagerMetrics::default())),
)
.await
.context(PuffinInitStagerSnafu)?;
Ok(Self {
stager: Arc::new(stager),
write_buffer_size,
Expand Down

0 comments on commit 01492a9

Please sign in to comment.