Skip to content

Commit

Permalink
feat(sur): add no_route ability to router #895
Browse files Browse the repository at this point in the history
  • Loading branch information
VinceJuliano committed Aug 8, 2024
1 parent 56494da commit 5f5f951
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE schedulers DROP COLUMN no_route;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE schedulers ADD COLUMN no_route BOOLEAN DEFAULT FALSE;
1 change: 1 addition & 0 deletions servers/su/src/domain/clients/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ table! {
row_id -> Int4,
url -> Varchar,
process_count -> Int4,
no_route -> Nullable<Bool>,
}
}

Expand Down
7 changes: 7 additions & 0 deletions servers/su/src/domain/clients/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ impl DataStore for StoreClient {
let new_scheduler = NewScheduler {
url: &scheduler.url,
process_count: &scheduler.process_count,
no_route: scheduler.no_route.as_ref(),
};

match diesel::insert_into(schedulers)
Expand All @@ -864,6 +865,7 @@ impl DataStore for StoreClient {
.set((
process_count.eq(scheduler.process_count),
url.eq(&scheduler.url),
no_route.eq(&scheduler.no_route)
))
.execute(conn)
{
Expand All @@ -887,6 +889,7 @@ impl DataStore for StoreClient {
row_id: Some(db_scheduler.row_id),
url: db_scheduler.url,
process_count: db_scheduler.process_count,
no_route: db_scheduler.no_route,
};
Ok(scheduler)
}
Expand All @@ -908,6 +911,7 @@ impl DataStore for StoreClient {
row_id: Some(db_scheduler.row_id),
url: db_scheduler.url,
process_count: db_scheduler.process_count,
no_route: db_scheduler.no_route,
};
Ok(scheduler)
}
Expand All @@ -928,6 +932,7 @@ impl DataStore for StoreClient {
row_id: Some(db_scheduler.row_id),
url: db_scheduler.url,
process_count: db_scheduler.process_count,
no_route: db_scheduler.no_route
})
.collect();
Ok(schedulers_out)
Expand Down Expand Up @@ -1006,13 +1011,15 @@ pub struct DbScheduler {
pub row_id: i32,
pub url: String,
pub process_count: i32,
pub no_route: Option<bool>,
}

#[derive(Insertable)]
#[diesel(table_name = super::schema::schedulers)]
pub struct NewScheduler<'a> {
pub url: &'a str,
pub process_count: &'a i32,
pub no_route: Option<&'a bool>,
}

#[derive(Queryable, Selectable)]
Expand Down
20 changes: 19 additions & 1 deletion servers/su/src/domain/core/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ use tokio::{fs::File, io::AsyncReadExt};
a file. It is a basic load balancer implementation
*/

#[derive(Debug)]
pub struct Scheduler {
pub row_id: Option<i32>,
pub url: String,
pub process_count: i32,
pub no_route: Option<bool>,
}

pub struct ProcessScheduler {
Expand All @@ -28,6 +30,7 @@ pub struct ProcessScheduler {
#[derive(Deserialize, Debug)]
struct SchedulerEntry {
url: String,
no_route: Option<bool>
}

/*
Expand Down Expand Up @@ -57,11 +60,20 @@ pub async fn init_schedulers(deps: Arc<Deps>) -> Result<String, String> {
row_id: None,
url: entry.url.clone(),
process_count: 0,
no_route: entry.no_route,
};
deps.data_store.save_scheduler(&scheduler)?;
deps.logger
.log(format!("saved new scheduler: {}", entry.url));
}

/*
If we no longer what to route any process to this su
we can set no_route to true.
*/
let mut sched = deps.data_store.get_scheduler_by_url(&entry.url)?;
sched.no_route = entry.no_route;
deps.data_store.update_scheduler(&sched)?;
}

Ok("schedulers initialized".to_string())
Expand Down Expand Up @@ -154,7 +166,13 @@ pub async fn redirect_data_item(
new process so we need to generate a
process_schedulers record and return the url
*/
let mut schedulers = deps.data_store.get_all_schedulers()?;
let mut schedulers = deps
.data_store
.get_all_schedulers()?
.into_iter()
.filter(|scheduler| scheduler.no_route.unwrap_or(false) == false)
.collect::<Vec<_>>();

if let Some(min_scheduler) = schedulers.iter_mut().min_by_key(|s| s.process_count) {
min_scheduler.process_count += 1;
deps.data_store.update_scheduler(min_scheduler)?;
Expand Down
2 changes: 1 addition & 1 deletion servers/su/src/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub async fn init_deps(mode: Option<String>) -> (Arc<Deps>, Arc<PromMetrics>) {

let config = Arc::new(AoConfig::new(mode.clone()).expect("Failed to read configuration"));

if config.use_disk {
if config.use_disk && config.mode != "router" {
let logger_clone = logger.clone();
/*
sync_bytestore is a blocking routine so we must
Expand Down
Binary file modified servers/su/su
Binary file not shown.

0 comments on commit 5f5f951

Please sign in to comment.