diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index f321c07fca..dab36e8751 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -357,7 +357,7 @@ impl HostController { }); let db = module.replica_ctx().relational_db.clone(); - let result = module.on_module_thread("using_database", move || f(db)).await?.await; + let result = module.on_module_thread_async("using_database", move || f(db)).await?; Ok(result) } /// Update the [`ModuleHost`] identified by `replica_id` to the given diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 7308b2412d..fc9ca19529 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -543,8 +543,10 @@ pub struct CallViewParams { pub caller_connection_id: Option, pub view_id: ViewId, pub args: ArgsTuple, - /// The expected return type of the view, used for deserialization. - /// This type information is obtained from the [`ModuleDef`]. + + /// The return type of the view, used for deserializing the view call result. + /// Either Option`, or `Vec` where `T` is a `ProductType`. + /// This type information is obtained from the [`ModuleDef`] pub return_type: AlgebraicType, /// Whether the view is being called anonymously (i.e., without a client identity). pub is_anonymous: bool, @@ -843,6 +845,29 @@ impl ModuleHost { Ok(res) } + /// Run an async function on the JobThread for this module. + /// Similar to `on_module_thread`, but for async functions. + pub async fn on_module_thread_async(&self, label: &str, f: Fun) -> Result + where + Fun: (FnOnce() -> Fut) + Send + 'static, + Fut: Future + Send + 'static, + R: Send + 'static, + { + self.guard_closed()?; + + let timer_guard = self.start_call_timer(label); + + let res = self + .executor + .run_job(async move { + drop(timer_guard); + f().await + }) + .await; + + Ok(res) + } + fn start_call_timer(&self, label: &str) -> ScopeGuard<(), impl FnOnce(())> { // Record the time until our function starts running. let queue_timer = WORKER_METRICS @@ -869,7 +894,7 @@ impl ModuleHost { }) } - async fn call_async_with_instance(&self, label: &str, f: Fun) -> Result + pub async fn call_async_with_instance(&self, label: &str, f: Fun) -> Result where Fun: (FnOnce(Instance) -> Fut) + Send + 'static, Fut: Future + Send + 'static, diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 74101fca87..23d9cf74b2 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -204,12 +204,12 @@ pub async fn run( let mut metrics = ExecutionMetrics::default(); for (view_name, args) in stmt.views() { - let (is_memoized, args) = tx + let (is_materialized, args) = tx .is_materialized(view_name, args, caller_identity) .map_err(|e| DBError::Other(anyhow!("Failed to check memoized view: {e}")))?; // Skip if already memoized - if is_memoized { + if is_materialized { continue; } diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 3f4be9964d..ea04875513 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -738,11 +738,11 @@ impl CommittedState { } } - fn merge_read_sets(&mut self, read_sets: ViewReadSets, tables: impl IntoIterator) { + fn merge_read_sets(&mut self, read_sets: ViewReadSets, updated_tables: impl IntoIterator) { for (view, read_set) in read_sets { self.merge_read_set(view, read_set); } - for table_id in tables { + for table_id in updated_tables { self.read_sets.clear_views_for_table(table_id); } } diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 7116aefd69..15d1a6f7e6 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -8,16 +8,13 @@ use super::{ tx_state::{IndexIdMap, PendingSchemaChange, TxState, TxTableForInsertion}, SharedMutexGuard, SharedWriteGuard, }; -use crate::traits::{InsertFlags, RowTypeForTable, TxData, UpdateFlags}; -use crate::{ - error::ViewError, - system_tables::{ - system_tables, ConnectionIdViaU128, IdentityViaU256, StConnectionCredentialsFields, StConnectionCredentialsRow, - StViewArgFields, StViewArgRow, StViewClientRow, StViewColumnFields, StViewFields, StViewParamFields, - StViewParamRow, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_ARG_ID, ST_VIEW_CLIENT_ID, ST_VIEW_COLUMN_ID, ST_VIEW_ID, - ST_VIEW_PARAM_ID, - }, +use crate::system_tables::{ + system_tables, ConnectionIdViaU128, IdentityViaU256, StConnectionCredentialsFields, StConnectionCredentialsRow, + StViewArgFields, StViewArgRow, StViewColumnFields, StViewFields, StViewParamFields, StViewParamRow, + StViewSubFields, StViewSubRow, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_ARG_ID, ST_VIEW_COLUMN_ID, ST_VIEW_ID, + ST_VIEW_PARAM_ID, ST_VIEW_SUB_ID, }; +use crate::traits::{InsertFlags, RowTypeForTable, TxData, UpdateFlags}; use crate::{ error::{IndexError, SequenceError, TableError}, system_tables::{ @@ -740,7 +737,7 @@ impl MutTxId { Ok((tx, commit)) } - /// Checks whether a memoized view exists for the given view name, arguments, and sender identity. + /// Checks whether a materialized view exists for the given view name, arguments, and sender identity. /// /// If view is not materialized, [`RelationalDB::evaluate_view`] should be called to compute and store it. ///