Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(su): run formatter #888 #956

Merged
merged 1 commit into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
309 changes: 161 additions & 148 deletions servers/su/Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions servers/su/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jsonwebkey = "0.3.5"
hex = "0.4.3"
ring = "0.16.20"
tokio = "1.34.0"
env_logger = "0.10.1"
env_logger = "0.11.5"
log = "0.4.20"
rsa = "0.6.1"
dashmap = "5.5.3"
Expand All @@ -32,8 +32,8 @@ actix-cors = "0.6.0"
simd-json = "0.13.10"
futures = "0.3.30"
rocksdb = "0.22.0"
prometheus-client = "0.22.3"
sysinfo = "0.30.13"
actix-web-prom = { version = "0.8.0", features = ["process"] }
prometheus = { version = "0.13.4", features = ["process"] }

[[bin]]
name = "su"
Expand Down
4 changes: 2 additions & 2 deletions servers/su/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Create a .env file with the following variables, or set them in the OS:
- `USE_DISK` whether or not to write and read rocksdb, this is a performance enhancement for the data storage layer
- `SU_DATA_DIR` if `USE_DISK` is `true`, this is where rocksdb will be initialized
- `MIGRATION_BATCH_SIZE` when running the migration binary how many to fetch at once from postgres
- `ENABLE_METRICS` enable prometheus metrics to be available on the `/metrics` endpoint
- `ENABLE_METRICS` enable application level prometheus metrics to be available on the `/metrics` endpoint

> You can also use a `.env` file to set environment variables when running in
> development mode, See the `.env.example` for an example `.env`
Expand Down Expand Up @@ -122,7 +122,7 @@ in the container.
- `USE_DISK` whether or not to write and read rocksdb, this is a performance enhancement for the data storage layer
- `SU_DATA_DIR` if `USE_DISK` is `true`, this is where rocksdb will be initialized
- `MIGRATION_BATCH_SIZE` when running the migration binary how many to fetch at once from postgres
- `ENABLE_METRICS` enable prometheus metrics to be available on the `/metrics` endpoint
- `ENABLE_METRICS` enable application level prometheus metrics to be available on the `/metrics` endpoint



Expand Down
271 changes: 36 additions & 235 deletions servers/su/src/domain/clients/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,285 +1,86 @@
use prometheus_client::encoding::text::encode;
use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::histogram::Histogram;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::registry::Registry;
use sysinfo::System;

use super::super::config::AoConfig;
use super::super::core::dal::CoreMetrics;

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct HttpLabels {
method: Method,
path: String,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)]
enum Method {
GET,
POST,
}
use prometheus::{HistogramOpts, HistogramVec, Registry};

/*
Implementation of metrics using prometheus-client
Implementation of metrics

see https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md
see https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md
for information on different models.
*/

pub struct PromMetrics {
enabled: bool,
registry: Registry,
http_requests: Family<HttpLabels, Counter>,
get_process_histogram: Histogram,
get_message_histogram: Histogram,
get_messages_histogram: Histogram,
serialize_json_histogram: Histogram,
read_message_data_histogram: Histogram,
write_item_histogram: Histogram,
write_assignment_histogram: Histogram,
acquire_write_lock_histogram: Histogram,
memory_usage_gauge: Gauge,
memory_available_gauge: Gauge,
cpu_usage_gauge: Gauge,
core_metrics: HistogramVec,
}

