From 2bc2b3e3d0ea7c606b4c33bf0f693526f6b03877 Mon Sep 17 00:00:00 2001 From: parmesant Date: Wed, 30 Oct 2024 09:08:18 +0530 Subject: [PATCH] fix: Windows failing build and querying (#978) fixes #824 --- server/src/handlers/http/health_check.rs | 59 +++++++++++++--------- server/src/query/stream_schema_provider.rs | 24 ++++++--- 2 files changed, 53 insertions(+), 30 deletions(-) diff --git a/server/src/handlers/http/health_check.rs b/server/src/handlers/http/health_check.rs index 957139657..af7de7762 100644 --- a/server/src/handlers/http/health_check.rs +++ b/server/src/handlers/http/health_check.rs @@ -25,7 +25,8 @@ use actix_web::middleware::Next; use actix_web::{Error, HttpResponse}; use lazy_static::lazy_static; use std::sync::Arc; -use tokio::signal::unix::{signal, SignalKind}; +use tokio::signal::ctrl_c; + use tokio::sync::{oneshot, Mutex}; // Create a global variable to store signal status @@ -52,35 +53,45 @@ pub async fn check_shutdown_middleware( } pub async fn handle_signals(shutdown_signal: Arc>>>) { - let mut sigterm = - signal(SignalKind::terminate()).expect("Failed to set up SIGTERM signal handler"); - log::info!("Signal handler task started"); - - // Block until SIGTERM is received - match sigterm.recv().await { - Some(_) => { - log::info!("Received SIGTERM signal at Readiness Probe Handler"); - - // Set the shutdown flag to true - let mut shutdown_flag = SIGNAL_RECEIVED.lock().await; - *shutdown_flag = true; - - // Sync to local - crate::event::STREAM_WRITERS.unset_all(); - - // Trigger graceful shutdown - if let Some(shutdown_sender) = shutdown_signal.lock().await.take() { - let _ = shutdown_sender.send(()); + #[cfg(windows)] + { + tokio::select! { + _ = ctrl_c() => { + log::info!("Received SIGINT signal at Readiness Probe Handler"); + shutdown(shutdown_signal).await; } } - None => { - log::info!("Signal handler received None, indicating an error or end of stream"); + } + #[cfg(unix)] + { + use tokio::signal::unix::{signal, SignalKind}; + let mut sigterm = signal(SignalKind::terminate()).unwrap(); + tokio::select! { + _ = ctrl_c() => { + log::info!("Received SIGINT signal at Readiness Probe Handler"); + shutdown(shutdown_signal).await; + }, + _ = sigterm.recv() => { + log::info!("Received SIGTERM signal at Readiness Probe Handler"); + shutdown(shutdown_signal).await; + } } } - - log::info!("Signal handler task completed"); } +async fn shutdown(shutdown_signal: Arc>>>) { + // Set the shutdown flag to true + let mut shutdown_flag = SIGNAL_RECEIVED.lock().await; + *shutdown_flag = true; + + // Sync to local + crate::event::STREAM_WRITERS.unset_all(); + + // Trigger graceful shutdown + if let Some(shutdown_sender) = shutdown_signal.lock().await.take() { + let _ = shutdown_sender.send(()); + } +} pub async fn readiness() -> HttpResponse { // Check the object store connection if CONFIG.storage().get_object_store().check().await.is_ok() { diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 86a848483..6f1ceecf4 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -237,12 +237,24 @@ fn partitioned_files( // object_store::path::Path doesn't automatically deal with Windows path separators // to do that, we are using from_absolute_path() which takes into consideration the underlying filesystem // before sending the file path to PartitionedFile - let pf = if CONFIG.storage_name.eq("drive") { - let file_path = object_store::path::Path::from_absolute_path(file_path).unwrap(); - PartitionedFile::new(file_path, file.file_size) - } else { - PartitionedFile::new(file_path, file.file_size) - }; + // the github issue- https://github.com/parseablehq/parseable/issues/824 + // For some reason, the `from_absolute_path()` doesn't work for macos, hence the ugly solution + // TODO: figure out an elegant solution to this + let pf; + + #[cfg(unix)] + { + pf = PartitionedFile::new(file_path, file.file_size); + } + #[cfg(windows)] + { + pf = if CONFIG.storage_name.eq("drive") { + let file_path = object_store::path::Path::from_absolute_path(file_path).unwrap(); + PartitionedFile::new(file_path, file.file_size) + } else { + PartitionedFile::new(file_path, file.file_size) + }; + } partitioned_files[index].push(pf); columns.into_iter().for_each(|col| {