Fix the issues with scheduling procedures (#3816)

# Description of Changes

This reapplies the patch from #3704, and fixes the issues that were
causing it to deadlock.

The reason it was deadlocking was that it allowed for the following
sequence of events:
* `SchedulerActor::handle_queued()` begins mutable tx
* `ModuleHost::disconnect_client()` submits call to `call_reducer(tx:
None)`
* scheduler submits call to `call_reducer(tx: Some)`
* `WasmModuleInstance::disconnect_client` now has to try to take tx
lock, but the scheduler's call_reducer already holds it and is behind it
in the queue

So, I moved most of the logic from `handle_queued` back to being
executed in the module worker thread, but kept the code in
`scheduler.rs` so that it can all be reasoned about locally.

Fixes #3645. Should I uncomment the implementation of
`ExportFunctionForScheduledTable for F: Procedure` now?

# Expected complexity level and risk

2 - there's a chance that this patch hasn't fully fixed the deadlock
issue from #3704, but I'm quite confident.

# Testing

- [x] Manually verified that deadlock no longer occurs - previously,
`while true; do python -m smoketests schedule_reducer -k
test_scheduled_table_subscription; done` would freeze up in only 2 or 3
iterations, but now it can run for 10 minutes without issues.
This commit is contained in:
Noa
2025-12-05 16:27:30 -06:00
committed by GitHub
parent e0dc9fb261
commit afe169ac4a
219 changed files with 1659 additions and 390 deletions
+9 -10
View File
@@ -731,17 +731,16 @@ pub use spacetimedb_bindings_macro::reducer;
// TODO(procedure-http): add example with an HTTP request.
// TODO(procedure-transaction): document obtaining and using a transaction within a procedure.
///
// TODO(scheduled-procedures): Uncomment below docs.
// /// # Scheduled procedures
/// # Scheduled procedures
// TODO(docs): after moving scheduled reducer docs into table secion, link there.
// ///
// /// Like [reducer]s, procedures can be made **scheduled**.
// /// This allows calling procedures at a particular time, or in a loop.
// /// It also allows reducers to enqueue procedure runs.
// ///
// /// Scheduled procedures are called on a best-effort basis and may be slightly delayed in their execution
// /// when a database is under heavy load.
// ///
///
/// Like [reducer]s, procedures can be made **scheduled**.
/// This allows calling procedures at a particular time, or in a loop.
/// It also allows reducers to enqueue procedure runs.
///
/// Scheduled procedures are called on a best-effort basis and may be slightly delayed in their execution
/// when a database is under heavy load.
///
/// [clients]: https://spacetimedb.com/docs/#client
// TODO(procedure-async): update docs and examples with `async`-ness.
#[doc(inline)]
+10 -13
View File
@@ -444,9 +444,7 @@ pub struct FnKindView {
/// See <https://willcrichton.net/notes/defeating-coherence-rust/> for details on this technique.
#[cfg_attr(
feature = "unstable",
// TODO(scheduled-procedures): uncomment this, delete other line
// doc = "It will be one of [`FnKindReducer`] or [`FnKindProcedure`] in modules that compile successfully."
doc = "It will be [`FnKindReducer`] in modules that compile successfully."
doc = "It will be one of [`FnKindReducer`] or [`FnKindProcedure`] in modules that compile successfully."
)]
#[cfg_attr(
not(feature = "unstable"),
@@ -467,16 +465,15 @@ impl<'de, TableRow: SpacetimeType + Serialize + Deserialize<'de>, F: Reducer<'de
{
}
// TODO(scheduled-procedures): uncomment this to syntactically allow scheduled procedures.
// #[cfg(feature = "unstable")]
// impl<
// 'de,
// TableRow: SpacetimeType + Serialize + Deserialize<'de>,
// Ret: SpacetimeType + Serialize + Deserialize<'de>,
// F: Procedure<'de, (TableRow,), Ret>,
// > ExportFunctionForScheduledTable<'de, TableRow, FnKindProcedure<Ret>> for F
// {
// }
#[cfg(feature = "unstable")]
impl<
'de,
TableRow: SpacetimeType + Serialize + Deserialize<'de>,
Ret: SpacetimeType + Serialize + Deserialize<'de>,
F: Procedure<'de, (TableRow,), Ret>,
> ExportFunctionForScheduledTable<'de, TableRow, FnKindProcedure<Ret>> for F
{
}
// the macro generates <T as SpacetimeType>::make_type::<DummyTypespace>
pub struct DummyTypespace;
-26
View File
@@ -361,29 +361,3 @@ error[E0277]: the trait bound `NotSpacetimeType: SpacetimeType` is not satisfied
ColId
and $N others
= note: required for `Option<NotSpacetimeType>` to implement `SpacetimeType`
error[E0631]: type mismatch in function arguments
--> tests/ui/views.rs:142:56
|
142 | #[spacetimedb::table(name = scheduled_table, scheduled(scheduled_table_view))]
| -------------------------------------------------------^^^^^^^^^^^^^^^^^^^^---
| | |
| | expected due to this
| required by a bound introduced by this call
...
154 | fn scheduled_table_view(_: &ViewContext, _args: ScheduledTable) -> Vec<Player> {
| ------------------------------------------------------------------------------ found signature defined here
|
= note: expected function signature `for<'a> fn(&'a ReducerContext, ScheduledTable) -> _`
found function signature `fn(&ViewContext, ScheduledTable) -> _`
= note: required for `for<'a> fn(&'a ViewContext, ScheduledTable) -> Vec<Player> {scheduled_table_view}` to implement `Reducer<'_, (ScheduledTable,)>`
= note: required for `for<'a> fn(&'a ViewContext, ScheduledTable) -> Vec<Player> {scheduled_table_view}` to implement `ExportFunctionForScheduledTable<'_, ScheduledTable, {type error}>`
note: required by a bound in `scheduled_typecheck`
--> src/rt.rs
|
| pub const fn scheduled_typecheck<'de, Row, FnKind>(_x: impl ExportFunctionForScheduledTable<'de, Row, FnKind>)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `scheduled_typecheck`
help: consider wrapping the function in a closure
|
142 | #[spacetimedb::table(name = scheduled_table, scheduled(|arg0: &ReducerContext, arg1: ScheduledTable| scheduled_table_view(/* &ViewContext */, arg1)))]
| +++++++++++++++++++++++++++++++++++++++++++++ ++++++++++++++++++++++++++
+1
View File
@@ -423,6 +423,7 @@ impl {func_name} for super::RemoteReducers {{
{callback_id}(self.imp.on_reducer(
{reducer_name:?},
Box::new(move |ctx: &super::ReducerEventContext| {{
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {{
event: __sdk::ReducerEvent {{
reducer: super::Reducer::{enum_variant_name} {{
@@ -72,6 +72,7 @@ impl add_player for super::RemoteReducers {
AddPlayerCallbackId(self.imp.on_reducer(
"add_player",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event: __sdk::ReducerEvent {
reducer: super::Reducer::AddPlayer {
@@ -181,6 +182,7 @@ impl add_private for super::RemoteReducers {
AddPrivateCallbackId(self.imp.on_reducer(
"add_private",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event: __sdk::ReducerEvent {
reducer: super::Reducer::AddPrivate {
@@ -294,6 +296,7 @@ age: u8,
AddCallbackId(self.imp.on_reducer(
"add",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event: __sdk::ReducerEvent {
reducer: super::Reducer::Add {
@@ -398,6 +401,7 @@ impl assert_caller_identity_is_module_identity for super::RemoteReducers {
AssertCallerIdentityIsModuleIdentityCallbackId(self.imp.on_reducer(
"assert_caller_identity_is_module_identity",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event: __sdk::ReducerEvent {
reducer: super::Reducer::AssertCallerIdentityIsModuleIdentity {
@@ -527,6 +531,7 @@ impl client_connected for super::RemoteReducers {
ClientConnectedCallbackId(self.imp.on_reducer(
"client_connected",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event: __sdk::ReducerEvent {
reducer: super::Reducer::ClientConnected {
@@ -636,6 +641,7 @@ impl delete_player for super::RemoteReducers {
DeletePlayerCallbackId(self.imp.on_reducer(
"delete_player",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event: __sdk::ReducerEvent {
reducer: super::Reducer::DeletePlayer {
@@ -745,6 +751,7 @@ impl delete_players_by_name for super::RemoteReducers {
DeletePlayersByNameCallbackId(self.imp.on_reducer(
"delete_players_by_name",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event: __sdk::ReducerEvent {
reducer: super::Reducer::DeletePlayersByName {
@@ -1066,6 +1073,7 @@ impl list_over_age for super::RemoteReducers {
ListOverAgeCallbackId(self.imp.on_reducer(
"list_over_age",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event: __sdk::ReducerEvent {
reducer: super::Reducer::ListOverAge {
@@ -1170,6 +1178,7 @@ impl log_module_identity for super::RemoteReducers {
LogModuleIdentityCallbackId(self.imp.on_reducer(
"log_module_identity",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event: __sdk::ReducerEvent {
reducer: super::Reducer::LogModuleIdentity {
@@ -3577,6 +3586,7 @@ impl query_private for super::RemoteReducers {
QueryPrivateCallbackId(self.imp.on_reducer(
"query_private",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event: __sdk::ReducerEvent {
reducer: super::Reducer::QueryPrivate {
@@ -3887,6 +3897,7 @@ impl repeating_test for super::RemoteReducers {
RepeatingTestCallbackId(self.imp.on_reducer(
"repeating_test",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event: __sdk::ReducerEvent {
reducer: super::Reducer::RepeatingTest {
@@ -4050,6 +4061,7 @@ impl say_hello for super::RemoteReducers {
SayHelloCallbackId(self.imp.on_reducer(
"say_hello",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event: __sdk::ReducerEvent {
reducer: super::Reducer::SayHello {
@@ -4460,6 +4472,7 @@ impl test_btree_index_args for super::RemoteReducers {
TestBtreeIndexArgsCallbackId(self.imp.on_reducer(
"test_btree_index_args",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event: __sdk::ReducerEvent {
reducer: super::Reducer::TestBtreeIndexArgs {
@@ -5013,6 +5026,7 @@ arg_4: NamespaceTestF,
TestCallbackId(self.imp.on_reducer(
"test",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event: __sdk::ReducerEvent {
reducer: super::Reducer::Test {
+9
View File
@@ -8,6 +8,7 @@ use spacetimedb_lib::bsatn;
use spacetimedb_lib::de::{serde::SeedWrapper, DeserializeSeed};
use spacetimedb_lib::ProductValue;
use spacetimedb_schema::def::deserialize::{ArgsSeed, FunctionDef};
use spacetimedb_schema::def::ModuleDef;
mod disk_storage;
mod host_controller;
@@ -41,6 +42,14 @@ pub enum FunctionArgs {
}
impl FunctionArgs {
fn into_tuple_for_def<Def: FunctionDef>(
self,
module: &ModuleDef,
def: &Def,
) -> Result<ArgsTuple, InvalidFunctionArguments> {
self.into_tuple(module.arg_seed_for(def))
}
fn into_tuple<Def: FunctionDef>(self, seed: ArgsSeed<'_, Def>) -> Result<ArgsTuple, InvalidFunctionArguments> {
self._into_tuple(seed).map_err(|err| InvalidFunctionArguments {
err,
+98 -84
View File
@@ -11,7 +11,7 @@ use crate::error::DBError;
use crate::estimation::estimate_rows_scanned;
use crate::hash::Hash;
use crate::host::host_controller::CallProcedureReturn;
use crate::host::scheduler::{handle_queued_call_reducer_params, QueueItem};
use crate::host::scheduler::{CallScheduledFunctionResult, ScheduledFunctionParams};
use crate::host::v8::JsInstance;
use crate::host::wasmtime::ModuleInstance;
use crate::host::{InvalidFunctionArguments, InvalidViewArguments};
@@ -41,7 +41,7 @@ use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOf
use spacetimedb_data_structures::error_stream::ErrorStream;
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
use spacetimedb_datastore::error::DatastoreError;
use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo};
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
use spacetimedb_durability::DurableOffset;
@@ -56,7 +56,6 @@ use spacetimedb_primitives::{ArgId, ProcedureId, TableId, ViewFnPtr, ViewId};
use spacetimedb_query::compile_subscription;
use spacetimedb_sats::{AlgebraicTypeRef, ProductValue};
use spacetimedb_schema::auto_migrate::{AutoMigrateError, MigrationPolicy};
use spacetimedb_schema::def::deserialize::ArgsSeed;
use spacetimedb_schema::def::{ModuleDef, ProcedureDef, ReducerDef, TableDef, ViewDef};
use spacetimedb_schema::schema::{Schema, TableSchema};
use spacetimedb_vm::relation::RelValue;
@@ -608,59 +607,6 @@ pub fn call_identity_connected(
}
}
// Only for logging purposes.
const SCHEDULED_REDUCER: &str = "scheduled_reducer";
pub(crate) fn call_scheduled_reducer(
module: &ModuleInfo,
queue_item: QueueItem,
call_reducer: impl FnOnce(Option<MutTxId>, CallReducerParams) -> (ReducerCallResult, bool),
) -> (Result<(ReducerCallResult, Timestamp), ReducerCallError>, bool) {
extract_trapped(call_scheduled_reducer_inner(module, queue_item, call_reducer))
}
fn call_scheduled_reducer_inner(
module: &ModuleInfo,
item: QueueItem,
call_reducer: impl FnOnce(Option<MutTxId>, CallReducerParams) -> (ReducerCallResult, bool),
) -> Result<((ReducerCallResult, Timestamp), bool), ReducerCallError> {
let db = &module.relational_db();
let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
match handle_queued_call_reducer_params(&tx, module, db, item) {
Ok(Some(params)) => {
// Is necessary to patch the context with the actual calling reducer
let reducer_def = module
.module_def
.get_reducer_by_id(params.reducer_id)
.ok_or(ReducerCallError::ScheduleReducerNotFound)?;
let reducer = &*reducer_def.name;
tx.ctx = ExecutionContext::with_workload(
tx.ctx.database_identity(),
Workload::Reducer(ReducerContext {
name: reducer.into(),
caller_identity: params.caller_identity,
caller_connection_id: params.caller_connection_id,
timestamp: Timestamp::now(),
arg_bsatn: params.args.get_bsatn().clone(),
}),
);
let timestamp = params.timestamp;
let (res, trapped) = call_reducer(Some(tx), params);
Ok(((res, timestamp), trapped))
}
Ok(None) => Err(ReducerCallError::ScheduleReducerNotFound),
Err(err) => Err(ReducerCallError::Args(InvalidReducerArguments(
InvalidFunctionArguments {
err,
function_name: SCHEDULED_REDUCER.into(),
},
))),
}
}
pub struct CallReducerParams {
pub timestamp: Timestamp,
pub caller_identity: Identity,
@@ -671,9 +617,10 @@ pub struct CallReducerParams {
pub reducer_id: ReducerId,
pub args: ArgsTuple,
}
impl CallReducerParams {
/// Returns a set of parameters for a call that came from within
/// and without a client/caller/request_id.
/// Returns a set of parameters for an internal call
/// without a client/caller/request_id.
pub fn from_system(
timestamp: Timestamp,
caller_identity: Identity,
@@ -718,6 +665,26 @@ pub struct CallProcedureParams {
pub args: ArgsTuple,
}
impl CallProcedureParams {
/// Returns a set of parameters for an internal call
/// without a client/caller/request_id.
pub fn from_system(
timestamp: Timestamp,
caller_identity: Identity,
procedure_id: ProcedureId,
args: ArgsTuple,
) -> Self {
Self {
timestamp,
caller_identity,
caller_connection_id: ConnectionId::ZERO,
timer: None,
procedure_id,
args,
}
}
}
/// Holds a [`Module`] and a set of [`Instance`]s from it,
/// and allocates the [`Instance`]s to be used for function calls.
///
@@ -1442,8 +1409,9 @@ impl ModuleHost {
reducer_def: &ReducerDef,
args: FunctionArgs,
) -> Result<CallReducerParams, InvalidReducerArguments> {
let reducer_seed = ArgsSeed(module.module_def.typespace().with_type(reducer_def));
let args = args.into_tuple(reducer_seed).map_err(InvalidReducerArguments)?;
let args = args
.into_tuple_for_def(&module.module_def, reducer_def)
.map_err(InvalidReducerArguments)?;
let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO);
Ok(CallReducerParams {
timestamp: Timestamp::now(),
@@ -1457,6 +1425,21 @@ impl ModuleHost {
})
}
pub async fn call_reducer_with_params(
&self,
reducer_name: &str,
tx: Option<MutTxId>,
params: CallReducerParams,
) -> Result<ReducerCallResult, NoSuchModule> {
self.call(
reducer_name,
(tx, params),
|(tx, p), inst| inst.call_reducer(tx, p),
|(tx, p), inst| inst.call_reducer(tx, p),
)
.await
}
async fn call_reducer_inner(
&self,
caller_identity: Identity,
@@ -1468,8 +1451,9 @@ impl ModuleHost {
reducer_def: &ReducerDef,
args: FunctionArgs,
) -> Result<ReducerCallResult, ReducerCallError> {
let reducer_seed = ArgsSeed(self.info.module_def.typespace().with_type(reducer_def));
let args = args.into_tuple(reducer_seed).map_err(InvalidReducerArguments)?;
let args = args
.into_tuple_for_def(&self.info.module_def, reducer_def)
.map_err(InvalidReducerArguments)?;
let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO);
let call_reducer_params = CallReducerParams {
timestamp: Timestamp::now(),
@@ -1485,9 +1469,9 @@ impl ModuleHost {
Ok(self
.call(
&reducer_def.name,
call_reducer_params,
|p, inst| inst.call_reducer(None, p),
|p, inst| inst.call_reducer(None, p),
(None, call_reducer_params),
|(tx, p), inst| inst.call_reducer(tx, p),
|(tx, p), inst| inst.call_reducer(tx, p),
)
.await?)
}
@@ -1593,9 +1577,10 @@ impl ModuleHost {
procedure_def: &ProcedureDef,
args: FunctionArgs,
) -> Result<CallProcedureReturn, ProcedureCallError> {
let procedure_seed = ArgsSeed(self.info.module_def.typespace().with_type(procedure_def));
let args = args
.into_tuple_for_def(&self.info.module_def, procedure_def)
.map_err(InvalidProcedureArguments)?;
let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO);
let args = args.into_tuple(procedure_seed).map_err(InvalidProcedureArguments)?;
let params = CallProcedureParams {
timestamp: Timestamp::now(),
@@ -1605,7 +1590,26 @@ impl ModuleHost {
procedure_id,
args,
};
self.call_async_with_instance(&procedure_def.name, async move |inst| match inst {
Ok(self
.call_async_with_instance(&procedure_def.name, async move |inst| match inst {
Instance::Wasm(mut inst) => (inst.call_procedure(params).await, Instance::Wasm(inst)),
Instance::Js(inst) => {
let (r, s) = inst.call_procedure(params).await;
(r, Instance::Js(s))
}
})
.await?)
}
// This is not reused in `call_procedure_inner`
// due to concerns re. `Timestamp::now`.
pub async fn call_procedure_with_params(
&self,
name: &str,
params: CallProcedureParams,
) -> Result<CallProcedureReturn, NoSuchModule> {
self.call_async_with_instance(name, async move |inst| match inst {
Instance::Wasm(mut inst) => (inst.call_procedure(params).await, Instance::Wasm(inst)),
Instance::Js(inst) => {
let (r, s) = inst.call_procedure(params).await;
@@ -1613,23 +1617,33 @@ impl ModuleHost {
}
})
.await
.map_err(Into::into)
}
// Scheduled reducers require a different function here to call their reducer
// because their reducer arguments are stored in the database and need to be fetched
// within the same transaction as the reducer call.
pub(crate) async fn call_scheduled_reducer(
pub(super) async fn call_scheduled_function(
&self,
item: QueueItem,
) -> Result<(ReducerCallResult, Timestamp), ReducerCallError> {
self.call(
SCHEDULED_REDUCER,
item,
|item, inst| inst.call_scheduled_reducer(item),
|item, inst| inst.call_scheduled_reducer(item),
params: ScheduledFunctionParams,
) -> Result<CallScheduledFunctionResult, NoSuchModule> {
self.with_instance(
"scheduled function",
"reducer or procedure",
|l| self.start_call_timer(l),
async move |timer_guard, executor, inst| match inst {
Instance::Wasm(mut inst) => {
executor
.run_job(async move {
drop(timer_guard);
(inst.call_scheduled_function(params).await, Instance::Wasm(inst))
})
.await
}
Instance::Js(inst) => {
drop(timer_guard);
let (r, s) = inst.call_scheduled_function(params).await;
(r, Instance::Js(s))
}
},
)
.await?
.await
}
/// Materializes the views return by the `view_collector`, if not already materialized,
@@ -1722,9 +1736,9 @@ impl ModuleHost {
let view_def = module_def.view(view_name).ok_or(ViewCallError::NoSuchView)?;
let fn_ptr = view_def.fn_ptr;
let row_type = view_def.product_type_ref;
let typespace = module_def.typespace().with_type(view_def);
let view_seed = ArgsSeed(typespace);
let args = args.into_tuple(view_seed).map_err(InvalidViewArguments)?;
let args = args
.into_tuple_for_def(module_def, view_def)
.map_err(InvalidViewArguments)?;
match self
.call_view_inner(tx, view_name, view_id, table_id, fn_ptr, caller, sender, args, row_type)
+269 -196
View File
@@ -1,38 +1,38 @@
use std::sync::Arc;
use std::time::Duration;
use super::module_host::{
CallReducerParams, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, WeakModuleHost,
};
use super::{FunctionArgs, ModuleHost};
use crate::db::relational_db::RelationalDB;
use crate::host::module_host::{CallProcedureParams, ModuleInfo};
use crate::host::wasm_common::module_host_actor::{InstanceCommon, WasmInstance};
use crate::host::{InvalidProcedureArguments, InvalidReducerArguments, NoSuchModule};
use anyhow::anyhow;
use futures::StreamExt;
use core::time::Duration;
use futures::{FutureExt, StreamExt};
use rustc_hash::FxHashMap;
use spacetimedb_client_api_messages::energy::EnergyQuanta;
use spacetimedb_lib::scheduler::ScheduleAt;
use spacetimedb_lib::Timestamp;
use spacetimedb_primitives::{ColId, TableId};
use spacetimedb_sats::{bsatn::ToBsatn as _, AlgebraicValue};
use spacetimedb_table::table::RowRef;
use tokio::sync::mpsc;
use tokio::time::Instant;
use tokio_util::time::delay_queue::{self, DelayQueue, Expired};
use crate::db::relational_db::RelationalDB;
use crate::host::module_host::ModuleInfo;
use super::module_host::ModuleEvent;
use super::module_host::ModuleFunctionCall;
use super::module_host::{CallReducerParams, WeakModuleHost};
use super::module_host::{DatabaseUpdate, EventStatus};
use super::{FunctionArgs, ModuleHost, ReducerCallError};
use spacetimedb_datastore::execution_context::Workload;
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
use spacetimedb_datastore::system_tables::{StFields, StScheduledFields, ST_SCHEDULED_ID};
use spacetimedb_datastore::traits::IsolationLevel;
use spacetimedb_lib::scheduler::ScheduleAt;
use spacetimedb_lib::Timestamp;
use spacetimedb_primitives::{ColId, FunctionId, TableId};
use spacetimedb_sats::bsatn::ToBsatn as _;
use spacetimedb_sats::AlgebraicValue;
use spacetimedb_table::table::RowRef;
use std::panic;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::Instant;
use tokio_util::time::delay_queue::{self, DelayQueue, Expired};
#[derive(Copy, Clone, Eq, PartialEq, Hash)]
pub struct ScheduledReducerId {
/// The ID of the table whose rows hold the scheduled reducers.
/// This table should have a entry in `ST_SCHEDULED`.
pub struct ScheduledFunctionId {
/// The ID of the table whose rows hold the scheduled reducers or procedures.
/// This table should have an entry in `ST_SCHEDULED`.
table_id: TableId,
/// The particular schedule row in the reducer scheduling table referred to by `self.table_id`.
/// The particular schedule row in the scheduling table referred to by `self.table_id`.
schedule_id: u64,
// These may seem redundant, but they're actually free - they fit in the struct padding.
// `scheduled_id: u64, table_id: u32, id_column: u16, at_column: u16` == 16 bytes, same as
@@ -43,7 +43,7 @@ pub struct ScheduledReducerId {
at_column: ColId,
}
spacetimedb_table::static_assert_size!(ScheduledReducerId, 16);
spacetimedb_table::static_assert_size!(ScheduledFunctionId, 16);
enum MsgOrExit<T> {
Msg(T),
@@ -52,20 +52,20 @@ enum MsgOrExit<T> {
enum SchedulerMessage {
Schedule {
id: ScheduledReducerId,
id: ScheduledFunctionId,
/// The timestamp we'll tell the reducer it is.
effective_at: Timestamp,
/// The actual instant we're scheduling for.
real_at: Instant,
},
ScheduleImmediate {
reducer_name: String,
function_name: String,
args: FunctionArgs,
},
}
pub struct ScheduledReducer {
reducer: Box<str>,
pub struct ScheduledFunction {
function: Box<str>,
bsatn_args: Vec<u8>,
}
@@ -121,7 +121,7 @@ impl SchedulerStarter {
// calculate duration left to call the scheduled reducer
let duration = schedule_at.to_duration_from(now_ts);
let at = schedule_at.to_timestamp_from(now_ts);
let id = ScheduledReducerId {
let id = ScheduledFunctionId {
table_id,
schedule_id,
id_column,
@@ -130,7 +130,7 @@ impl SchedulerStarter {
let key = queue.insert_at(QueueItem::Id { id, at }, now_instant + duration);
// This should never happen as duplicate entries should be gated by unique
// constraint voilation in scheduled tables.
// constraint violation in scheduled tables.
if key_map.insert(id, key).is_some() {
return Err(anyhow!(
"Duplicate key found in scheduler queue: table_id {}, schedule_id {}",
@@ -195,9 +195,9 @@ pub enum ScheduleError {
}
impl Scheduler {
/// Schedule a reducer to run from a scheduled table.
/// Schedule a reducer/procedure to run from a scheduled table.
///
/// `reducer_start` is the timestamp of the start of the current reducer.
/// `fn_start` is the timestamp of the start of the current reducer/procedure.
pub(super) fn schedule(
&self,
table_id: TableId,
@@ -205,11 +205,14 @@ impl Scheduler {
schedule_at: ScheduleAt,
id_column: ColId,
at_column: ColId,
reducer_start: Timestamp,
fn_start: Timestamp,
) -> Result<(), ScheduleError> {
// if `Timestamp::now()` is properly monotonic, use it; otherwise, use
// the start of the reducer run as "now" for purposes of scheduling
let now = reducer_start.max(Timestamp::now());
// TODO(procedure-tx): when we do `with_tx` in a procedure,
// it inherits the timestamp of the procedure,
// which could become a problem here for long running procedures.
let now = fn_start.max(Timestamp::now());
// Check that `at` is within `tokio_utils::time::DelayQueue`'s
// accepted time-range.
@@ -229,7 +232,7 @@ impl Scheduler {
// if the actor has exited, it's fine to ignore; it means that the host actor calling
// schedule will exit soon as well, and it'll be scheduled to run when the module host restarts
let _ = self.tx.send(MsgOrExit::Msg(SchedulerMessage::Schedule {
id: ScheduledReducerId {
id: ScheduledFunctionId {
table_id,
schedule_id,
id_column,
@@ -242,9 +245,9 @@ impl Scheduler {
Ok(())
}
pub fn volatile_nonatomic_schedule_immediate(&self, reducer_name: String, args: FunctionArgs) {
pub fn volatile_nonatomic_schedule_immediate(&self, function_name: String, args: FunctionArgs) {
let _ = self.tx.send(MsgOrExit::Msg(SchedulerMessage::ScheduleImmediate {
reducer_name,
function_name,
args,
}));
}
@@ -261,15 +264,17 @@ impl Scheduler {
struct SchedulerActor {
rx: mpsc::UnboundedReceiver<MsgOrExit<SchedulerMessage>>,
queue: DelayQueue<QueueItem>,
key_map: FxHashMap<ScheduledReducerId, delay_queue::Key>,
key_map: FxHashMap<ScheduledFunctionId, delay_queue::Key>,
module_host: WeakModuleHost,
}
pub(crate) enum QueueItem {
Id { id: ScheduledReducerId, at: Timestamp },
VolatileNonatomicImmediate { reducer_name: String, args: FunctionArgs },
enum QueueItem {
Id { id: ScheduledFunctionId, at: Timestamp },
VolatileNonatomicImmediate { function_name: String, args: FunctionArgs },
}
pub(super) struct ScheduledFunctionParams(QueueItem);
#[cfg(target_pointer_width = "64")]
spacetimedb_table::static_assert_size!(QueueItem, 64);
@@ -304,9 +309,9 @@ impl SchedulerActor {
let key = self.queue.insert_at(QueueItem::Id { id, at: effective_at }, real_at);
self.key_map.insert(id, key);
}
SchedulerMessage::ScheduleImmediate { reducer_name, args } => {
SchedulerMessage::ScheduleImmediate { function_name, args } => {
self.queue.insert(
QueueItem::VolatileNonatomicImmediate { reducer_name, args },
QueueItem::VolatileNonatomicImmediate { function_name, args },
Duration::ZERO,
);
}
@@ -315,7 +320,7 @@ impl SchedulerActor {
async fn handle_queued(&mut self, id: Expired<QueueItem>) {
let item = id.into_inner();
let id = match item {
let id: Option<ScheduledFunctionId> = match item {
QueueItem::Id { id, .. } => Some(id),
QueueItem::VolatileNonatomicImmediate { .. } => None,
};
@@ -326,156 +331,159 @@ impl SchedulerActor {
let Some(module_host) = self.module_host.upgrade() else {
return;
};
let db = module_host.replica_ctx().relational_db.clone();
let module_host_clone = module_host.clone();
let res = tokio::spawn(async move { module_host.call_scheduled_reducer(item).await }).await;
let result = module_host.call_scheduled_function(ScheduledFunctionParams(item)).await;
match res {
// if we didn't actually call the reducer because the module exited or it was already deleted, leave
// the ScheduledReducer in the database for when the module restarts
Ok(Err(ReducerCallError::NoSuchModule(_)) | Err(ReducerCallError::ScheduleReducerNotFound)) => {}
Ok(Ok((_, ts))) => {
if let Some(id) = id {
let _ = self.delete_scheduled_reducer_row(&db, id, module_host_clone, ts).await;
}
match result {
// If the module already exited, leave the `ScheduledFunction` in
// the database for when the module restarts.
Err(NoSuchModule) => {}
Ok(CallScheduledFunctionResult { reschedule: None }) => {
// nothing to do
}
// delete the scheduled reducer row if its not repeated reducer
Ok(_) | Err(_) => {
Ok(CallScheduledFunctionResult {
reschedule: Some(Reschedule { at_ts, at_real }),
}) => {
if let Some(id) = id {
// TODO: Handle errors here?
let _ = self
.delete_scheduled_reducer_row(&db, id, module_host_clone, Timestamp::now())
.await;
// If this was repeated, we need to add it back to the queue.
let key = self.queue.insert_at(QueueItem::Id { id, at: at_ts }, at_real);
self.key_map.insert(id, key);
}
}
}
if let Err(e) = res {
log::error!("invoking scheduled reducer failed: {e:#}");
};
}
async fn delete_scheduled_reducer_row(
&mut self,
db: &RelationalDB,
id: ScheduledReducerId,
module_host: ModuleHost,
ts: Timestamp,
) -> anyhow::Result<()> {
let host_clone = module_host.clone();
let db = db.clone();
let schedule_at = host_clone
.on_module_thread("delete_scheduled_reducer_row", move || {
let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
match get_schedule_row_mut(&tx, &db, id) {
Ok(schedule_row) => {
if let Ok(schedule_at) = read_schedule_at(&schedule_row, id.at_column) {
// If the schedule is an interval, we handle it as a repeated schedule
if let ScheduleAt::Interval(_) = schedule_at {
return Some(schedule_at);
}
let row_ptr = schedule_row.pointer();
db.delete(&mut tx, id.table_id, [row_ptr]);
commit_and_broadcast_deletion_event(tx, module_host);
} else {
log::debug!(
"Failed to read 'scheduled_at' from row: table_id {}, schedule_id {}",
id.table_id,
id.schedule_id
);
}
}
Err(_) => {
log::debug!(
"Table row corresponding to yield scheduler ID not found: table_id {}, scheduler_id {}",
id.table_id,
id.schedule_id
);
}
}
None
})
.await?;
// If this was repeated, we need to add it back to the queue.
if let Some(ScheduleAt::Interval(dur)) = schedule_at {
let key = self.queue.insert(
QueueItem::Id { id, at: ts + dur },
dur.to_duration().unwrap_or(Duration::ZERO),
);
self.key_map.insert(id, key);
}
Ok(())
}
}
pub(crate) fn handle_queued_call_reducer_params(
tx: &MutTxId,
#[derive(Debug)]
pub(super) struct CallScheduledFunctionResult {
reschedule: Option<Reschedule>,
}
#[derive(Debug)]
struct Reschedule {
at_ts: Timestamp,
at_real: Instant,
}
pub(super) async fn call_scheduled_function(
module_info: &ModuleInfo,
params: ScheduledFunctionParams,
inst_common: &mut InstanceCommon,
inst: &mut impl WasmInstance,
) -> (CallScheduledFunctionResult, bool) {
let ScheduledFunctionParams(item) = params;
let id: Option<ScheduledFunctionId> = match item {
QueueItem::Id { id, .. } => Some(id),
QueueItem::VolatileNonatomicImmediate { .. } => None,
};
let db = &**module_info.relational_db();
let delete_scheduled_function_row = |tx: Option<MutTxId>, timestamp: Option<_>| {
id.and_then(|id| {
let (timestamp, instant) = timestamp.unwrap_or_else(|| (Timestamp::now(), Instant::now()));
let tx = tx.unwrap_or_else(|| db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal));
let schedule_at = delete_scheduled_function_row_with_tx(module_info, db, tx, id)?;
let ScheduleAt::Interval(dur) = schedule_at else {
return None;
};
Some(Reschedule {
at_ts: schedule_at.to_timestamp_from(timestamp),
at_real: instant + dur.to_duration_abs(),
})
})
};
let tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
// Determine the call params.
// This also lets us know whether to call a reducer or procedure.
let params = call_params_for_queued_item(module_info, db, &tx, item);
let (timestamp, instant, params) = match params {
// If the function was already deleted, leave the `ScheduledFunction`
// in the database for when the module restarts.
Ok(None) => return (CallScheduledFunctionResult { reschedule: None }, false),
Ok(Some(params)) => params,
Err(err) => {
// All we can do here is log an error.
log::error!("could not determine scheduled function or its parameters: {err:#}");
let reschedule = delete_scheduled_function_row(Some(tx), None);
return (CallScheduledFunctionResult { reschedule }, false);
}
};
// We've determined whether we have a reducer or procedure.
// The logic between them will now split,
// as for scheduled procedures, it's incorrect to retry them if execution aborts midway,
// so we must remove the schedule row before executing.
match params {
CallParams::Reducer(params) => {
// We don't want a panic in the module host to affect the scheduler, as unlikely
// as it might be, so catch it so we can handle it "gracefully". Panics will
// print their message and backtrace when they occur, so we don't need to do
// anything with the error payload.
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
inst_common.call_reducer_with_tx(Some(tx), params, inst)
}));
let reschedule = delete_scheduled_function_row(None, None);
// Currently, we drop the return value from the function call. In the future,
// we might want to handle it somehow.
let trapped = match result {
Ok((_res, trapped)) => trapped,
Err(_err) => true,
};
(CallScheduledFunctionResult { reschedule }, trapped)
}
CallParams::Procedure(params) => {
// Delete scheduled row.
let reschedule = delete_scheduled_function_row(Some(tx), Some((timestamp, instant)));
// Execute the procedure. See above for commentary on `catch_unwind()`.
let result = panic::AssertUnwindSafe(inst_common.call_procedure(params, inst))
.catch_unwind()
.await;
// Currently, we drop the return value from the function call. In the future,
// we might want to handle it somehow.
let trapped = match result {
Ok((_res, trapped)) => trapped,
Err(_err) => true,
};
(CallScheduledFunctionResult { reschedule }, trapped)
}
}
}
fn delete_scheduled_function_row_with_tx(
module_info: &ModuleInfo,
db: &RelationalDB,
item: QueueItem,
) -> anyhow::Result<Option<CallReducerParams>> {
let caller_identity = module_info.database_identity;
mut tx: MutTxId,
id: ScheduledFunctionId,
) -> Option<ScheduleAt> {
if let Ok(Some(schedule_row)) = get_schedule_row_mut(&tx, db, id) {
if let Ok(schedule_at) = read_schedule_at(&schedule_row, id.at_column) {
// If the schedule is an interval, we handle it as a repeated schedule
if let ScheduleAt::Interval(_) = schedule_at {
return Some(schedule_at);
}
let row_ptr = schedule_row.pointer();
db.delete(&mut tx, id.table_id, [row_ptr]);
match item {
QueueItem::Id { id, at } => {
let Ok(schedule_row) = get_schedule_row_mut(tx, db, id) else {
// if the row is not found, it means the schedule is cancelled by the user
log::debug!(
"table row corresponding to yield scheduler id not found: tableid {}, schedulerId {}",
id.table_id,
id.schedule_id
);
return Ok(None);
};
let ScheduledReducer { reducer, bsatn_args } = process_schedule(tx, db, id.table_id, &schedule_row)?;
let (reducer_id, reducer_seed) = module_info
.module_def
.reducer_arg_deserialize_seed(&reducer[..])
.ok_or_else(|| anyhow!("Reducer not found: {reducer}"))?;
let reducer_args = FunctionArgs::Bsatn(bsatn_args.into()).into_tuple(reducer_seed)?;
// the timestamp we tell the reducer it's running at will be
// at least the timestamp it was scheduled to run at.
let timestamp = at.max(Timestamp::now());
Ok(Some(CallReducerParams::from_system(
timestamp,
caller_identity,
reducer_id,
reducer_args,
)))
}
QueueItem::VolatileNonatomicImmediate { reducer_name, args } => {
let (reducer_id, reducer_seed) = module_info
.module_def
.reducer_arg_deserialize_seed(&reducer_name[..])
.ok_or_else(|| anyhow!("Reducer not found: {reducer_name}"))?;
let reducer_args = args.into_tuple(reducer_seed)?;
Ok(Some(CallReducerParams::from_system(
Timestamp::now(),
caller_identity,
reducer_id,
reducer_args,
)))
commit_and_broadcast_deletion_event(tx, module_info);
} else {
log::debug!(
"Failed to read 'scheduled_at' from row: table_id {}, schedule_id {}",
id.table_id,
id.schedule_id
);
}
}
None
}
fn commit_and_broadcast_deletion_event(tx: MutTxId, module_host: ModuleHost) {
let caller_identity = module_host.info().database_identity;
fn commit_and_broadcast_deletion_event(tx: MutTxId, module_info: &ModuleInfo) {
let event = ModuleEvent {
timestamp: Timestamp::now(),
caller_identity,
caller_identity: module_info.database_identity,
caller_connection_id: None,
function_call: ModuleFunctionCall::default(),
status: EventStatus::Committed(DatabaseUpdate::default()),
@@ -486,49 +494,114 @@ fn commit_and_broadcast_deletion_event(tx: MutTxId, module_host: ModuleHost) {
timer: None,
};
if let Err(e) = module_host
.info()
.subscriptions
.commit_and_broadcast_event(None, event, tx)
{
if let Err(e) = module_info.subscriptions.commit_and_broadcast_event(None, event, tx) {
log::error!("Failed to broadcast deletion event: {e:#}");
}
}
/// Generate `ScheduledReducer` for given `ScheduledReducerId`
fn call_params_for_queued_item(
module: &ModuleInfo,
db: &RelationalDB,
tx: &MutTxId,
item: QueueItem,
) -> anyhow::Result<Option<(Timestamp, Instant, CallParams)>> {
Ok(Some(match item {
QueueItem::Id { id, at } => {
let Some(schedule_row) = get_schedule_row_mut(tx, db, id)? else {
// If the row is not found, it means the schedule is cancelled by the user.
return Ok(None);
};
let ScheduledFunction { function, bsatn_args } = process_schedule(tx, db, id.table_id, &schedule_row)?;
let fun_args = FunctionArgs::Bsatn(bsatn_args.into());
function_to_call_params(module, &function, fun_args, Some(at))?
}
QueueItem::VolatileNonatomicImmediate { function_name, args } => {
function_to_call_params(module, &function_name, args, None)?
}
}))
}
enum CallParams {
Reducer(CallReducerParams),
Procedure(CallProcedureParams),
}
/// Finds the function for `name`
/// and returns the appropriate call parameters
/// to call the function with `args`.
fn function_to_call_params(
module: &ModuleInfo,
name: &str,
args: FunctionArgs,
at: Option<Timestamp>,
) -> anyhow::Result<(Timestamp, Instant, CallParams)> {
let identity = module.database_identity;
// Find the function and deserialize the arguments.
let module = &module.module_def;
let (id, args) = if let Some((id, def)) = module.reducer_full(name) {
let args = args.into_tuple_for_def(module, def).map_err(InvalidReducerArguments)?;
(FunctionId::Reducer(id), args)
} else if let Some((id, def)) = module.procedure_full(name) {
let args = args
.into_tuple_for_def(module, def)
.map_err(InvalidProcedureArguments)?;
(FunctionId::Procedure(id), args)
} else {
// This should be impossible, but let's still return an error to log.
return Err(anyhow!("Reducer or procedure `{name}` not found"));
};
// The timestamp we tell the function it's running at will be
// at least the timestamp it was scheduled to run at.
let now = Timestamp::now();
let ts = at.unwrap_or(now).max(now);
let instant = Instant::now() + ts.duration_since(now).unwrap_or(Duration::ZERO);
let params = match id {
FunctionId::Reducer(id) => CallParams::Reducer(CallReducerParams::from_system(ts, identity, id, args)),
FunctionId::Procedure(id) => CallParams::Procedure(CallProcedureParams::from_system(ts, identity, id, args)),
};
Ok((ts, instant, params))
}
/// Generate [`ScheduledFunction`] for given [`ScheduledFunctionId`].
fn process_schedule(
tx: &MutTxId,
db: &RelationalDB,
table_id: TableId,
schedule_row: &RowRef<'_>,
) -> Result<ScheduledReducer, anyhow::Error> {
// get reducer name from `ST_SCHEDULED` table
) -> Result<ScheduledFunction, anyhow::Error> {
// Get reducer name from `ST_SCHEDULED` table.
let table_id_col = StScheduledFields::TableId.col_id();
let reducer_name_col = StScheduledFields::ReducerName.col_id();
let function_name_col = StScheduledFields::ReducerName.col_id();
let st_scheduled_row = db
.iter_by_col_eq_mut(tx, ST_SCHEDULED_ID, table_id_col, &table_id.into())?
.next()
.ok_or_else(|| anyhow!("Scheduled table with id {table_id} entry does not exist in `st_scheduled`"))?;
let reducer = st_scheduled_row.read_col::<Box<str>>(reducer_name_col)?;
let function = st_scheduled_row.read_col::<Box<str>>(function_name_col)?;
Ok(ScheduledReducer {
reducer,
Ok(ScheduledFunction {
function,
bsatn_args: schedule_row.to_bsatn_vec()?,
})
}
/// Helper to get schedule_row with `MutTxId`
/// Returns the `schedule_row` for `id`.
fn get_schedule_row_mut<'a>(
tx: &'a MutTxId,
db: &'a RelationalDB,
id: ScheduledReducerId,
) -> anyhow::Result<RowRef<'a>> {
db.iter_by_col_eq_mut(tx, id.table_id, id.id_column, &id.schedule_id.into())?
.next()
.ok_or_else(|| anyhow!("Schedule with ID {} not found in table {}", id.schedule_id, id.table_id))
id: ScheduledFunctionId,
) -> anyhow::Result<Option<RowRef<'a>>> {
Ok(db
.iter_by_col_eq_mut(tx, id.table_id, id.id_column, &id.schedule_id.into())?
.next())
}
/// Helper to get schedule_id and schedule_at from schedule_row product value
/// Helper to get `schedule_id` and `schedule_at`
/// from `schedule_row` product value.
pub fn get_schedule_from_row(
row: &RowRef<'_>,
id_column: ColId,
+28 -21
View File
@@ -16,10 +16,9 @@ use crate::client::ClientActorId;
use crate::host::host_controller::CallProcedureReturn;
use crate::host::instance_env::{ChunkPool, InstanceEnv, TxSlot};
use crate::host::module_host::{
call_identity_connected, call_scheduled_reducer, init_database, CallViewParams, ClientConnectedError, Instance,
ViewCallResult,
call_identity_connected, init_database, CallViewParams, ClientConnectedError, Instance, ViewCallResult,
};
use crate::host::scheduler::QueueItem;
use crate::host::scheduler::{CallScheduledFunctionResult, ScheduledFunctionParams};
use crate::host::wasm_common::instrumentation::CallTimes;
use crate::host::wasm_common::module_host_actor::{
AnonymousViewOp, DescribeError, ExecutionError, ExecutionResult, ExecutionStats, ExecutionTimings, InstanceCommon,
@@ -363,17 +362,6 @@ impl JsInstance {
.await
}
pub(crate) async fn call_scheduled_reducer(
self: Box<Self>,
item: QueueItem,
) -> (Result<(ReducerCallResult, Timestamp), ReducerCallError>, Box<Self>) {
self.send_recv(
JsWorkerReply::into_call_scheduled_reducer,
JsWorkerRequest::CallScheduledReducer(item),
)
.await
}
pub async fn init_database(
self: Box<Self>,
program: Program,
@@ -404,6 +392,20 @@ impl JsInstance {
.await;
(*r, s)
}
pub(in crate::host) async fn call_scheduled_function(
self: Box<Self>,
params: ScheduledFunctionParams,
) -> (CallScheduledFunctionResult, Box<Self>) {
// Get a handle to the current tokio runtime, and pass it to the worker
// so that it can execute futures.
let rt = tokio::runtime::Handle::current();
self.send_recv(
JsWorkerReply::into_call_scheduled_function,
JsWorkerRequest::CallScheduledFunction(params, rt),
)
.await
}
}
/// A reply from the worker in [`spawn_instance_worker`].
@@ -417,8 +419,8 @@ enum JsWorkerReply {
CallIdentityConnected(Result<(), ClientConnectedError>),
CallIdentityDisconnected(Result<(), ReducerCallError>),
DisconnectClient(Result<(), ReducerCallError>),
CallScheduledReducer(Result<(ReducerCallResult, Timestamp), ReducerCallError>),
InitDatabase(anyhow::Result<Option<ReducerCallResult>>),
CallScheduledFunction(CallScheduledFunctionResult),
}
/// A request for the worker in [`spawn_instance_worker`].
@@ -451,10 +453,10 @@ enum JsWorkerRequest {
CallIdentityDisconnected(Identity, ConnectionId, bool),
/// See [`JsInstance::disconnect_client`].
DisconnectClient(ClientActorId),
/// See [`JsInstance::call_scheduled_reducer`].
CallScheduledReducer(QueueItem),
/// See [`JsInstance::init_database`].
InitDatabase(Program),
/// See [`JsInstance::call_scheduled_function`].
CallScheduledFunction(ScheduledFunctionParams, tokio::runtime::Handle),
}
/// Performs some of the startup work of [`spawn_instance_worker`].
@@ -642,15 +644,20 @@ fn spawn_instance_worker(
let res = ModuleHost::disconnect_client_inner(client_id, info, call_reducer, &mut trapped);
reply("disconnect_client", DisconnectClient(res), trapped);
}
JsWorkerRequest::CallScheduledReducer(queue_item) => {
let (res, trapped) = call_scheduled_reducer(info, queue_item, call_reducer);
reply("call_scheduled_reducer", CallScheduledReducer(res), trapped);
}
JsWorkerRequest::InitDatabase(program) => {
let (res, trapped): (Result<Option<ReducerCallResult>, anyhow::Error>, bool) =
init_database(replica_ctx, &module_common.info().module_def, program, call_reducer);
reply("init_database", InitDatabase(res), trapped);
}
JsWorkerRequest::CallScheduledFunction(params, rt) => {
let _guard = rt.enter();
let (res, trapped) = instance_common
.call_scheduled_function(params, &mut inst)
.now_or_never()
.expect("our call_procedure implementation is not actually async");
reply("call_scheduled_function", CallScheduledFunction(res), trapped);
}
}
}
});
@@ -8,11 +8,11 @@ use crate::host::host_controller::CallProcedureReturn;
use crate::host::instance_env::{InstanceEnv, TxSlot};
use crate::host::module_common::{build_common_module_from_raw, ModuleCommon};
use crate::host::module_host::{
call_identity_connected, call_scheduled_reducer, init_database, CallProcedureParams, CallReducerParams,
CallViewParams, ClientConnectedError, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, ModuleInfo,
ViewCallResult, ViewOutcome,
call_identity_connected, init_database, CallProcedureParams, CallReducerParams, CallViewParams,
ClientConnectedError, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, ModuleInfo, ViewCallResult,
ViewOutcome,
};
use crate::host::scheduler::QueueItem;
use crate::host::scheduler::{CallScheduledFunctionResult, ScheduledFunctionParams};
use crate::host::{
ArgsTuple, ModuleHost, ProcedureCallError, ProcedureCallResult, ReducerCallError, ReducerCallResult, ReducerId,
ReducerOutcome, Scheduler, UpdateDatabaseResult,
@@ -446,17 +446,6 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
res
}
pub(crate) fn call_scheduled_reducer(
&mut self,
item: QueueItem,
) -> Result<(ReducerCallResult, Timestamp), ReducerCallError> {
let module = &self.common.info.clone();
let call_reducer = |tx, params| self.call_reducer_with_tx(tx, params);
let (res, trapped) = call_scheduled_reducer(module, item, call_reducer);
self.trapped = trapped;
res
}
pub fn init_database(&mut self, program: Program) -> anyhow::Result<Option<ReducerCallResult>> {
let module_def = &self.common.info.clone().module_def;
let replica_ctx = &self.instance.replica_ctx().clone();
@@ -471,6 +460,15 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
self.trapped = trapped;
res
}
pub(in crate::host) async fn call_scheduled_function(
&mut self,
params: ScheduledFunctionParams,
) -> CallScheduledFunctionResult {
let (res, trapped) = self.common.call_scheduled_function(params, &mut self.instance).await;
self.trapped = trapped;
res
}
}
impl<T: WasmInstance> WasmModuleInstance<T> {
@@ -1246,6 +1244,14 @@ impl InstanceCommon {
pub(crate) fn clear_all_clients(&self) -> anyhow::Result<()> {
self.info.relational_db().clear_all_clients().map_err(Into::into)
}
pub(crate) async fn call_scheduled_function<I: WasmInstance>(
&mut self,
params: ScheduledFunctionParams,
inst: &mut I,
) -> (CallScheduledFunctionResult, bool) {
crate::host::scheduler::call_scheduled_function(&self.info.clone(), params, self, inst).await
}
}
/// VM-related metrics for reducer execution.
struct VmMetrics {
+3 -8
View File
@@ -308,14 +308,9 @@ impl ModuleDef {
self.lifecycle_reducers[lifecycle].map(|i| (i, &self.reducers[i.idx()]))
}
/// Get a `DeserializeSeed` that can pull data from a `Deserializer` and format it into a `ProductType`
/// at the parameter type of the reducer named `name`.
pub fn reducer_arg_deserialize_seed<K: ?Sized + Hash + Equivalent<Identifier>>(
&self,
name: &K,
) -> Option<(ReducerId, ArgsSeed<'_, ReducerDef>)> {
let (id, reducer) = self.reducer_full(name)?;
Some((id, ArgsSeed(self.typespace.with_type(reducer))))
/// Returns a `DeserializeSeed` that can pull data from a `Deserializer` for `def`.
pub fn arg_seed_for<'a, T>(&'a self, def: &'a T) -> ArgsSeed<'a, T> {
ArgsSeed(self.typespace.with_type(def))
}
/// Look up the name corresponding to an `AlgebraicTypeRef`.
+5 -8
View File
@@ -1197,7 +1197,7 @@ fn identifier(name: Box<str>) -> Result<Identifier> {
fn check_scheduled_functions_exist(
tables: &mut IdentifierMap<TableDef>,
reducers: &IndexMap<Identifier, ReducerDef>,
_procedures: &IndexMap<Identifier, ProcedureDef>,
procedures: &IndexMap<Identifier, ProcedureDef>,
) -> Result<()> {
let validate_params =
|params_from_function: &ProductType, table_row_type_ref: AlgebraicTypeRef, function_name: &str| {
@@ -1222,13 +1222,10 @@ fn check_scheduled_functions_exist(
if let Some(reducer) = reducers.get(&schedule.function_name) {
schedule.function_kind = FunctionKind::Reducer;
validate_params(&reducer.params, table.product_type_ref, &reducer.name).map_err(Into::into)
} else
// TODO(scheduled-procedures): Uncomment this
// if let Some(procedure) = procedures.get(&schedule.function_name) {
// schedule.function_kind = FunctionKind::Procedure;
// validate_params(&procedure.params, table.product_type_ref, &procedure.name).map_err(Into::into)
// } else
{
} else if let Some(procedure) = procedures.get(&schedule.function_name) {
schedule.function_kind = FunctionKind::Procedure;
validate_params(&procedure.params, table.product_type_ref, &procedure.name).map_err(Into::into)
} else {
Err(ValidationError::MissingScheduledFunction {
schedule: schedule.name.clone(),
function: schedule.function_name.clone(),
+51 -1
View File
@@ -1,6 +1,7 @@
// ─────────────────────────────────────────────────────────────────────────────
// IMPORTS
// ─────────────────────────────────────────────────────────────────────────────
import { ScheduleAt } from 'spacetimedb';
import {
errors,
schema,
@@ -25,7 +26,30 @@ const MyTable = table(
{ field: ReturnStruct }
);
const spacetimedb = schema(MyTable);
const ScheduledProcTable = t.row({
scheduled_id: t.u64().primaryKey().autoInc(),
scheduled_at: t.scheduleAt(),
reducer_ts: t.timestamp(),
x: t.u8(),
y: t.u8(),
});
const ScheduledProcTableTable = table(
{ name: 'scheduled_proc_table', scheduled: 'scheduled_proc' },
ScheduledProcTable
);
const ProcInsertsInto = t.row({
reducer_ts: t.timestamp(),
procedure_ts: t.timestamp(),
x: t.u8(),
y: t.u8(),
});
const ProcInsertsIntoTable = table(
{ name: 'proc_inserts_into', public: true },
ProcInsertsInto
);
const spacetimedb = schema(MyTable, ScheduledProcTableTable, ProcInsertsIntoTable);
spacetimedb.procedure(
'return_primitive',
@@ -119,3 +143,29 @@ spacetimedb.procedure('insert_with_tx_rollback', t.unit(), ctx => {
assertRowCount(ctx, 0);
return {};
});
spacetimedb.reducer('schedule_proc', {}, ctx => {
ctx.db.scheduledProcTable.insert({
scheduled_id: 0n,
scheduled_at: ScheduleAt.interval(1000000n),
reducer_ts: ctx.timestamp,
x: 42,
y: 24,
})
});
spacetimedb.procedure('scheduled_proc', { data: ScheduledProcTable }, t.unit(), (ctx, { data }) => {
const reducer_ts = data.reducer_ts;
const x = data.x;
const y = data.y;
const procedure_ts = ctx.timestamp;
ctx.withTx(ctx => {
ctx.db.procInsertsInto.insert({
reducer_ts,
procedure_ts,
x,
y
});
});
return {};
});
+53 -1
View File
@@ -1,4 +1,7 @@
use spacetimedb::{procedure, table, ProcedureContext, SpacetimeType, Table, TxContext};
use spacetimedb::{
duration, procedure, reducer, table, DbContext, ProcedureContext, ReducerContext, ScheduleAt, SpacetimeType, Table,
Timestamp, TxContext,
};
#[derive(SpacetimeType)]
struct ReturnStruct {
@@ -98,3 +101,52 @@ fn insert_with_tx_rollback(ctx: &mut ProcedureContext) {
// Assert that there's not a row.
assert_row_count(ctx, 0);
}
/// A reducer that schedules [`scheduled_proc`] via `ScheduledProcTable`.
#[reducer]
fn schedule_proc(ctx: &ReducerContext) {
// Schedule the procedure to run in 1s.
ctx.db().scheduled_proc_table().insert(ScheduledProcTable {
scheduled_id: 0,
scheduled_at: duration!("1000ms").into(),
// Store the timestamp at which this reducer was called.
// In tests, we'll compare this with the timestamp the procedure was called.
reducer_ts: ctx.timestamp,
x: 42,
y: 24,
});
}
#[table(name = scheduled_proc_table, scheduled(scheduled_proc))]
struct ScheduledProcTable {
#[primary_key]
#[auto_inc]
scheduled_id: u64,
scheduled_at: ScheduleAt,
reducer_ts: Timestamp,
x: u8,
y: u8,
}
/// A procedure that should be called 1s after `schedule_proc`.
#[procedure]
fn scheduled_proc(ctx: &mut ProcedureContext, data: ScheduledProcTable) {
let ScheduledProcTable { reducer_ts, x, y, .. } = data;
let procedure_ts = ctx.timestamp;
ctx.with_tx(|ctx| {
ctx.db().proc_inserts_into().insert(ProcInsertsInto {
reducer_ts,
procedure_ts,
x,
y,
})
});
}
#[table(name = proc_inserts_into, public)]
struct ProcInsertsInto {
reducer_ts: Timestamp,
procedure_ts: Timestamp,
x: u8,
y: u8,
}
@@ -0,0 +1,77 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#nullable enable
using System;
using SpacetimeDB.ClientApi;
using System.Collections.Generic;
using System.Runtime.Serialization;
namespace SpacetimeDB.Types
{
public sealed partial class RemoteProcedures : RemoteBase
{
public void ScheduledProc(SpacetimeDB.Types.ScheduledProcTable data, ProcedureCallback<SpacetimeDB.Unit> callback)
{
// Convert the clean callback to the wrapper callback
InternalScheduledProc(data, (ctx, result) =>
{
if (result.IsSuccess && result.Value != null)
{
callback(ctx, ProcedureCallbackResult<SpacetimeDB.Unit>.Success(result.Value.Value));
}
else
{
callback(ctx, ProcedureCallbackResult<SpacetimeDB.Unit>.Failure(result.Error!));
}
});
}
private void InternalScheduledProc(SpacetimeDB.Types.ScheduledProcTable data, ProcedureCallback<Procedure.ScheduledProc> callback)
{
conn.InternalCallProcedure(new Procedure.ScheduledProcArgs(data), callback);
}
}
public abstract partial class Procedure
{
[SpacetimeDB.Type]
[DataContract]
public sealed partial class ScheduledProc
{
[DataMember(Name = "Value")]
public SpacetimeDB.Unit Value;
public ScheduledProc(SpacetimeDB.Unit Value)
{
this.Value = Value;
}
public ScheduledProc()
{
}
}
[SpacetimeDB.Type]
[DataContract]
public sealed partial class ScheduledProcArgs : Procedure, IProcedureArgs
{
[DataMember(Name = "data")]
public ScheduledProcTable Data;
public ScheduledProcArgs(ScheduledProcTable Data)
{
this.Data = Data;
}
public ScheduledProcArgs()
{
this.Data = new();
}
string IProcedureArgs.ProcedureName => "scheduled_proc";
}
}
}
@@ -0,0 +1,59 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#nullable enable
using System;
using SpacetimeDB.ClientApi;
using System.Collections.Generic;
using System.Runtime.Serialization;
namespace SpacetimeDB.Types
{
public sealed partial class RemoteReducers : RemoteBase
{
public delegate void ScheduleProcHandler(ReducerEventContext ctx);
public event ScheduleProcHandler? OnScheduleProc;
public void ScheduleProc()
{
conn.InternalCallReducer(new Reducer.ScheduleProc(), this.SetCallReducerFlags.ScheduleProcFlags);
}
public bool InvokeScheduleProc(ReducerEventContext ctx, Reducer.ScheduleProc args)
{
if (OnScheduleProc == null)
{
if (InternalOnUnhandledReducerError != null)
{
switch (ctx.Event.Status)
{
case Status.Failed(var reason): InternalOnUnhandledReducerError(ctx, new Exception(reason)); break;
case Status.OutOfEnergy(var _): InternalOnUnhandledReducerError(ctx, new Exception("out of energy")); break;
}
}
return false;
}
OnScheduleProc(
ctx
);
return true;
}
}
public abstract partial class Reducer
{
[SpacetimeDB.Type]
[DataContract]
public sealed partial class ScheduleProc : Reducer, IReducerArgs
{
string IReducerArgs.ReducerName => "schedule_proc";
}
}
public sealed partial class SetReducerFlags
{
internal CallReducerFlags ScheduleProcFlags;
public void ScheduleProc(CallReducerFlags flags) => ScheduleProcFlags = flags;
}
}
@@ -1,7 +1,7 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
// This was generated using spacetimedb cli version 1.8.0 (commit 6cbedef7ca453ea48801d9192779fcee7a465a10).
// This was generated using spacetimedb cli version 1.9.0 (commit a722cabfdf0a59d6fd152cf316511a514475c0ca).
#nullable enable
@@ -29,6 +29,8 @@ namespace SpacetimeDB.Types
public RemoteTables(DbConnection conn)
{
AddTable(MyTable = new(conn));
AddTable(ProcInsertsInto = new(conn));
AddTable(ScheduledProcTable = new(conn));
}
}
@@ -589,6 +591,7 @@ namespace SpacetimeDB.Types
var encodedArgs = update.ReducerCall.Args;
return update.ReducerCall.ReducerName switch
{
"schedule_proc" => BSATNHelpers.Decode<Reducer.ScheduleProc>(encodedArgs),
"" => throw new SpacetimeDBEmptyReducerNameException("Reducer name is empty"),
var reducer => throw new ArgumentOutOfRangeException("Reducer", $"Unknown reducer {reducer}")
};
@@ -614,6 +617,7 @@ namespace SpacetimeDB.Types
var eventContext = (ReducerEventContext)context;
return reducer switch
{
Reducer.ScheduleProc args => Reducers.InvokeScheduleProc(eventContext, args),
_ => throw new ArgumentOutOfRangeException("Reducer", $"Unknown reducer {reducer}")
};
}
@@ -0,0 +1,27 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#nullable enable
using System;
using SpacetimeDB.BSATN;
using SpacetimeDB.ClientApi;
using System.Collections.Generic;
using System.Runtime.Serialization;
namespace SpacetimeDB.Types
{
public sealed partial class RemoteTables
{
public sealed class ProcInsertsIntoHandle : RemoteTableHandle<EventContext, ProcInsertsInto>
{
protected override string RemoteTableName => "proc_inserts_into";
internal ProcInsertsIntoHandle(DbConnection conn) : base(conn)
{
}
}
public readonly ProcInsertsIntoHandle ProcInsertsInto;
}
}
@@ -0,0 +1,39 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#nullable enable
using System;
using SpacetimeDB.BSATN;
using SpacetimeDB.ClientApi;
using System.Collections.Generic;
using System.Runtime.Serialization;
namespace SpacetimeDB.Types
{
public sealed partial class RemoteTables
{
public sealed class ScheduledProcTableHandle : RemoteTableHandle<EventContext, ScheduledProcTable>
{
protected override string RemoteTableName => "scheduled_proc_table";
public sealed class ScheduledIdUniqueIndex : UniqueIndexBase<ulong>
{
protected override ulong GetKey(ScheduledProcTable row) => row.ScheduledId;
public ScheduledIdUniqueIndex(ScheduledProcTableHandle table) : base(table) { }
}
public readonly ScheduledIdUniqueIndex ScheduledId;
internal ScheduledProcTableHandle(DbConnection conn) : base(conn)
{
ScheduledId = new(this);
}
protected override object GetPrimaryKey(ScheduledProcTable row) => row.ScheduledId;
}
public readonly ScheduledProcTableHandle ScheduledProcTable;
}
}
@@ -0,0 +1,42 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#nullable enable
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
namespace SpacetimeDB.Types
{
[SpacetimeDB.Type]
[DataContract]
public sealed partial class ProcInsertsInto
{
[DataMember(Name = "reducer_ts")]
public SpacetimeDB.Timestamp ReducerTs;
[DataMember(Name = "procedure_ts")]
public SpacetimeDB.Timestamp ProcedureTs;
[DataMember(Name = "x")]
public byte X;
[DataMember(Name = "y")]
public byte Y;
public ProcInsertsInto(
SpacetimeDB.Timestamp ReducerTs,
SpacetimeDB.Timestamp ProcedureTs,
byte X,
byte Y
)
{
this.ReducerTs = ReducerTs;
this.ProcedureTs = ProcedureTs;
this.X = X;
this.Y = Y;
}
public ProcInsertsInto()
{
}
}
}
@@ -0,0 +1,47 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#nullable enable
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
namespace SpacetimeDB.Types
{
[SpacetimeDB.Type]
[DataContract]
public sealed partial class ScheduledProcTable
{
[DataMember(Name = "scheduled_id")]
public ulong ScheduledId;
[DataMember(Name = "scheduled_at")]
public SpacetimeDB.ScheduleAt ScheduledAt;
[DataMember(Name = "reducer_ts")]
public SpacetimeDB.Timestamp ReducerTs;
[DataMember(Name = "x")]
public byte X;
[DataMember(Name = "y")]
public byte Y;
public ScheduledProcTable(
ulong ScheduledId,
SpacetimeDB.ScheduleAt ScheduledAt,
SpacetimeDB.Timestamp ReducerTs,
byte X,
byte Y
)
{
this.ScheduledId = ScheduledId;
this.ScheduledAt = ScheduledAt;
this.ReducerTs = ReducerTs;
this.X = X;
this.Y = Y;
}
public ScheduledProcTable()
{
this.ScheduledAt = null!;
}
}
}
@@ -1,7 +1,7 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
// This was generated using spacetimedb cli version 1.8.0 (commit 6cbedef7ca453ea48801d9192779fcee7a465a10).
// This was generated using spacetimedb cli version 1.9.0 (commit a722cabfdf0a59d6fd152cf316511a514475c0ca).
#nullable enable
@@ -58,6 +58,7 @@ impl identity_connected for super::RemoteReducers {
IdentityConnectedCallbackId(self.imp.on_reducer(
"identity_connected",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -59,6 +59,7 @@ impl identity_disconnected for super::RemoteReducers {
IdentityDisconnectedCallbackId(self.imp.on_reducer(
"identity_disconnected",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -1,7 +1,7 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
// This was generated using spacetimedb cli version 1.6.0 (commit 3f1de9e09651bc412de3cb9daf49cc553ebb81e8).
// This was generated using spacetimedb cli version 1.9.0 (commit d694065b403f116daeb90e7cccc0920ce23dfc71).
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
+43 -1
View File
@@ -1,5 +1,7 @@
mod module_bindings;
use core::time::Duration;
use anyhow::Context;
use module_bindings::*;
use spacetimedb_lib::db::raw_def::v9::{RawMiscModuleExportV9, RawModuleDefV9};
@@ -43,6 +45,7 @@ fn main() {
"procedure-http-err" => exec_procedure_http_err(),
"insert-with-tx-commit" => exec_insert_with_tx_commit(),
"insert-with-tx-rollback" => exec_insert_with_tx_rollback(),
"schedule-procedure" => exec_schedule_procedure(),
_ => panic!("Unknown test: {test}"),
}
}
@@ -97,7 +100,7 @@ fn connect_then(
}
/// A query that subscribes to all rows from all tables.
const SUBSCRIBE_ALL: &[&str] = &["SELECT * FROM my_table;"];
const SUBSCRIBE_ALL: &[&str] = &["SELECT * FROM my_table;", "SELECT * FROM proc_inserts_into;"];
fn subscribe_all_then(ctx: &impl RemoteDbContext, callback: impl FnOnce(&SubscriptionEventContext) + Send + 'static) {
subscribe_these_then(ctx, SUBSCRIBE_ALL, callback)
@@ -310,5 +313,44 @@ fn exec_procedure_http_err() {
})
}
});
test_counter.wait_for_all();
}
fn exec_schedule_procedure() {
let test_counter = TestCounter::new();
let sub_applied_nothing_result = test_counter.add_test("on_subscription_applied_nothing");
let mut callback_result = Some(test_counter.add_test("insert_with_tx_commit_callback"));
connect_then(&test_counter, {
move |ctx| {
ctx.db().proc_inserts_into().on_insert(move |_, row| {
assert_eq!(row.x, 42);
assert_eq!(row.y, 24);
// Ensure that the elapsed time
// between the reducer and procedure
// is at least 1 second
// but no more than 2 seconds.
let elapsed = row
.procedure_ts
.duration_since(row.reducer_ts)
.expect("procedure ts > reducer ts");
const MS_1000: Duration = Duration::from_secs(1);
const MS_2000: Duration = Duration::from_secs(2);
assert!(elapsed >= MS_1000);
assert!(elapsed <= MS_2000);
(callback_result.take().unwrap())(Ok(()));
});
subscribe_all_then(ctx, move |ctx| {
sub_applied_nothing_result(assert_all_tables_empty(ctx));
ctx.reducers.schedule_proc().unwrap();
});
}
});
test_counter.wait_for_all();
}
+45 -2
View File
@@ -1,7 +1,7 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
// This was generated using spacetimedb cli version 1.8.0 (commit fd75c7fbf57e943785c09dc91df0697844ff4dad).
// This was generated using spacetimedb cli version 1.9.0 (commit d694065b403f116daeb90e7cccc0920ce23dfc71).
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
@@ -11,6 +11,8 @@ pub mod insert_with_tx_rollback_procedure;
pub mod invalid_request_procedure;
pub mod my_table_table;
pub mod my_table_type;
pub mod proc_inserts_into_table;
pub mod proc_inserts_into_type;
pub mod read_my_schema_procedure;
pub mod return_enum_a_procedure;
pub mod return_enum_b_procedure;
@@ -18,6 +20,10 @@ pub mod return_enum_type;
pub mod return_primitive_procedure;
pub mod return_struct_procedure;
pub mod return_struct_type;
pub mod schedule_proc_reducer;
pub mod scheduled_proc_procedure;
pub mod scheduled_proc_table_table;
pub mod scheduled_proc_table_type;
pub mod will_panic_procedure;
pub use insert_with_tx_commit_procedure::insert_with_tx_commit;
@@ -25,6 +31,8 @@ pub use insert_with_tx_rollback_procedure::insert_with_tx_rollback;
pub use invalid_request_procedure::invalid_request;
pub use my_table_table::*;
pub use my_table_type::MyTable;
pub use proc_inserts_into_table::*;
pub use proc_inserts_into_type::ProcInsertsInto;
pub use read_my_schema_procedure::read_my_schema;
pub use return_enum_a_procedure::return_enum_a;
pub use return_enum_b_procedure::return_enum_b;
@@ -32,6 +40,10 @@ pub use return_enum_type::ReturnEnum;
pub use return_primitive_procedure::return_primitive;
pub use return_struct_procedure::return_struct;
pub use return_struct_type::ReturnStruct;
pub use schedule_proc_reducer::{schedule_proc, set_flags_for_schedule_proc, ScheduleProcCallbackId};
pub use scheduled_proc_procedure::scheduled_proc;
pub use scheduled_proc_table_table::*;
pub use scheduled_proc_table_type::ScheduledProcTable;
pub use will_panic_procedure::will_panic;
#[derive(Clone, PartialEq, Debug)]
@@ -41,7 +53,9 @@ pub use will_panic_procedure::will_panic;
/// Contained within a [`__sdk::ReducerEvent`] in [`EventContext`]s for reducer events
/// to indicate which reducer caused the event.
pub enum Reducer {}
pub enum Reducer {
ScheduleProc,
}
impl __sdk::InModule for Reducer {
type Module = RemoteModule;
@@ -50,6 +64,7 @@ impl __sdk::InModule for Reducer {
impl __sdk::Reducer for Reducer {
fn reducer_name(&self) -> &'static str {
match self {
Reducer::ScheduleProc => "schedule_proc",
_ => unreachable!(),
}
}
@@ -58,6 +73,11 @@ impl TryFrom<__ws::ReducerCallInfo<__ws::BsatnFormat>> for Reducer {
type Error = __sdk::Error;
fn try_from(value: __ws::ReducerCallInfo<__ws::BsatnFormat>) -> __sdk::Result<Self> {
match &value.reducer_name[..] {
"schedule_proc" => Ok(__sdk::parse_reducer_args::<schedule_proc_reducer::ScheduleProcArgs>(
"schedule_proc",
&value.args,
)?
.into()),
unknown => Err(__sdk::InternalError::unknown_name("reducer", unknown, "ReducerCallInfo").into()),
}
}
@@ -68,6 +88,8 @@ impl TryFrom<__ws::ReducerCallInfo<__ws::BsatnFormat>> for Reducer {
#[doc(hidden)]
pub struct DbUpdate {
my_table: __sdk::TableUpdate<MyTable>,
proc_inserts_into: __sdk::TableUpdate<ProcInsertsInto>,
scheduled_proc_table: __sdk::TableUpdate<ScheduledProcTable>,
}
impl TryFrom<__ws::DatabaseUpdate<__ws::BsatnFormat>> for DbUpdate {
@@ -79,6 +101,12 @@ impl TryFrom<__ws::DatabaseUpdate<__ws::BsatnFormat>> for DbUpdate {
"my_table" => db_update
.my_table
.append(my_table_table::parse_table_update(table_update)?),
"proc_inserts_into" => db_update
.proc_inserts_into
.append(proc_inserts_into_table::parse_table_update(table_update)?),
"scheduled_proc_table" => db_update
.scheduled_proc_table
.append(scheduled_proc_table_table::parse_table_update(table_update)?),
unknown => {
return Err(__sdk::InternalError::unknown_name("table", unknown, "DatabaseUpdate").into());
@@ -98,6 +126,11 @@ impl __sdk::DbUpdate for DbUpdate {
let mut diff = AppliedDiff::default();
diff.my_table = cache.apply_diff_to_table::<MyTable>("my_table", &self.my_table);
diff.proc_inserts_into =
cache.apply_diff_to_table::<ProcInsertsInto>("proc_inserts_into", &self.proc_inserts_into);
diff.scheduled_proc_table = cache
.apply_diff_to_table::<ScheduledProcTable>("scheduled_proc_table", &self.scheduled_proc_table)
.with_updates_by_pk(|row| &row.scheduled_id);
diff
}
@@ -108,6 +141,8 @@ impl __sdk::DbUpdate for DbUpdate {
#[doc(hidden)]
pub struct AppliedDiff<'r> {
my_table: __sdk::TableAppliedDiff<'r, MyTable>,
proc_inserts_into: __sdk::TableAppliedDiff<'r, ProcInsertsInto>,
scheduled_proc_table: __sdk::TableAppliedDiff<'r, ScheduledProcTable>,
__unused: std::marker::PhantomData<&'r ()>,
}
@@ -118,6 +153,12 @@ impl __sdk::InModule for AppliedDiff<'_> {
impl<'r> __sdk::AppliedDiff<'r> for AppliedDiff<'r> {
fn invoke_row_callbacks(&self, event: &EventContext, callbacks: &mut __sdk::DbCallbacks<RemoteModule>) {
callbacks.invoke_table_row_callbacks::<MyTable>("my_table", &self.my_table, event);
callbacks.invoke_table_row_callbacks::<ProcInsertsInto>("proc_inserts_into", &self.proc_inserts_into, event);
callbacks.invoke_table_row_callbacks::<ScheduledProcTable>(
"scheduled_proc_table",
&self.scheduled_proc_table,
event,
);
}
}
@@ -838,5 +879,7 @@ impl __sdk::SpacetimeModule for RemoteModule {
fn register_tables(client_cache: &mut __sdk::ClientCache<Self>) {
my_table_table::register_table(client_cache);
proc_inserts_into_table::register_table(client_cache);
scheduled_proc_table_table::register_table(client_cache);
}
}
@@ -0,0 +1,95 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use super::proc_inserts_into_type::ProcInsertsInto;
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
/// Table handle for the table `proc_inserts_into`.
///
/// Obtain a handle from the [`ProcInsertsIntoTableAccess::proc_inserts_into`] method on [`super::RemoteTables`],
/// like `ctx.db.proc_inserts_into()`.
///
/// Users are encouraged not to explicitly reference this type,
/// but to directly chain method calls,
/// like `ctx.db.proc_inserts_into().on_insert(...)`.
pub struct ProcInsertsIntoTableHandle<'ctx> {
imp: __sdk::TableHandle<ProcInsertsInto>,
ctx: std::marker::PhantomData<&'ctx super::RemoteTables>,
}
#[allow(non_camel_case_types)]
/// Extension trait for access to the table `proc_inserts_into`.
///
/// Implemented for [`super::RemoteTables`].
pub trait ProcInsertsIntoTableAccess {
#[allow(non_snake_case)]
/// Obtain a [`ProcInsertsIntoTableHandle`], which mediates access to the table `proc_inserts_into`.
fn proc_inserts_into(&self) -> ProcInsertsIntoTableHandle<'_>;
}
impl ProcInsertsIntoTableAccess for super::RemoteTables {
fn proc_inserts_into(&self) -> ProcInsertsIntoTableHandle<'_> {
ProcInsertsIntoTableHandle {
imp: self.imp.get_table::<ProcInsertsInto>("proc_inserts_into"),
ctx: std::marker::PhantomData,
}
}
}
pub struct ProcInsertsIntoInsertCallbackId(__sdk::CallbackId);
pub struct ProcInsertsIntoDeleteCallbackId(__sdk::CallbackId);
impl<'ctx> __sdk::Table for ProcInsertsIntoTableHandle<'ctx> {
type Row = ProcInsertsInto;
type EventContext = super::EventContext;
fn count(&self) -> u64 {
self.imp.count()
}
fn iter(&self) -> impl Iterator<Item = ProcInsertsInto> + '_ {
self.imp.iter()
}
type InsertCallbackId = ProcInsertsIntoInsertCallbackId;
fn on_insert(
&self,
callback: impl FnMut(&Self::EventContext, &Self::Row) + Send + 'static,
) -> ProcInsertsIntoInsertCallbackId {
ProcInsertsIntoInsertCallbackId(self.imp.on_insert(Box::new(callback)))
}
fn remove_on_insert(&self, callback: ProcInsertsIntoInsertCallbackId) {
self.imp.remove_on_insert(callback.0)
}
type DeleteCallbackId = ProcInsertsIntoDeleteCallbackId;
fn on_delete(
&self,
callback: impl FnMut(&Self::EventContext, &Self::Row) + Send + 'static,
) -> ProcInsertsIntoDeleteCallbackId {
ProcInsertsIntoDeleteCallbackId(self.imp.on_delete(Box::new(callback)))
}
fn remove_on_delete(&self, callback: ProcInsertsIntoDeleteCallbackId) {
self.imp.remove_on_delete(callback.0)
}
}
#[doc(hidden)]
pub(super) fn register_table(client_cache: &mut __sdk::ClientCache<super::RemoteModule>) {
let _table = client_cache.get_or_make_table::<ProcInsertsInto>("proc_inserts_into");
}
#[doc(hidden)]
pub(super) fn parse_table_update(
raw_updates: __ws::TableUpdate<__ws::BsatnFormat>,
) -> __sdk::Result<__sdk::TableUpdate<ProcInsertsInto>> {
__sdk::TableUpdate::parse_table_update(raw_updates).map_err(|e| {
__sdk::InternalError::failed_parse("TableUpdate<ProcInsertsInto>", "TableUpdate")
.with_cause(e)
.into()
})
}
@@ -0,0 +1,18 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
pub struct ProcInsertsInto {
pub reducer_ts: __sdk::Timestamp,
pub procedure_ts: __sdk::Timestamp,
pub x: u8,
pub y: u8,
}
impl __sdk::InModule for ProcInsertsInto {
type Module = super::RemoteModule;
}
@@ -0,0 +1,100 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
pub(super) struct ScheduleProcArgs {}
impl From<ScheduleProcArgs> for super::Reducer {
fn from(args: ScheduleProcArgs) -> Self {
Self::ScheduleProc
}
}
impl __sdk::InModule for ScheduleProcArgs {
type Module = super::RemoteModule;
}
pub struct ScheduleProcCallbackId(__sdk::CallbackId);
#[allow(non_camel_case_types)]
/// Extension trait for access to the reducer `schedule_proc`.
///
/// Implemented for [`super::RemoteReducers`].
pub trait schedule_proc {
/// Request that the remote module invoke the reducer `schedule_proc` to run as soon as possible.
///
/// This method returns immediately, and errors only if we are unable to send the request.
/// The reducer will run asynchronously in the future,
/// and its status can be observed by listening for [`Self::on_schedule_proc`] callbacks.
fn schedule_proc(&self) -> __sdk::Result<()>;
/// Register a callback to run whenever we are notified of an invocation of the reducer `schedule_proc`.
///
/// Callbacks should inspect the [`__sdk::ReducerEvent`] contained in the [`super::ReducerEventContext`]
/// to determine the reducer's status.
///
/// The returned [`ScheduleProcCallbackId`] can be passed to [`Self::remove_on_schedule_proc`]
/// to cancel the callback.
fn on_schedule_proc(
&self,
callback: impl FnMut(&super::ReducerEventContext) + Send + 'static,
) -> ScheduleProcCallbackId;
/// Cancel a callback previously registered by [`Self::on_schedule_proc`],
/// causing it not to run in the future.
fn remove_on_schedule_proc(&self, callback: ScheduleProcCallbackId);
}
impl schedule_proc for super::RemoteReducers {
fn schedule_proc(&self) -> __sdk::Result<()> {
self.imp.call_reducer("schedule_proc", ScheduleProcArgs {})
}
fn on_schedule_proc(
&self,
mut callback: impl FnMut(&super::ReducerEventContext) + Send + 'static,
) -> ScheduleProcCallbackId {
ScheduleProcCallbackId(self.imp.on_reducer(
"schedule_proc",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
reducer: super::Reducer::ScheduleProc {},
..
},
..
} = ctx
else {
unreachable!()
};
callback(ctx)
}),
))
}
fn remove_on_schedule_proc(&self, callback: ScheduleProcCallbackId) {
self.imp.remove_on_reducer("schedule_proc", callback.0)
}
}
#[allow(non_camel_case_types)]
#[doc(hidden)]
/// Extension trait for setting the call-flags for the reducer `schedule_proc`.
///
/// Implemented for [`super::SetReducerFlags`].
///
/// This type is currently unstable and may be removed without a major version bump.
pub trait set_flags_for_schedule_proc {
/// Set the call-reducer flags for the reducer `schedule_proc` to `flags`.
///
/// This type is currently unstable and may be removed without a major version bump.
fn schedule_proc(&self, flags: __ws::CallReducerFlags);
}
impl set_flags_for_schedule_proc for super::SetReducerFlags {
fn schedule_proc(&self, flags: __ws::CallReducerFlags) {
self.imp.set_call_reducer_flags("schedule_proc", flags);
}
}
@@ -0,0 +1,46 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
use super::scheduled_proc_table_type::ScheduledProcTable;
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
struct ScheduledProcArgs {
pub data: ScheduledProcTable,
}
impl __sdk::InModule for ScheduledProcArgs {
type Module = super::RemoteModule;
}
#[allow(non_camel_case_types)]
/// Extension trait for access to the procedure `scheduled_proc`.
///
/// Implemented for [`super::RemoteProcedures`].
pub trait scheduled_proc {
fn scheduled_proc(&self, data: ScheduledProcTable) {
self.scheduled_proc_then(data, |_, _| {});
}
fn scheduled_proc_then(
&self,
data: ScheduledProcTable,
__callback: impl FnOnce(&super::ProcedureEventContext, Result<(), __sdk::InternalError>) + Send + 'static,
);
}
impl scheduled_proc for super::RemoteProcedures {
fn scheduled_proc_then(
&self,
data: ScheduledProcTable,
__callback: impl FnOnce(&super::ProcedureEventContext, Result<(), __sdk::InternalError>) + Send + 'static,
) {
self.imp
.invoke_procedure_with_callback::<_, ()>("scheduled_proc", ScheduledProcArgs { data }, __callback);
}
}
@@ -0,0 +1,142 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use super::scheduled_proc_table_type::ScheduledProcTable;
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
/// Table handle for the table `scheduled_proc_table`.
///
/// Obtain a handle from the [`ScheduledProcTableTableAccess::scheduled_proc_table`] method on [`super::RemoteTables`],
/// like `ctx.db.scheduled_proc_table()`.
///
/// Users are encouraged not to explicitly reference this type,
/// but to directly chain method calls,
/// like `ctx.db.scheduled_proc_table().on_insert(...)`.
pub struct ScheduledProcTableTableHandle<'ctx> {
imp: __sdk::TableHandle<ScheduledProcTable>,
ctx: std::marker::PhantomData<&'ctx super::RemoteTables>,
}
#[allow(non_camel_case_types)]
/// Extension trait for access to the table `scheduled_proc_table`.
///
/// Implemented for [`super::RemoteTables`].
pub trait ScheduledProcTableTableAccess {
#[allow(non_snake_case)]
/// Obtain a [`ScheduledProcTableTableHandle`], which mediates access to the table `scheduled_proc_table`.
fn scheduled_proc_table(&self) -> ScheduledProcTableTableHandle<'_>;
}
impl ScheduledProcTableTableAccess for super::RemoteTables {
fn scheduled_proc_table(&self) -> ScheduledProcTableTableHandle<'_> {
ScheduledProcTableTableHandle {
imp: self.imp.get_table::<ScheduledProcTable>("scheduled_proc_table"),
ctx: std::marker::PhantomData,
}
}
}
pub struct ScheduledProcTableInsertCallbackId(__sdk::CallbackId);
pub struct ScheduledProcTableDeleteCallbackId(__sdk::CallbackId);
impl<'ctx> __sdk::Table for ScheduledProcTableTableHandle<'ctx> {
type Row = ScheduledProcTable;
type EventContext = super::EventContext;
fn count(&self) -> u64 {
self.imp.count()
}
fn iter(&self) -> impl Iterator<Item = ScheduledProcTable> + '_ {
self.imp.iter()
}
type InsertCallbackId = ScheduledProcTableInsertCallbackId;
fn on_insert(
&self,
callback: impl FnMut(&Self::EventContext, &Self::Row) + Send + 'static,
) -> ScheduledProcTableInsertCallbackId {
ScheduledProcTableInsertCallbackId(self.imp.on_insert(Box::new(callback)))
}
fn remove_on_insert(&self, callback: ScheduledProcTableInsertCallbackId) {
self.imp.remove_on_insert(callback.0)
}
type DeleteCallbackId = ScheduledProcTableDeleteCallbackId;
fn on_delete(
&self,
callback: impl FnMut(&Self::EventContext, &Self::Row) + Send + 'static,
) -> ScheduledProcTableDeleteCallbackId {
ScheduledProcTableDeleteCallbackId(self.imp.on_delete(Box::new(callback)))
}
fn remove_on_delete(&self, callback: ScheduledProcTableDeleteCallbackId) {
self.imp.remove_on_delete(callback.0)
}
}
#[doc(hidden)]
pub(super) fn register_table(client_cache: &mut __sdk::ClientCache<super::RemoteModule>) {
let _table = client_cache.get_or_make_table::<ScheduledProcTable>("scheduled_proc_table");
_table.add_unique_constraint::<u64>("scheduled_id", |row| &row.scheduled_id);
}
pub struct ScheduledProcTableUpdateCallbackId(__sdk::CallbackId);
impl<'ctx> __sdk::TableWithPrimaryKey for ScheduledProcTableTableHandle<'ctx> {
type UpdateCallbackId = ScheduledProcTableUpdateCallbackId;
fn on_update(
&self,
callback: impl FnMut(&Self::EventContext, &Self::Row, &Self::Row) + Send + 'static,
) -> ScheduledProcTableUpdateCallbackId {
ScheduledProcTableUpdateCallbackId(self.imp.on_update(Box::new(callback)))
}
fn remove_on_update(&self, callback: ScheduledProcTableUpdateCallbackId) {
self.imp.remove_on_update(callback.0)
}
}
#[doc(hidden)]
pub(super) fn parse_table_update(
raw_updates: __ws::TableUpdate<__ws::BsatnFormat>,
) -> __sdk::Result<__sdk::TableUpdate<ScheduledProcTable>> {
__sdk::TableUpdate::parse_table_update(raw_updates).map_err(|e| {
__sdk::InternalError::failed_parse("TableUpdate<ScheduledProcTable>", "TableUpdate")
.with_cause(e)
.into()
})
}
/// Access to the `scheduled_id` unique index on the table `scheduled_proc_table`,
/// which allows point queries on the field of the same name
/// via the [`ScheduledProcTableScheduledIdUnique::find`] method.
///
/// Users are encouraged not to explicitly reference this type,
/// but to directly chain method calls,
/// like `ctx.db.scheduled_proc_table().scheduled_id().find(...)`.
pub struct ScheduledProcTableScheduledIdUnique<'ctx> {
imp: __sdk::UniqueConstraintHandle<ScheduledProcTable, u64>,
phantom: std::marker::PhantomData<&'ctx super::RemoteTables>,
}
impl<'ctx> ScheduledProcTableTableHandle<'ctx> {
/// Get a handle on the `scheduled_id` unique index on the table `scheduled_proc_table`.
pub fn scheduled_id(&self) -> ScheduledProcTableScheduledIdUnique<'ctx> {
ScheduledProcTableScheduledIdUnique {
imp: self.imp.get_unique_constraint::<u64>("scheduled_id"),
phantom: std::marker::PhantomData,
}
}
}
impl<'ctx> ScheduledProcTableScheduledIdUnique<'ctx> {
/// Find the subscribed row whose `scheduled_id` column value is equal to `col_val`,
/// if such a row is present in the client cache.
pub fn find(&self, col_val: &u64) -> Option<ScheduledProcTable> {
self.imp.find(col_val)
}
}
@@ -0,0 +1,19 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
pub struct ScheduledProcTable {
pub scheduled_id: u64,
pub scheduled_at: __sdk::ScheduleAt,
pub reducer_ts: __sdk::Timestamp,
pub x: u8,
pub y: u8,
}
impl __sdk::InModule for ScheduledProcTable {
type Module = super::RemoteModule;
}
@@ -63,6 +63,7 @@ impl delete_from_btree_u_32 for super::RemoteReducers {
DeleteFromBtreeU32CallbackId(self.imp.on_reducer(
"delete_from_btree_u32",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -235,6 +235,7 @@ impl delete_large_table for super::RemoteReducers {
DeleteLargeTableCallbackId(self.imp.on_reducer(
"delete_large_table",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_pk_bool for super::RemoteReducers {
DeletePkBoolCallbackId(self.imp.on_reducer(
"delete_pk_bool",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -61,6 +61,7 @@ impl delete_pk_connection_id for super::RemoteReducers {
DeletePkConnectionIdCallbackId(self.imp.on_reducer(
"delete_pk_connection_id",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_pk_i_128 for super::RemoteReducers {
DeletePkI128CallbackId(self.imp.on_reducer(
"delete_pk_i128",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_pk_i_16 for super::RemoteReducers {
DeletePkI16CallbackId(self.imp.on_reducer(
"delete_pk_i16",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_pk_i_256 for super::RemoteReducers {
DeletePkI256CallbackId(self.imp.on_reducer(
"delete_pk_i256",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_pk_i_32 for super::RemoteReducers {
DeletePkI32CallbackId(self.imp.on_reducer(
"delete_pk_i32",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_pk_i_64 for super::RemoteReducers {
DeletePkI64CallbackId(self.imp.on_reducer(
"delete_pk_i64",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_pk_i_8 for super::RemoteReducers {
DeletePkI8CallbackId(self.imp.on_reducer(
"delete_pk_i8",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_pk_identity for super::RemoteReducers {
DeletePkIdentityCallbackId(self.imp.on_reducer(
"delete_pk_identity",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_pk_string for super::RemoteReducers {
DeletePkStringCallbackId(self.imp.on_reducer(
"delete_pk_string",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_pk_u_128 for super::RemoteReducers {
DeletePkU128CallbackId(self.imp.on_reducer(
"delete_pk_u128",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_pk_u_16 for super::RemoteReducers {
DeletePkU16CallbackId(self.imp.on_reducer(
"delete_pk_u16",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_pk_u_256 for super::RemoteReducers {
DeletePkU256CallbackId(self.imp.on_reducer(
"delete_pk_u256",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -67,6 +67,7 @@ impl delete_pk_u_32_insert_pk_u_32_two for super::RemoteReducers {
DeletePkU32InsertPkU32TwoCallbackId(self.imp.on_reducer(
"delete_pk_u32_insert_pk_u32_two",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_pk_u_32 for super::RemoteReducers {
DeletePkU32CallbackId(self.imp.on_reducer(
"delete_pk_u32",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_pk_u_32_two for super::RemoteReducers {
DeletePkU32TwoCallbackId(self.imp.on_reducer(
"delete_pk_u32_two",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_pk_u_64 for super::RemoteReducers {
DeletePkU64CallbackId(self.imp.on_reducer(
"delete_pk_u64",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_pk_u_8 for super::RemoteReducers {
DeletePkU8CallbackId(self.imp.on_reducer(
"delete_pk_u8",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_unique_bool for super::RemoteReducers {
DeleteUniqueBoolCallbackId(self.imp.on_reducer(
"delete_unique_bool",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -61,6 +61,7 @@ impl delete_unique_connection_id for super::RemoteReducers {
DeleteUniqueConnectionIdCallbackId(self.imp.on_reducer(
"delete_unique_connection_id",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_unique_i_128 for super::RemoteReducers {
DeleteUniqueI128CallbackId(self.imp.on_reducer(
"delete_unique_i128",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_unique_i_16 for super::RemoteReducers {
DeleteUniqueI16CallbackId(self.imp.on_reducer(
"delete_unique_i16",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_unique_i_256 for super::RemoteReducers {
DeleteUniqueI256CallbackId(self.imp.on_reducer(
"delete_unique_i256",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_unique_i_32 for super::RemoteReducers {
DeleteUniqueI32CallbackId(self.imp.on_reducer(
"delete_unique_i32",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_unique_i_64 for super::RemoteReducers {
DeleteUniqueI64CallbackId(self.imp.on_reducer(
"delete_unique_i64",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_unique_i_8 for super::RemoteReducers {
DeleteUniqueI8CallbackId(self.imp.on_reducer(
"delete_unique_i8",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -61,6 +61,7 @@ impl delete_unique_identity for super::RemoteReducers {
DeleteUniqueIdentityCallbackId(self.imp.on_reducer(
"delete_unique_identity",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -61,6 +61,7 @@ impl delete_unique_string for super::RemoteReducers {
DeleteUniqueStringCallbackId(self.imp.on_reducer(
"delete_unique_string",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_unique_u_128 for super::RemoteReducers {
DeleteUniqueU128CallbackId(self.imp.on_reducer(
"delete_unique_u128",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_unique_u_16 for super::RemoteReducers {
DeleteUniqueU16CallbackId(self.imp.on_reducer(
"delete_unique_u16",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_unique_u_256 for super::RemoteReducers {
DeleteUniqueU256CallbackId(self.imp.on_reducer(
"delete_unique_u256",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_unique_u_32 for super::RemoteReducers {
DeleteUniqueU32CallbackId(self.imp.on_reducer(
"delete_unique_u32",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_unique_u_64 for super::RemoteReducers {
DeleteUniqueU64CallbackId(self.imp.on_reducer(
"delete_unique_u64",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl delete_unique_u_8 for super::RemoteReducers {
DeleteUniqueU8CallbackId(self.imp.on_reducer(
"delete_unique_u8",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -59,6 +59,7 @@ impl insert_call_timestamp for super::RemoteReducers {
InsertCallTimestampCallbackId(self.imp.on_reducer(
"insert_call_timestamp",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -59,6 +59,7 @@ impl insert_caller_one_connection_id for super::RemoteReducers {
InsertCallerOneConnectionIdCallbackId(self.imp.on_reducer(
"insert_caller_one_connection_id",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -59,6 +59,7 @@ impl insert_caller_one_identity for super::RemoteReducers {
InsertCallerOneIdentityCallbackId(self.imp.on_reducer(
"insert_caller_one_identity",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -63,6 +63,7 @@ impl insert_caller_pk_connection_id for super::RemoteReducers {
InsertCallerPkConnectionIdCallbackId(self.imp.on_reducer(
"insert_caller_pk_connection_id",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -61,6 +61,7 @@ impl insert_caller_pk_identity for super::RemoteReducers {
InsertCallerPkIdentityCallbackId(self.imp.on_reducer(
"insert_caller_pk_identity",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -63,6 +63,7 @@ impl insert_caller_unique_connection_id for super::RemoteReducers {
InsertCallerUniqueConnectionIdCallbackId(self.imp.on_reducer(
"insert_caller_unique_connection_id",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -61,6 +61,7 @@ impl insert_caller_unique_identity for super::RemoteReducers {
InsertCallerUniqueIdentityCallbackId(self.imp.on_reducer(
"insert_caller_unique_identity",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -59,6 +59,7 @@ impl insert_caller_vec_connection_id for super::RemoteReducers {
InsertCallerVecConnectionIdCallbackId(self.imp.on_reducer(
"insert_caller_vec_connection_id",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -59,6 +59,7 @@ impl insert_caller_vec_identity for super::RemoteReducers {
InsertCallerVecIdentityCallbackId(self.imp.on_reducer(
"insert_caller_vec_identity",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -63,6 +63,7 @@ impl insert_into_btree_u_32 for super::RemoteReducers {
InsertIntoBtreeU32CallbackId(self.imp.on_reducer(
"insert_into_btree_u32",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -63,6 +63,7 @@ impl insert_into_indexed_simple_enum for super::RemoteReducers {
InsertIntoIndexedSimpleEnumCallbackId(self.imp.on_reducer(
"insert_into_indexed_simple_enum",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -70,6 +70,7 @@ impl insert_into_pk_btree_u_32 for super::RemoteReducers {
InsertIntoPkBtreeU32CallbackId(self.imp.on_reducer(
"insert_into_pk_btree_u32",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -235,6 +235,7 @@ impl insert_large_table for super::RemoteReducers {
InsertLargeTableCallbackId(self.imp.on_reducer(
"insert_large_table",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl insert_one_bool for super::RemoteReducers {
InsertOneBoolCallbackId(self.imp.on_reducer(
"insert_one_bool",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -63,6 +63,7 @@ impl insert_one_byte_struct for super::RemoteReducers {
InsertOneByteStructCallbackId(self.imp.on_reducer(
"insert_one_byte_struct",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -61,6 +61,7 @@ impl insert_one_connection_id for super::RemoteReducers {
InsertOneConnectionIdCallbackId(self.imp.on_reducer(
"insert_one_connection_id",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -63,6 +63,7 @@ impl insert_one_enum_with_payload for super::RemoteReducers {
InsertOneEnumWithPayloadCallbackId(self.imp.on_reducer(
"insert_one_enum_with_payload",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -65,6 +65,7 @@ impl insert_one_every_primitive_struct for super::RemoteReducers {
InsertOneEveryPrimitiveStructCallbackId(self.imp.on_reducer(
"insert_one_every_primitive_struct",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -63,6 +63,7 @@ impl insert_one_every_vec_struct for super::RemoteReducers {
InsertOneEveryVecStructCallbackId(self.imp.on_reducer(
"insert_one_every_vec_struct",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl insert_one_f_32 for super::RemoteReducers {
InsertOneF32CallbackId(self.imp.on_reducer(
"insert_one_f32",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl insert_one_f_64 for super::RemoteReducers {
InsertOneF64CallbackId(self.imp.on_reducer(
"insert_one_f64",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl insert_one_i_128 for super::RemoteReducers {
InsertOneI128CallbackId(self.imp.on_reducer(
"insert_one_i128",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl insert_one_i_16 for super::RemoteReducers {
InsertOneI16CallbackId(self.imp.on_reducer(
"insert_one_i16",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl insert_one_i_256 for super::RemoteReducers {
InsertOneI256CallbackId(self.imp.on_reducer(
"insert_one_i256",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl insert_one_i_32 for super::RemoteReducers {
InsertOneI32CallbackId(self.imp.on_reducer(
"insert_one_i32",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl insert_one_i_64 for super::RemoteReducers {
InsertOneI64CallbackId(self.imp.on_reducer(
"insert_one_i64",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl insert_one_i_8 for super::RemoteReducers {
InsertOneI8CallbackId(self.imp.on_reducer(
"insert_one_i8",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -61,6 +61,7 @@ impl insert_one_identity for super::RemoteReducers {
InsertOneIdentityCallbackId(self.imp.on_reducer(
"insert_one_identity",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -63,6 +63,7 @@ impl insert_one_simple_enum for super::RemoteReducers {
InsertOneSimpleEnumCallbackId(self.imp.on_reducer(
"insert_one_simple_enum",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -60,6 +60,7 @@ impl insert_one_string for super::RemoteReducers {
InsertOneStringCallbackId(self.imp.on_reducer(
"insert_one_string",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {
@@ -61,6 +61,7 @@ impl insert_one_timestamp for super::RemoteReducers {
InsertOneTimestampCallbackId(self.imp.on_reducer(
"insert_one_timestamp",
Box::new(move |ctx: &super::ReducerEventContext| {
#[allow(irrefutable_let_patterns)]
let super::ReducerEventContext {
event:
__sdk::ReducerEvent {

Some files were not shown because too many files have changed in this diff Show More