From 29b2b5f66f058bec3b8374fec6eb548b1f2ce472 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C3=ABl=20Catanzariti?= Date: Fri, 29 Dec 2023 01:48:46 +0100 Subject: [PATCH] fix infinite loop in cluster mode when loosing connection to a node --- src/commands/cluster_commands.rs | 3 ++- src/network/cluster_connection.rs | 9 ++++++++- src/tests/cluster.rs | 29 ++++++++++++++++++++++------- 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/src/commands/cluster_commands.rs b/src/commands/cluster_commands.rs index 442ee04..ed8b7dd 100644 --- a/src/commands/cluster_commands.rs +++ b/src/commands/cluster_commands.rs @@ -761,11 +761,12 @@ pub struct ClusterNodeResult { } /// Cluster health status for the [`cluster_shards`](ClusterCommands::cluster_shards) command. -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum ClusterHealthStatus { Online, Failed, + Fail, Loading, } diff --git a/src/network/cluster_connection.rs b/src/network/cluster_connection.rs index ad25e5e..0ebd8fd 100644 --- a/src/network/cluster_connection.rs +++ b/src/network/cluster_connection.rs @@ -509,6 +509,10 @@ impl ClusterConnection { let read_futures = self.nodes.iter_mut().map(|n| n.connection.read().boxed()); let (result, node_idx, _) = future::select_all(read_futures).await; + if result.is_none() { + return None; + } + if let Some(Ok(bytes)) = &result { if bytes.is_push_message() { return result; @@ -940,7 +944,10 @@ impl ClusterConnection { let mut slot_ranges = Vec::::new(); for shard_info in shard_info_list.into_iter() { - let Some(master_info) = shard_info.nodes.into_iter().find(|n| n.role == "master") + let Some(master_info) = shard_info + .nodes + .into_iter() + .find(|n| n.role == "master" && n.health == ClusterHealthStatus::Online) else { return Err(Error::Client("Cluster misconfiguration".to_owned())); }; diff --git a/src/tests/cluster.rs b/src/tests/cluster.rs index fa80005..21698c7 100644 --- a/src/tests/cluster.rs +++ b/src/tests/cluster.rs @@ -6,17 +6,14 @@ use crate::{ ClusterShardResult, ConnectionCommands, FlushingMode, GenericCommands, HelloOptions, MigrateOptions, ScriptingCommands, ServerCommands, StringCommands, }, - network::{Version, ClusterConnection}, + network::{ClusterConnection, Version}, sleep, spawn, tests::{get_cluster_test_client, get_cluster_test_client_with_command_timeout}, Error, RedisError, RedisErrorKind, Result, }; -use serial_test::serial; -use std::{ - collections::HashSet, - future::IntoFuture, -}; use futures_util::try_join; +use serial_test::serial; +use std::{collections::HashSet, future::IntoFuture, time::Duration}; #[cfg_attr(feature = "tokio-runtime", tokio::test)] #[cfg_attr(feature = "async-std-runtime", async_std::test)] @@ -464,4 +461,22 @@ async fn commands_to_different_nodes() -> Result<()> { assert_eq!("1", val1); assert_eq!("2", val2); Ok(()) -} \ No newline at end of file +} + +/// test reconnection to replica when master is stopped +/// master stop is not automated but must be done manually +#[cfg_attr(feature = "tokio-runtime", tokio::test)] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +#[serial] +#[ignore] +async fn get_loop() -> Result<()> { + let client = get_cluster_test_client().await?; + client.set("key", "value").await?; + + for _ in 0..1000 { + let _value: Result = client.get("key").await; + sleep(Duration::from_secs(1)).await; + } + + Ok(()) +}