From e0ac767cdf2e65cc01ac46c19648c2f39369a36a Mon Sep 17 00:00:00 2001 From: Alessandro Asoni Date: Tue, 31 Mar 2026 12:55:13 +0200 Subject: [PATCH] core: allow later waiters to retry wound flow Remove the per-owner wound dedupe from the distributed scheduler. The local wound path and the coordinator wound RPC are both idempotent, so suppressing repeat attempts was only an optimization. In practice it could become a correctness bug: if the first older waiter was cancelled during the grace period, or if the first wound RPC failed, later older waiters would never retry the wound and the younger owner could hold the slot until timeout. Add a regression test that wounds the first waiter during the grace period and verifies a second older waiter can still wound the owner and make progress. --- crates/core/src/host/global_tx.rs | 59 ++++++++++++++++++++++++++++--- 1 file changed, 54 insertions(+), 5 deletions(-) diff --git a/crates/core/src/host/global_tx.rs b/crates/core/src/host/global_tx.rs index 84e3fb17b..96acf740b 100644 --- a/crates/core/src/host/global_tx.rs +++ b/crates/core/src/host/global_tx.rs @@ -1,7 +1,7 @@ use crate::identity::Identity; use spacetimedb_lib::GlobalTxId; use std::cmp::Ordering as CmpOrdering; -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap}; use std::future::Future; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; @@ -107,7 +107,6 @@ struct LockState { // A map from wait_id to the corresponding wait entry, which contains the notify object to wake up the waiter when its turn comes. wait_entries: HashMap, waiter_ids_by_tx: HashMap, - wounded_owners: HashSet, next_wait_id: u64, } @@ -118,7 +117,6 @@ impl Default for LockState { waiting: BTreeSet::new(), wait_entries: HashMap::new(), waiter_ids_by_tx: HashMap::new(), - wounded_owners: HashSet::new(), next_wait_id: 1, } } @@ -385,7 +383,7 @@ impl GlobalTxManager { } else { self.ensure_waiter_locked(&mut state, tx_id) }; - let owner_to_wound = (tx_id < owner && state.wounded_owners.insert(owner)).then_some(owner); + let owner_to_wound = (tx_id < owner).then_some(owner); let new_registration = registration.is_none().then(|| WaitRegistration::new(self, wait_id)); (notify, owner_to_wound, new_registration) } @@ -451,7 +449,6 @@ impl GlobalTxManager { let mut state = self.lock_state.lock().unwrap(); if state.owner.as_ref() == Some(tx_id) { state.owner = None; - state.wounded_owners.remove(tx_id); self.notify_next_waiter_locked(&state); } else { log::warn!("Release a lock that isn't actually held. This should not happen"); @@ -768,4 +765,56 @@ mod tests { assert!(matches!(rt.block_on(waiter_task).expect("task should complete"), true)); } + + #[test] + fn later_older_waiter_can_retry_wound_after_first_waiter_cancels() { + let manager = Arc::new(GlobalTxManager::default()); + let owner = tx_id(30, 3, 0); + let first_waiter = tx_id(10, 1, 0); + let second_waiter = tx_id(15, 2, 0); + manager.ensure_session(owner, super::GlobalTxRole::Participant, owner.creator_db); + manager.ensure_session(first_waiter, super::GlobalTxRole::Participant, first_waiter.creator_db); + manager.ensure_session( + second_waiter, + super::GlobalTxRole::Participant, + second_waiter.creator_db, + ); + + let rt = Runtime::new().unwrap(); + let owner_guard = match rt.block_on(manager.acquire(owner, |_| async {})) { + AcquireDisposition::Acquired(guard) => guard, + AcquireDisposition::Cancelled => panic!("owner should acquire immediately"), + }; + + let manager_for_first_waiter = manager.clone(); + let first_waiter_task = rt.spawn(async move { + matches!( + manager_for_first_waiter.acquire(first_waiter, |_| async {}).await, + AcquireDisposition::Cancelled + ) + }); + + std::thread::sleep(Duration::from_millis(5)); + manager.wound(&first_waiter).expect("first waiter session should exist"); + assert!(rt + .block_on(first_waiter_task) + .expect("first waiter task should complete")); + assert!(!manager.is_wounded(&owner)); + + let manager_for_second_waiter = manager.clone(); + let second_waiter_task = rt.spawn(async move { + matches!( + manager_for_second_waiter.acquire(second_waiter, |_| async {}).await, + AcquireDisposition::Acquired(_) + ) + }); + + std::thread::sleep(Duration::from_millis(25)); + assert!(manager.is_wounded(&owner)); + + drop(owner_guard); + assert!(rt + .block_on(second_waiter_task) + .expect("second waiter task should complete")); + } }