/*
Functions exposed by this impl are used outside
the business logic in clients and initialization.
*/
impl PromMetrics {
pub fn new(config: AoConfig) -> Self {
let mut registry = <Registry>::default();
let http_requests = Family::<HttpLabels, Counter>::default();
registry.register(
"ao_su_http_requests",
"Number of HTTP requests received",
http_requests.clone(),
);
let individual_object_retrieval = [
1.0, 5.0, 10.0, 25.0, 50.0, 100.0
];
let message_list_retrieval = [
1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2000.0, 3000.0, 4000.0, 5000.0,
6000.0, 7000.0, 8000.0, 9000.0, 10000.0
];
let serialization = [
1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2000.0, 3000.0
];
let writes = [
1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0
];
let locks = [
1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0
];
let get_process_histogram = Histogram::new(individual_object_retrieval.into_iter());
let get_message_histogram = Histogram::new(individual_object_retrieval.into_iter());
let get_messages_histogram = Histogram::new(message_list_retrieval.into_iter());
let serialize_json_histogram = Histogram::new(serialization.into_iter());
let read_message_data_histogram = Histogram::new(message_list_retrieval.into_iter());
let write_item_histogram = Histogram::new(writes.into_iter());
let write_assignment_histogram = Histogram::new(writes.into_iter());
let acquire_write_lock_histogram = Histogram::new(locks.into_iter());
registry.register(
"ao_su_get_process_histogram",
"Process retrieval runtime",
get_process_histogram.clone(),
);
registry.register(
"ao_su_get_message_histogram",
"Histogram of get_message_observe durations",
get_message_histogram.clone(),
);
registry.register(
"ao_su_get_messages_histogram",
"Histogram of get_messages_observe durations",
get_messages_histogram.clone(),
);
registry.register(
"ao_su_serialize_json_histogram",
"Histogram of serialize_json_observe durations",
serialize_json_histogram.clone(),
);
registry.register(
"ao_su_read_message_data_histogram",
"Histogram of read_message_data_observe durations",
read_message_data_histogram.clone(),
);
registry.register(
"ao_su_write_item_histogram",
"Histogram of write_item_observe durations",
write_item_histogram.clone(),
);
registry.register(
"ao_su_write_assignment_histogram",
"Histogram of write_assignment_observe durations",
write_assignment_histogram.clone(),
);
registry.register(
"ao_su_acquire_write_lock_histogram",
"Histogram of acquire_write_lock_histogram durations",
acquire_write_lock_histogram.clone(),
);

let memory_usage_gauge = Gauge::default();
let memory_available_gauge = Gauge::default();
let cpu_usage_gauge = Gauge::default();
pub fn new(config: AoConfig, registry: Registry) -> Self {
// Define the options for the histogram, with buckets in milliseconds
let histogram_opts = HistogramOpts::new(
"core_metrics_duration_milliseconds",
"Histogram of durations for core metrics functions in milliseconds",
)
.buckets(vec![
0.0, 1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 5500.0,
6000.0, 6500.0, 7000.0, 7500.0, 8000.0, 8500.0, 9000.0, 9500.0, 10000.0,
])
.namespace("su");

// Create a HistogramVec with labels for the different core metric functions
let core_metrics = HistogramVec::new(histogram_opts, &["function_name"]).unwrap();

// Register the HistogramVec with the provided registry
registry.register(Box::new(core_metrics.clone())).unwrap();

registry.register(
"ao_su_memory_usage_bytes",
"Current memory usage in bytes",
memory_usage_gauge.clone(),
);
registry.register(
"ao_su_memory_available_bytes",
"Current memory available in bytes",
memory_available_gauge.clone(),
);
registry.register(
"ao_su_cpu_usage_percentage",
"Current CPU usage as a percentage",
cpu_usage_gauge.clone(),
);

PromMetrics {
enabled: config.enable_metrics,
registry,
http_requests,
get_process_histogram,
get_message_histogram,
get_messages_histogram,
serialize_json_histogram,
read_message_data_histogram,
write_item_histogram,
write_assignment_histogram,
acquire_write_lock_histogram,
memory_usage_gauge,
memory_available_gauge,
cpu_usage_gauge
core_metrics,
}
}

pub fn get_request(&self, route: String) {
if !self.enabled {
return;
};
self.http_requests
.get_or_create(&HttpLabels {
method: Method::GET,
path: route,
})
.inc();
}

pub fn post_request(&self) {
fn observe_duration(&self, function_name: &str, duration: u128) {
if !self.enabled {
return;
};
self.http_requests
.get_or_create(&HttpLabels {
method: Method::POST,
path: "/".to_string(),
})
.inc();
}

pub fn emit_metrics(&self) -> Result<String, String> {
if !self.enabled {
return Err("Metrics not enabled".to_string());
};

/*
Refresh twice as directed by the documentation

https://docs.rs/sysinfo/latest/sysinfo/struct.Cpu.html
*/
let mut s = System::new();
s.refresh_memory();
s.refresh_cpu();
s.refresh_memory();
s.refresh_cpu();

let used_memory = s.used_memory();
self.memory_usage_gauge.set(used_memory as i64);

let available_memory = s.available_memory();
self.memory_available_gauge.set(available_memory as i64);

let mut total_cpu_usage = 0.0;
let cpu_count = s.cpus().len() as f32;
for cpu in s.cpus() {
total_cpu_usage += cpu.cpu_usage();
}
let average_cpu_usage = (total_cpu_usage / cpu_count).round() as i64;
self.cpu_usage_gauge.set(average_cpu_usage);

let mut buffer = String::new();
match encode(&mut buffer, &self.registry) {
Ok(_) => Ok(buffer),
Err(e) => Err(format!("{:?}", e)),
}
// Observe the duration in milliseconds directly
self.core_metrics
.with_label_values(&[function_name])
.observe(duration as f64);
}
}

/*
These methods are used by the business logic
*/
impl CoreMetrics for PromMetrics {
fn get_process_observe(&self, duration: u128) {
if !self.enabled {
return;
}
self.get_process_histogram.observe(duration as f64);
self.observe_duration("get_process", duration);
}

fn get_message_observe(&self, duration: u128) {
if !self.enabled {
return;
}
self.get_message_histogram.observe(duration as f64);
self.observe_duration("get_message", duration);
}

fn get_messages_observe(&self, duration: u128) {
if !self.enabled {
return;
}
self.get_messages_histogram.observe(duration as f64);
self.observe_duration("get_messages", duration);
}

fn serialize_json_observe(&self, duration: u128) {
if !self.enabled {
return;
}
self.serialize_json_histogram.observe(duration as f64);
self.observe_duration("serialize_json", duration);
}

fn read_message_data_observe(&self, duration: u128) {
if !self.enabled {
return;
}
self.read_message_data_histogram.observe(duration as f64);
self.observe_duration("read_message_data", duration);
}

fn write_item_observe(&self, duration: u128) {
if !self.enabled {
return;
}
self.write_item_histogram.observe(duration as f64);
self.observe_duration("write_item", duration);
}

fn write_assignment_observe(&self, duration: u128) {
if !self.enabled {
return;
}
self.write_assignment_histogram.observe(duration as f64);
self.observe_duration("write_assignment", duration);
}

fn acquire_write_lock_observe(&self, duration: u128) {
if !self.enabled {
return;
}
self.acquire_write_lock_histogram.observe(duration as f64);
self.observe_duration("acquire_write_lock", duration);
}
}
8 changes: 4 additions & 4 deletions servers/su/src/domain/clients/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ impl DataStore for StoreClient {
.set((
process_count.eq(scheduler.process_count),
url.eq(&scheduler.url),
no_route.eq(&scheduler.no_route)
no_route.eq(&scheduler.no_route),
))
.execute(conn)
{
Expand Down Expand Up @@ -932,7 +932,7 @@ impl DataStore for StoreClient {
row_id: Some(db_scheduler.row_id),
url: db_scheduler.url,
process_count: db_scheduler.process_count,
no_route: db_scheduler.no_route
no_route: db_scheduler.no_route,
})
.collect();
Ok(schedulers_out)
Expand Down Expand Up @@ -1011,15 +1011,15 @@ pub struct DbScheduler {
pub row_id: i32,
pub url: String,
pub process_count: i32,
pub no_route: Option<bool>,
pub no_route: Option<bool>,
}

#[derive(Insertable)]
#[diesel(table_name = super::schema::schedulers)]
pub struct NewScheduler<'a> {
pub url: &'a str,
pub process_count: &'a i32,
pub no_route: Option<&'a bool>,
pub no_route: Option<&'a bool>,
}

#[derive(Queryable, Selectable)]
Expand Down
Loading
Loading