From 8d99ded238202cfc086af84dc5bc6cf779d64397 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Tue, 19 Aug 2025 22:56:42 +0530 Subject: [PATCH] fix #3174 (#3179) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Description of Changes fixes #3174 . During initialization, entries were added to the `DelayQueue` but not to `key_map`. ### Detailed Explanation: 1. `DelayQueue` is **not set-semantic**, so we track uniqueness with a `key_map: FxHashMap` but that wasn't updated during initiliaziation. 2. `Scheduler::schedule` is **not transactional**: it enqueues reducers even if the DB transaction later fails (abort, duplicate row, etc.). On yield, `SchedulerActor` checks the DB before execution. **Combined Effect**: A transaction that does not actually change a scheduled entry but still calls `Scheduler::schedule` after a module update will cause a duplicate entry in the `DelayQueue`, since `key_map` does not yet contain that entry. **Why It Didn’t Show Earlier**: When a repeating reducer executes, we re-schedule it by updating both `DelayQueue` and `key_map` correctly. The bug only appears in the window after updating module but before the first execution, if a transaction calls schedule without actually modifying the DB row. Which was indeed happening as per discord chat: > but yeah most likely order of event was modue was updated > and then update_scheduled_timers_from_static_data was called window between update module and first execution is 1 hour for this case. ## Repo steps: 1. publish this module, it makes `send_scheduled_message` reducer to be called every 10 secs. ```rust #[spacetimedb::table(name = scheduled_message, public, scheduled(send_scheduled_message))] pub struct ScheduledMessage { #[primary_key] #[auto_inc] scheduled_id: u64, scheduled_at: ScheduleAt, } #[spacetimedb::reducer] fn send_scheduled_message(ctx: &ReducerContext, sched: ScheduledMessage) -> Result<(), String> { info!("Sending scheduled message: {:?}", ctx.timestamp); Ok(()) } #[spacetimedb::reducer(init)] pub fn init(ctx: &ReducerContext) { ctx.db.scheduled_message().insert(ScheduledMessage { scheduled_id: 0, scheduled_at: Duration::from_secs(10).into(), }); } #[spacetimedb::reducer] pub fn update_timer(ctx: &ReducerContext) { for mut timer in ctx.db.scheduled_message().iter() { timer.scheduled_at = Duration::from_secs(10).into(); ctx.db.scheduled_message().scheduled_id().update(timer); log::info!("building decay agent timer was updated"); } } ``` 2. Update module to support automigration (add a table) and re-publish it. 3. Call reducer `update_timer` and do it before first execution of `send_scheduled_message` after updating module. 4. As `update_timer` doesn't change the existing scheduler but calls `Scheduler::schedule` it will cause duplicate entry in `DelayQueue`. # API and ABI breaking changes N/A # Expected complexity level and risk 1, pretty obvious fix. # Testing manually. The code fix is straightforward, but the issue only becomes visible under specific conditions. --- crates/core/src/host/scheduler.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index 8e6de9809..685298210 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -91,6 +91,7 @@ impl SchedulerStarter { // time to make it better right now. pub fn start(mut self, module_host: &ModuleHost) -> anyhow::Result<()> { let mut queue: DelayQueue = DelayQueue::new(); + let mut key_map = FxHashMap::default(); let tx = self.db.begin_tx(Workload::Internal); @@ -126,7 +127,17 @@ impl SchedulerStarter { id_column, at_column, }; - queue.insert_at(QueueItem::Id { id, at }, now_instant + duration); + let key = queue.insert_at(QueueItem::Id { id, at }, now_instant + duration); + + // This should never happen as duplicate entries should be gated by unique + // constraint voilation in scheduled tables. + if key_map.insert(id, key).is_some() { + return Err(anyhow!( + "Duplicate key found in scheduler queue: table_id {}, schedule_id {}", + id.table_id, + id.schedule_id + )); + } } } @@ -134,7 +145,7 @@ impl SchedulerStarter { SchedulerActor { rx: self.rx, queue, - key_map: FxHashMap::default(), + key_map, module_host: module_host.downgrade(), } .run(),