Skip to content

Commit

Permalink
fix redis lock_key missing the name (#1986)
Browse files Browse the repository at this point in the history
  • Loading branch information
phiSgr authored Jan 7, 2025
1 parent a873fcd commit ac20acc
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 15 deletions.
1 change: 0 additions & 1 deletion apps/framework-cli/src/cli/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ impl FileWatcher {

let mut syncing_process_registry = syncing_process_registry;
let mut project_registry = project_registries;
let redis_client = redis_client.clone();

tokio::spawn(async move {
watch(
Expand Down
34 changes: 20 additions & 14 deletions apps/framework-cli/src/infrastructure/redis/redis_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl RedisClient {

pub async fn register_lock(&mut self, name: &str, ttl: i64) -> Result<()> {
info!("<RedisClient> Registering lock {}", name);
let lock_key = self.service_prefix(&["lock"]);
let lock_key = self.service_prefix(&[name, "lock"]);
let lock = RedisLock { key: lock_key, ttl };
self.locks.insert(name.to_string(), lock);
Ok(())
Expand Down Expand Up @@ -222,7 +222,7 @@ impl RedisClient {
}
}

pub async fn release_lock(&self, name: &str) -> Result<()> {
pub async fn release_lock(&mut self, name: &str) -> Result<()> {
info!("<RedisClient> Releasing lock {}", name);
if let Some(lock) = self.locks.get(name) {
let script = Script::new(
Expand All @@ -241,7 +241,7 @@ impl RedisClient {
.invoke_async(&mut *self.connection.lock().await)
.await
.context("Failed to release lock")?;

self.locks.remove(name);
Ok(())
} else {
info!("<RedisClient> Unable to release {} lock", name);
Expand Down Expand Up @@ -537,6 +537,10 @@ impl RedisClient {
}

impl Clone for RedisClient {
// FIXME: some state (the stateful connection) is shared,
// some state (the locks map) is not,
// some state (the listener task) is gone
// this is not a good abstraction for cloning
fn clone(&self) -> Self {
Self {
connection: Arc::clone(&self.connection),
Expand All @@ -556,18 +560,20 @@ impl Drop for RedisClient {
fn drop(&mut self) {
info!("RedisClient is being dropped");
if let Ok(rt) = tokio::runtime::Handle::try_current() {
let mut self_clone = self.clone();
rt.spawn(async move {
if let Err(e) = self_clone.stop_periodic_tasks() {
error!("Error stopping periodic tasks: {}", e);
}
let lock_names: Vec<_> = self_clone.locks.keys().cloned().collect();
for name in lock_names {
if let Err(e) = self_clone.release_lock(&name).await {
error!("Error releasing lock {}: {}", name, e);
if let Err(e) = self.stop_periodic_tasks() {
error!("Error stopping periodic tasks: {}", e);
}
if !self.locks.is_empty() {
let mut self_clone = self.clone();
rt.spawn(async move {
let lock_names: Vec<_> = self_clone.locks.keys().cloned().collect();
for name in lock_names {
if let Err(e) = self_clone.release_lock(&name).await {
error!("Error releasing lock {}: {}", name, e);
}
}
}
});
});
}
} else {
error!("Failed to get current runtime handle in RedisClient::drop");
}
Expand Down

0 comments on commit ac20acc

Please sign in to comment.