diff --git a/modules/sdk-test-procedure-concurrency/src/lib.rs b/modules/sdk-test-procedure-concurrency/src/lib.rs index 48138e522..3a4b4e652 100644 --- a/modules/sdk-test-procedure-concurrency/src/lib.rs +++ b/modules/sdk-test-procedure-concurrency/src/lib.rs @@ -1,4 +1,6 @@ -use spacetimedb::{procedure, reducer, table, DbContext, ProcedureContext, ReducerContext, Table, TxContext}; +use spacetimedb::{ + procedure, reducer, table, DbContext, ProcedureContext, ReducerContext, ScheduleAt, Table, TxContext, +}; use std::time::Duration; #[table(public, accessor = procedure_concurrency_row)] @@ -29,3 +31,32 @@ fn procedure_sleep_between_inserts(ctx: &mut ProcedureContext) { ctx.sleep_until(ctx.timestamp + Duration::from_secs(10)); ctx.with_tx(|ctx| insert_procedure_concurrency_row(ctx, "procedure_after")); } + +#[table(accessor = scheduled_reducer_row, scheduled(insert_scheduled_reducer))] +struct ScheduledReducerRow { + #[primary_key] + #[auto_inc] + scheduled_id: u64, + scheduled_at: ScheduleAt, +} + +#[reducer] +fn insert_scheduled_reducer(ctx: &ReducerContext, _schedule: ScheduledReducerRow) { + ctx.db().procedure_concurrency_row().insert(ProcedureConcurrencyRow { + insertion_order: 0, + insertion_context: "scheduled_reducer".into(), + }); +} + +#[procedure] +fn procedure_schedule_reducer_between_inserts(ctx: &mut ProcedureContext) { + ctx.with_tx(|ctx| { + insert_procedure_concurrency_row(ctx, "procedure_before"); + ctx.db.scheduled_reducer_row().insert(ScheduledReducerRow { + scheduled_id: 0, + scheduled_at: ctx.timestamp.into(), + }); + }); + ctx.sleep_until(ctx.timestamp + Duration::from_secs(10)); + ctx.with_tx(|ctx| insert_procedure_concurrency_row(ctx, "procedure_after")); +} diff --git a/sdks/rust/tests/procedure-concurrency-client/src/module_bindings/insert_scheduled_reducer_reducer.rs b/sdks/rust/tests/procedure-concurrency-client/src/module_bindings/insert_scheduled_reducer_reducer.rs new file mode 100644 index 000000000..c4b4a29e0 --- /dev/null +++ b/sdks/rust/tests/procedure-concurrency-client/src/module_bindings/insert_scheduled_reducer_reducer.rs @@ -0,0 +1,70 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#![allow(unused, clippy::all)] +use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; + +use super::scheduled_reducer_row_type::ScheduledReducerRow; + +#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)] +#[sats(crate = __lib)] +pub(super) struct InsertScheduledReducerArgs { + pub schedule: ScheduledReducerRow, +} + +impl From for super::Reducer { + fn from(args: InsertScheduledReducerArgs) -> Self { + Self::InsertScheduledReducer { + schedule: args.schedule, + } + } +} + +impl __sdk::InModule for InsertScheduledReducerArgs { + type Module = super::RemoteModule; +} + +#[allow(non_camel_case_types)] +/// Extension trait for access to the reducer `insert_scheduled_reducer`. +/// +/// Implemented for [`super::RemoteReducers`]. +pub trait insert_scheduled_reducer { + /// Request that the remote module invoke the reducer `insert_scheduled_reducer` to run as soon as possible. + /// + /// This method returns immediately, and errors only if we are unable to send the request. + /// The reducer will run asynchronously in the future, + /// and this method provides no way to listen for its completion status. + /// /// Use [`insert_scheduled_reducer:insert_scheduled_reducer_then`] to run a callback after the reducer completes. + fn insert_scheduled_reducer(&self, schedule: ScheduledReducerRow) -> __sdk::Result<()> { + self.insert_scheduled_reducer_then(schedule, |_, _| {}) + } + + /// Request that the remote module invoke the reducer `insert_scheduled_reducer` to run as soon as possible, + /// registering `callback` to run when we are notified that the reducer completed. + /// + /// This method returns immediately, and errors only if we are unable to send the request. + /// The reducer will run asynchronously in the future, + /// and its status can be observed with the `callback`. + fn insert_scheduled_reducer_then( + &self, + schedule: ScheduledReducerRow, + + callback: impl FnOnce(&super::ReducerEventContext, Result, __sdk::InternalError>) + + Send + + 'static, + ) -> __sdk::Result<()>; +} + +impl insert_scheduled_reducer for super::RemoteReducers { + fn insert_scheduled_reducer_then( + &self, + schedule: ScheduledReducerRow, + + callback: impl FnOnce(&super::ReducerEventContext, Result, __sdk::InternalError>) + + Send + + 'static, + ) -> __sdk::Result<()> { + self.imp + .invoke_reducer_with_callback(InsertScheduledReducerArgs { schedule }, callback) + } +} diff --git a/sdks/rust/tests/procedure-concurrency-client/src/module_bindings/mod.rs b/sdks/rust/tests/procedure-concurrency-client/src/module_bindings/mod.rs index 6162085b1..88e54eaf9 100644 --- a/sdks/rust/tests/procedure-concurrency-client/src/module_bindings/mod.rs +++ b/sdks/rust/tests/procedure-concurrency-client/src/module_bindings/mod.rs @@ -7,14 +7,22 @@ use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; pub mod insert_reducer_row_reducer; +pub mod insert_scheduled_reducer_reducer; pub mod procedure_concurrency_row_table; pub mod procedure_concurrency_row_type; +pub mod procedure_schedule_reducer_between_inserts_procedure; pub mod procedure_sleep_between_inserts_procedure; +pub mod scheduled_reducer_row_table; +pub mod scheduled_reducer_row_type; pub use insert_reducer_row_reducer::insert_reducer_row; +pub use insert_scheduled_reducer_reducer::insert_scheduled_reducer; pub use procedure_concurrency_row_table::*; pub use procedure_concurrency_row_type::ProcedureConcurrencyRow; +pub use procedure_schedule_reducer_between_inserts_procedure::procedure_schedule_reducer_between_inserts; pub use procedure_sleep_between_inserts_procedure::procedure_sleep_between_inserts; +pub use scheduled_reducer_row_table::*; +pub use scheduled_reducer_row_type::ScheduledReducerRow; #[derive(Clone, PartialEq, Debug)] @@ -25,6 +33,7 @@ pub use procedure_sleep_between_inserts_procedure::procedure_sleep_between_inser pub enum Reducer { InsertReducerRow, + InsertScheduledReducer { schedule: ScheduledReducerRow }, } impl __sdk::InModule for Reducer { @@ -35,6 +44,7 @@ impl __sdk::Reducer for Reducer { fn reducer_name(&self) -> &'static str { match self { Reducer::InsertReducerRow => "insert_reducer_row", + Reducer::InsertScheduledReducer { .. } => "insert_scheduled_reducer", _ => unreachable!(), } } @@ -42,6 +52,11 @@ impl __sdk::Reducer for Reducer { fn args_bsatn(&self) -> Result, __sats::bsatn::EncodeError> { match self { Reducer::InsertReducerRow => __sats::bsatn::to_vec(&insert_reducer_row_reducer::InsertReducerRowArgs {}), + Reducer::InsertScheduledReducer { schedule } => { + __sats::bsatn::to_vec(&insert_scheduled_reducer_reducer::InsertScheduledReducerArgs { + schedule: schedule.clone(), + }) + } _ => unreachable!(), } } @@ -52,6 +67,7 @@ impl __sdk::Reducer for Reducer { #[doc(hidden)] pub struct DbUpdate { procedure_concurrency_row: __sdk::TableUpdate, + scheduled_reducer_row: __sdk::TableUpdate, } impl TryFrom<__ws::v2::TransactionUpdate> for DbUpdate { @@ -63,6 +79,9 @@ impl TryFrom<__ws::v2::TransactionUpdate> for DbUpdate { "procedure_concurrency_row" => db_update .procedure_concurrency_row .append(procedure_concurrency_row_table::parse_table_update(table_update)?), + "scheduled_reducer_row" => db_update + .scheduled_reducer_row + .append(scheduled_reducer_row_table::parse_table_update(table_update)?), unknown => { return Err(__sdk::InternalError::unknown_name("table", unknown, "DatabaseUpdate").into()); @@ -85,6 +104,9 @@ impl __sdk::DbUpdate for DbUpdate { "procedure_concurrency_row", &self.procedure_concurrency_row, ); + diff.scheduled_reducer_row = cache + .apply_diff_to_table::("scheduled_reducer_row", &self.scheduled_reducer_row) + .with_updates_by_pk(|row| &row.scheduled_id); diff } @@ -95,6 +117,9 @@ impl __sdk::DbUpdate for DbUpdate { "procedure_concurrency_row" => db_update .procedure_concurrency_row .append(__sdk::parse_row_list_as_inserts(table_rows.rows)?), + "scheduled_reducer_row" => db_update + .scheduled_reducer_row + .append(__sdk::parse_row_list_as_inserts(table_rows.rows)?), unknown => { return Err(__sdk::InternalError::unknown_name("table", unknown, "QueryRows").into()); } @@ -109,6 +134,9 @@ impl __sdk::DbUpdate for DbUpdate { "procedure_concurrency_row" => db_update .procedure_concurrency_row .append(__sdk::parse_row_list_as_deletes(table_rows.rows)?), + "scheduled_reducer_row" => db_update + .scheduled_reducer_row + .append(__sdk::parse_row_list_as_deletes(table_rows.rows)?), unknown => { return Err(__sdk::InternalError::unknown_name("table", unknown, "QueryRows").into()); } @@ -123,6 +151,7 @@ impl __sdk::DbUpdate for DbUpdate { #[doc(hidden)] pub struct AppliedDiff<'r> { procedure_concurrency_row: __sdk::TableAppliedDiff<'r, ProcedureConcurrencyRow>, + scheduled_reducer_row: __sdk::TableAppliedDiff<'r, ScheduledReducerRow>, __unused: std::marker::PhantomData<&'r ()>, } @@ -137,6 +166,11 @@ impl<'r> __sdk::AppliedDiff<'r> for AppliedDiff<'r> { &self.procedure_concurrency_row, event, ); + callbacks.invoke_table_row_callbacks::( + "scheduled_reducer_row", + &self.scheduled_reducer_row, + event, + ); } } @@ -795,6 +829,7 @@ impl __sdk::SpacetimeModule for RemoteModule { fn register_tables(client_cache: &mut __sdk::ClientCache) { procedure_concurrency_row_table::register_table(client_cache); + scheduled_reducer_row_table::register_table(client_cache); } - const ALL_TABLE_NAMES: &'static [&'static str] = &["procedure_concurrency_row"]; + const ALL_TABLE_NAMES: &'static [&'static str] = &["procedure_concurrency_row", "scheduled_reducer_row"]; } diff --git a/sdks/rust/tests/procedure-concurrency-client/src/module_bindings/procedure_schedule_reducer_between_inserts_procedure.rs b/sdks/rust/tests/procedure-concurrency-client/src/module_bindings/procedure_schedule_reducer_between_inserts_procedure.rs new file mode 100644 index 000000000..dbae3c857 --- /dev/null +++ b/sdks/rust/tests/procedure-concurrency-client/src/module_bindings/procedure_schedule_reducer_between_inserts_procedure.rs @@ -0,0 +1,43 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#![allow(unused, clippy::all)] +use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; + +#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)] +#[sats(crate = __lib)] +struct ProcedureScheduleReducerBetweenInsertsArgs {} + +impl __sdk::InModule for ProcedureScheduleReducerBetweenInsertsArgs { + type Module = super::RemoteModule; +} + +#[allow(non_camel_case_types)] +/// Extension trait for access to the procedure `procedure_schedule_reducer_between_inserts`. +/// +/// Implemented for [`super::RemoteProcedures`]. +pub trait procedure_schedule_reducer_between_inserts { + fn procedure_schedule_reducer_between_inserts(&self) { + self.procedure_schedule_reducer_between_inserts_then(|_, _| {}); + } + + fn procedure_schedule_reducer_between_inserts_then( + &self, + + __callback: impl FnOnce(&super::ProcedureEventContext, Result<(), __sdk::InternalError>) + Send + 'static, + ); +} + +impl procedure_schedule_reducer_between_inserts for super::RemoteProcedures { + fn procedure_schedule_reducer_between_inserts_then( + &self, + + __callback: impl FnOnce(&super::ProcedureEventContext, Result<(), __sdk::InternalError>) + Send + 'static, + ) { + self.imp.invoke_procedure_with_callback::<_, ()>( + "procedure_schedule_reducer_between_inserts", + ProcedureScheduleReducerBetweenInsertsArgs {}, + __callback, + ); + } +} diff --git a/sdks/rust/tests/procedure-concurrency-client/src/module_bindings/scheduled_reducer_row_table.rs b/sdks/rust/tests/procedure-concurrency-client/src/module_bindings/scheduled_reducer_row_table.rs new file mode 100644 index 000000000..88a0376c9 --- /dev/null +++ b/sdks/rust/tests/procedure-concurrency-client/src/module_bindings/scheduled_reducer_row_table.rs @@ -0,0 +1,159 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#![allow(unused, clippy::all)] +use super::scheduled_reducer_row_type::ScheduledReducerRow; +use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; + +/// Table handle for the table `scheduled_reducer_row`. +/// +/// Obtain a handle from the [`ScheduledReducerRowTableAccess::scheduled_reducer_row`] method on [`super::RemoteTables`], +/// like `ctx.db.scheduled_reducer_row()`. +/// +/// Users are encouraged not to explicitly reference this type, +/// but to directly chain method calls, +/// like `ctx.db.scheduled_reducer_row().on_insert(...)`. +pub struct ScheduledReducerRowTableHandle<'ctx> { + imp: __sdk::TableHandle, + ctx: std::marker::PhantomData<&'ctx super::RemoteTables>, +} + +#[allow(non_camel_case_types)] +/// Extension trait for access to the table `scheduled_reducer_row`. +/// +/// Implemented for [`super::RemoteTables`]. +pub trait ScheduledReducerRowTableAccess { + #[allow(non_snake_case)] + /// Obtain a [`ScheduledReducerRowTableHandle`], which mediates access to the table `scheduled_reducer_row`. + fn scheduled_reducer_row(&self) -> ScheduledReducerRowTableHandle<'_>; +} + +impl ScheduledReducerRowTableAccess for super::RemoteTables { + fn scheduled_reducer_row(&self) -> ScheduledReducerRowTableHandle<'_> { + ScheduledReducerRowTableHandle { + imp: self.imp.get_table::("scheduled_reducer_row"), + ctx: std::marker::PhantomData, + } + } +} + +pub struct ScheduledReducerRowInsertCallbackId(__sdk::CallbackId); +pub struct ScheduledReducerRowDeleteCallbackId(__sdk::CallbackId); + +impl<'ctx> __sdk::Table for ScheduledReducerRowTableHandle<'ctx> { + type Row = ScheduledReducerRow; + type EventContext = super::EventContext; + + fn count(&self) -> u64 { + self.imp.count() + } + fn iter(&self) -> impl Iterator + '_ { + self.imp.iter() + } + + type InsertCallbackId = ScheduledReducerRowInsertCallbackId; + + fn on_insert( + &self, + callback: impl FnMut(&Self::EventContext, &Self::Row) + Send + 'static, + ) -> ScheduledReducerRowInsertCallbackId { + ScheduledReducerRowInsertCallbackId(self.imp.on_insert(Box::new(callback))) + } + + fn remove_on_insert(&self, callback: ScheduledReducerRowInsertCallbackId) { + self.imp.remove_on_insert(callback.0) + } + + type DeleteCallbackId = ScheduledReducerRowDeleteCallbackId; + + fn on_delete( + &self, + callback: impl FnMut(&Self::EventContext, &Self::Row) + Send + 'static, + ) -> ScheduledReducerRowDeleteCallbackId { + ScheduledReducerRowDeleteCallbackId(self.imp.on_delete(Box::new(callback))) + } + + fn remove_on_delete(&self, callback: ScheduledReducerRowDeleteCallbackId) { + self.imp.remove_on_delete(callback.0) + } +} + +pub struct ScheduledReducerRowUpdateCallbackId(__sdk::CallbackId); + +impl<'ctx> __sdk::TableWithPrimaryKey for ScheduledReducerRowTableHandle<'ctx> { + type UpdateCallbackId = ScheduledReducerRowUpdateCallbackId; + + fn on_update( + &self, + callback: impl FnMut(&Self::EventContext, &Self::Row, &Self::Row) + Send + 'static, + ) -> ScheduledReducerRowUpdateCallbackId { + ScheduledReducerRowUpdateCallbackId(self.imp.on_update(Box::new(callback))) + } + + fn remove_on_update(&self, callback: ScheduledReducerRowUpdateCallbackId) { + self.imp.remove_on_update(callback.0) + } +} + +/// Access to the `scheduled_id` unique index on the table `scheduled_reducer_row`, +/// which allows point queries on the field of the same name +/// via the [`ScheduledReducerRowScheduledIdUnique::find`] method. +/// +/// Users are encouraged not to explicitly reference this type, +/// but to directly chain method calls, +/// like `ctx.db.scheduled_reducer_row().scheduled_id().find(...)`. +pub struct ScheduledReducerRowScheduledIdUnique<'ctx> { + imp: __sdk::UniqueConstraintHandle, + phantom: std::marker::PhantomData<&'ctx super::RemoteTables>, +} + +impl<'ctx> ScheduledReducerRowTableHandle<'ctx> { + /// Get a handle on the `scheduled_id` unique index on the table `scheduled_reducer_row`. + pub fn scheduled_id(&self) -> ScheduledReducerRowScheduledIdUnique<'ctx> { + ScheduledReducerRowScheduledIdUnique { + imp: self.imp.get_unique_constraint::("scheduled_id"), + phantom: std::marker::PhantomData, + } + } +} + +impl<'ctx> ScheduledReducerRowScheduledIdUnique<'ctx> { + /// Find the subscribed row whose `scheduled_id` column value is equal to `col_val`, + /// if such a row is present in the client cache. + pub fn find(&self, col_val: &u64) -> Option { + self.imp.find(col_val) + } +} + +#[doc(hidden)] +pub(super) fn register_table(client_cache: &mut __sdk::ClientCache) { + let _table = client_cache.get_or_make_table::("scheduled_reducer_row"); + _table.add_unique_constraint::("scheduled_id", |row| &row.scheduled_id); +} + +#[doc(hidden)] +pub(super) fn parse_table_update( + raw_updates: __ws::v2::TableUpdate, +) -> __sdk::Result<__sdk::TableUpdate> { + __sdk::TableUpdate::parse_table_update(raw_updates).map_err(|e| { + __sdk::InternalError::failed_parse("TableUpdate", "TableUpdate") + .with_cause(e) + .into() + }) +} + +#[allow(non_camel_case_types)] +/// Extension trait for query builder access to the table `ScheduledReducerRow`. +/// +/// Implemented for [`__sdk::QueryTableAccessor`]. +pub trait scheduled_reducer_rowQueryTableAccess { + #[allow(non_snake_case)] + /// Get a query builder for the table `ScheduledReducerRow`. + fn scheduled_reducer_row(&self) -> __sdk::__query_builder::Table; +} + +impl scheduled_reducer_rowQueryTableAccess for __sdk::QueryTableAccessor { + fn scheduled_reducer_row(&self) -> __sdk::__query_builder::Table { + __sdk::__query_builder::Table::new("scheduled_reducer_row") + } +} diff --git a/sdks/rust/tests/procedure-concurrency-client/src/module_bindings/scheduled_reducer_row_type.rs b/sdks/rust/tests/procedure-concurrency-client/src/module_bindings/scheduled_reducer_row_type.rs new file mode 100644 index 000000000..92af01cb5 --- /dev/null +++ b/sdks/rust/tests/procedure-concurrency-client/src/module_bindings/scheduled_reducer_row_type.rs @@ -0,0 +1,52 @@ +// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE +// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. + +#![allow(unused, clippy::all)] +use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; + +#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)] +#[sats(crate = __lib)] +pub struct ScheduledReducerRow { + pub scheduled_id: u64, + pub scheduled_at: __sdk::ScheduleAt, +} + +impl __sdk::InModule for ScheduledReducerRow { + type Module = super::RemoteModule; +} + +/// Column accessor struct for the table `ScheduledReducerRow`. +/// +/// Provides typed access to columns for query building. +pub struct ScheduledReducerRowCols { + pub scheduled_id: __sdk::__query_builder::Col, + pub scheduled_at: __sdk::__query_builder::Col, +} + +impl __sdk::__query_builder::HasCols for ScheduledReducerRow { + type Cols = ScheduledReducerRowCols; + fn cols(table_name: &'static str) -> Self::Cols { + ScheduledReducerRowCols { + scheduled_id: __sdk::__query_builder::Col::new(table_name, "scheduled_id"), + scheduled_at: __sdk::__query_builder::Col::new(table_name, "scheduled_at"), + } + } +} + +/// Indexed column accessor struct for the table `ScheduledReducerRow`. +/// +/// Provides typed access to indexed columns for query building. +pub struct ScheduledReducerRowIxCols { + pub scheduled_id: __sdk::__query_builder::IxCol, +} + +impl __sdk::__query_builder::HasIxCols for ScheduledReducerRow { + type IxCols = ScheduledReducerRowIxCols; + fn ix_cols(table_name: &'static str) -> Self::IxCols { + ScheduledReducerRowIxCols { + scheduled_id: __sdk::__query_builder::IxCol::new(table_name, "scheduled_id"), + } + } +} + +impl __sdk::__query_builder::CanBeLookupTable for ScheduledReducerRow {} diff --git a/sdks/rust/tests/procedure-concurrency-client/src/test_handlers.rs b/sdks/rust/tests/procedure-concurrency-client/src/test_handlers.rs index 99ec3bfe4..9171d30fc 100644 --- a/sdks/rust/tests/procedure-concurrency-client/src/test_handlers.rs +++ b/sdks/rust/tests/procedure-concurrency-client/src/test_handlers.rs @@ -12,6 +12,9 @@ pub async fn dispatch(test: &str, db_name: &str) { "procedure-reducer-same-client-not-interleaved" => { exec_procedure_reducer_same_client_not_interleaved(db_name).await } + "procedure-concurrent-with-scheduled-reducer" => { + exec_procedure_concurrent_with_scheduled_reducer(db_name).await + } _ => panic!("Unknown test: {test}"), } } @@ -88,6 +91,7 @@ fn subscribe_all_then(ctx: &impl RemoteDbContext, callback: impl FnOnce(&Subscri struct ConnectionRowObservation { procedure_before: Option, reducer: Option, + scheduled_reducer: Option, procedure_after: Option, ordering_checked: bool, } @@ -384,3 +388,79 @@ async fn exec_procedure_reducer_same_client_not_interleaved(db_name: &str) { test_counter.wait_for_all().await; } + +async fn exec_procedure_concurrent_with_scheduled_reducer(db_name: &str) { + let test_counter = TestCounter::new(); + let sub_applied_nothing_result = test_counter.add_test("on_subscription_applied_nothing"); + let procedure_callback_result = test_counter.add_test("procedure_schedule_reducer_between_inserts_callback"); + let mut ordering_result = Some(test_counter.add_test("procedure_scheduled_reducer_order")); + let state = Arc::new(Mutex::new(ConnectionRowObservation::default())); + + connect_then(db_name, &test_counter, { + let state = Arc::clone(&state); + move |ctx| { + ctx.db().procedure_concurrency_row().on_insert({ + let state = Arc::clone(&state); + move |_ctx, row| { + let maybe_ordering = { + let mut observation = state.lock().expect("ConnectionRowObservation mutex is poisoned"); + match row.insertion_context.as_str() { + "procedure_before" => { + assert!(observation.procedure_before.replace(row.insertion_order).is_none()); + } + "scheduled_reducer" => { + assert!(observation + .scheduled_reducer + .replace(row.insertion_order) + .is_none()); + } + "procedure_after" => { + assert!(observation.procedure_after.replace(row.insertion_order).is_none()); + } + unexpected => panic!("Unexpected insertion context: {unexpected}"), + } + match ( + observation.procedure_before, + observation.scheduled_reducer, + observation.procedure_after, + ) { + (Some(before), Some(scheduled_reducer), Some(after)) + if !observation.ordering_checked => + { + observation.ordering_checked = true; + Some((before, scheduled_reducer, after)) + } + _ => None, + } + }; + + if let Some((before, scheduled_reducer, after)) = maybe_ordering { + (ordering_result.take().expect("Ordering result should only be reported once"))( + #[allow(clippy::redundant_closure_call)] + (|| { + anyhow::ensure!( + before < scheduled_reducer && scheduled_reducer < after, + "Expected scheduled reducer insertion order procedure_before < scheduled_reducer < procedure_after, got {before} < {scheduled_reducer} < {after}" + ); + Ok(()) + })(), + ); + } + } + }); + + subscribe_all_then(ctx, move |ctx| { + sub_applied_nothing_result(assert_all_tables_empty(ctx)); + ctx.procedures + .procedure_schedule_reducer_between_inserts_then(move |_ctx, res| { + procedure_callback_result( + res.context("procedure_schedule_reducer_between_inserts failed unexpectedly"), + ); + }); + }); + } + }) + .await; + + test_counter.wait_for_all().await; +} diff --git a/sdks/rust/tests/test.rs b/sdks/rust/tests/test.rs index 3f317b8b5..ec28d3a90 100644 --- a/sdks/rust/tests/test.rs +++ b/sdks/rust/tests/test.rs @@ -543,6 +543,11 @@ mod rust_procedure_concurrency { fn procedure_reducer_same_client_not_interleaved() { make_test("procedure-reducer-same-client-not-interleaved").run() } + + #[test] + fn procedure_concurrent_with_scheduled_reducer() { + make_test("procedure-concurrent-with-scheduled-reducer").run() + } } macro_rules! view_tests {