Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(su): reimplement app level metrics #1088

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions servers/su/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions servers/su/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ actix-cors = "0.6.0"
simd-json = "0.13.10"
futures = "0.3.30"
rocksdb = "0.22.0"
prometheus = { version = "0.13.4", features = ["process"] }
lru = "0.12.4"
lazy_static = "1.5.0"
avro-rs = "0.13.0"
Expand Down
81 changes: 50 additions & 31 deletions servers/su/src/domain/clients/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::super::config::AoConfig;
use super::super::core::dal::CoreMetrics;
// use prometheus::{HistogramOpts, HistogramVec, IntCounter, Registry};
use prometheus::{HistogramOpts, HistogramVec, IntCounter, Registry, TextEncoder};

/*
Implementation of metrics
Expand All @@ -11,55 +11,74 @@ use super::super::core::dal::CoreMetrics;

pub struct PromMetrics {
enabled: bool,
// core_metrics: HistogramVec,
// message_save_failures: IntCounter,
core_metrics: HistogramVec,
message_save_failures: IntCounter,
registry: Registry
}

impl PromMetrics {
pub fn new(config: AoConfig) -> Self {
// let registry = Registry::new();
let registry = Registry::new();

// // Define the options for the histogram, with buckets in milliseconds
// let histogram_opts = HistogramOpts::new(
// "core_metrics_duration_milliseconds",
// "Histogram of durations for core metrics functions in milliseconds",
// )
// .buckets(vec![
// 0.0, 1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 5500.0,
// 6000.0, 6500.0, 7000.0, 7500.0, 8000.0, 8500.0, 9000.0, 9500.0, 10000.0,
// ])
// .namespace("su");
// Define the options for the histogram, with buckets in milliseconds
let histogram_opts = HistogramOpts::new(
"core_metrics_duration_milliseconds",
"Histogram of durations for core metrics functions in milliseconds",
)
.buckets(vec![
0.0, 1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 5500.0,
6000.0, 6500.0, 7000.0, 7500.0, 8000.0, 8500.0, 9000.0, 9500.0, 10000.0,
])
.namespace("su");

// // Create a HistogramVec with labels for the different core metric functions
// let core_metrics = HistogramVec::new(histogram_opts, &["function_name"]).unwrap();
// Create a HistogramVec with labels for the different core metric functions
let core_metrics = HistogramVec::new(histogram_opts, &["function_name"]).unwrap();

// // Register the HistogramVec with the provided registry
// registry.register(Box::new(core_metrics.clone())).unwrap();
// Register the HistogramVec with the provided registry
registry.register(Box::new(core_metrics.clone())).unwrap();

// let message_save_failures: IntCounter =
// IntCounter::new("message_save_failures", "message save failure count").unwrap();
let message_save_failures: IntCounter =
IntCounter::new("message_save_failures", "message save failure count").unwrap();

// // Register the IntCounter with the provided registry
// registry
// .register(Box::new(message_save_failures.clone()))
// .unwrap();
// Register the IntCounter with the provided registry
registry
.register(Box::new(message_save_failures.clone()))
.unwrap();

PromMetrics {
enabled: config.enable_metrics,
// core_metrics,
// message_save_failures,
core_metrics,
message_save_failures,
registry,
}
}

fn observe_duration(&self, _function_name: &str, _duration: u128) {
fn observe_duration(&self, function_name: &str, duration: u128) {
if !self.enabled {
return;
}

// Observe the duration in milliseconds directly
// self.core_metrics
// .with_label_values(&[function_name])
// .observe(duration as f64);
self.core_metrics
.with_label_values(&[function_name])
.observe(duration as f64);
}

pub fn emit_metrics(&self) -> Result<String, String> {
if !self.enabled {
return Err("Metrics not enabled".to_string());
}

let encoder = TextEncoder::new();

let metric_families = self.registry.gather();

let mut buffer = String::new();
if let Err(err) = encoder.encode_utf8(&metric_families, &mut buffer) {
return Err(format!("Failed to encode metrics: {}", err));
}

Ok(buffer)
}
}

Expand Down Expand Up @@ -93,6 +112,6 @@ impl CoreMetrics for PromMetrics {
}

fn failed_message_save(&self) {
// self.message_save_failures.inc();
self.message_save_failures.inc();
}
}
7 changes: 4 additions & 3 deletions servers/su/src/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub use flows::Deps;
pub use local_store::migration::migrate_to_local;
pub use store::migrate_to_disk;

pub async fn init_deps(mode: Option<String>) -> Arc<Deps> {
pub async fn init_deps(mode: Option<String>) -> (Arc<Deps>, Arc<PromMetrics>) {
let logger: Arc<dyn Log> = SuLog::init();

let config = Arc::new(AoConfig::new(mode.clone()).expect("Failed to read configuration"));
Expand Down Expand Up @@ -100,8 +100,9 @@ pub async fn init_deps(mode: Option<String>) -> Arc<Deps> {
let metrics = Arc::new(PromMetrics::new(
AoConfig::new(mode).expect("Failed to read configuration"),
));
let metrics_clone = metrics.clone();

Arc::new(Deps {
(Arc::new(Deps {
data_store: main_data_store,
router_data_store,
logger,
Expand All @@ -112,5 +113,5 @@ pub async fn init_deps(mode: Option<String>) -> Arc<Deps> {
wallet,
uploader,
metrics,
})
}), metrics_clone)
}
20 changes: 17 additions & 3 deletions servers/su/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use actix_web::{
use serde::Deserialize;
use serde_json::json;

use su::domain::{flows, init_deps, router, Deps};
use su::domain::{flows, init_deps, router, Deps, PromMetrics};

#[derive(Deserialize)]
struct FromTo {
Expand Down Expand Up @@ -213,8 +213,21 @@ async fn health_check() -> impl Responder {
HttpResponse::Ok()
}

async fn metrics_route(data: web::Data<AppState>) -> impl Responder {
let result = data.metrics.emit_metrics();
match result {
Ok(metrics_str) => HttpResponse::Ok()
.content_type("application/openmetrics-text; version=1.0.0; charset=utf-8")
.body(metrics_str),
Err(err) => HttpResponse::BadRequest()
.content_type("text/plain")
.body(err),
}
}

struct AppState {
deps: Arc<Deps>,
metrics: Arc<PromMetrics>,
}

#[actix_web::main]
Expand All @@ -239,8 +252,8 @@ async fn main() -> io::Result<()> {
}
};

let deps = init_deps(mode).await;
let app_state = web::Data::new(AppState { deps });
let (deps, metrics) = init_deps(mode).await;
let app_state = web::Data::new(AppState { deps, metrics });

let run_deps = app_state.deps.clone();

Expand All @@ -266,6 +279,7 @@ async fn main() -> io::Result<()> {
.route("/", web::post().to(main_post_route))
.route("/timestamp", web::get().to(timestamp_route))
.route("/health", web::get().to(health_check))
.route("/metrics", web::get().to(metrics_route))
.route("/{tx_id}", web::get().to(main_get_route))
.route("/processes/{process_id}", web::get().to(read_process_route))
})
Expand Down
Binary file modified servers/su/su
Binary file not shown.