Add test procedure_concurrent_with_scheduled_reducer

This commit is contained in:
Phoebe Goldman
2026-05-05 12:42:46 -04:00
parent 58252619aa
commit 148ed2784d
8 changed files with 477 additions and 2 deletions
@@ -1,4 +1,6 @@
use spacetimedb::{procedure, reducer, table, DbContext, ProcedureContext, ReducerContext, Table, TxContext};
use spacetimedb::{
procedure, reducer, table, DbContext, ProcedureContext, ReducerContext, ScheduleAt, Table, TxContext,
};
use std::time::Duration;
#[table(public, accessor = procedure_concurrency_row)]
@@ -29,3 +31,32 @@ fn procedure_sleep_between_inserts(ctx: &mut ProcedureContext) {
ctx.sleep_until(ctx.timestamp + Duration::from_secs(10));
ctx.with_tx(|ctx| insert_procedure_concurrency_row(ctx, "procedure_after"));
}
#[table(accessor = scheduled_reducer_row, scheduled(insert_scheduled_reducer))]
struct ScheduledReducerRow {
#[primary_key]
#[auto_inc]
scheduled_id: u64,
scheduled_at: ScheduleAt,
}
#[reducer]
fn insert_scheduled_reducer(ctx: &ReducerContext, _schedule: ScheduledReducerRow) {
ctx.db().procedure_concurrency_row().insert(ProcedureConcurrencyRow {
insertion_order: 0,
insertion_context: "scheduled_reducer".into(),
});
}
#[procedure]
fn procedure_schedule_reducer_between_inserts(ctx: &mut ProcedureContext) {
ctx.with_tx(|ctx| {
insert_procedure_concurrency_row(ctx, "procedure_before");
ctx.db.scheduled_reducer_row().insert(ScheduledReducerRow {
scheduled_id: 0,
scheduled_at: ctx.timestamp.into(),
});
});
ctx.sleep_until(ctx.timestamp + Duration::from_secs(10));
ctx.with_tx(|ctx| insert_procedure_concurrency_row(ctx, "procedure_after"));
}
@@ -0,0 +1,70 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
use super::scheduled_reducer_row_type::ScheduledReducerRow;
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
pub(super) struct InsertScheduledReducerArgs {
pub schedule: ScheduledReducerRow,
}
impl From<InsertScheduledReducerArgs> for super::Reducer {
fn from(args: InsertScheduledReducerArgs) -> Self {
Self::InsertScheduledReducer {
schedule: args.schedule,
}
}
}
impl __sdk::InModule for InsertScheduledReducerArgs {
type Module = super::RemoteModule;
}
#[allow(non_camel_case_types)]
/// Extension trait for access to the reducer `insert_scheduled_reducer`.
///
/// Implemented for [`super::RemoteReducers`].
pub trait insert_scheduled_reducer {
/// Request that the remote module invoke the reducer `insert_scheduled_reducer` to run as soon as possible.
///
/// This method returns immediately, and errors only if we are unable to send the request.
/// The reducer will run asynchronously in the future,
/// and this method provides no way to listen for its completion status.
/// /// Use [`insert_scheduled_reducer:insert_scheduled_reducer_then`] to run a callback after the reducer completes.
fn insert_scheduled_reducer(&self, schedule: ScheduledReducerRow) -> __sdk::Result<()> {
self.insert_scheduled_reducer_then(schedule, |_, _| {})
}
/// Request that the remote module invoke the reducer `insert_scheduled_reducer` to run as soon as possible,
/// registering `callback` to run when we are notified that the reducer completed.
///
/// This method returns immediately, and errors only if we are unable to send the request.
/// The reducer will run asynchronously in the future,
/// and its status can be observed with the `callback`.
fn insert_scheduled_reducer_then(
&self,
schedule: ScheduledReducerRow,
callback: impl FnOnce(&super::ReducerEventContext, Result<Result<(), String>, __sdk::InternalError>)
+ Send
+ 'static,
) -> __sdk::Result<()>;
}
impl insert_scheduled_reducer for super::RemoteReducers {
fn insert_scheduled_reducer_then(
&self,
schedule: ScheduledReducerRow,
callback: impl FnOnce(&super::ReducerEventContext, Result<Result<(), String>, __sdk::InternalError>)
+ Send
+ 'static,
) -> __sdk::Result<()> {
self.imp
.invoke_reducer_with_callback(InsertScheduledReducerArgs { schedule }, callback)
}
}
@@ -7,14 +7,22 @@
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
pub mod insert_reducer_row_reducer;
pub mod insert_scheduled_reducer_reducer;
pub mod procedure_concurrency_row_table;
pub mod procedure_concurrency_row_type;
pub mod procedure_schedule_reducer_between_inserts_procedure;
pub mod procedure_sleep_between_inserts_procedure;
pub mod scheduled_reducer_row_table;
pub mod scheduled_reducer_row_type;
pub use insert_reducer_row_reducer::insert_reducer_row;
pub use insert_scheduled_reducer_reducer::insert_scheduled_reducer;
pub use procedure_concurrency_row_table::*;
pub use procedure_concurrency_row_type::ProcedureConcurrencyRow;
pub use procedure_schedule_reducer_between_inserts_procedure::procedure_schedule_reducer_between_inserts;
pub use procedure_sleep_between_inserts_procedure::procedure_sleep_between_inserts;
pub use scheduled_reducer_row_table::*;
pub use scheduled_reducer_row_type::ScheduledReducerRow;
#[derive(Clone, PartialEq, Debug)]
@@ -25,6 +33,7 @@ pub use procedure_sleep_between_inserts_procedure::procedure_sleep_between_inser
pub enum Reducer {
InsertReducerRow,
InsertScheduledReducer { schedule: ScheduledReducerRow },
}
impl __sdk::InModule for Reducer {
@@ -35,6 +44,7 @@ impl __sdk::Reducer for Reducer {
fn reducer_name(&self) -> &'static str {
match self {
Reducer::InsertReducerRow => "insert_reducer_row",
Reducer::InsertScheduledReducer { .. } => "insert_scheduled_reducer",
_ => unreachable!(),
}
}
@@ -42,6 +52,11 @@ impl __sdk::Reducer for Reducer {
fn args_bsatn(&self) -> Result<Vec<u8>, __sats::bsatn::EncodeError> {
match self {
Reducer::InsertReducerRow => __sats::bsatn::to_vec(&insert_reducer_row_reducer::InsertReducerRowArgs {}),
Reducer::InsertScheduledReducer { schedule } => {
__sats::bsatn::to_vec(&insert_scheduled_reducer_reducer::InsertScheduledReducerArgs {
schedule: schedule.clone(),
})
}
_ => unreachable!(),
}
}
@@ -52,6 +67,7 @@ impl __sdk::Reducer for Reducer {
#[doc(hidden)]
pub struct DbUpdate {
procedure_concurrency_row: __sdk::TableUpdate<ProcedureConcurrencyRow>,
scheduled_reducer_row: __sdk::TableUpdate<ScheduledReducerRow>,
}
impl TryFrom<__ws::v2::TransactionUpdate> for DbUpdate {
@@ -63,6 +79,9 @@ impl TryFrom<__ws::v2::TransactionUpdate> for DbUpdate {
"procedure_concurrency_row" => db_update
.procedure_concurrency_row
.append(procedure_concurrency_row_table::parse_table_update(table_update)?),
"scheduled_reducer_row" => db_update
.scheduled_reducer_row
.append(scheduled_reducer_row_table::parse_table_update(table_update)?),
unknown => {
return Err(__sdk::InternalError::unknown_name("table", unknown, "DatabaseUpdate").into());
@@ -85,6 +104,9 @@ impl __sdk::DbUpdate for DbUpdate {
"procedure_concurrency_row",
&self.procedure_concurrency_row,
);
diff.scheduled_reducer_row = cache
.apply_diff_to_table::<ScheduledReducerRow>("scheduled_reducer_row", &self.scheduled_reducer_row)
.with_updates_by_pk(|row| &row.scheduled_id);
diff
}
@@ -95,6 +117,9 @@ impl __sdk::DbUpdate for DbUpdate {
"procedure_concurrency_row" => db_update
.procedure_concurrency_row
.append(__sdk::parse_row_list_as_inserts(table_rows.rows)?),
"scheduled_reducer_row" => db_update
.scheduled_reducer_row
.append(__sdk::parse_row_list_as_inserts(table_rows.rows)?),
unknown => {
return Err(__sdk::InternalError::unknown_name("table", unknown, "QueryRows").into());
}
@@ -109,6 +134,9 @@ impl __sdk::DbUpdate for DbUpdate {
"procedure_concurrency_row" => db_update
.procedure_concurrency_row
.append(__sdk::parse_row_list_as_deletes(table_rows.rows)?),
"scheduled_reducer_row" => db_update
.scheduled_reducer_row
.append(__sdk::parse_row_list_as_deletes(table_rows.rows)?),
unknown => {
return Err(__sdk::InternalError::unknown_name("table", unknown, "QueryRows").into());
}
@@ -123,6 +151,7 @@ impl __sdk::DbUpdate for DbUpdate {
#[doc(hidden)]
pub struct AppliedDiff<'r> {
procedure_concurrency_row: __sdk::TableAppliedDiff<'r, ProcedureConcurrencyRow>,
scheduled_reducer_row: __sdk::TableAppliedDiff<'r, ScheduledReducerRow>,
__unused: std::marker::PhantomData<&'r ()>,
}
@@ -137,6 +166,11 @@ impl<'r> __sdk::AppliedDiff<'r> for AppliedDiff<'r> {
&self.procedure_concurrency_row,
event,
);
callbacks.invoke_table_row_callbacks::<ScheduledReducerRow>(
"scheduled_reducer_row",
&self.scheduled_reducer_row,
event,
);
}
}
@@ -795,6 +829,7 @@ impl __sdk::SpacetimeModule for RemoteModule {
fn register_tables(client_cache: &mut __sdk::ClientCache<Self>) {
procedure_concurrency_row_table::register_table(client_cache);
scheduled_reducer_row_table::register_table(client_cache);
}
const ALL_TABLE_NAMES: &'static [&'static str] = &["procedure_concurrency_row"];
const ALL_TABLE_NAMES: &'static [&'static str] = &["procedure_concurrency_row", "scheduled_reducer_row"];
}
@@ -0,0 +1,43 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
struct ProcedureScheduleReducerBetweenInsertsArgs {}
impl __sdk::InModule for ProcedureScheduleReducerBetweenInsertsArgs {
type Module = super::RemoteModule;
}
#[allow(non_camel_case_types)]
/// Extension trait for access to the procedure `procedure_schedule_reducer_between_inserts`.
///
/// Implemented for [`super::RemoteProcedures`].
pub trait procedure_schedule_reducer_between_inserts {
fn procedure_schedule_reducer_between_inserts(&self) {
self.procedure_schedule_reducer_between_inserts_then(|_, _| {});
}
fn procedure_schedule_reducer_between_inserts_then(
&self,
__callback: impl FnOnce(&super::ProcedureEventContext, Result<(), __sdk::InternalError>) + Send + 'static,
);
}
impl procedure_schedule_reducer_between_inserts for super::RemoteProcedures {
fn procedure_schedule_reducer_between_inserts_then(
&self,
__callback: impl FnOnce(&super::ProcedureEventContext, Result<(), __sdk::InternalError>) + Send + 'static,
) {
self.imp.invoke_procedure_with_callback::<_, ()>(
"procedure_schedule_reducer_between_inserts",
ProcedureScheduleReducerBetweenInsertsArgs {},
__callback,
);
}
}
@@ -0,0 +1,159 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use super::scheduled_reducer_row_type::ScheduledReducerRow;
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
/// Table handle for the table `scheduled_reducer_row`.
///
/// Obtain a handle from the [`ScheduledReducerRowTableAccess::scheduled_reducer_row`] method on [`super::RemoteTables`],
/// like `ctx.db.scheduled_reducer_row()`.
///
/// Users are encouraged not to explicitly reference this type,
/// but to directly chain method calls,
/// like `ctx.db.scheduled_reducer_row().on_insert(...)`.
pub struct ScheduledReducerRowTableHandle<'ctx> {
imp: __sdk::TableHandle<ScheduledReducerRow>,
ctx: std::marker::PhantomData<&'ctx super::RemoteTables>,
}
#[allow(non_camel_case_types)]
/// Extension trait for access to the table `scheduled_reducer_row`.
///
/// Implemented for [`super::RemoteTables`].
pub trait ScheduledReducerRowTableAccess {
#[allow(non_snake_case)]
/// Obtain a [`ScheduledReducerRowTableHandle`], which mediates access to the table `scheduled_reducer_row`.
fn scheduled_reducer_row(&self) -> ScheduledReducerRowTableHandle<'_>;
}
impl ScheduledReducerRowTableAccess for super::RemoteTables {
fn scheduled_reducer_row(&self) -> ScheduledReducerRowTableHandle<'_> {
ScheduledReducerRowTableHandle {
imp: self.imp.get_table::<ScheduledReducerRow>("scheduled_reducer_row"),
ctx: std::marker::PhantomData,
}
}
}
pub struct ScheduledReducerRowInsertCallbackId(__sdk::CallbackId);
pub struct ScheduledReducerRowDeleteCallbackId(__sdk::CallbackId);
impl<'ctx> __sdk::Table for ScheduledReducerRowTableHandle<'ctx> {
type Row = ScheduledReducerRow;
type EventContext = super::EventContext;
fn count(&self) -> u64 {
self.imp.count()
}
fn iter(&self) -> impl Iterator<Item = ScheduledReducerRow> + '_ {
self.imp.iter()
}
type InsertCallbackId = ScheduledReducerRowInsertCallbackId;
fn on_insert(
&self,
callback: impl FnMut(&Self::EventContext, &Self::Row) + Send + 'static,
) -> ScheduledReducerRowInsertCallbackId {
ScheduledReducerRowInsertCallbackId(self.imp.on_insert(Box::new(callback)))
}
fn remove_on_insert(&self, callback: ScheduledReducerRowInsertCallbackId) {
self.imp.remove_on_insert(callback.0)
}
type DeleteCallbackId = ScheduledReducerRowDeleteCallbackId;
fn on_delete(
&self,
callback: impl FnMut(&Self::EventContext, &Self::Row) + Send + 'static,
) -> ScheduledReducerRowDeleteCallbackId {
ScheduledReducerRowDeleteCallbackId(self.imp.on_delete(Box::new(callback)))
}
fn remove_on_delete(&self, callback: ScheduledReducerRowDeleteCallbackId) {
self.imp.remove_on_delete(callback.0)
}
}
pub struct ScheduledReducerRowUpdateCallbackId(__sdk::CallbackId);
impl<'ctx> __sdk::TableWithPrimaryKey for ScheduledReducerRowTableHandle<'ctx> {
type UpdateCallbackId = ScheduledReducerRowUpdateCallbackId;
fn on_update(
&self,
callback: impl FnMut(&Self::EventContext, &Self::Row, &Self::Row) + Send + 'static,
) -> ScheduledReducerRowUpdateCallbackId {
ScheduledReducerRowUpdateCallbackId(self.imp.on_update(Box::new(callback)))
}
fn remove_on_update(&self, callback: ScheduledReducerRowUpdateCallbackId) {
self.imp.remove_on_update(callback.0)
}
}
/// Access to the `scheduled_id` unique index on the table `scheduled_reducer_row`,
/// which allows point queries on the field of the same name
/// via the [`ScheduledReducerRowScheduledIdUnique::find`] method.
///
/// Users are encouraged not to explicitly reference this type,
/// but to directly chain method calls,
/// like `ctx.db.scheduled_reducer_row().scheduled_id().find(...)`.
pub struct ScheduledReducerRowScheduledIdUnique<'ctx> {
imp: __sdk::UniqueConstraintHandle<ScheduledReducerRow, u64>,
phantom: std::marker::PhantomData<&'ctx super::RemoteTables>,
}
impl<'ctx> ScheduledReducerRowTableHandle<'ctx> {
/// Get a handle on the `scheduled_id` unique index on the table `scheduled_reducer_row`.
pub fn scheduled_id(&self) -> ScheduledReducerRowScheduledIdUnique<'ctx> {
ScheduledReducerRowScheduledIdUnique {
imp: self.imp.get_unique_constraint::<u64>("scheduled_id"),
phantom: std::marker::PhantomData,
}
}
}
impl<'ctx> ScheduledReducerRowScheduledIdUnique<'ctx> {
/// Find the subscribed row whose `scheduled_id` column value is equal to `col_val`,
/// if such a row is present in the client cache.
pub fn find(&self, col_val: &u64) -> Option<ScheduledReducerRow> {
self.imp.find(col_val)
}
}
#[doc(hidden)]
pub(super) fn register_table(client_cache: &mut __sdk::ClientCache<super::RemoteModule>) {
let _table = client_cache.get_or_make_table::<ScheduledReducerRow>("scheduled_reducer_row");
_table.add_unique_constraint::<u64>("scheduled_id", |row| &row.scheduled_id);
}
#[doc(hidden)]
pub(super) fn parse_table_update(
raw_updates: __ws::v2::TableUpdate,
) -> __sdk::Result<__sdk::TableUpdate<ScheduledReducerRow>> {
__sdk::TableUpdate::parse_table_update(raw_updates).map_err(|e| {
__sdk::InternalError::failed_parse("TableUpdate<ScheduledReducerRow>", "TableUpdate")
.with_cause(e)
.into()
})
}
#[allow(non_camel_case_types)]
/// Extension trait for query builder access to the table `ScheduledReducerRow`.
///
/// Implemented for [`__sdk::QueryTableAccessor`].
pub trait scheduled_reducer_rowQueryTableAccess {
#[allow(non_snake_case)]
/// Get a query builder for the table `ScheduledReducerRow`.
fn scheduled_reducer_row(&self) -> __sdk::__query_builder::Table<ScheduledReducerRow>;
}
impl scheduled_reducer_rowQueryTableAccess for __sdk::QueryTableAccessor {
fn scheduled_reducer_row(&self) -> __sdk::__query_builder::Table<ScheduledReducerRow> {
__sdk::__query_builder::Table::new("scheduled_reducer_row")
}
}
@@ -0,0 +1,52 @@
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
#![allow(unused, clippy::all)]
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
#[derive(__lib::ser::Serialize, __lib::de::Deserialize, Clone, PartialEq, Debug)]
#[sats(crate = __lib)]
pub struct ScheduledReducerRow {
pub scheduled_id: u64,
pub scheduled_at: __sdk::ScheduleAt,
}
impl __sdk::InModule for ScheduledReducerRow {
type Module = super::RemoteModule;
}
/// Column accessor struct for the table `ScheduledReducerRow`.
///
/// Provides typed access to columns for query building.
pub struct ScheduledReducerRowCols {
pub scheduled_id: __sdk::__query_builder::Col<ScheduledReducerRow, u64>,
pub scheduled_at: __sdk::__query_builder::Col<ScheduledReducerRow, __sdk::ScheduleAt>,
}
impl __sdk::__query_builder::HasCols for ScheduledReducerRow {
type Cols = ScheduledReducerRowCols;
fn cols(table_name: &'static str) -> Self::Cols {
ScheduledReducerRowCols {
scheduled_id: __sdk::__query_builder::Col::new(table_name, "scheduled_id"),
scheduled_at: __sdk::__query_builder::Col::new(table_name, "scheduled_at"),
}
}
}
/// Indexed column accessor struct for the table `ScheduledReducerRow`.
///
/// Provides typed access to indexed columns for query building.
pub struct ScheduledReducerRowIxCols {
pub scheduled_id: __sdk::__query_builder::IxCol<ScheduledReducerRow, u64>,
}
impl __sdk::__query_builder::HasIxCols for ScheduledReducerRow {
type IxCols = ScheduledReducerRowIxCols;
fn ix_cols(table_name: &'static str) -> Self::IxCols {
ScheduledReducerRowIxCols {
scheduled_id: __sdk::__query_builder::IxCol::new(table_name, "scheduled_id"),
}
}
}
impl __sdk::__query_builder::CanBeLookupTable for ScheduledReducerRow {}
@@ -12,6 +12,9 @@ pub async fn dispatch(test: &str, db_name: &str) {
"procedure-reducer-same-client-not-interleaved" => {
exec_procedure_reducer_same_client_not_interleaved(db_name).await
}
"procedure-concurrent-with-scheduled-reducer" => {
exec_procedure_concurrent_with_scheduled_reducer(db_name).await
}
_ => panic!("Unknown test: {test}"),
}
}
@@ -88,6 +91,7 @@ fn subscribe_all_then(ctx: &impl RemoteDbContext, callback: impl FnOnce(&Subscri
struct ConnectionRowObservation {
procedure_before: Option<u32>,
reducer: Option<u32>,
scheduled_reducer: Option<u32>,
procedure_after: Option<u32>,
ordering_checked: bool,
}
@@ -384,3 +388,79 @@ async fn exec_procedure_reducer_same_client_not_interleaved(db_name: &str) {
test_counter.wait_for_all().await;
}
async fn exec_procedure_concurrent_with_scheduled_reducer(db_name: &str) {
let test_counter = TestCounter::new();
let sub_applied_nothing_result = test_counter.add_test("on_subscription_applied_nothing");
let procedure_callback_result = test_counter.add_test("procedure_schedule_reducer_between_inserts_callback");
let mut ordering_result = Some(test_counter.add_test("procedure_scheduled_reducer_order"));
let state = Arc::new(Mutex::new(ConnectionRowObservation::default()));
connect_then(db_name, &test_counter, {
let state = Arc::clone(&state);
move |ctx| {
ctx.db().procedure_concurrency_row().on_insert({
let state = Arc::clone(&state);
move |_ctx, row| {
let maybe_ordering = {
let mut observation = state.lock().expect("ConnectionRowObservation mutex is poisoned");
match row.insertion_context.as_str() {
"procedure_before" => {
assert!(observation.procedure_before.replace(row.insertion_order).is_none());
}
"scheduled_reducer" => {
assert!(observation
.scheduled_reducer
.replace(row.insertion_order)
.is_none());
}
"procedure_after" => {
assert!(observation.procedure_after.replace(row.insertion_order).is_none());
}
unexpected => panic!("Unexpected insertion context: {unexpected}"),
}
match (
observation.procedure_before,
observation.scheduled_reducer,
observation.procedure_after,
) {
(Some(before), Some(scheduled_reducer), Some(after))
if !observation.ordering_checked =>
{
observation.ordering_checked = true;
Some((before, scheduled_reducer, after))
}
_ => None,
}
};
if let Some((before, scheduled_reducer, after)) = maybe_ordering {
(ordering_result.take().expect("Ordering result should only be reported once"))(
#[allow(clippy::redundant_closure_call)]
(|| {
anyhow::ensure!(
before < scheduled_reducer && scheduled_reducer < after,
"Expected scheduled reducer insertion order procedure_before < scheduled_reducer < procedure_after, got {before} < {scheduled_reducer} < {after}"
);
Ok(())
})(),
);
}
}
});
subscribe_all_then(ctx, move |ctx| {
sub_applied_nothing_result(assert_all_tables_empty(ctx));
ctx.procedures
.procedure_schedule_reducer_between_inserts_then(move |_ctx, res| {
procedure_callback_result(
res.context("procedure_schedule_reducer_between_inserts failed unexpectedly"),
);
});
});
}
})
.await;
test_counter.wait_for_all().await;
}
+5
View File
@@ -543,6 +543,11 @@ mod rust_procedure_concurrency {
fn procedure_reducer_same_client_not_interleaved() {
make_test("procedure-reducer-same-client-not-interleaved").run()
}
#[test]
fn procedure_concurrent_with_scheduled_reducer() {
make_test("procedure-concurrent-with-scheduled-reducer").run()
}
}
macro_rules! view_tests {