diff --git a/servers/su/.env.example b/servers/su/.env.example index 1c9df8a8a..9a2d011bf 100644 --- a/servers/su/.env.example +++ b/servers/su/.env.example @@ -9,4 +9,5 @@ SU_DATA_DIR=/home/rocksdb USE_DISK=true MIGRATION_BATCH_SIZE=10 DB_WRITE_CONNECTIONS=1 -DB_READ_CONNECTIONS=40 \ No newline at end of file +DB_READ_CONNECTIONS=40 +ENABLE_METRICS=true \ No newline at end of file diff --git a/servers/su/Cargo.lock b/servers/su/Cargo.lock index c4df3282b..4edfcb259 100644 --- a/servers/su/Cargo.lock +++ b/servers/su/Cargo.lock @@ -802,14 +802,30 @@ dependencies = [ ] [[package]] -name = "crossbeam-utils" -version = "0.8.16" +name = "crossbeam-deque" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" dependencies = [ - "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "crunchy" version = "0.2.2" @@ -1036,6 +1052,12 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" +[[package]] +name = "dtoa" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcbb2bf8e87535c23f7a8a321e364ce21462d0ff10cb6407820e8e96dfff6653" + [[package]] name = "ed25519" version = "1.5.3" @@ -1580,7 +1602,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows", + "windows 0.48.0", ] [[package]] @@ -1859,9 +1881,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.150" +version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" +checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" [[package]] name = "libflate" @@ -2105,6 +2127,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "num" version = "0.4.1" @@ -2565,6 +2596,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus-client" +version = "0.22.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504ee9ff529add891127c4827eb481bd69dc0ebc72e9a682e187db4caa60c3ca" +dependencies = [ + "dtoa", + "itoa", + "parking_lot", + "prometheus-client-derive-encode", +] + +[[package]] +name = "prometheus-client-derive-encode" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "quote" version = "1.0.33" @@ -2662,6 +2716,26 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.3.5" @@ -3287,6 +3361,7 @@ dependencies = [ "hex", "jsonwebkey", "log", + "prometheus-client", "reqwest", "ring", "rocksdb", @@ -3296,6 +3371,7 @@ dependencies = [ "serde_json", "sha2 0.10.8", "simd-json", + "sysinfo", "tokio", ] @@ -3339,6 +3415,21 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "sysinfo" +version = "0.30.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a5b4ddaee55fb2bea2bf0e5000747e5f5c0de765e5a5ff87f4cd106439f4bb3" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "windows 0.52.0", +] + [[package]] name = "system-configuration" version = "0.5.1" @@ -3958,6 +4049,25 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +dependencies = [ + "windows-core", + "windows-targets 0.52.5", +] + +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.5", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/servers/su/Cargo.toml b/servers/su/Cargo.toml index 097b2c177..224246b14 100644 --- a/servers/su/Cargo.toml +++ b/servers/su/Cargo.toml @@ -32,6 +32,8 @@ actix-cors = "0.6.0" simd-json = "0.13.10" futures = "0.3.30" rocksdb = "0.22.0" +prometheus-client = "0.22.3" +sysinfo = "0.30.13" [[bin]] name = "su" diff --git a/servers/su/README.md b/servers/su/README.md index 2a8d93022..d20a113d0 100644 --- a/servers/su/README.md +++ b/servers/su/README.md @@ -47,6 +47,7 @@ Create a .env file with the following variables, or set them in the OS: - `USE_DISK` whether or not to write and read rocksdb, this is a performance enhancement for the data storage layer - `SU_DATA_DIR` if `USE_DISK` is `true`, this is where rocksdb will be initialized - `MIGRATION_BATCH_SIZE` when running the migration binary how many to fetch at once from postgres +- `ENABLE_METRICS` enable prometheus metrics to be available on the `/metrics` endpoint > You can also use a `.env` file to set environment variables when running in > development mode, See the `.env.example` for an example `.env` @@ -121,6 +122,7 @@ in the container. - `USE_DISK` whether or not to write and read rocksdb, this is a performance enhancement for the data storage layer - `SU_DATA_DIR` if `USE_DISK` is `true`, this is where rocksdb will be initialized - `MIGRATION_BATCH_SIZE` when running the migration binary how many to fetch at once from postgres +- `ENABLE_METRICS` enable prometheus metrics to be available on the `/metrics` endpoint diff --git a/servers/su/src/domain/clients/metrics.rs b/servers/su/src/domain/clients/metrics.rs new file mode 100644 index 000000000..5808750ea --- /dev/null +++ b/servers/su/src/domain/clients/metrics.rs @@ -0,0 +1,217 @@ +use prometheus_client::encoding::text::encode; +use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue}; +use prometheus_client::metrics::counter::Counter; +use prometheus_client::metrics::family::Family; +use prometheus_client::metrics::histogram::Histogram; +use prometheus_client::registry::Registry; + +use super::super::config::AoConfig; +use super::super::core::dal::CoreMetrics; + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +struct HttpLabels { + method: Method, + path: String, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)] +enum Method { + GET, + POST, +} + +pub struct PromMetrics { + enabled: bool, + registry: Registry, + http_requests: Family, + get_process_histogram: Histogram, + get_message_histogram: Histogram, + get_messages_histogram: Histogram, + serialize_json_histogram: Histogram, + read_message_data_histogram: Histogram, + write_item_histogram: Histogram, + write_assignment_histogram: Histogram, + acquire_write_lock_histogram: Histogram, +} + +impl PromMetrics { + pub fn new(config: AoConfig) -> Self { + let mut registry = ::default(); + let http_requests = Family::::default(); + registry.register( + "ao_su_http_requests", + "Number of HTTP requests received", + http_requests.clone(), + ); + let individual_object_retrieval = [ + 1.0, 5.0, 10.0, 25.0, 50.0, 100.0 + ]; + let message_list_retrieval = [ + 1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2000.0, 3000.0, 4000.0, 5000.0, + 6000.0, 7000.0, 8000.0, 9000.0, 10000.0 + ]; + let serialization = [ + 1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2000.0, 3000.0 + ]; + let writes = [ + 1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0 + ]; + let locks = [ + 1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 200.0, 300.0, 400.0, 500.0 + ]; + let get_process_histogram = Histogram::new(individual_object_retrieval.into_iter()); + let get_message_histogram = Histogram::new(individual_object_retrieval.into_iter()); + let get_messages_histogram = Histogram::new(message_list_retrieval.into_iter()); + let serialize_json_histogram = Histogram::new(serialization.into_iter()); + let read_message_data_histogram = Histogram::new(message_list_retrieval.into_iter()); + let write_item_histogram = Histogram::new(writes.into_iter()); + let write_assignment_histogram = Histogram::new(writes.into_iter()); + let acquire_write_lock_histogram = Histogram::new(locks.into_iter()); + registry.register( + "ao_su_get_process_histogram", + "Process retrieval runtime", + get_process_histogram.clone(), + ); + registry.register( + "ao_su_get_message_histogram", + "Histogram of get_message_observe durations", + get_message_histogram.clone(), + ); + registry.register( + "ao_su_get_messages_histogram", + "Histogram of get_messages_observe durations", + get_messages_histogram.clone(), + ); + registry.register( + "ao_su_serialize_json_histogram", + "Histogram of serialize_json_observe durations", + serialize_json_histogram.clone(), + ); + registry.register( + "ao_su_read_message_data_histogram", + "Histogram of read_message_data_observe durations", + read_message_data_histogram.clone(), + ); + registry.register( + "ao_su_write_item_histogram", + "Histogram of write_item_observe durations", + write_item_histogram.clone(), + ); + registry.register( + "ao_su_write_assignment_histogram", + "Histogram of write_assignment_observe durations", + write_assignment_histogram.clone(), + ); + registry.register( + "ao_su_acquire_write_lock_histogram", + "Histogram of acquire_write_lock_histogram durations", + acquire_write_lock_histogram.clone(), + ); + PromMetrics { + enabled: config.enable_metrics, + registry, + http_requests, + get_process_histogram, + get_message_histogram, + get_messages_histogram, + serialize_json_histogram, + read_message_data_histogram, + write_item_histogram, + write_assignment_histogram, + acquire_write_lock_histogram, + } + } + + pub fn get_request(&self, route: String) { + if !self.enabled { + return; + }; + self.http_requests + .get_or_create(&HttpLabels { + method: Method::GET, + path: route, + }) + .inc(); + } + + pub fn post_request(&self) { + if !self.enabled { + return; + }; + self.http_requests + .get_or_create(&HttpLabels { + method: Method::POST, + path: "/".to_string(), + }) + .inc(); + } + + pub fn emit_metrics(&self) -> Result { + if !self.enabled { + return Err("Metrics not enabled".to_string()); + }; + let mut buffer = String::new(); + match encode(&mut buffer, &self.registry) { + Ok(_) => Ok(buffer), + Err(e) => Err(format!("{:?}", e)), + } + } +} + +impl CoreMetrics for PromMetrics { + fn get_process_observe(&self, duration: u128) { + if !self.enabled { + return; + } + self.get_process_histogram.observe(duration as f64); + } + + fn get_message_observe(&self, duration: u128) { + if !self.enabled { + return; + } + self.get_message_histogram.observe(duration as f64); + } + + fn get_messages_observe(&self, duration: u128) { + if !self.enabled { + return; + } + self.get_messages_histogram.observe(duration as f64); + } + + fn serialize_json_observe(&self, duration: u128) { + if !self.enabled { + return; + } + self.serialize_json_histogram.observe(duration as f64); + } + + fn read_message_data_observe(&self, duration: u128) { + if !self.enabled { + return; + } + self.read_message_data_histogram.observe(duration as f64); + } + + fn write_item_observe(&self, duration: u128) { + if !self.enabled { + return; + } + self.write_item_histogram.observe(duration as f64); + } + + fn write_assignment_observe(&self, duration: u128) { + if !self.enabled { + return; + } + self.write_assignment_histogram.observe(duration as f64); + } + + fn acquire_write_lock_observe(&self, duration: u128) { + if !self.enabled { + return; + } + self.acquire_write_lock_histogram.observe(duration as f64); + } +} diff --git a/servers/su/src/domain/clients/mod.rs b/servers/su/src/domain/clients/mod.rs index 9e82f1ce9..92e77921f 100644 --- a/servers/su/src/domain/clients/mod.rs +++ b/servers/su/src/domain/clients/mod.rs @@ -22,3 +22,6 @@ used to sign transactions, required here because the arweave sdk reads a wallet from the file system */ pub mod signer; + +// metrics client +pub mod metrics; diff --git a/servers/su/src/domain/clients/store.rs b/servers/su/src/domain/clients/store.rs index 08b3eda21..9622bafee 100644 --- a/servers/su/src/domain/clients/store.rs +++ b/servers/su/src/domain/clients/store.rs @@ -3,22 +3,22 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::{env, io}; -use dotenv::dotenv; -use futures::future::join_all; -use tokio::task::JoinHandle; -use tokio::time::interval; +use async_trait::async_trait; use diesel::pg::PgConnection; use diesel::prelude::*; use diesel::r2d2::ConnectionManager; use diesel::r2d2::Pool; use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; -use async_trait::async_trait; +use dotenv::dotenv; +use futures::future::join_all; +use tokio::task::JoinHandle; +use tokio::time::interval; use super::super::SuLog; use super::super::core::dal::{ - DataStore, JsonErrorType, Message, PaginatedMessages, Process, ProcessScheduler, Scheduler, - StoreErrorType, Log + DataStore, JsonErrorType, Log, Message, PaginatedMessages, Process, ProcessScheduler, + Scheduler, StoreErrorType, }; use crate::domain::config::AoConfig; @@ -92,10 +92,10 @@ pub struct StoreClient { It currently uses postgresql, and if the environmnent variable USE_DISK is set to true, it will initialize (if not already initialized) a rocksdb instance in - SU_DATA_DIR to store the message binary data for - performance. + SU_DATA_DIR to store the message binary data for + performance. - USE_DISK should be set after the migration function + USE_DISK should be set after the migration function migrate_to_disk is run, which is built into its own binary in the build process. Things will not speed up unless the data is already migrated. @@ -138,8 +138,8 @@ impl StoreClient { /* Get a connection to the writer database using - the connection pool initialized in r2d2. This - should be used in functions that write data + the connection pool initialized in r2d2. This + should be used in functions that write data or critically require the most up to date data. */ pub fn get_conn( @@ -152,7 +152,7 @@ impl StoreClient { } /* - Get a connection to the reader instance. If + Get a connection to the reader instance. If no DATABASE_READ_URL is set, this will default to the DATABASE_URL. This should be used in functions that only read data. @@ -168,7 +168,7 @@ impl StoreClient { /* Run at server startup to modify the database as needed. - Migrations are embedded directly into the binary that + Migrations are embedded directly into the binary that get built. */ pub fn run_migrations(&self) -> Result { @@ -186,7 +186,7 @@ impl StoreClient { Method to get the total number of messages in the database, this is important for the migration and sync functions. - */ + */ pub fn get_message_count(&self) -> Result { use super::schema::messages::dsl::*; let conn = &mut self.get_read_conn()?; @@ -359,16 +359,15 @@ impl StoreClient { sync the bytestore if USE_DISK is true. */ pub fn sync_bytestore(&self) -> Result<(), ()> { - /* - if self.bytestore.clone().try_connect() is never + if self.bytestore.clone().try_connect() is never called, the is_ready method on the byte store will never return true, and the rest of the StoreClient - will not read or write bytestore. + will not read or write bytestore. We call it here because this runs the in background. So the server can operate normally without bytestore - until bytestore can be initialized. This is in case + until bytestore can be initialized. This is in case another program is still using the same embedded db. */ loop { @@ -377,12 +376,14 @@ impl StoreClient { break; } Err(_) => { - self.logger.log("Bytestore not ready, waiting...".to_string()); + self.logger + .log("Bytestore not ready, waiting...".to_string()); std::thread::sleep(std::time::Duration::from_secs(5)); } } } - self.logger.log("Syncing the tail of the messages table".to_string()); + self.logger + .log("Syncing the tail of the messages table".to_string()); use std::time::Instant; let start = Instant::now(); @@ -414,8 +415,10 @@ impl StoreClient { ) { // Stop the migration if message is already in byte store let duration = start.elapsed(); - self.logger.log(format!("Time elapsed in sync is: {:?}", duration)); - self.logger.log(format!("Number of messages synced: {}", synced_count)); + self.logger + .log(format!("Time elapsed in sync is: {:?}", duration)); + self.logger + .log(format!("Number of messages synced: {}", synced_count)); return Ok(()); } @@ -437,21 +440,24 @@ impl StoreClient { break; } Err(e) => { - self.logger.error(format!("Error fetching messages: {:?}", e)); + self.logger + .error(format!("Error fetching messages: {:?}", e)); } } } let duration = start.elapsed(); - self.logger.log(format!("Time elapsed in sync is: {:?}", duration)); - self.logger.log(format!("Number of messages synced: {}", synced_count)); + self.logger + .log(format!("Time elapsed in sync is: {:?}", duration)); + self.logger + .log(format!("Number of messages synced: {}", synced_count)); Ok(()) } } /* - The DataStore trait is what the business logic uses + The DataStore trait is what the business logic uses to interact with the data storage layer. The implementations can change here but the function definitions cannot unless the business logic needs them to. @@ -567,14 +573,13 @@ impl DataStore for StoreClient { } else { let bytestore = self.bytestore.clone(); if bytestore.is_ready() { - bytestore - .save_binary( - message.message_id()?, - Some(message.assignment_id()?), - message.process_id()?, - message.timestamp()?.to_string(), - bundle_in.to_vec(), - )?; + bytestore.save_binary( + message.message_id()?, + Some(message.assignment_id()?), + message.process_id()?, + message.timestamp()?.to_string(), + bundle_in.to_vec(), + )?; } Ok("saved".to_string()) } @@ -651,11 +656,7 @@ impl DataStore for StoreClient { }) .collect(); - let binaries = self - .bytestore - .clone() - .read_binaries(message_ids) - .await?; + let binaries = self.bytestore.clone().read_binaries(message_ids).await?; let mut messages_mapped: Vec = vec![]; @@ -1038,134 +1039,137 @@ pub struct NewProcessScheduler<'a> { See https://rocksdb.org/blog/2021/05/26/integrated-blob-db.html */ mod bytestore { - use super::super::super::config::AoConfig; - use dashmap::DashMap; - use rocksdb::{Options, DB}; - use std::sync::Arc; - use std::sync::RwLock; - - pub struct ByteStore { - db: RwLock>, - config: AoConfig, - } - - impl ByteStore { - pub fn new(config: AoConfig) -> Self { - ByteStore { db: RwLock::new(None), config } - } - - pub fn try_connect(&self) -> Result<(), String> { - let mut opts = Options::default(); - opts.create_if_missing(true); - opts.set_enable_blob_files(true); // Enable blob files - opts.set_blob_file_size(5 * 1024 * 1024 * 1024); // 5GB max for now - opts.set_min_blob_size(1024); // low value ensures it is used - - let new_db = DB::open(&opts, &self.config.su_data_dir).map_err(|e| format!("Failed to open RocksDB: {:?}", e))?; - - let mut db_write = self.db.write().unwrap(); - *db_write = Some(new_db); - - Ok(()) - } - - pub fn is_ready(&self) -> bool { - match self.db.read() { - Ok(r) => r.is_some(), - Err(_) => false, + use super::super::super::config::AoConfig; + use dashmap::DashMap; + use rocksdb::{Options, DB}; + use std::sync::Arc; + use std::sync::RwLock; + + pub struct ByteStore { + db: RwLock>, + config: AoConfig, + } + + impl ByteStore { + pub fn new(config: AoConfig) -> Self { + ByteStore { + db: RwLock::new(None), + config, + } + } + + pub fn try_connect(&self) -> Result<(), String> { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_enable_blob_files(true); // Enable blob files + opts.set_blob_file_size(5 * 1024 * 1024 * 1024); // 5GB max for now + opts.set_min_blob_size(1024); // low value ensures it is used + + let new_db = DB::open(&opts, &self.config.su_data_dir) + .map_err(|e| format!("Failed to open RocksDB: {:?}", e))?; + + let mut db_write = self.db.write().unwrap(); + *db_write = Some(new_db); + + Ok(()) + } + + pub fn is_ready(&self) -> bool { + match self.db.read() { + Ok(r) => r.is_some(), + Err(_) => false, + } + } + + pub async fn read_binaries( + &self, + ids: Vec<(String, Option, String, String)>, + ) -> Result, String, String), Vec>, String> { + let binaries = Arc::new(DashMap::new()); + let db = match self.db.read() { + Ok(r) => r, + Err(_) => return Err("Failed to acquire read lock".into()), + }; + + if let Some(ref db) = *db { + for id in ids { + let binaries = binaries.clone(); + + let key = ByteStore::create_key(&id.0, &id.1, &id.2, &id.3); + if let Ok(Some(value)) = db.get(&key) { + binaries.insert(id.clone(), value); + } + } + Ok(Arc::try_unwrap(binaries).map_err(|_| "Failed to unwrap Arc")?) + } else { + Err("Database is not initialized".into()) + } } - } - - pub async fn read_binaries( - &self, - ids: Vec<(String, Option, String, String)>, - ) -> Result, String, String), Vec>, String> { - let binaries = Arc::new(DashMap::new()); - let db = match self.db.read() { - Ok(r) => r, - Err(_) => return Err("Failed to acquire read lock".into()), - }; - - if let Some(ref db) = *db { - for id in ids { - let binaries = binaries.clone(); - - let key = ByteStore::create_key(&id.0, &id.1, &id.2, &id.3); - if let Ok(Some(value)) = db.get(&key) { - binaries.insert(id.clone(), value); - } - } - Ok(Arc::try_unwrap(binaries).map_err(|_| "Failed to unwrap Arc")?) - } else { - Err("Database is not initialized".into()) - } - } - - pub fn save_binary( - &self, - message_id: String, - assignment_id: Option, - process_id: String, - timestamp: String, - binary: Vec, - ) -> Result<(), String> { - let key = ByteStore::create_key(&message_id, &assignment_id, &process_id, ×tamp); - let db = match self.db.read() { - Ok(r) => r, - Err(_) => return Err("Failed to acquire read lock".into()), - }; - - if let Some(ref db) = *db { - db.put(key, binary) - .map_err(|e| format!("Failed to write to RocksDB: {:?}", e))?; - Ok(()) - } else { - Err("Database is not initialized".into()) - } - } - - fn create_key( - message_id: &str, - assignment_id: &Option, - process_id: &str, - timestamp: &str, - ) -> Vec { - match assignment_id { - Some(assignment_id) => format!( - "message___{}___{}___{}___{}", - process_id, timestamp, message_id, assignment_id - ) - .into_bytes(), - None => format!("message___{}___{}___{}", process_id, timestamp, message_id) - .into_bytes(), - } - } - - pub fn exists( - &self, - message_id: &str, - assignment_id: &Option, - process_id: &str, - timestamp: &str, - ) -> bool { - let key = ByteStore::create_key(message_id, assignment_id, process_id, timestamp); - let db = match self.db.read() { - Ok(r) => r, - Err(_) => return false, - }; - - if let Some(ref db) = *db { - match db.get(&key) { - Ok(Some(_)) => true, - _ => false, - } - } else { - false - } - } - } -} + pub fn save_binary( + &self, + message_id: String, + assignment_id: Option, + process_id: String, + timestamp: String, + binary: Vec, + ) -> Result<(), String> { + let key = ByteStore::create_key(&message_id, &assignment_id, &process_id, ×tamp); + let db = match self.db.read() { + Ok(r) => r, + Err(_) => return Err("Failed to acquire read lock".into()), + }; + + if let Some(ref db) = *db { + db.put(key, binary) + .map_err(|e| format!("Failed to write to RocksDB: {:?}", e))?; + Ok(()) + } else { + Err("Database is not initialized".into()) + } + } + + fn create_key( + message_id: &str, + assignment_id: &Option, + process_id: &str, + timestamp: &str, + ) -> Vec { + match assignment_id { + Some(assignment_id) => format!( + "message___{}___{}___{}___{}", + process_id, timestamp, message_id, assignment_id + ) + .into_bytes(), + None => format!("message___{}___{}___{}", process_id, timestamp, message_id) + .into_bytes(), + } + } + + pub fn exists( + &self, + message_id: &str, + assignment_id: &Option, + process_id: &str, + timestamp: &str, + ) -> bool { + let key = ByteStore::create_key(message_id, assignment_id, process_id, timestamp); + let db = match self.db.read() { + Ok(r) => r, + Err(_) => return false, + }; + + if let Some(ref db) = *db { + match db.get(&key) { + Ok(Some(_)) => true, + _ => false, + } + } else { + false + } + } + } +} /* This function is the migation program will @@ -1179,7 +1183,10 @@ pub async fn migrate_to_disk() -> io::Result<()> { dotenv().ok(); let data_store = Arc::new(StoreClient::new().expect("Failed to create StoreClient")); - data_store.bytestore.try_connect().expect("Failed to connect to bytestore"); + data_store + .bytestore + .try_connect() + .expect("Failed to connect to bytestore"); let args: Vec = env::args().collect(); let range: &String = args.get(1).expect("Range argument not provided"); @@ -1224,12 +1231,10 @@ pub async fn migrate_to_disk() -> io::Result<()> { let mut interval = interval(Duration::from_secs(10)); loop { interval.tick().await; - data_store_c.logger.log( - format!( + data_store_c.logger.log(format!( "Messages processed update: {}", processed_count_clone.load(Ordering::SeqCst) - ) - ); + )); if processed_count_clone.load(Ordering::SeqCst) >= total_count as usize { break; } @@ -1270,7 +1275,8 @@ pub async fn migrate_to_disk() -> io::Result<()> { process_id.clone(), timestamp.clone(), bundle, - ).unwrap(); + ) + .unwrap(); processed_count.fetch_add(1, Ordering::SeqCst); }); @@ -1279,13 +1285,17 @@ pub async fn migrate_to_disk() -> io::Result<()> { join_all(save_handles).await; } Err(e) => { - data_store.logger.error(format!("Error fetching messages: {:?}", e)); + data_store + .logger + .error(format!("Error fetching messages: {:?}", e)); } } } let duration = start.elapsed(); - data_store.logger.log(format!("Time elapsed in data migration is: {:?}", duration)); + data_store + .logger + .log(format!("Time elapsed in data migration is: {:?}", duration)); Ok(()) } diff --git a/servers/su/src/domain/config.rs b/servers/su/src/domain/config.rs index 711a39098..a45a8c1cc 100644 --- a/servers/su/src/domain/config.rs +++ b/servers/su/src/domain/config.rs @@ -19,6 +19,7 @@ pub struct AoConfig { pub migration_batch_size: i64, pub db_write_connections: u32, pub db_read_connections: u32, + pub enable_metrics: bool, } impl AoConfig { @@ -37,8 +38,8 @@ impl AoConfig { Err(_e) => false, }; let su_data_dir = match use_disk { - true => env::var("SU_DATA_DIR")?, - false => "".to_string() + true => env::var("SU_DATA_DIR")?, + false => "".to_string(), }; let migration_batch_size = match env::var("MIGRATION_BATCH_SIZE") { Ok(val) => val.parse().unwrap(), @@ -53,12 +54,16 @@ impl AoConfig { Err(_e) => 10, }; let graphql_url = match env::var("GRAPHQL_URL") { - Ok(val) => val, - Err(_e) => env::var("GATEWAY_URL")? + Ok(val) => val, + Err(_e) => env::var("GATEWAY_URL")?, }; let arweave_url = match env::var("ARWEAVE_URL") { - Ok(val) => val, - Err(_e) => env::var("GATEWAY_URL")? + Ok(val) => val, + Err(_e) => env::var("GATEWAY_URL")?, + }; + let enable_metrics = match env::var("ENABLE_METRICS") { + Ok(val) => val == "true", + Err(_e) => false, }; Ok(AoConfig { database_url: env::var("DATABASE_URL")?, @@ -74,6 +79,7 @@ impl AoConfig { migration_batch_size, db_write_connections, db_read_connections, + enable_metrics, }) } } diff --git a/servers/su/src/domain/core/dal.rs b/servers/su/src/domain/core/dal.rs index c986be47c..ffc5af510 100644 --- a/servers/su/src/domain/core/dal.rs +++ b/servers/su/src/domain/core/dal.rs @@ -120,3 +120,14 @@ pub trait DataStore: Send + Sync { fn get_all_schedulers(&self) -> Result, StoreErrorType>; fn check_existing_message(&self, message: &Message) -> Result<(), StoreErrorType>; } + +pub trait CoreMetrics: Send + Sync { + fn get_process_observe(&self, duration: u128); + fn get_message_observe(&self, duration: u128); + fn get_messages_observe(&self, duration: u128); + fn serialize_json_observe(&self, duration: u128); + fn read_message_data_observe(&self, duration: u128); + fn write_item_observe(&self, duration: u128); + fn write_assignment_observe(&self, duration: u128); + fn acquire_write_lock_observe(&self, duration: u128); +} diff --git a/servers/su/src/domain/core/flows.rs b/servers/su/src/domain/core/flows.rs index b3a31c046..36099162c 100644 --- a/servers/su/src/domain/core/flows.rs +++ b/servers/su/src/domain/core/flows.rs @@ -10,7 +10,7 @@ use super::builder::Builder; use super::json::{Message, Process}; use super::scheduler; -use super::dal::{Config, DataStore, Gateway, Log, Signer, Uploader, Wallet}; +use super::dal::{Config, DataStore, Gateway, Log, Signer, Uploader, Wallet, CoreMetrics}; pub struct Deps { pub data_store: Arc, @@ -20,6 +20,7 @@ pub struct Deps { pub signer: Arc, pub wallet: Arc, pub uploader: Arc, + pub metrics: Arc, /* scheduler is part of the core but we initialize @@ -56,10 +57,14 @@ async fn assignment_only( base_layer: Option, exclude: Option, ) -> Result { + let start_top_level = Instant::now(); let builder = init_builder(&deps)?; + let start_acquire_lock = Instant::now(); let locked_schedule_info = deps.scheduler.acquire_lock(process_id.clone()).await?; let mut schedule_info = locked_schedule_info.lock().await; + let elapsed_acquire_lock = start_acquire_lock.elapsed(); + deps.metrics.acquire_write_lock_observe(elapsed_acquire_lock.as_millis()); let updated_info = deps .scheduler .update_schedule_info(&mut *schedule_info, process_id.clone()) @@ -88,6 +93,8 @@ async fn assignment_only( Ok(timestamp) => { let response_json = json!({ "timestamp": timestamp, "id": message.assignment.id.clone() }); + let elapsed_top_level = start_top_level.elapsed(); + deps.metrics.write_assignment_observe(elapsed_top_level.as_millis()); Ok(response_json.to_string()) } Err(e) => Err(format!("{:?}", e)), @@ -109,6 +116,7 @@ pub async fn write_item( base_layer: Option, exclude: Option, ) -> Result { + let start_top_level = Instant::now(); // XOR, if we have one of these, we must have both. if process_id.is_some() ^ assign.is_some() { return Err("If sending assign or process-id, you must send both.".to_string()); @@ -143,8 +151,11 @@ pub async fn write_item( process we are creating. So if a message is written while the process is still being created it will wait */ + let start_acquire_lock = Instant::now(); let locked_schedule_info = deps.scheduler.acquire_lock(data_item.id()).await?; let mut schedule_info = locked_schedule_info.lock().await; + let elapsed_acquire_lock = start_acquire_lock.elapsed(); + deps.metrics.acquire_write_lock_observe(elapsed_acquire_lock.as_millis()); let updated_info = deps .scheduler .update_schedule_info(&mut *schedule_info, data_item.id()) @@ -161,6 +172,8 @@ pub async fn write_item( Ok(timestamp) => { let response_json = json!({ "timestamp": timestamp, "id": process.process_id.clone() }); + let elapsed_top_level = start_top_level.elapsed(); + deps.metrics.write_item_observe(elapsed_top_level.as_millis()); Ok(response_json.to_string()) } Err(e) => Err(format!("{:?}", e)), @@ -171,8 +184,11 @@ pub async fn write_item( process we are writing a message to. this ensures no conflicts in the schedule */ + let start_acquire_lock = Instant::now(); let locked_schedule_info = deps.scheduler.acquire_lock(data_item.target()).await?; let mut schedule_info = locked_schedule_info.lock().await; + let elapsed_acquire_lock = start_acquire_lock.elapsed(); + deps.metrics.acquire_write_lock_observe(elapsed_acquire_lock.as_millis()); let updated_info = deps .scheduler .update_schedule_info(&mut *schedule_info, data_item.target()) @@ -190,6 +206,8 @@ pub async fn write_item( Ok(timestamp) => { let response_json = json!({ "timestamp": timestamp, "id": message.message_id()? }); + let elapsed_top_level = start_top_level.elapsed(); + deps.metrics.write_item_observe(elapsed_top_level.as_millis()); Ok(response_json.to_string()) } Err(e) => Err(format!("{:?}", e)), @@ -209,11 +227,15 @@ pub async fn read_message_data( to: Option, limit: Option, ) -> Result { + let start_top_level = Instant::now(); + let start_get_message = Instant::now(); if let Ok(message) = deps.data_store.get_message(&tx_id) { if message.message.is_some() || ((message.message_id()? != message.process_id()?) && (message.assignment_id()? == tx_id)) { + let elapsed_get_message = start_get_message.elapsed(); + deps.metrics.get_message_observe(elapsed_get_message.as_millis()); return serde_json::to_string(&message).map_err(|e| format!("{:?}", e)); } } @@ -227,12 +249,17 @@ pub async fn read_message_data( let duration = start.elapsed(); deps.logger .log(format!("Time elapsed in get_messages() is: {:?}", duration)); + deps.metrics.get_messages_observe(duration.as_millis()); let startj = Instant::now(); let result = simd_to_string(&messages).map_err(|e| format!("{:?}", e))?; let durationj = startj.elapsed(); deps.logger .log(format!("Time elapsed in json mapping is: {:?}", durationj)); + deps.metrics.serialize_json_observe(durationj.as_millis()); + + let elapsed_top_level = start_top_level.elapsed(); + deps.metrics.read_message_data_observe(elapsed_top_level.as_millis()); return Ok(result); } @@ -241,7 +268,10 @@ pub async fn read_message_data( } pub async fn read_process(deps: Arc, process_id: String) -> Result { + let start = Instant::now(); let process = deps.data_store.get_process(&process_id)?; + let elapsed = start.elapsed(); + deps.metrics.get_process_observe(elapsed.as_millis()); let result = match serde_json::to_string(&process) { Ok(r) => r, Err(e) => return Err(format!("{:?}", e)), @@ -295,4 +325,4 @@ pub async fn health(deps: Arc) -> Result { } Err(e) => Err(format!("{:?}", e)), } -} +} \ No newline at end of file diff --git a/servers/su/src/domain/mod.rs b/servers/su/src/domain/mod.rs index db82bcde5..8d80360ad 100644 --- a/servers/su/src/domain/mod.rs +++ b/servers/su/src/domain/mod.rs @@ -15,12 +15,13 @@ use config::AoConfig; use core::dal::{Config, Gateway, Log}; use logger::SuLog; +pub use clients::metrics::PromMetrics; pub use core::flows; pub use core::router; pub use flows::Deps; pub use store::migrate_to_disk; -pub async fn init_deps(mode: Option) -> Arc { +pub async fn init_deps(mode: Option) -> (Arc, Arc) { let logger: Arc = SuLog::init(); let data_store = Arc::new(store::StoreClient::new().expect("Failed to create StoreClient")); @@ -31,7 +32,7 @@ pub async fn init_deps(mode: Option) -> Arc { Err(e) => logger.log(format!("{:?}", e)), } - let config = Arc::new(AoConfig::new(mode).expect("Failed to read configuration")); + let config = Arc::new(AoConfig::new(mode.clone()).expect("Failed to read configuration")); if config.use_disk { let logger_clone = logger.clone(); @@ -70,14 +71,23 @@ pub async fn init_deps(mode: Option) -> Arc { UploaderClient::new(&config.upload_node_url, logger.clone()).expect("Invalid uploader url"), ); - Arc::new(Deps { - data_store, - logger, - config, - scheduler, - gateway, - signer, - wallet, - uploader, - }) + let metrics = Arc::new(PromMetrics::new( + AoConfig::new(mode).expect("Failed to read configuration"), + )); + let metrics_clone = metrics.clone(); + + ( + Arc::new(Deps { + data_store, + logger, + config, + scheduler, + gateway, + signer, + wallet, + uploader, + metrics, + }), + metrics_clone, + ) } diff --git a/servers/su/src/main.rs b/servers/su/src/main.rs index a1e841721..6af408828 100644 --- a/servers/su/src/main.rs +++ b/servers/su/src/main.rs @@ -11,7 +11,7 @@ use actix_web::{ use serde::Deserialize; use serde_json::json; -use su::domain::{flows, init_deps, router, Deps}; +use su::domain::{flows, init_deps, router, Deps, PromMetrics}; #[derive(Deserialize)] struct FromTo { @@ -57,13 +57,13 @@ fn err_response(err: String) -> HttpResponse { } async fn base( - deps: web::Data>, + data: web::Data, query_params: web::Query, req: HttpRequest, ) -> impl Responder { let process_id = query_params.process_id.clone(); - match router::redirect_process_id(deps.get_ref().clone(), process_id).await { + match router::redirect_process_id(data.deps.clone(), process_id).await { Ok(Some(redirect_url)) => { let target_url = format!("{}{}", redirect_url, req.uri()); return HttpResponse::TemporaryRedirect() @@ -74,7 +74,7 @@ async fn base( Err(err) => return err_response(err.to_string()), } - match flows::health(deps.get_ref().clone()).await { + match flows::health(data.deps.clone()).await { Ok(processed_str) => HttpResponse::Ok() .content_type("application/json") .body(processed_str), @@ -83,13 +83,14 @@ async fn base( } async fn timestamp_route( - deps: web::Data>, + data: web::Data, query_params: web::Query, req: HttpRequest, ) -> impl Responder { + data.metrics.get_request("/timestamp".to_string()); let process_id = query_params.process_id.clone(); - match router::redirect_process_id(deps.get_ref().clone(), process_id).await { + match router::redirect_process_id(data.deps.clone(), process_id).await { Ok(Some(redirect_url)) => { let target_url = format!("{}{}", redirect_url, req.uri()); return HttpResponse::TemporaryRedirect() @@ -100,7 +101,7 @@ async fn timestamp_route( Err(err) => return err_response(err.to_string()), } - match flows::timestamp(deps.get_ref().clone()).await { + match flows::timestamp(data.deps.clone()).await { Ok(processed_str) => HttpResponse::Ok() .content_type("application/json") .body(processed_str), @@ -109,13 +110,14 @@ async fn timestamp_route( } async fn main_post_route( - deps: web::Data>, + data: web::Data, req_body: web::Bytes, req: HttpRequest, query_params: web::Query, ) -> impl Responder { + data.metrics.post_request(); match router::redirect_data_item( - deps.get_ref().clone(), + data.deps.clone(), req_body.to_vec(), query_params.process_id.clone(), query_params.assign.clone(), @@ -133,7 +135,7 @@ async fn main_post_route( } match flows::write_item( - deps.get_ref().clone(), + data.deps.clone(), req_body.to_vec(), query_params.process_id.clone(), query_params.assign.clone(), @@ -150,18 +152,19 @@ async fn main_post_route( } async fn main_get_route( - deps: web::Data>, + data: web::Data, req: HttpRequest, path: web::Path, query_params: web::Query, ) -> impl Responder { + data.metrics.get_request("/".to_string()); let tx_id = path.tx_id.clone(); let from_sort_key = query_params.from.clone(); let to_sort_key = query_params.to.clone(); let limit = query_params.limit.clone(); let process_id = query_params.process_id.clone(); - match router::redirect_tx_id(deps.get_ref().clone(), tx_id.clone(), process_id.clone()).await { + match router::redirect_tx_id(data.deps.clone(), tx_id.clone(), process_id.clone()).await { Ok(Some(redirect_url)) => { let target_url = format!("{}{}", redirect_url, req.uri()); return HttpResponse::TemporaryRedirect() @@ -172,14 +175,8 @@ async fn main_get_route( Err(err) => return err_response(err.to_string()), } - let result = flows::read_message_data( - deps.get_ref().clone(), - tx_id, - from_sort_key, - to_sort_key, - limit, - ) - .await; + let result = + flows::read_message_data(data.deps.clone(), tx_id, from_sort_key, to_sort_key, limit).await; match result { Ok(processed_str) => HttpResponse::Ok() @@ -190,13 +187,14 @@ async fn main_get_route( } async fn read_process_route( - deps: web::Data>, + data: web::Data, req: HttpRequest, path: web::Path, ) -> impl Responder { + data.metrics.get_request("/processes".to_string()); let process_id = path.process_id.clone(); - match router::redirect_process_id(deps.get_ref().clone(), Some(process_id.clone())).await { + match router::redirect_process_id(data.deps.clone(), Some(process_id.clone())).await { Ok(Some(redirect_url)) => { let target_url = format!("{}{}", redirect_url, req.uri()); return HttpResponse::TemporaryRedirect() @@ -207,7 +205,7 @@ async fn read_process_route( Err(err) => return err_response(err.to_string()), } - match flows::read_process(deps.get_ref().clone(), process_id).await { + match flows::read_process(data.deps.clone(), process_id).await { Ok(processed_str) => HttpResponse::Ok() .content_type("application/json") .body(processed_str), @@ -219,6 +217,23 @@ async fn health_check() -> impl Responder { HttpResponse::Ok() } +async fn metrics_route(data: web::Data) -> impl Responder { + let result = data.metrics.emit_metrics(); + match result { + Ok(metrics_str) => HttpResponse::Ok() + .content_type("application/openmetrics-text; version=1.0.0; charset=utf-8") + .body(metrics_str), + Err(err) => HttpResponse::BadRequest() + .content_type("text/plain") + .body(err), + } +} + +struct AppState { + deps: Arc, + metrics: Arc, +} + #[actix_web::main] async fn main() -> io::Result<()> { let args: Vec = env::args().collect(); @@ -241,9 +256,10 @@ async fn main() -> io::Result<()> { } }; - let wrapped = web::Data::new(init_deps(mode).await); + let (deps, metrics) = init_deps(mode).await; + let app_state = web::Data::new(AppState { deps, metrics }); - let run_deps = wrapped.get_ref().clone(); + let run_deps = app_state.deps.clone(); if run_deps.config.mode() == "router" { match router::init_schedulers(run_deps.clone()).await { @@ -261,12 +277,13 @@ async fn main() -> io::Result<()> { .allow_any_header(), ) .wrap(Logger::default()) - .app_data(wrapped.clone()) + .app_data(app_state.clone()) .app_data(web::PayloadConfig::new(10485760)) .route("/", web::get().to(base)) .route("/", web::post().to(main_post_route)) .route("/timestamp", web::get().to(timestamp_route)) .route("/health", web::get().to(health_check)) + .route("/metrics", web::get().to(metrics_route)) .route("/{tx_id}", web::get().to(main_get_route)) .route("/processes/{process_id}", web::get().to(read_process_route)) }) diff --git a/servers/su/su b/servers/su/su index f126ef8df..307a5fa8b 100755 Binary files a/servers/su/su and b/servers/su/su differ