diff --git a/servers/su/migrations/2024-11-26-200327_wallet_routing/down.sql b/servers/su/migrations/2024-11-26-200327_wallet_routing/down.sql new file mode 100644 index 000000000..c701684c2 --- /dev/null +++ b/servers/su/migrations/2024-11-26-200327_wallet_routing/down.sql @@ -0,0 +1,3 @@ +ALTER TABLE schedulers +DROP COLUMN IF EXISTS wallets_to_route, +DROP COLUMN IF EXISTS wallets_only; \ No newline at end of file diff --git a/servers/su/migrations/2024-11-26-200327_wallet_routing/up.sql b/servers/su/migrations/2024-11-26-200327_wallet_routing/up.sql new file mode 100644 index 000000000..b8069760d --- /dev/null +++ b/servers/su/migrations/2024-11-26-200327_wallet_routing/up.sql @@ -0,0 +1,3 @@ +ALTER TABLE schedulers +ADD COLUMN wallets_to_route TEXT NULL, +ADD COLUMN wallets_only BOOLEAN NULL; \ No newline at end of file diff --git a/servers/su/src/domain/clients/schema.rs b/servers/su/src/domain/clients/schema.rs index 42b9a8760..5b107bfc4 100644 --- a/servers/su/src/domain/clients/schema.rs +++ b/servers/su/src/domain/clients/schema.rs @@ -34,6 +34,8 @@ table! { url -> Varchar, process_count -> Int4, no_route -> Nullable, + wallets_to_route -> Nullable, + wallets_only -> Nullable, } } diff --git a/servers/su/src/domain/clients/store.rs b/servers/su/src/domain/clients/store.rs index c4b8505de..197c81cc8 100644 --- a/servers/su/src/domain/clients/store.rs +++ b/servers/su/src/domain/clients/store.rs @@ -1154,6 +1154,8 @@ impl RouterDataStore for StoreClient { url: &scheduler.url, process_count: &scheduler.process_count, no_route: scheduler.no_route.as_ref(), + wallets_to_route: scheduler.wallets_to_route.as_deref(), + wallets_only: scheduler.wallets_only.as_ref(), }; match diesel::insert_into(schedulers) @@ -1177,6 +1179,8 @@ impl RouterDataStore for StoreClient { process_count.eq(scheduler.process_count), url.eq(&scheduler.url), no_route.eq(&scheduler.no_route), + wallets_to_route.eq(&scheduler.wallets_to_route), + wallets_only.eq(&scheduler.wallets_only) )) .execute(conn) { @@ -1201,6 +1205,8 @@ impl RouterDataStore for StoreClient { url: db_scheduler.url, process_count: db_scheduler.process_count, no_route: db_scheduler.no_route, + wallets_to_route: db_scheduler.wallets_to_route, + wallets_only: db_scheduler.wallets_only, }; Ok(scheduler) } @@ -1223,6 +1229,8 @@ impl RouterDataStore for StoreClient { url: db_scheduler.url, process_count: db_scheduler.process_count, no_route: db_scheduler.no_route, + wallets_to_route: db_scheduler.wallets_to_route, + wallets_only: db_scheduler.wallets_only, }; Ok(scheduler) } @@ -1244,6 +1252,8 @@ impl RouterDataStore for StoreClient { url: db_scheduler.url, process_count: db_scheduler.process_count, no_route: db_scheduler.no_route, + wallets_to_route: db_scheduler.wallets_to_route, + wallets_only: db_scheduler.wallets_only, }) .collect(); Ok(schedulers_out) @@ -1331,6 +1341,8 @@ pub struct DbScheduler { pub url: String, pub process_count: i32, pub no_route: Option, + pub wallets_to_route: Option, + pub wallets_only: Option, } #[derive(Insertable)] @@ -1339,6 +1351,8 @@ pub struct NewScheduler<'a> { pub url: &'a str, pub process_count: &'a i32, pub no_route: Option<&'a bool>, + pub wallets_to_route: Option<&'a str>, + pub wallets_only: Option<&'a bool>, } #[derive(Queryable, Selectable)] diff --git a/servers/su/src/domain/core/router.rs b/servers/su/src/domain/core/router.rs index 3bd492196..d4730534b 100644 --- a/servers/su/src/domain/core/router.rs +++ b/servers/su/src/domain/core/router.rs @@ -1,9 +1,11 @@ -use super::builder::Builder; -use crate::domain::core::dal::StoreErrorType; -use crate::domain::flows::Deps; use serde::Deserialize; use std::{fmt::Debug, sync::Arc}; use tokio::{fs::File, io::AsyncReadExt}; +use sha2::{Digest, Sha256}; + +use super::builder::Builder; +use crate::domain::core::dal::StoreErrorType; +use crate::domain::flows::Deps; /* The code in this file only runs on a su that is @@ -20,6 +22,8 @@ pub struct Scheduler { pub url: String, pub process_count: i32, pub no_route: Option, + pub wallets_to_route: Option, + pub wallets_only: Option, } pub struct ProcessScheduler { @@ -32,6 +36,15 @@ pub struct ProcessScheduler { struct SchedulerEntry { url: String, no_route: Option, + wallets_to_route: Option, + wallets_only: Option, +} + +pub fn hash(data: &[u8]) -> Vec { + let mut hasher = Sha256::new(); + hasher.update(data); + let result = hasher.finalize(); + result.to_vec() } /* @@ -64,6 +77,8 @@ pub async fn init_schedulers(deps: Arc) -> Result { url: entry.url.clone(), process_count: 0, no_route: entry.no_route, + wallets_to_route: entry.wallets_to_route.clone(), + wallets_only: entry.wallets_only }; deps.router_data_store.save_scheduler(&scheduler)?; deps.logger @@ -76,6 +91,8 @@ pub async fn init_schedulers(deps: Arc) -> Result { */ let mut sched = deps.router_data_store.get_scheduler_by_url(&entry.url)?; sched.no_route = entry.no_route; + sched.wallets_to_route = entry.wallets_to_route.clone(); + sched.wallets_only = entry.wallets_only; deps.router_data_store.update_scheduler(&sched)?; } @@ -163,6 +180,14 @@ pub async fn redirect_data_item( .iter() .find(|tag| tag.name == "Type") .ok_or("Cannot redirect data item, invalid Type Tag")?; + let owner = item.owner().clone(); + + let owner_bytes = match base64_url::decode(&owner) { + Ok(h) => h, + Err(_) => return Err("Failed to parse owner".to_string()) + }; + let address_hash = hash(&owner_bytes); + let owner_address = base64_url::encode(&address_hash); match type_tag.value.as_str() { "Process" => { @@ -177,6 +202,52 @@ pub async fn redirect_data_item( .filter(|scheduler| scheduler.no_route.unwrap_or(false) == false) .collect::>(); + /* + This logic is added for routing wallet addresses to + specific schedulers. It will find the first scheduler + with a wallet matching the owner and route the new spawn + there. + */ + for scheduler in schedulers.iter_mut() { + match &scheduler.wallets_to_route { + Some(w) => { + let wallets: Vec = w.split(',') + .map(|s| s.trim().to_string()) + .collect(); + + for wallet in wallets { + if owner_address == wallet { + scheduler.process_count += 1; + deps.router_data_store.update_scheduler(scheduler)?; + + let scheduler_row_id = if let Some(m_scheduler_row_id) = scheduler.row_id { + m_scheduler_row_id + } else { + /* + this should be unreachable but return an error + just in case so the router doesn't crash + */ + return Err("Missing id on scheduler".to_string()); + }; + + let process_scheduler = ProcessScheduler { + row_id: None, + scheduler_row_id, + process_id: id, + }; + deps.router_data_store + .save_process_scheduler(&process_scheduler)?; + + return Ok(Some(scheduler.url.clone())); + } + } + } + None => {} + } + } + + schedulers.retain(|scheduler| scheduler.wallets_only.unwrap_or(false) == false); + if let Some(min_scheduler) = schedulers.iter_mut().min_by_key(|s| s.process_count) { min_scheduler.process_count += 1; deps.router_data_store.update_scheduler(min_scheduler)?; @@ -193,7 +264,7 @@ pub async fn redirect_data_item( let process_scheduler = ProcessScheduler { row_id: None, - scheduler_row_id: scheduler_row_id, + scheduler_row_id, process_id: id, }; deps.router_data_store diff --git a/servers/su/src/schema.rs b/servers/su/src/schema.rs index 3da5abfdf..638cb40ad 100644 --- a/servers/su/src/schema.rs +++ b/servers/su/src/schema.rs @@ -13,6 +13,8 @@ diesel::table! { timestamp -> Int8, bundle -> Bytea, hash_chain -> Text, + #[max_length = 255] + assignment_id -> Nullable, } } @@ -31,6 +33,10 @@ diesel::table! { process_id -> Varchar, process_data -> Jsonb, bundle -> Bytea, + epoch -> Nullable, + nonce -> Nullable, + hash_chain -> Nullable, + timestamp -> Nullable, } } @@ -39,6 +45,9 @@ diesel::table! { row_id -> Int4, url -> Varchar, process_count -> Int4, + no_route -> Nullable, + wallets_to_route -> Nullable, + wallets_only -> Nullable, } } diff --git a/servers/su/su b/servers/su/su index 31beb82c7..65b1a9b60 100644 Binary files a/servers/su/su and b/servers/su/su differ