Skip to content

Commit

Permalink
- bridge.rs:
Browse files Browse the repository at this point in the history
  - Add a `shutdown_rx` parameter to the `start` function to receive shutdown signals.
  - Handle the shutdown signal in the main loop and break out cleanly.

- console.rs:
  - Add a `shutdown_rx` parameter to the `start` function to receive shutdown signals.
  - Use `axum::serve` with `with_graceful_shutdown` to handle shutdown signals.

- timer.rs:
  - Add a `shutdown_rx` parameter to the `start` function to receive shutdown signals.
  - Handle the shutdown signal in the main loop and break out cleanly.

- broker.rs:
  - Add a `shutdown_rx` and `shutdown_tx` to the `Broker` struct for shutdown handling.
  - Pass the `shutdown_rx` to various components like bridge, console, timer, and servers.
  - Handle shutdown signals in the server's main loop and break out cleanly.
  - Add a `ShutdownHandler` and `ShutdownDropGuard` for managing shutdown signals.

These changes allow for graceful shutdown of the rumqtt broker and its components, ensuring clean termination and resource cleanup when a shutdown signal is received.
  • Loading branch information
silvestrpredko committed Jun 13, 2024
1 parent 819d4e3 commit 74ce72d
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 31 deletions.
6 changes: 6 additions & 0 deletions rumqttd/src/link/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{io, net::AddrParseError, time::Duration};

use tokio::{
net::TcpStream,
sync::watch,
time::{sleep, sleep_until, Instant},
};

Expand Down Expand Up @@ -48,6 +49,7 @@ pub async fn start<P>(
config: BridgeConfig,
router_tx: Sender<(ConnectionId, Event)>,
protocol: P,
mut shutdown_rx: watch::Receiver<()>,
) -> Result<(), BridgeError>
where
P: Protocol + Clone + Send + 'static,
Expand Down Expand Up @@ -154,6 +156,10 @@ where
// resetting timeout because tokio::select! consumes the old timeout future
timeout = sleep_until(ping_time + Duration::from_secs(config.ping_delay));
}
_ = shutdown_rx.changed() => {
debug!("Shutting down bridge");
break 'outer Ok(());
}
}
}
}
Expand Down
14 changes: 10 additions & 4 deletions rumqttd/src/link/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use axum::Json;
use axum::{routing::get, Router};
use flume::Sender;
use std::sync::Arc;
use tokio::net::TcpListener;
use tracing::info;
use tokio::{net::TcpListener, sync::watch};
use tracing::{debug, info};

#[derive(Debug)]
pub struct ConsoleLink {
Expand Down Expand Up @@ -39,7 +39,7 @@ impl ConsoleLink {
}

#[tracing::instrument]
pub async fn start(console: Arc<ConsoleLink>) {
pub async fn start(console: Arc<ConsoleLink>, mut shutdown_rx: watch::Receiver<()>) {
let listener = TcpListener::bind(console.config.listen.clone())
.await
.unwrap();
Expand All @@ -56,7 +56,13 @@ pub async fn start(console: Arc<ConsoleLink>) {
.route("/logs", post(logs))
.with_state(console);

axum::serve(listener, app).await.unwrap();
axum::serve(listener, app)
.with_graceful_shutdown(async move {
debug!("Shutting down console");
let _ = shutdown_rx.changed().await;
})
.await
.unwrap();
}

async fn root(State(console): State<Arc<ConsoleLink>>) -> impl IntoResponse {
Expand Down
8 changes: 7 additions & 1 deletion rumqttd/src/link/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use crate::{router::Event, MetricType};
use crate::{ConnectionId, MetricSettings};
use flume::{SendError, Sender};
use tokio::select;
use tracing::error;
use tokio::sync::watch;
use tracing::{debug, error};

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand All @@ -18,6 +19,7 @@ pub enum Error {
pub async fn start(
config: HashMap<MetricType, MetricSettings>,
router_tx: Sender<(ConnectionId, Event)>,
mut shutdown_rx: watch::Receiver<()>,
) {
let span = tracing::info_span!("metrics_timer");
let _guard = span.enter();
Expand All @@ -42,6 +44,10 @@ pub async fn start(
error!("Failed to push alerts: {e}");
}
}
_ = shutdown_rx.changed() => {
debug!("Shutting down metrics timer");
break;
}
}
}
}
Loading

0 comments on commit 74ce72d

Please sign in to comment.