diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 8535236ff..a57e771bf 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -18,6 +18,7 @@ use futures::StreamExt; use http::StatusCode; use serde::Deserialize; use spacetimedb::database_logger::DatabaseLogger; +use spacetimedb::host::module_host::ClientConnectedError; use spacetimedb::host::ReducerArgs; use spacetimedb::host::ReducerCallError; use spacetimedb::host::ReducerOutcome; @@ -74,11 +75,34 @@ pub async fn call( // so generate one. let connection_id = generate_random_connection_id(); - if let Err(e) = module - .call_identity_connected_disconnected(caller_identity, connection_id, true) - .await - { - return Err((StatusCode::NOT_FOUND, format!("{:#}", anyhow::anyhow!(e))).into()); + match module.call_identity_connected(caller_identity, connection_id).await { + // If `call_identity_connected` returns `Err(Rejected)`, then the `client_connected` reducer errored, + // meaning the connection was refused. Return 403 forbidden. + Err(ClientConnectedError::Rejected(msg)) => return Err((StatusCode::FORBIDDEN, msg).into()), + // If `call_identity_connected` returns `Err(OutOfEnergy)`, + // then, well, the database is out of energy. + // Return 503 service unavailable. + Err(err @ ClientConnectedError::OutOfEnergy) => { + return Err((StatusCode::SERVICE_UNAVAILABLE, err.to_string()).into()) + } + // If `call_identity_connected` returns `Err(ReducerCall)`, + // something went wrong while invoking the `client_connected` reducer. + // I (pgoldman 2025-03-27) am not really sure how this would happen, + // but we returned 404 not found in this case prior to my editing this code, + // so I guess let's keep doing that. + Err(ClientConnectedError::ReducerCall(e)) => { + return Err((StatusCode::NOT_FOUND, format!("{:#}", anyhow::anyhow!(e))).into()) + } + // If `call_identity_connected` returns `Err(DBError)`, + // then the module didn't define `client_connected`, + // but something went wrong when we tried to insert into `st_client`. + // That's weird and scary, so return 500 internal error. + Err(e @ ClientConnectedError::DBError(_)) => { + return Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into()) + } + + // If `call_identity_connected` returns `Ok`, then we can actually call the reducer we want. + Ok(()) => (), } let result = match module .call_reducer(caller_identity, Some(connection_id), None, None, None, &reducer, args) @@ -107,11 +131,12 @@ pub async fn call( } }; - if let Err(e) = module - .call_identity_connected_disconnected(caller_identity, connection_id, false) - .await - { - return Err((StatusCode::NOT_FOUND, format!("{:#}", anyhow::anyhow!(e))).into()); + if let Err(e) = module.call_identity_disconnected(caller_identity, connection_id).await { + // If `call_identity_disconnected` errors, something is very wrong: + // it means we tried to delete the `st_client` row but failed. + // Note that `call_identity_disconnected` swallows errors from the `client_disconnected` reducer. + // Slap a 500 on it and pray. + return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("{:#}", anyhow::anyhow!(e))).into()); } match result { diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index af2dd264d..c23c32d8f 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -14,6 +14,7 @@ use scopeguard::ScopeGuard; use serde::Deserialize; use spacetimedb::client::messages::{serialize, IdentityTokenMessage, SerializableMessage}; use spacetimedb::client::{ClientActorId, ClientConfig, ClientConnection, DataMessage, MessageHandleError, Protocol}; +use spacetimedb::host::module_host::ClientConnectedError; use spacetimedb::host::NoSuchModule; use spacetimedb::util::also_poll; use spacetimedb::worker_metrics::WORKER_METRICS; @@ -148,7 +149,11 @@ where let client = match ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor).await { Ok(s) => s, - Err(e) => { + Err(e @ (ClientConnectedError::Rejected(_) | ClientConnectedError::OutOfEnergy)) => { + log::info!("{e}"); + return; + } + Err(e @ (ClientConnectedError::DBError(_) | ClientConnectedError::ReducerCall(_))) => { log::warn!("ModuleHost died while we were connecting: {e:#}"); return; } diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index e4a5d21a8..6346516d0 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -6,6 +6,7 @@ use std::time::Instant; use super::messages::{OneOffQueryResponseMessage, SerializableMessage}; use super::{message_handlers, ClientActorId, MessageHandleError}; use crate::error::DBError; +use crate::host::module_host::ClientConnectedError; use crate::host::{ModuleHost, NoSuchModule, ReducerArgs, ReducerCallError, ReducerCallResult}; use crate::messages::websocket::Subscribe; use crate::util::prometheus_handle::IntGaugeExt; @@ -171,7 +172,7 @@ impl ClientConnection { replica_id: u64, mut module_rx: watch::Receiver, actor: impl FnOnce(ClientConnection, mpsc::Receiver) -> Fut, - ) -> Result + ) -> Result where Fut: Future + Send + 'static, { @@ -180,9 +181,7 @@ impl ClientConnection { // logically subscribed to the database, not any particular replica. We should handle failover for // them and stuff. Not right now though. let module = module_rx.borrow_and_update().clone(); - module - .call_identity_connected_disconnected(id.identity, id.connection_id, true) - .await?; + module.call_identity_connected(id.identity, id.connection_id).await?; let (sendtx, sendrx) = mpsc::channel::(CLIENT_CHANNEL_CAPACITY); diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index be2bffede..d700ddea7 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -9,10 +9,11 @@ use super::{ SharedMutexGuard, SharedWriteGuard, }; use crate::db::datastore::system_tables::{ - with_sys_table_buf, StColumnFields, StColumnRow, StConstraintFields, StConstraintRow, StFields as _, StIndexFields, - StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow, StScheduledFields, StScheduledRow, StSequenceFields, - StSequenceRow, StTableFields, StTableRow, SystemTable, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_INDEX_ID, - ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ID, + with_sys_table_buf, StClientFields, StClientRow, StColumnFields, StColumnRow, StConstraintFields, StConstraintRow, + StFields as _, StIndexFields, StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow, StScheduledFields, + StScheduledRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, SystemTable, ST_CLIENT_ID, + ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, + ST_TABLE_ID, }; use crate::db::datastore::traits::{RowTypeForTable, TxData}; use crate::db::datastore::{ @@ -34,9 +35,14 @@ use core::ops::RangeBounds; use core::{iter, ops::Bound}; use smallvec::SmallVec; use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore}; -use spacetimedb_lib::db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP}; use spacetimedb_lib::{db::raw_def::v9::RawSql, metrics::ExecutionMetrics}; -use spacetimedb_primitives::{ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId}; +use spacetimedb_lib::{ + db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP}, + ConnectionId, Identity, +}; +use spacetimedb_primitives::{ + col_list, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, +}; use spacetimedb_sats::{ bsatn::{self, to_writer, DecodeError, Deserializer}, de::{DeserializeSeed, WithBound}, @@ -1230,6 +1236,31 @@ impl<'a> Iterator for IndexScanFilterDeleted<'a> { } impl MutTxId { + pub(crate) fn insert_st_client(&mut self, identity: Identity, connection_id: ConnectionId) -> Result<()> { + let row = &StClientRow { + identity: identity.into(), + connection_id: connection_id.into(), + }; + self.insert_via_serialize_bsatn(ST_CLIENT_ID, row).map(|_| ()) + } + + pub(crate) fn delete_st_client(&mut self, identity: Identity, connection_id: ConnectionId) -> Result<()> { + let row = &StClientRow { + identity: identity.into(), + connection_id: connection_id.into(), + }; + let ptr = self + .iter_by_col_eq( + ST_CLIENT_ID, + col_list![StClientFields::Identity, StClientFields::ConnectionId], + &AlgebraicValue::product(row), + )? + .next() + .expect("the client should be connected") + .pointer(); + self.delete(ST_CLIENT_ID, ptr).map(drop) + } + pub(crate) fn insert_via_serialize_bsatn<'a, T: Serialize>( &'a mut self, table_id: TableId, diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index c07a90a81..e3952edaa 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -767,7 +767,7 @@ impl Host { // Disconnect dangling clients. for (identity, connection_id) in connected_clients { module_host - .call_identity_connected_disconnected(identity, connection_id, false) + .call_identity_disconnected(identity, connection_id) .await .with_context(|| { format!( diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 697eaefe5..8d2c8ef39 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1,8 +1,7 @@ -use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, ReducerId, Scheduler}; +use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler}; use crate::client::{ClientActorId, ClientConnectionSender}; use crate::database_logger::{LogLevel, Record}; use crate::db::datastore::locking_tx_datastore::MutTxId; -use crate::db::datastore::system_tables::{StClientFields, StClientRow, ST_CLIENT_ID}; use crate::db::datastore::traits::{IsolationLevel, Program, TxData}; use crate::energy::EnergyQuanta; use crate::error::DBError; @@ -25,7 +24,6 @@ use derive_more::From; use futures::{Future, FutureExt}; use indexmap::IndexSet; use itertools::Itertools; -use smallvec::SmallVec; use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate, WebsocketFormat}; use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap}; @@ -33,9 +31,9 @@ use spacetimedb_lib::db::raw_def::v9::Lifecycle; use spacetimedb_lib::identity::{AuthCtx, RequestId}; use spacetimedb_lib::ConnectionId; use spacetimedb_lib::Timestamp; -use spacetimedb_primitives::{col_list, TableId}; +use spacetimedb_primitives::TableId; use spacetimedb_query::compile_subscription; -use spacetimedb_sats::{algebraic_value, ProductValue}; +use spacetimedb_sats::ProductValue; use spacetimedb_schema::auto_migrate::AutoMigrateError; use spacetimedb_schema::def::deserialize::ReducerArgsDeserializeSeed; use spacetimedb_schema::def::{ModuleDef, ReducerDef}; @@ -468,6 +466,18 @@ pub enum InitDatabaseError { Other(anyhow::Error), } +#[derive(thiserror::Error, Debug)] +pub enum ClientConnectedError { + #[error(transparent)] + ReducerCall(#[from] ReducerCallError), + #[error("Failed to insert `st_client` row for module without client_connected reducer: {0}")] + DBError(#[from] DBError), + #[error("Connection rejected by `client_connected` reducer: {0}")] + Rejected(String), + #[error("Insufficient energy balance to run `client_connected` reducer")] + OutOfEnergy, +} + impl ModuleHost { pub fn new(mut module: impl Module, on_panic: impl Fn() + Send + Sync + 'static) -> Self { let info = module.info(); @@ -523,119 +533,203 @@ impl ModuleHost { }) .await; // ignore NoSuchModule; if the module's already closed, that's fine - let _ = self - .call_identity_connected_disconnected(client_id.identity, client_id.connection_id, false) - .await; + if let Err(e) = self + .call_identity_disconnected(client_id.identity, client_id.connection_id) + .await + { + log::error!("Error from client_disconnected transaction: {e}"); + } } - /// Method is responsible for handling connect/disconnect events. + /// Invoke the module's `client_connected` reducer, if it has one, + /// and insert a new row into `st_client` for `(caller_identity, caller_connection_id)`. /// - /// It ensures pairing up those event in commitlogs - /// Though It can also create two entries `__identity_disconnect__`. - /// One is to actually run the reducer and another one to delete client from `st_clients` - pub async fn call_identity_connected_disconnected( + /// The host inspects `st_client` when restarting in order to run `client_disconnected` reducers + /// for clients that were connected at the time when the host went down. + /// This ensures that every client connection eventually has `client_disconnected` invoked. + /// + /// If this method returns `Ok`, then the client connection has been approved, + /// and the new row has been inserted into `st_client`. + /// + /// If this method returns `Err`, then the client connection has either failed or been rejected, + /// and `st_client` has not been modified. + /// In this case, the caller should terminate the connection. + pub async fn call_identity_connected( &self, caller_identity: Identity, caller_connection_id: ConnectionId, - connected: bool, - ) -> Result<(), ReducerCallError> { - let (lifecycle, fake_name) = if connected { - (Lifecycle::OnConnect, "__identity_connected__") + ) -> Result<(), ClientConnectedError> { + let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnConnect); + + if let Some((reducer_id, reducer_def)) = reducer_lookup { + // The module defined a lifecycle reducer to handle new connections. + // Call this reducer. + // If the call fails (as in, something unexpectedly goes wrong with WASM execution), + // abort the connection: we can't really recover. + let reducer_outcome = self + .call_reducer_inner( + caller_identity, + Some(caller_connection_id), + None, + None, + None, + reducer_id, + reducer_def, + ReducerArgs::Nullary, + ) + .await?; + + match reducer_outcome.outcome { + // If the reducer committed successfully, we're done. + // `WasmModuleInstance::call_reducer_with_tx` has already ensured + // that `st_client` is updated appropriately. + // + // It's necessary to spread out the responsibility for updating `st_client` in this way + // because it's important that `call_identity_connected` commit at most one transaction. + // A naive implementation of this method would just run the reducer first, + // then insert into `st_client`, + // but if we crashed in between, we'd be left in an inconsistent state + // where the reducer had run but `st_client` was not yet updated. + ReducerOutcome::Committed => Ok(()), + + // If the reducer returned an error or couldn't run due to insufficient energy, + // abort the connection: the module code has decided it doesn't want this client. + ReducerOutcome::Failed(message) => Err(ClientConnectedError::Rejected(message)), + ReducerOutcome::BudgetExceeded => Err(ClientConnectedError::OutOfEnergy), + } } else { - (Lifecycle::OnDisconnect, "__identity_disconnected__") - }; + // The module doesn't define a client_connected reducer. + // Commit a transaction to update `st_clients` + // and to ensure we always have those events paired in the commitlog. + // + // This is necessary to be able to disconnect clients after a server crash. + let reducer_name = reducer_lookup + .as_ref() + .map(|(_, def)| &*def.name) + .unwrap_or("__identity_connected__"); - let reducer_lookup = self.info.module_def.lifecycle_reducer(lifecycle); - let reducer_name = reducer_lookup.as_ref().map(|(_, def)| &*def.name).unwrap_or(fake_name); - - let db = &self.inner.replica_ctx().relational_db; - let workload = || { - Workload::Reducer(ReducerContext { + let workload = Workload::Reducer(ReducerContext { name: reducer_name.to_owned(), caller_identity, caller_connection_id, timestamp: Timestamp::now(), arg_bsatn: Bytes::new(), - }) - }; - - let result = if let Some((reducer_id, reducer_def)) = reducer_lookup { - self.call_reducer_inner( - caller_identity, - Some(caller_connection_id), - None, - None, - None, - reducer_id, - reducer_def, - ReducerArgs::Nullary, - ) - .await - .map(drop) - } else { - // If the module doesn't define connected or disconnected, commit - // a transaction to update `st_clients` and to ensure we always have those events - // paired in the commitlog. - // - // This is necessary to be able to disconnect clients after a server - // crash. - db.with_auto_commit(workload(), |mut_tx| { - if connected { - self.update_st_clients(mut_tx, caller_identity, caller_connection_id, connected) - } else { - Ok(()) - } - }) - .map_err(|err| { - InvalidReducerArguments { - err: err.into(), - reducer: reducer_name.into(), - } - .into() - }) - }; - - // Deleting client from `st_clients`does not depend upon result of disconnect reducer hence done in a separate tx. - if !connected { - let _ = db - .with_auto_commit(workload(), |mut_tx| { - self.update_st_clients(mut_tx, caller_identity, caller_connection_id, connected) + }); + self.inner + .replica_ctx() + .relational_db + .with_auto_commit(workload, |mut_tx| { + mut_tx.insert_st_client(caller_identity, caller_connection_id) }) - .map_err(|e| { - log::error!("st_clients table update failed with params with error: {:?}", e); - }); + .inspect_err(|e| { + log::error!( + "`call_identity_connected`: fallback transaction to insert into `st_client` failed: {e:#?}" + ); + }) + .map_err(Into::into) } - result } - fn update_st_clients( + /// Invoke the module's `client_disconnected` reducer, if it has one, + /// and delete the client's row from `st_client`, if any. + /// + /// The host inspects `st_client` when restarting in order to run `client_disconnected` reducers + /// for clients that were connected at the time when the host went down. + /// This ensures that every client connection eventually has `client_disconnected` invoked. + /// + /// Unlike [`Self::call_identity_connected`], + /// this method swallows errors returned by the `client_disconnected` reducer. + /// The database can't reject a disconnection - the client's gone, whether the database likes it or not. + /// + /// If this method returns an error, the database is likely to wind up in a bad state, + /// as that means we've somehow failed to delete from `st_client`. + /// We cannot meaningfully handle this. + /// Sometimes it just means that the database no longer exists, though, which is fine. + pub async fn call_identity_disconnected( &self, - mut_tx: &mut MutTxId, caller_identity: Identity, caller_connection_id: ConnectionId, - connected: bool, - ) -> Result<(), DBError> { - let db = &*self.inner.replica_ctx().relational_db; + ) -> Result<(), ReducerCallError> { + let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect); - let row = &StClientRow { - identity: caller_identity.into(), - connection_id: caller_connection_id.into(), + // A fallback transaction that deletes the client from `st_client`. + let fallback = || { + let reducer_name = reducer_lookup + .as_ref() + .map(|(_, def)| &*def.name) + .unwrap_or("__identity_disconnected__"); + + let workload = Workload::Reducer(ReducerContext { + name: reducer_name.to_owned(), + caller_identity, + caller_connection_id, + timestamp: Timestamp::now(), + arg_bsatn: Bytes::new(), + }); + self.inner + .replica_ctx() + .relational_db + .with_auto_commit(workload, |mut_tx| { + mut_tx.delete_st_client(caller_identity, caller_connection_id) + }) + .inspect_err(|e| { + log::error!( + "`call_identity_disconnected`: fallback transaction to delete from `st_client` failed: {e}" + ); + }) + .map_err(|err| { + InvalidReducerArguments { + err: err.into(), + reducer: reducer_name.into(), + } + .into() + }) }; - if connected { - mut_tx.insert_via_serialize_bsatn(ST_CLIENT_ID, &row).map(|_| ()) + if let Some((reducer_id, reducer_def)) = reducer_lookup { + // The module defined a lifecycle reducer to handle disconnects. Call it. + // If it succeeds, `WasmModuleInstance::call_reducer_with_tx` has already ensured + // that `st_client` is updated appropriately. + let result = self + .call_reducer_inner( + caller_identity, + Some(caller_connection_id), + None, + None, + None, + reducer_id, + reducer_def, + ReducerArgs::Nullary, + ) + .await; + + // If it failed, we still need to update `st_client`: the client's not coming back. + // Commit a separate transaction that just updates `st_client`. + // + // It's OK for this to not be atomic with the previous transaction, + // since that transaction didn't commit. If we crash before committing this one, + // we'll run the `client_disconnected` reducer again unnecessarily, + // but the commitlog won't contain two invocations of it, which is what we care about. + match result { + Err(e) => { + log::error!("call_reducer_inner of client_disconnected failed: {e:#?}"); + fallback() + } + Ok(ReducerCallResult { + outcome: ReducerOutcome::Failed(_) | ReducerOutcome::BudgetExceeded, + .. + }) => fallback(), + + // If it succeeded, as mentioend above, `st_client` is already updated. + Ok(ReducerCallResult { + outcome: ReducerOutcome::Committed, + .. + }) => Ok(()), + } } else { - let row = db - .iter_by_col_eq_mut( - mut_tx, - ST_CLIENT_ID, - col_list![StClientFields::Identity, StClientFields::ConnectionId], - &algebraic_value::AlgebraicValue::product(row), - )? - .map(|row_ref| row_ref.pointer()) - .collect::>(); - db.delete(mut_tx, ST_CLIENT_ID, row); - Ok::<(), DBError>(()) + // The module doesn't define a `client_disconnected` reducer. + // Commit a transaction to update `st_clients`. + fallback() } } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 15e36922b..6bb641fb3 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -11,7 +11,6 @@ use std::time::Duration; use super::instrumentation::CallTimes; use crate::database_logger::{self, SystemLogger}; use crate::db::datastore::locking_tx_datastore::MutTxId; -use crate::db::datastore::system_tables::{StClientRow, ST_CLIENT_ID}; use crate::db::datastore::traits::{IsolationLevel, Program}; use crate::db::db_metrics::DB_METRICS; use crate::energy::{EnergyMonitor, EnergyQuanta, ReducerBudget, ReducerFingerprint}; @@ -551,18 +550,20 @@ impl WasmModuleInstance { ); EventStatus::Failed(errmsg.into()) } - // we haven't actually comitted yet - `commit_and_broadcast_event` will commit + // We haven't actually comitted yet - `commit_and_broadcast_event` will commit // for us and replace this with the actual database update. + // + // Detecting a new client, and inserting it in `st_clients` + // and conversely removing from `st_clients` on disconnect. Ok(Ok(())) => { - // Detecing a new client, and inserting it in `st_clients` - // Disconnect logic is written in module_host.rs, due to different transacationality requirements. - if reducer_def.lifecycle == Some(Lifecycle::OnConnect) { - match self.insert_st_client(&mut tx, caller_identity, caller_connection_id) { - Ok(_) => EventStatus::Committed(DatabaseUpdate::default()), - Err(err) => EventStatus::Failed(err.to_string()), - } - } else { - EventStatus::Committed(DatabaseUpdate::default()) + let res = match reducer_def.lifecycle { + Some(Lifecycle::OnConnect) => tx.insert_st_client(caller_identity, caller_connection_id), + Some(Lifecycle::OnDisconnect) => tx.delete_st_client(caller_identity, caller_connection_id), + _ => Ok(()), + }; + match res { + Ok(()) => EventStatus::Committed(DatabaseUpdate::default()), + Err(err) => EventStatus::Failed(err.to_string()), } } }; @@ -603,19 +604,6 @@ impl WasmModuleInstance { fn system_logger(&self) -> &SystemLogger { self.replica_context().logger.system_logger() } - - fn insert_st_client( - &self, - tx: &mut MutTxId, - identity: Identity, - connection_id: ConnectionId, - ) -> Result<(), DBError> { - let row = &StClientRow { - identity: identity.into(), - connection_id: connection_id.into(), - }; - tx.insert_via_serialize_bsatn(ST_CLIENT_ID, row).map(|_| ()) - } } /// Describes a reducer call in a cheaply shareable way. diff --git a/smoketests/tests/client_connected_error_rejects_connection.py b/smoketests/tests/client_connected_error_rejects_connection.py new file mode 100644 index 000000000..8654643ad --- /dev/null +++ b/smoketests/tests/client_connected_error_rejects_connection.py @@ -0,0 +1,65 @@ +from .. import Smoketest + +MODULE_HEADER = """ +use spacetimedb::{ReducerContext, Table}; + +#[spacetimedb::table(name = all_u8s, public)] +pub struct AllU8s { + number: u8, +} + +#[spacetimedb::reducer(init)] +pub fn init(ctx: &ReducerContext) { + // Here's a bunch of data that no one will be able to subscribe to. + for i in u8::MIN..=u8::MAX { + ctx.db.all_u8s().insert(AllU8s { number: i }); + } +} +""" + +class ClientConnectedErrorRejectsConnection(Smoketest): + MODULE_CODE = MODULE_HEADER + """ + +#[spacetimedb::reducer(client_connected)] +pub fn identity_connected(ctx: &ReducerContext) -> Result<(), String> { + Err("Rejecting connection from client".to_string()) +} + +#[spacetimedb::reducer(client_disconnected)] +pub fn identity_disconnected(_ctx: &ReducerContext) { + panic!("This should never be called, since we reject all connections!") +} +""" + + def test_client_connected_error_rejects_connection(self): + with self.assertRaises(Exception): + self.subscribe("select * from all_u8s", n = 0)() + + logs = self.logs(100) + self.assertIn('Rejecting connection from client', logs) + self.assertNotIn('This should never be called, since we reject all connections!', logs) + +class ClientDisconnectedErrorStillDeletesStClient(Smoketest): + MODULE_CODE = MODULE_HEADER + """ +#[spacetimedb::reducer(client_connected)] +pub fn identity_connected(_ctx: &ReducerContext) -> Result<(), String> { + Ok(()) +} + +#[spacetimedb::reducer(client_disconnected)] +pub fn identity_disconnected(_ctx: &ReducerContext) { + panic!("This should be called, but the `st_client` row should still be deleted") +} +""" + + def test_client_disconnected_error_still_deletes_st_client(self): + self.subscribe("select * from all_u8s", n = 0)() + + logs = self.logs(100) + self.assertIn('This should be called, but the `st_client` row should still be deleted', logs) + + sql_out = self.spacetime("sql", self.database_identity, "select * from st_client") + + self.assertMultiLineEqual(sql_out, """ identity | connection_id +----------+--------------- +""") diff --git a/smoketests/tests/connect_disconnect_from_cli.py b/smoketests/tests/connect_disconnect_from_cli.py index dc13d8756..a2f46ac88 100644 --- a/smoketests/tests/connect_disconnect_from_cli.py +++ b/smoketests/tests/connect_disconnect_from_cli.py @@ -8,13 +8,11 @@ use spacetimedb::{log, ReducerContext}; #[spacetimedb::reducer(client_connected)] pub fn connected(_ctx: &ReducerContext) { log::info!("_connect called"); - panic!("Panic on connect"); } #[spacetimedb::reducer(client_disconnected)] pub fn disconnected(_ctx: &ReducerContext) { log::info!("disconnect called"); - panic!("Panic on disconnect"); } #[spacetimedb::reducer]