mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-06 07:26:43 -04:00
# Description of Changes fixes #3174 . During initialization, entries were added to the `DelayQueue` but not to `key_map`. ### Detailed Explanation: 1. `DelayQueue` is **not set-semantic**, so we track uniqueness with a `key_map: FxHashMap` but that wasn't updated during initiliaziation. 2. `Scheduler::schedule` is **not transactional**: it enqueues reducers even if the DB transaction later fails (abort, duplicate row, etc.). On yield, `SchedulerActor` checks the DB before execution. **Combined Effect**: A transaction that does not actually change a scheduled entry but still calls `Scheduler::schedule` after a module update will cause a duplicate entry in the `DelayQueue`, since `key_map` does not yet contain that entry. **Why It Didn’t Show Earlier**: When a repeating reducer executes, we re-schedule it by updating both `DelayQueue` and `key_map` correctly. The bug only appears in the window after updating module but before the first execution, if a transaction calls schedule without actually modifying the DB row. Which was indeed happening as per discord chat: > but yeah most likely order of event was modue was updated > and then update_scheduled_timers_from_static_data was called window between update module and first execution is 1 hour for this case. ## Repo steps: 1. publish this module, it makes `send_scheduled_message` reducer to be called every 10 secs. ```rust #[spacetimedb::table(name = scheduled_message, public, scheduled(send_scheduled_message))] pub struct ScheduledMessage { #[primary_key] #[auto_inc] scheduled_id: u64, scheduled_at: ScheduleAt, } #[spacetimedb::reducer] fn send_scheduled_message(ctx: &ReducerContext, sched: ScheduledMessage) -> Result<(), String> { info!("Sending scheduled message: {:?}", ctx.timestamp); Ok(()) } #[spacetimedb::reducer(init)] pub fn init(ctx: &ReducerContext) { ctx.db.scheduled_message().insert(ScheduledMessage { scheduled_id: 0, scheduled_at: Duration::from_secs(10).into(), }); } #[spacetimedb::reducer] pub fn update_timer(ctx: &ReducerContext) { for mut timer in ctx.db.scheduled_message().iter() { timer.scheduled_at = Duration::from_secs(10).into(); ctx.db.scheduled_message().scheduled_id().update(timer); log::info!("building decay agent timer was updated"); } } ``` 2. Update module to support automigration (add a table) and re-publish it. 3. Call reducer `update_timer` and do it before first execution of `send_scheduled_message` after updating module. 4. As `update_timer` doesn't change the existing scheduler but calls `Scheduler::schedule` it will cause duplicate entry in `DelayQueue`. # API and ABI breaking changes N/A # Expected complexity level and risk 1, pretty obvious fix. # Testing manually. The code fix is straightforward, but the issue only becomes visible under specific conditions.
This commit is contained in:
@@ -91,6 +91,7 @@ impl SchedulerStarter {
|
||||
// time to make it better right now.
|
||||
pub fn start(mut self, module_host: &ModuleHost) -> anyhow::Result<()> {
|
||||
let mut queue: DelayQueue<QueueItem> = DelayQueue::new();
|
||||
let mut key_map = FxHashMap::default();
|
||||
|
||||
let tx = self.db.begin_tx(Workload::Internal);
|
||||
|
||||
@@ -126,7 +127,17 @@ impl SchedulerStarter {
|
||||
id_column,
|
||||
at_column,
|
||||
};
|
||||
queue.insert_at(QueueItem::Id { id, at }, now_instant + duration);
|
||||
let key = queue.insert_at(QueueItem::Id { id, at }, now_instant + duration);
|
||||
|
||||
// This should never happen as duplicate entries should be gated by unique
|
||||
// constraint voilation in scheduled tables.
|
||||
if key_map.insert(id, key).is_some() {
|
||||
return Err(anyhow!(
|
||||
"Duplicate key found in scheduler queue: table_id {}, schedule_id {}",
|
||||
id.table_id,
|
||||
id.schedule_id
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,7 +145,7 @@ impl SchedulerStarter {
|
||||
SchedulerActor {
|
||||
rx: self.rx,
|
||||
queue,
|
||||
key_map: FxHashMap::default(),
|
||||
key_map,
|
||||
module_host: module_host.downgrade(),
|
||||
}
|
||||
.run(),
|
||||
|
||||
Reference in New Issue
Block a user