Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vince juliano/sur routing 1070 #1073

Merged
merged 3 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE schedulers
DROP COLUMN IF EXISTS wallets_to_route,
DROP COLUMN IF EXISTS wallets_only;
3 changes: 3 additions & 0 deletions servers/su/migrations/2024-11-26-200327_wallet_routing/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE schedulers
ADD COLUMN wallets_to_route TEXT NULL,
ADD COLUMN wallets_only BOOLEAN NULL;
2 changes: 2 additions & 0 deletions servers/su/src/domain/clients/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ table! {
url -> Varchar,
process_count -> Int4,
no_route -> Nullable<Bool>,
wallets_to_route -> Nullable<Text>,
wallets_only -> Nullable<Bool>,
}
}

Expand Down
14 changes: 14 additions & 0 deletions servers/su/src/domain/clients/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,8 @@ impl RouterDataStore for StoreClient {
url: &scheduler.url,
process_count: &scheduler.process_count,
no_route: scheduler.no_route.as_ref(),
wallets_to_route: scheduler.wallets_to_route.as_deref(),
wallets_only: scheduler.wallets_only.as_ref(),
};

match diesel::insert_into(schedulers)
Expand All @@ -1177,6 +1179,8 @@ impl RouterDataStore for StoreClient {
process_count.eq(scheduler.process_count),
url.eq(&scheduler.url),
no_route.eq(&scheduler.no_route),
wallets_to_route.eq(&scheduler.wallets_to_route),
wallets_only.eq(&scheduler.wallets_only)
))
.execute(conn)
{
Expand All @@ -1201,6 +1205,8 @@ impl RouterDataStore for StoreClient {
url: db_scheduler.url,
process_count: db_scheduler.process_count,
no_route: db_scheduler.no_route,
wallets_to_route: db_scheduler.wallets_to_route,
wallets_only: db_scheduler.wallets_only,
};
Ok(scheduler)
}
Expand All @@ -1223,6 +1229,8 @@ impl RouterDataStore for StoreClient {
url: db_scheduler.url,
process_count: db_scheduler.process_count,
no_route: db_scheduler.no_route,
wallets_to_route: db_scheduler.wallets_to_route,
wallets_only: db_scheduler.wallets_only,
};
Ok(scheduler)
}
Expand All @@ -1244,6 +1252,8 @@ impl RouterDataStore for StoreClient {
url: db_scheduler.url,
process_count: db_scheduler.process_count,
no_route: db_scheduler.no_route,
wallets_to_route: db_scheduler.wallets_to_route,
wallets_only: db_scheduler.wallets_only,
})
.collect();
Ok(schedulers_out)
Expand Down Expand Up @@ -1331,6 +1341,8 @@ pub struct DbScheduler {
pub url: String,
pub process_count: i32,
pub no_route: Option<bool>,
pub wallets_to_route: Option<String>,
pub wallets_only: Option<bool>,
}

#[derive(Insertable)]
Expand All @@ -1339,6 +1351,8 @@ pub struct NewScheduler<'a> {
pub url: &'a str,
pub process_count: &'a i32,
pub no_route: Option<&'a bool>,
pub wallets_to_route: Option<&'a str>,
pub wallets_only: Option<&'a bool>,
}

#[derive(Queryable, Selectable)]
Expand Down
79 changes: 75 additions & 4 deletions servers/su/src/domain/core/router.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use super::builder::Builder;
use crate::domain::core::dal::StoreErrorType;
use crate::domain::flows::Deps;
use serde::Deserialize;
use std::{fmt::Debug, sync::Arc};
use tokio::{fs::File, io::AsyncReadExt};
use sha2::{Digest, Sha256};

use super::builder::Builder;
use crate::domain::core::dal::StoreErrorType;
use crate::domain::flows::Deps;

