diff --git a/compiler/base/orchestrator/src/coordinator.rs b/compiler/base/orchestrator/src/coordinator.rs index af96aadc..360b849c 100644 --- a/compiler/base/orchestrator/src/coordinator.rs +++ b/compiler/base/orchestrator/src/coordinator.rs @@ -6,12 +6,12 @@ use futures::{ use serde::Deserialize; use snafu::prelude::*; use std::{ - collections::HashMap, + collections::{BTreeSet, HashMap}, fmt, mem, ops, process::Stdio, sync::{ atomic::{AtomicU64, Ordering}, - Arc, + Arc, LazyLock, Mutex, }, time::Duration, }; @@ -2532,11 +2532,23 @@ pub enum CommanderError { WorkerOperationFailed { source: SerializedError2 }, } +pub static TRACKED_CONTAINERS: LazyLock>>> = + LazyLock::new(Default::default); + #[derive(Debug)] pub struct TerminateContainer(Option<(String, Command)>); impl TerminateContainer { pub fn new(name: String, command: Command) -> Self { + let was_inserted = TRACKED_CONTAINERS + .lock() + .unwrap_or_else(|e| e.into_inner()) + .insert(name.clone().into()); + + if !was_inserted { + error!(%name, "This container was already tracked; duplicates are bad logic"); + } + Self(Some((name, command))) } @@ -2548,6 +2560,7 @@ impl TerminateContainer { use terminate_container_error::*; if let Some((name, mut kill_child)) = self.0.take() { + Self::stop_tracking(&name); let o = kill_child .output() .await @@ -2558,6 +2571,16 @@ impl TerminateContainer { Ok(()) } + fn stop_tracking(name: &str) { + let was_tracked = TRACKED_CONTAINERS + .lock() + .unwrap_or_else(|e| e.into_inner()) + .remove(name); + if !was_tracked { + error!(%name, "Container was not in the tracking set"); + } + } + fn report_failure(name: String, s: std::process::Output) { // We generally don't care if the command itself succeeds or // not; the container may already be dead! However, let's log @@ -2570,6 +2593,9 @@ impl TerminateContainer { let stdout = String::from_utf8_lossy(&s.stdout); let stderr = String::from_utf8_lossy(&s.stderr); + let stdout = stdout.trim(); + let stderr = stderr.trim(); + error!(?code, %stdout, %stderr, %name, "Killing the container failed"); } } @@ -2578,6 +2604,7 @@ impl TerminateContainer { impl Drop for TerminateContainer { fn drop(&mut self) { if let Some((name, mut kill_child)) = self.0.take() { + Self::stop_tracking(&name); match kill_child.as_std_mut().output() { Ok(o) => Self::report_failure(name, o), Err(e) => error!("Unable to kill container {name} while dropping: {e}"), diff --git a/ui/src/server_axum.rs b/ui/src/server_axum.rs index ac235ca5..bbe2cad8 100644 --- a/ui/src/server_axum.rs +++ b/ui/src/server_axum.rs @@ -26,7 +26,9 @@ use axum_extra::{ TypedHeader, }; use futures::{future::BoxFuture, FutureExt}; -use orchestrator::coordinator::{self, CoordinatorFactory, DockerBackend, Versions}; +use orchestrator::coordinator::{ + self, CoordinatorFactory, DockerBackend, Versions, TRACKED_CONTAINERS, +}; use snafu::prelude::*; use std::{ convert::TryInto, @@ -100,7 +102,11 @@ pub(crate) async fn serve(config: Config) { .route("/metrics", get(metrics)) .route("/websocket", get(websocket)) .route("/nowebsocket", post(nowebsocket)) - .route("/whynowebsocket", get(whynowebsocket)) + .route("/internal/debug/whynowebsocket", get(whynowebsocket)) + .route( + "/internal/debug/tracked-containers", + get(tracked_containers), + ) .layer(Extension(factory)) .layer(Extension(db_handle)) .layer(Extension(Arc::new(SandboxCache::default()))) @@ -680,6 +686,16 @@ async fn whynowebsocket() -> String { format!("{:#?}", WS_ERRORS.lock().unwrap_or_else(|e| e.into_inner())) } +async fn tracked_containers() -> String { + let tracked_containers = TRACKED_CONTAINERS + .lock() + .unwrap_or_else(|e| e.into_inner()) + .clone(); + tracked_containers + .iter() + .fold(String::new(), |a, s| a + s + "\n") +} + #[derive(Debug)] struct MetricsAuthorization;