diff --git a/crates/core/src/host/global_tx.rs b/crates/core/src/host/global_tx.rs index 84e3fb17b..96acf740b 100644 --- a/crates/core/src/host/global_tx.rs +++ b/crates/core/src/host/global_tx.rs @@ -1,7 +1,7 @@ use crate::identity::Identity; use spacetimedb_lib::GlobalTxId; use std::cmp::Ordering as CmpOrdering; -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap}; use std::future::Future; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; @@ -107,7 +107,6 @@ struct LockState { // A map from wait_id to the corresponding wait entry, which contains the notify object to wake up the waiter when its turn comes. wait_entries: HashMap, waiter_ids_by_tx: HashMap, - wounded_owners: HashSet, next_wait_id: u64, } @@ -118,7 +117,6 @@ impl Default for LockState { waiting: BTreeSet::new(), wait_entries: HashMap::new(), waiter_ids_by_tx: HashMap::new(), - wounded_owners: HashSet::new(), next_wait_id: 1, } } @@ -385,7 +383,7 @@ impl GlobalTxManager { } else { self.ensure_waiter_locked(&mut state, tx_id) }; - let owner_to_wound = (tx_id < owner && state.wounded_owners.insert(owner)).then_some(owner); + let owner_to_wound = (tx_id < owner).then_some(owner); let new_registration = registration.is_none().then(|| WaitRegistration::new(self, wait_id)); (notify, owner_to_wound, new_registration) } @@ -451,7 +449,6 @@ impl GlobalTxManager { let mut state = self.lock_state.lock().unwrap(); if state.owner.as_ref() == Some(tx_id) { state.owner = None; - state.wounded_owners.remove(tx_id); self.notify_next_waiter_locked(&state); } else { log::warn!("Release a lock that isn't actually held. This should not happen"); @@ -768,4 +765,56 @@ mod tests { assert!(matches!(rt.block_on(waiter_task).expect("task should complete"), true)); } + + #[test] + fn later_older_waiter_can_retry_wound_after_first_waiter_cancels() { + let manager = Arc::new(GlobalTxManager::default()); + let owner = tx_id(30, 3, 0); + let first_waiter = tx_id(10, 1, 0); + let second_waiter = tx_id(15, 2, 0); + manager.ensure_session(owner, super::GlobalTxRole::Participant, owner.creator_db); + manager.ensure_session(first_waiter, super::GlobalTxRole::Participant, first_waiter.creator_db); + manager.ensure_session( + second_waiter, + super::GlobalTxRole::Participant, + second_waiter.creator_db, + ); + + let rt = Runtime::new().unwrap(); + let owner_guard = match rt.block_on(manager.acquire(owner, |_| async {})) { + AcquireDisposition::Acquired(guard) => guard, + AcquireDisposition::Cancelled => panic!("owner should acquire immediately"), + }; + + let manager_for_first_waiter = manager.clone(); + let first_waiter_task = rt.spawn(async move { + matches!( + manager_for_first_waiter.acquire(first_waiter, |_| async {}).await, + AcquireDisposition::Cancelled + ) + }); + + std::thread::sleep(Duration::from_millis(5)); + manager.wound(&first_waiter).expect("first waiter session should exist"); + assert!(rt + .block_on(first_waiter_task) + .expect("first waiter task should complete")); + assert!(!manager.is_wounded(&owner)); + + let manager_for_second_waiter = manager.clone(); + let second_waiter_task = rt.spawn(async move { + matches!( + manager_for_second_waiter.acquire(second_waiter, |_| async {}).await, + AcquireDisposition::Acquired(_) + ) + }); + + std::thread::sleep(Duration::from_millis(25)); + assert!(manager.is_wounded(&owner)); + + drop(owner_guard); + assert!(rt + .block_on(second_waiter_task) + .expect("second waiter task should complete")); + } }