Skip to content

Commit

Permalink
feat: use cancellation token to stop the broker
Browse files Browse the repository at this point in the history
  • Loading branch information
swanandx authored and baxterjo committed Dec 5, 2024
1 parent 2377e4e commit bb9a826
Showing 1 changed file with 87 additions and 23 deletions.
110 changes: 87 additions & 23 deletions rumqttd/src/server/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use flume::{RecvError, SendError, Sender};
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::{Arc, Mutex};
use tokio_util::sync::CancellationToken;
use tracing::{error, field, info, warn, Instrument};
use uuid::Uuid;

Expand All @@ -40,7 +41,7 @@ use crate::{Config, ConnectionId, ServerSettings};

use tokio::net::{TcpListener, TcpStream};
use tokio::time::error::Elapsed;
use tokio::{task, time};
use tokio::{select, task, time};

#[derive(Debug, thiserror::Error)]
#[error("Acceptor error")]
Expand Down Expand Up @@ -69,6 +70,16 @@ pub struct Broker {
router_tx: Sender<(ConnectionId, Event)>,
}

pub struct BrokerHandle {
token: CancellationToken,
}

impl BrokerHandle {
pub fn stop(self) {
self.token.cancel();
}
}

impl Broker {
pub fn new(config: Config) -> Broker {
let config = Arc::new(config);
Expand Down Expand Up @@ -156,7 +167,7 @@ impl Broker {
}

#[tracing::instrument(skip(self))]
pub fn start(&mut self) -> Result<(), Error> {
pub fn start(&mut self) -> Result<BrokerHandle, Error> {
if self.config.v4.is_none()
&& self.config.v5.is_none()
&& (cfg!(not(feature = "websocket")) || self.config.ws.is_none())
Expand All @@ -172,15 +183,23 @@ impl Broker {
// so we collect handles for all of the spawned servers
let mut server_thread_handles = Vec::new();

let cancel_token = CancellationToken::new();

if let Some(metrics_config) = self.config.metrics.clone() {
let timer_thread = thread::Builder::new().name("timer".to_owned());
let router_tx = self.router_tx.clone();
let token = cancel_token.clone();
timer_thread.spawn(move || {
let mut runtime = tokio::runtime::Builder::new_current_thread();
let runtime = runtime.enable_all().build().unwrap();

runtime.block_on(async move {
timer::start(metrics_config, router_tx).await;
select! {
_ = timer::start(metrics_config, router_tx) => {}
_ = token.cancelled() => {
info!("shutting down timer");
}
}
});
})?;
}
Expand All @@ -189,14 +208,21 @@ impl Broker {
if let Some(bridge_config) = self.config.bridge.clone() {
let bridge_thread = thread::Builder::new().name(bridge_config.name.clone());
let router_tx = self.router_tx.clone();
let token = cancel_token.clone();
bridge_thread.spawn(move || {
let mut runtime = tokio::runtime::Builder::new_current_thread();
let runtime = runtime.enable_all().build().unwrap();

runtime.block_on(async move {
if let Err(e) = bridge::start(bridge_config, router_tx, V4).await {
error!(error=?e, "Bridge Link error");
};
select! {
val = bridge::start(bridge_config, router_tx, V4) => {
if let Err(e) = val { error!(error=?e, "Bridge Link error") };
}
_ = token.cancelled() => {
info!("shutting down bridge");
}

}
});
})?;
}
Expand All @@ -206,13 +232,20 @@ impl Broker {
for (_, config) in v4_config.clone() {
let server_thread = thread::Builder::new().name(config.name.clone());
let mut server = Server::new(config, self.router_tx.clone(), V4);
let token = cancel_token.clone();
let handle = server_thread.spawn(move || {
let mut runtime = tokio::runtime::Builder::new_current_thread();
let runtime = runtime.enable_all().build().unwrap();

runtime.block_on(async {
if let Err(e) = server.start(LinkType::Remote).await {
error!(error=?e, "Server error - V4");
select! {
val = server.start(LinkType::Remote) => {
if let Err(e) = val { error!(error=?e, "Server error - V4") };
}
_ = token.cancelled() => {
info!("shutting down V4 server");
}

}
});
})?;
Expand All @@ -224,13 +257,20 @@ impl Broker {
for (_, config) in v5_config.clone() {
let server_thread = thread::Builder::new().name(config.name.clone());
let mut server = Server::new(config, self.router_tx.clone(), V5);
let token = cancel_token.clone();
let handle = server_thread.spawn(move || {
let mut runtime = tokio::runtime::Builder::new_current_thread();
let runtime = runtime.enable_all().build().unwrap();

runtime.block_on(async {
if let Err(e) = server.start(LinkType::Remote).await {
error!(error=?e, "Server error - V5");
select! {
val = server.start(LinkType::Remote) => {
if let Err(e) = val { error!(error=?e, "Server error - V5") };
}
_ = token.cancelled() => {
info!("shutting down V5 server");
}

}
});
})?;
Expand All @@ -249,13 +289,20 @@ impl Broker {
let server_thread = thread::Builder::new().name(config.name.clone());
//TODO: Add support for V5 procotol with websockets. Registered in config or on ServerSettings
let mut server = Server::new(config, self.router_tx.clone(), V4);
let token = cancel_token.clone();
let handle = server_thread.spawn(move || {
let mut runtime = tokio::runtime::Builder::new_current_thread();
let runtime = runtime.enable_all().build().unwrap();

runtime.block_on(async {
if let Err(e) = server.start(LinkType::Websocket).await {
error!(error=?e, "Server error - WS");
select! {
val = server.start(LinkType::Websocket) => {
if let Err(e) = val { error!(error=?e, "Server error - WS") };
}
_ = token.cancelled() => {
info!("shutting down WS server");
}

}
});
})?;
Expand All @@ -280,6 +327,7 @@ impl Broker {
};
let metrics_thread = thread::Builder::new().name("Metrics".to_owned());
let meter_link = self.meters().unwrap();
let token = cancel_token.clone();
metrics_thread.spawn(move || {
let builder = PrometheusBuilder::new().with_http_listener(addr);
builder.install().unwrap();
Expand All @@ -301,6 +349,11 @@ impl Broker {
}
}

if token.is_cancelled() {
info!("shutting down prometheus");
break;
}

std::thread::sleep(Duration::from_secs(timeout));
}
})?;
Expand All @@ -311,23 +364,34 @@ impl Broker {

let console_link = Arc::new(console_link);
let console_thread = thread::Builder::new().name("Console".to_string());
let token = cancel_token.clone();
console_thread.spawn(move || {
let mut runtime = tokio::runtime::Builder::new_current_thread();
let runtime = runtime.enable_all().build().unwrap();
runtime.block_on(console::start(console_link));

runtime.block_on(async move {
select! {
_ = console::start(console_link) => {}
_ = token.cancelled() => {
info!("shutting down console");
}
}
});
})?;
}

// in ideal case, where server doesn't crash, join() will never resolve
// we still try to join threads so that we don't return from function
// unless everything crashes.
server_thread_handles.into_iter().for_each(|handle| {
// join() might panic in case the thread panics
// we just ignore it
let _ = handle.join();
});

Ok(())
// // in ideal case, where server doesn't crash, join() will never resolve
// // we still try to join threads so that we don't return from function
// // unless everything crashes.
// server_thread_handles.into_iter().for_each(|handle| {
// // join() might panic in case the thread panics
// // we just ignore it
// let _ = handle.join();
// });

Ok(BrokerHandle {
token: cancel_token,
})
}
}

Expand Down

0 comments on commit bb9a826

Please sign in to comment.