/*
The code in this file only runs on a su that is
Expand All @@ -20,6 +22,8 @@ pub struct Scheduler {
pub url: String,
pub process_count: i32,
pub no_route: Option<bool>,
pub wallets_to_route: Option<String>,
pub wallets_only: Option<bool>,
}

pub struct ProcessScheduler {
Expand All @@ -32,6 +36,15 @@ pub struct ProcessScheduler {
struct SchedulerEntry {
url: String,
no_route: Option<bool>,
wallets_to_route: Option<String>,
wallets_only: Option<bool>,
}

pub fn hash(data: &[u8]) -> Vec<u8> {
let mut hasher = Sha256::new();
hasher.update(data);
let result = hasher.finalize();
result.to_vec()
}

/*
Expand Down Expand Up @@ -64,6 +77,8 @@ pub async fn init_schedulers(deps: Arc<Deps>) -> Result<String, String> {
url: entry.url.clone(),
process_count: 0,
no_route: entry.no_route,
wallets_to_route: entry.wallets_to_route.clone(),
wallets_only: entry.wallets_only
};
deps.router_data_store.save_scheduler(&scheduler)?;
deps.logger
Expand All @@ -76,6 +91,8 @@ pub async fn init_schedulers(deps: Arc<Deps>) -> Result<String, String> {
*/
let mut sched = deps.router_data_store.get_scheduler_by_url(&entry.url)?;
sched.no_route = entry.no_route;
sched.wallets_to_route = entry.wallets_to_route.clone();
sched.wallets_only = entry.wallets_only;
deps.router_data_store.update_scheduler(&sched)?;
}

Expand Down Expand Up @@ -163,6 +180,14 @@ pub async fn redirect_data_item(
.iter()
.find(|tag| tag.name == "Type")
.ok_or("Cannot redirect data item, invalid Type Tag")?;
let owner = item.owner().clone();

let owner_bytes = match base64_url::decode(&owner) {
Ok(h) => h,
Err(_) => return Err("Failed to parse owner".to_string())
};
let address_hash = hash(&owner_bytes);
let owner_address = base64_url::encode(&address_hash);

match type_tag.value.as_str() {
"Process" => {
Expand All @@ -177,6 +202,52 @@ pub async fn redirect_data_item(
.filter(|scheduler| scheduler.no_route.unwrap_or(false) == false)
.collect::<Vec<_>>();

/*
This logic is added for routing wallet addresses to
specific schedulers. It will find the first scheduler
with a wallet matching the owner and route the new spawn
there.
*/
for scheduler in schedulers.iter_mut() {
match &scheduler.wallets_to_route {
Some(w) => {
let wallets: Vec<String> = w.split(',')
.map(|s| s.trim().to_string())
.collect();

for wallet in wallets {
if owner_address == wallet {
scheduler.process_count += 1;
deps.router_data_store.update_scheduler(scheduler)?;

let scheduler_row_id = if let Some(m_scheduler_row_id) = scheduler.row_id {
m_scheduler_row_id
} else {
/*
this should be unreachable but return an error
just in case so the router doesn't crash
*/
return Err("Missing id on scheduler".to_string());
};

let process_scheduler = ProcessScheduler {
row_id: None,
scheduler_row_id,
process_id: id,
};
deps.router_data_store
.save_process_scheduler(&process_scheduler)?;

return Ok(Some(scheduler.url.clone()));
}
}
}
None => {}
}
}

schedulers.retain(|scheduler| scheduler.wallets_only.unwrap_or(false) == false);

if let Some(min_scheduler) = schedulers.iter_mut().min_by_key(|s| s.process_count) {
min_scheduler.process_count += 1;
deps.router_data_store.update_scheduler(min_scheduler)?;
Expand All @@ -193,7 +264,7 @@ pub async fn redirect_data_item(

let process_scheduler = ProcessScheduler {
row_id: None,
scheduler_row_id: scheduler_row_id,
scheduler_row_id,
process_id: id,
};
deps.router_data_store
Expand Down
9 changes: 9 additions & 0 deletions servers/su/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ diesel::table! {
timestamp -> Int8,
bundle -> Bytea,
hash_chain -> Text,
#[max_length = 255]
assignment_id -> Nullable<Varchar>,
}
}

Expand All @@ -31,6 +33,10 @@ diesel::table! {
process_id -> Varchar,
process_data -> Jsonb,
bundle -> Bytea,
epoch -> Nullable<Int4>,
nonce -> Nullable<Int4>,
hash_chain -> Nullable<Text>,
timestamp -> Nullable<Int8>,
}
}

Expand All @@ -39,6 +45,9 @@ diesel::table! {
row_id -> Int4,
url -> Varchar,
process_count -> Int4,
no_route -> Nullable<Bool>,
wallets_to_route -> Nullable<Text>,
wallets_only -> Nullable<Bool>,
}
}

Expand Down
Binary file modified servers/su/su
Binary file not shown.