mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-06 07:26:43 -04:00
Repair transactionality of st_client removals after disconnect (#2446)
Co-authored-by: Phoebe Goldman <phoebe@clockworklabs.io>
This commit is contained in:
committed by
GitHub
parent
d436b1f9b7
commit
eb380e6a18
@@ -18,6 +18,7 @@ use futures::StreamExt;
|
||||
use http::StatusCode;
|
||||
use serde::Deserialize;
|
||||
use spacetimedb::database_logger::DatabaseLogger;
|
||||
use spacetimedb::host::module_host::ClientConnectedError;
|
||||
use spacetimedb::host::ReducerArgs;
|
||||
use spacetimedb::host::ReducerCallError;
|
||||
use spacetimedb::host::ReducerOutcome;
|
||||
@@ -74,11 +75,34 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
|
||||
// so generate one.
|
||||
let connection_id = generate_random_connection_id();
|
||||
|
||||
if let Err(e) = module
|
||||
.call_identity_connected_disconnected(caller_identity, connection_id, true)
|
||||
.await
|
||||
{
|
||||
return Err((StatusCode::NOT_FOUND, format!("{:#}", anyhow::anyhow!(e))).into());
|
||||
match module.call_identity_connected(caller_identity, connection_id).await {
|
||||
// If `call_identity_connected` returns `Err(Rejected)`, then the `client_connected` reducer errored,
|
||||
// meaning the connection was refused. Return 403 forbidden.
|
||||
Err(ClientConnectedError::Rejected(msg)) => return Err((StatusCode::FORBIDDEN, msg).into()),
|
||||
// If `call_identity_connected` returns `Err(OutOfEnergy)`,
|
||||
// then, well, the database is out of energy.
|
||||
// Return 503 service unavailable.
|
||||
Err(err @ ClientConnectedError::OutOfEnergy) => {
|
||||
return Err((StatusCode::SERVICE_UNAVAILABLE, err.to_string()).into())
|
||||
}
|
||||
// If `call_identity_connected` returns `Err(ReducerCall)`,
|
||||
// something went wrong while invoking the `client_connected` reducer.
|
||||
// I (pgoldman 2025-03-27) am not really sure how this would happen,
|
||||
// but we returned 404 not found in this case prior to my editing this code,
|
||||
// so I guess let's keep doing that.
|
||||
Err(ClientConnectedError::ReducerCall(e)) => {
|
||||
return Err((StatusCode::NOT_FOUND, format!("{:#}", anyhow::anyhow!(e))).into())
|
||||
}
|
||||
// If `call_identity_connected` returns `Err(DBError)`,
|
||||
// then the module didn't define `client_connected`,
|
||||
// but something went wrong when we tried to insert into `st_client`.
|
||||
// That's weird and scary, so return 500 internal error.
|
||||
Err(e @ ClientConnectedError::DBError(_)) => {
|
||||
return Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into())
|
||||
}
|
||||
|
||||
// If `call_identity_connected` returns `Ok`, then we can actually call the reducer we want.
|
||||
Ok(()) => (),
|
||||
}
|
||||
let result = match module
|
||||
.call_reducer(caller_identity, Some(connection_id), None, None, None, &reducer, args)
|
||||
@@ -107,11 +131,12 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = module
|
||||
.call_identity_connected_disconnected(caller_identity, connection_id, false)
|
||||
.await
|
||||
{
|
||||
return Err((StatusCode::NOT_FOUND, format!("{:#}", anyhow::anyhow!(e))).into());
|
||||
if let Err(e) = module.call_identity_disconnected(caller_identity, connection_id).await {
|
||||
// If `call_identity_disconnected` errors, something is very wrong:
|
||||
// it means we tried to delete the `st_client` row but failed.
|
||||
// Note that `call_identity_disconnected` swallows errors from the `client_disconnected` reducer.
|
||||
// Slap a 500 on it and pray.
|
||||
return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("{:#}", anyhow::anyhow!(e))).into());
|
||||
}
|
||||
|
||||
match result {
|
||||
|
||||
@@ -14,6 +14,7 @@ use scopeguard::ScopeGuard;
|
||||
use serde::Deserialize;
|
||||
use spacetimedb::client::messages::{serialize, IdentityTokenMessage, SerializableMessage};
|
||||
use spacetimedb::client::{ClientActorId, ClientConfig, ClientConnection, DataMessage, MessageHandleError, Protocol};
|
||||
use spacetimedb::host::module_host::ClientConnectedError;
|
||||
use spacetimedb::host::NoSuchModule;
|
||||
use spacetimedb::util::also_poll;
|
||||
use spacetimedb::worker_metrics::WORKER_METRICS;
|
||||
@@ -148,7 +149,11 @@ where
|
||||
let client = match ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor).await
|
||||
{
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
Err(e @ (ClientConnectedError::Rejected(_) | ClientConnectedError::OutOfEnergy)) => {
|
||||
log::info!("{e}");
|
||||
return;
|
||||
}
|
||||
Err(e @ (ClientConnectedError::DBError(_) | ClientConnectedError::ReducerCall(_))) => {
|
||||
log::warn!("ModuleHost died while we were connecting: {e:#}");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::time::Instant;
|
||||
use super::messages::{OneOffQueryResponseMessage, SerializableMessage};
|
||||
use super::{message_handlers, ClientActorId, MessageHandleError};
|
||||
use crate::error::DBError;
|
||||
use crate::host::module_host::ClientConnectedError;
|
||||
use crate::host::{ModuleHost, NoSuchModule, ReducerArgs, ReducerCallError, ReducerCallResult};
|
||||
use crate::messages::websocket::Subscribe;
|
||||
use crate::util::prometheus_handle::IntGaugeExt;
|
||||
@@ -171,7 +172,7 @@ impl ClientConnection {
|
||||
replica_id: u64,
|
||||
mut module_rx: watch::Receiver<ModuleHost>,
|
||||
actor: impl FnOnce(ClientConnection, mpsc::Receiver<SerializableMessage>) -> Fut,
|
||||
) -> Result<ClientConnection, ReducerCallError>
|
||||
) -> Result<ClientConnection, ClientConnectedError>
|
||||
where
|
||||
Fut: Future<Output = ()> + Send + 'static,
|
||||
{
|
||||
@@ -180,9 +181,7 @@ impl ClientConnection {
|
||||
// logically subscribed to the database, not any particular replica. We should handle failover for
|
||||
// them and stuff. Not right now though.
|
||||
let module = module_rx.borrow_and_update().clone();
|
||||
module
|
||||
.call_identity_connected_disconnected(id.identity, id.connection_id, true)
|
||||
.await?;
|
||||
module.call_identity_connected(id.identity, id.connection_id).await?;
|
||||
|
||||
let (sendtx, sendrx) = mpsc::channel::<SerializableMessage>(CLIENT_CHANNEL_CAPACITY);
|
||||
|
||||
|
||||
@@ -9,10 +9,11 @@ use super::{
|
||||
SharedMutexGuard, SharedWriteGuard,
|
||||
};
|
||||
use crate::db::datastore::system_tables::{
|
||||
with_sys_table_buf, StColumnFields, StColumnRow, StConstraintFields, StConstraintRow, StFields as _, StIndexFields,
|
||||
StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow, StScheduledFields, StScheduledRow, StSequenceFields,
|
||||
StSequenceRow, StTableFields, StTableRow, SystemTable, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_INDEX_ID,
|
||||
ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ID,
|
||||
with_sys_table_buf, StClientFields, StClientRow, StColumnFields, StColumnRow, StConstraintFields, StConstraintRow,
|
||||
StFields as _, StIndexFields, StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow, StScheduledFields,
|
||||
StScheduledRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, SystemTable, ST_CLIENT_ID,
|
||||
ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID,
|
||||
ST_TABLE_ID,
|
||||
};
|
||||
use crate::db::datastore::traits::{RowTypeForTable, TxData};
|
||||
use crate::db::datastore::{
|
||||
@@ -34,9 +35,14 @@ use core::ops::RangeBounds;
|
||||
use core::{iter, ops::Bound};
|
||||
use smallvec::SmallVec;
|
||||
use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore};
|
||||
use spacetimedb_lib::db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP};
|
||||
use spacetimedb_lib::{db::raw_def::v9::RawSql, metrics::ExecutionMetrics};
|
||||
use spacetimedb_primitives::{ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId};
|
||||
use spacetimedb_lib::{
|
||||
db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP},
|
||||
ConnectionId, Identity,
|
||||
};
|
||||
use spacetimedb_primitives::{
|
||||
col_list, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId,
|
||||
};
|
||||
use spacetimedb_sats::{
|
||||
bsatn::{self, to_writer, DecodeError, Deserializer},
|
||||
de::{DeserializeSeed, WithBound},
|
||||
@@ -1230,6 +1236,31 @@ impl<'a> Iterator for IndexScanFilterDeleted<'a> {
|
||||
}
|
||||
|
||||
impl MutTxId {
|
||||
pub(crate) fn insert_st_client(&mut self, identity: Identity, connection_id: ConnectionId) -> Result<()> {
|
||||
let row = &StClientRow {
|
||||
identity: identity.into(),
|
||||
connection_id: connection_id.into(),
|
||||
};
|
||||
self.insert_via_serialize_bsatn(ST_CLIENT_ID, row).map(|_| ())
|
||||
}
|
||||
|
||||
pub(crate) fn delete_st_client(&mut self, identity: Identity, connection_id: ConnectionId) -> Result<()> {
|
||||
let row = &StClientRow {
|
||||
identity: identity.into(),
|
||||
connection_id: connection_id.into(),
|
||||
};
|
||||
let ptr = self
|
||||
.iter_by_col_eq(
|
||||
ST_CLIENT_ID,
|
||||
col_list![StClientFields::Identity, StClientFields::ConnectionId],
|
||||
&AlgebraicValue::product(row),
|
||||
)?
|
||||
.next()
|
||||
.expect("the client should be connected")
|
||||
.pointer();
|
||||
self.delete(ST_CLIENT_ID, ptr).map(drop)
|
||||
}
|
||||
|
||||
pub(crate) fn insert_via_serialize_bsatn<'a, T: Serialize>(
|
||||
&'a mut self,
|
||||
table_id: TableId,
|
||||
|
||||
@@ -767,7 +767,7 @@ impl Host {
|
||||
// Disconnect dangling clients.
|
||||
for (identity, connection_id) in connected_clients {
|
||||
module_host
|
||||
.call_identity_connected_disconnected(identity, connection_id, false)
|
||||
.call_identity_disconnected(identity, connection_id)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, ReducerId, Scheduler};
|
||||
use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler};
|
||||
use crate::client::{ClientActorId, ClientConnectionSender};
|
||||
use crate::database_logger::{LogLevel, Record};
|
||||
use crate::db::datastore::locking_tx_datastore::MutTxId;
|
||||
use crate::db::datastore::system_tables::{StClientFields, StClientRow, ST_CLIENT_ID};
|
||||
use crate::db::datastore::traits::{IsolationLevel, Program, TxData};
|
||||
use crate::energy::EnergyQuanta;
|
||||
use crate::error::DBError;
|
||||
@@ -25,7 +24,6 @@ use derive_more::From;
|
||||
use futures::{Future, FutureExt};
|
||||
use indexmap::IndexSet;
|
||||
use itertools::Itertools;
|
||||
use smallvec::SmallVec;
|
||||
use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate, WebsocketFormat};
|
||||
use spacetimedb_data_structures::error_stream::ErrorStream;
|
||||
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
|
||||
@@ -33,9 +31,9 @@ use spacetimedb_lib::db::raw_def::v9::Lifecycle;
|
||||
use spacetimedb_lib::identity::{AuthCtx, RequestId};
|
||||
use spacetimedb_lib::ConnectionId;
|
||||
use spacetimedb_lib::Timestamp;
|
||||
use spacetimedb_primitives::{col_list, TableId};
|
||||
use spacetimedb_primitives::TableId;
|
||||
use spacetimedb_query::compile_subscription;
|
||||
use spacetimedb_sats::{algebraic_value, ProductValue};
|
||||
use spacetimedb_sats::ProductValue;
|
||||
use spacetimedb_schema::auto_migrate::AutoMigrateError;
|
||||
use spacetimedb_schema::def::deserialize::ReducerArgsDeserializeSeed;
|
||||
use spacetimedb_schema::def::{ModuleDef, ReducerDef};
|
||||
@@ -468,6 +466,18 @@ pub enum InitDatabaseError {
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum ClientConnectedError {
|
||||
#[error(transparent)]
|
||||
ReducerCall(#[from] ReducerCallError),
|
||||
#[error("Failed to insert `st_client` row for module without client_connected reducer: {0}")]
|
||||
DBError(#[from] DBError),
|
||||
#[error("Connection rejected by `client_connected` reducer: {0}")]
|
||||
Rejected(String),
|
||||
#[error("Insufficient energy balance to run `client_connected` reducer")]
|
||||
OutOfEnergy,
|
||||
}
|
||||
|
||||
impl ModuleHost {
|
||||
pub fn new(mut module: impl Module, on_panic: impl Fn() + Send + Sync + 'static) -> Self {
|
||||
let info = module.info();
|
||||
@@ -523,119 +533,203 @@ impl ModuleHost {
|
||||
})
|
||||
.await;
|
||||
// ignore NoSuchModule; if the module's already closed, that's fine
|
||||
let _ = self
|
||||
.call_identity_connected_disconnected(client_id.identity, client_id.connection_id, false)
|
||||
.await;
|
||||
if let Err(e) = self
|
||||
.call_identity_disconnected(client_id.identity, client_id.connection_id)
|
||||
.await
|
||||
{
|
||||
log::error!("Error from client_disconnected transaction: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
/// Method is responsible for handling connect/disconnect events.
|
||||
/// Invoke the module's `client_connected` reducer, if it has one,
|
||||
/// and insert a new row into `st_client` for `(caller_identity, caller_connection_id)`.
|
||||
///
|
||||
/// It ensures pairing up those event in commitlogs
|
||||
/// Though It can also create two entries `__identity_disconnect__`.
|
||||
/// One is to actually run the reducer and another one to delete client from `st_clients`
|
||||
pub async fn call_identity_connected_disconnected(
|
||||
/// The host inspects `st_client` when restarting in order to run `client_disconnected` reducers
|
||||
/// for clients that were connected at the time when the host went down.
|
||||
/// This ensures that every client connection eventually has `client_disconnected` invoked.
|
||||
///
|
||||
/// If this method returns `Ok`, then the client connection has been approved,
|
||||
/// and the new row has been inserted into `st_client`.
|
||||
///
|
||||
/// If this method returns `Err`, then the client connection has either failed or been rejected,
|
||||
/// and `st_client` has not been modified.
|
||||
/// In this case, the caller should terminate the connection.
|
||||
pub async fn call_identity_connected(
|
||||
&self,
|
||||
caller_identity: Identity,
|
||||
caller_connection_id: ConnectionId,
|
||||
connected: bool,
|
||||
) -> Result<(), ReducerCallError> {
|
||||
let (lifecycle, fake_name) = if connected {
|
||||
(Lifecycle::OnConnect, "__identity_connected__")
|
||||
) -> Result<(), ClientConnectedError> {
|
||||
let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnConnect);
|
||||
|
||||
if let Some((reducer_id, reducer_def)) = reducer_lookup {
|
||||
// The module defined a lifecycle reducer to handle new connections.
|
||||
// Call this reducer.
|
||||
// If the call fails (as in, something unexpectedly goes wrong with WASM execution),
|
||||
// abort the connection: we can't really recover.
|
||||
let reducer_outcome = self
|
||||
.call_reducer_inner(
|
||||
caller_identity,
|
||||
Some(caller_connection_id),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
reducer_id,
|
||||
reducer_def,
|
||||
ReducerArgs::Nullary,
|
||||
)
|
||||
.await?;
|
||||
|
||||
match reducer_outcome.outcome {
|
||||
// If the reducer committed successfully, we're done.
|
||||
// `WasmModuleInstance::call_reducer_with_tx` has already ensured
|
||||
// that `st_client` is updated appropriately.
|
||||
//
|
||||
// It's necessary to spread out the responsibility for updating `st_client` in this way
|
||||
// because it's important that `call_identity_connected` commit at most one transaction.
|
||||
// A naive implementation of this method would just run the reducer first,
|
||||
// then insert into `st_client`,
|
||||
// but if we crashed in between, we'd be left in an inconsistent state
|
||||
// where the reducer had run but `st_client` was not yet updated.
|
||||
ReducerOutcome::Committed => Ok(()),
|
||||
|
||||
// If the reducer returned an error or couldn't run due to insufficient energy,
|
||||
// abort the connection: the module code has decided it doesn't want this client.
|
||||
ReducerOutcome::Failed(message) => Err(ClientConnectedError::Rejected(message)),
|
||||
ReducerOutcome::BudgetExceeded => Err(ClientConnectedError::OutOfEnergy),
|
||||
}
|
||||
} else {
|
||||
(Lifecycle::OnDisconnect, "__identity_disconnected__")
|
||||
};
|
||||
// The module doesn't define a client_connected reducer.
|
||||
// Commit a transaction to update `st_clients`
|
||||
// and to ensure we always have those events paired in the commitlog.
|
||||
//
|
||||
// This is necessary to be able to disconnect clients after a server crash.
|
||||
let reducer_name = reducer_lookup
|
||||
.as_ref()
|
||||
.map(|(_, def)| &*def.name)
|
||||
.unwrap_or("__identity_connected__");
|
||||
|
||||
let reducer_lookup = self.info.module_def.lifecycle_reducer(lifecycle);
|
||||
let reducer_name = reducer_lookup.as_ref().map(|(_, def)| &*def.name).unwrap_or(fake_name);
|
||||
|
||||
let db = &self.inner.replica_ctx().relational_db;
|
||||
let workload = || {
|
||||
Workload::Reducer(ReducerContext {
|
||||
let workload = Workload::Reducer(ReducerContext {
|
||||
name: reducer_name.to_owned(),
|
||||
caller_identity,
|
||||
caller_connection_id,
|
||||
timestamp: Timestamp::now(),
|
||||
arg_bsatn: Bytes::new(),
|
||||
})
|
||||
};
|
||||
|
||||
let result = if let Some((reducer_id, reducer_def)) = reducer_lookup {
|
||||
self.call_reducer_inner(
|
||||
caller_identity,
|
||||
Some(caller_connection_id),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
reducer_id,
|
||||
reducer_def,
|
||||
ReducerArgs::Nullary,
|
||||
)
|
||||
.await
|
||||
.map(drop)
|
||||
} else {
|
||||
// If the module doesn't define connected or disconnected, commit
|
||||
// a transaction to update `st_clients` and to ensure we always have those events
|
||||
// paired in the commitlog.
|
||||
//
|
||||
// This is necessary to be able to disconnect clients after a server
|
||||
// crash.
|
||||
db.with_auto_commit(workload(), |mut_tx| {
|
||||
if connected {
|
||||
self.update_st_clients(mut_tx, caller_identity, caller_connection_id, connected)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
.map_err(|err| {
|
||||
InvalidReducerArguments {
|
||||
err: err.into(),
|
||||
reducer: reducer_name.into(),
|
||||
}
|
||||
.into()
|
||||
})
|
||||
};
|
||||
|
||||
// Deleting client from `st_clients`does not depend upon result of disconnect reducer hence done in a separate tx.
|
||||
if !connected {
|
||||
let _ = db
|
||||
.with_auto_commit(workload(), |mut_tx| {
|
||||
self.update_st_clients(mut_tx, caller_identity, caller_connection_id, connected)
|
||||
});
|
||||
self.inner
|
||||
.replica_ctx()
|
||||
.relational_db
|
||||
.with_auto_commit(workload, |mut_tx| {
|
||||
mut_tx.insert_st_client(caller_identity, caller_connection_id)
|
||||
})
|
||||
.map_err(|e| {
|
||||
log::error!("st_clients table update failed with params with error: {:?}", e);
|
||||
});
|
||||
.inspect_err(|e| {
|
||||
log::error!(
|
||||
"`call_identity_connected`: fallback transaction to insert into `st_client` failed: {e:#?}"
|
||||
);
|
||||
})
|
||||
.map_err(Into::into)
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
fn update_st_clients(
|
||||
/// Invoke the module's `client_disconnected` reducer, if it has one,
|
||||
/// and delete the client's row from `st_client`, if any.
|
||||
///
|
||||
/// The host inspects `st_client` when restarting in order to run `client_disconnected` reducers
|
||||
/// for clients that were connected at the time when the host went down.
|
||||
/// This ensures that every client connection eventually has `client_disconnected` invoked.
|
||||
///
|
||||
/// Unlike [`Self::call_identity_connected`],
|
||||
/// this method swallows errors returned by the `client_disconnected` reducer.
|
||||
/// The database can't reject a disconnection - the client's gone, whether the database likes it or not.
|
||||
///
|
||||
/// If this method returns an error, the database is likely to wind up in a bad state,
|
||||
/// as that means we've somehow failed to delete from `st_client`.
|
||||
/// We cannot meaningfully handle this.
|
||||
/// Sometimes it just means that the database no longer exists, though, which is fine.
|
||||
pub async fn call_identity_disconnected(
|
||||
&self,
|
||||
mut_tx: &mut MutTxId,
|
||||
caller_identity: Identity,
|
||||
caller_connection_id: ConnectionId,
|
||||
connected: bool,
|
||||
) -> Result<(), DBError> {
|
||||
let db = &*self.inner.replica_ctx().relational_db;
|
||||
) -> Result<(), ReducerCallError> {
|
||||
let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect);
|
||||
|
||||
let row = &StClientRow {
|
||||
identity: caller_identity.into(),
|
||||
connection_id: caller_connection_id.into(),
|
||||
// A fallback transaction that deletes the client from `st_client`.
|
||||
let fallback = || {
|
||||
let reducer_name = reducer_lookup
|
||||
.as_ref()
|
||||
.map(|(_, def)| &*def.name)
|
||||
.unwrap_or("__identity_disconnected__");
|
||||
|
||||
let workload = Workload::Reducer(ReducerContext {
|
||||
name: reducer_name.to_owned(),
|
||||
caller_identity,
|
||||
caller_connection_id,
|
||||
timestamp: Timestamp::now(),
|
||||
arg_bsatn: Bytes::new(),
|
||||
});
|
||||
self.inner
|
||||
.replica_ctx()
|
||||
.relational_db
|
||||
.with_auto_commit(workload, |mut_tx| {
|
||||
mut_tx.delete_st_client(caller_identity, caller_connection_id)
|
||||
})
|
||||
.inspect_err(|e| {
|
||||
log::error!(
|
||||
"`call_identity_disconnected`: fallback transaction to delete from `st_client` failed: {e}"
|
||||
);
|
||||
})
|
||||
.map_err(|err| {
|
||||
InvalidReducerArguments {
|
||||
err: err.into(),
|
||||
reducer: reducer_name.into(),
|
||||
}
|
||||
.into()
|
||||
})
|
||||
};
|
||||
|
||||
if connected {
|
||||
mut_tx.insert_via_serialize_bsatn(ST_CLIENT_ID, &row).map(|_| ())
|
||||
if let Some((reducer_id, reducer_def)) = reducer_lookup {
|
||||
// The module defined a lifecycle reducer to handle disconnects. Call it.
|
||||
// If it succeeds, `WasmModuleInstance::call_reducer_with_tx` has already ensured
|
||||
// that `st_client` is updated appropriately.
|
||||
let result = self
|
||||
.call_reducer_inner(
|
||||
caller_identity,
|
||||
Some(caller_connection_id),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
reducer_id,
|
||||
reducer_def,
|
||||
ReducerArgs::Nullary,
|
||||
)
|
||||
.await;
|
||||
|
||||
// If it failed, we still need to update `st_client`: the client's not coming back.
|
||||
// Commit a separate transaction that just updates `st_client`.
|
||||
//
|
||||
// It's OK for this to not be atomic with the previous transaction,
|
||||
// since that transaction didn't commit. If we crash before committing this one,
|
||||
// we'll run the `client_disconnected` reducer again unnecessarily,
|
||||
// but the commitlog won't contain two invocations of it, which is what we care about.
|
||||
match result {
|
||||
Err(e) => {
|
||||
log::error!("call_reducer_inner of client_disconnected failed: {e:#?}");
|
||||
fallback()
|
||||
}
|
||||
Ok(ReducerCallResult {
|
||||
outcome: ReducerOutcome::Failed(_) | ReducerOutcome::BudgetExceeded,
|
||||
..
|
||||
}) => fallback(),
|
||||
|
||||
// If it succeeded, as mentioend above, `st_client` is already updated.
|
||||
Ok(ReducerCallResult {
|
||||
outcome: ReducerOutcome::Committed,
|
||||
..
|
||||
}) => Ok(()),
|
||||
}
|
||||
} else {
|
||||
let row = db
|
||||
.iter_by_col_eq_mut(
|
||||
mut_tx,
|
||||
ST_CLIENT_ID,
|
||||
col_list![StClientFields::Identity, StClientFields::ConnectionId],
|
||||
&algebraic_value::AlgebraicValue::product(row),
|
||||
)?
|
||||
.map(|row_ref| row_ref.pointer())
|
||||
.collect::<SmallVec<[_; 1]>>();
|
||||
db.delete(mut_tx, ST_CLIENT_ID, row);
|
||||
Ok::<(), DBError>(())
|
||||
// The module doesn't define a `client_disconnected` reducer.
|
||||
// Commit a transaction to update `st_clients`.
|
||||
fallback()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,6 @@ use std::time::Duration;
|
||||
use super::instrumentation::CallTimes;
|
||||
use crate::database_logger::{self, SystemLogger};
|
||||
use crate::db::datastore::locking_tx_datastore::MutTxId;
|
||||
use crate::db::datastore::system_tables::{StClientRow, ST_CLIENT_ID};
|
||||
use crate::db::datastore::traits::{IsolationLevel, Program};
|
||||
use crate::db::db_metrics::DB_METRICS;
|
||||
use crate::energy::{EnergyMonitor, EnergyQuanta, ReducerBudget, ReducerFingerprint};
|
||||
@@ -551,18 +550,20 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
|
||||
);
|
||||
EventStatus::Failed(errmsg.into())
|
||||
}
|
||||
// we haven't actually comitted yet - `commit_and_broadcast_event` will commit
|
||||
// We haven't actually comitted yet - `commit_and_broadcast_event` will commit
|
||||
// for us and replace this with the actual database update.
|
||||
//
|
||||
// Detecting a new client, and inserting it in `st_clients`
|
||||
// and conversely removing from `st_clients` on disconnect.
|
||||
Ok(Ok(())) => {
|
||||
// Detecing a new client, and inserting it in `st_clients`
|
||||
// Disconnect logic is written in module_host.rs, due to different transacationality requirements.
|
||||
if reducer_def.lifecycle == Some(Lifecycle::OnConnect) {
|
||||
match self.insert_st_client(&mut tx, caller_identity, caller_connection_id) {
|
||||
Ok(_) => EventStatus::Committed(DatabaseUpdate::default()),
|
||||
Err(err) => EventStatus::Failed(err.to_string()),
|
||||
}
|
||||
} else {
|
||||
EventStatus::Committed(DatabaseUpdate::default())
|
||||
let res = match reducer_def.lifecycle {
|
||||
Some(Lifecycle::OnConnect) => tx.insert_st_client(caller_identity, caller_connection_id),
|
||||
Some(Lifecycle::OnDisconnect) => tx.delete_st_client(caller_identity, caller_connection_id),
|
||||
_ => Ok(()),
|
||||
};
|
||||
match res {
|
||||
Ok(()) => EventStatus::Committed(DatabaseUpdate::default()),
|
||||
Err(err) => EventStatus::Failed(err.to_string()),
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -603,19 +604,6 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
|
||||
fn system_logger(&self) -> &SystemLogger {
|
||||
self.replica_context().logger.system_logger()
|
||||
}
|
||||
|
||||
fn insert_st_client(
|
||||
&self,
|
||||
tx: &mut MutTxId,
|
||||
identity: Identity,
|
||||
connection_id: ConnectionId,
|
||||
) -> Result<(), DBError> {
|
||||
let row = &StClientRow {
|
||||
identity: identity.into(),
|
||||
connection_id: connection_id.into(),
|
||||
};
|
||||
tx.insert_via_serialize_bsatn(ST_CLIENT_ID, row).map(|_| ())
|
||||
}
|
||||
}
|
||||
|
||||
/// Describes a reducer call in a cheaply shareable way.
|
||||
|
||||
@@ -0,0 +1,65 @@
|
||||
from .. import Smoketest
|
||||
|
||||
MODULE_HEADER = """
|
||||
use spacetimedb::{ReducerContext, Table};
|
||||
|
||||
#[spacetimedb::table(name = all_u8s, public)]
|
||||
pub struct AllU8s {
|
||||
number: u8,
|
||||
}
|
||||
|
||||
#[spacetimedb::reducer(init)]
|
||||
pub fn init(ctx: &ReducerContext) {
|
||||
// Here's a bunch of data that no one will be able to subscribe to.
|
||||
for i in u8::MIN..=u8::MAX {
|
||||
ctx.db.all_u8s().insert(AllU8s { number: i });
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
class ClientConnectedErrorRejectsConnection(Smoketest):
|
||||
MODULE_CODE = MODULE_HEADER + """
|
||||
|
||||
#[spacetimedb::reducer(client_connected)]
|
||||
pub fn identity_connected(ctx: &ReducerContext) -> Result<(), String> {
|
||||
Err("Rejecting connection from client".to_string())
|
||||
}
|
||||
|
||||
#[spacetimedb::reducer(client_disconnected)]
|
||||
pub fn identity_disconnected(_ctx: &ReducerContext) {
|
||||
panic!("This should never be called, since we reject all connections!")
|
||||
}
|
||||
"""
|
||||
|
||||
def test_client_connected_error_rejects_connection(self):
|
||||
with self.assertRaises(Exception):
|
||||
self.subscribe("select * from all_u8s", n = 0)()
|
||||
|
||||
logs = self.logs(100)
|
||||
self.assertIn('Rejecting connection from client', logs)
|
||||
self.assertNotIn('This should never be called, since we reject all connections!', logs)
|
||||
|
||||
class ClientDisconnectedErrorStillDeletesStClient(Smoketest):
|
||||
MODULE_CODE = MODULE_HEADER + """
|
||||
#[spacetimedb::reducer(client_connected)]
|
||||
pub fn identity_connected(_ctx: &ReducerContext) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[spacetimedb::reducer(client_disconnected)]
|
||||
pub fn identity_disconnected(_ctx: &ReducerContext) {
|
||||
panic!("This should be called, but the `st_client` row should still be deleted")
|
||||
}
|
||||
"""
|
||||
|
||||
def test_client_disconnected_error_still_deletes_st_client(self):
|
||||
self.subscribe("select * from all_u8s", n = 0)()
|
||||
|
||||
logs = self.logs(100)
|
||||
self.assertIn('This should be called, but the `st_client` row should still be deleted', logs)
|
||||
|
||||
sql_out = self.spacetime("sql", self.database_identity, "select * from st_client")
|
||||
|
||||
self.assertMultiLineEqual(sql_out, """ identity | connection_id
|
||||
----------+---------------
|
||||
""")
|
||||
@@ -8,13 +8,11 @@ use spacetimedb::{log, ReducerContext};
|
||||
#[spacetimedb::reducer(client_connected)]
|
||||
pub fn connected(_ctx: &ReducerContext) {
|
||||
log::info!("_connect called");
|
||||
panic!("Panic on connect");
|
||||
}
|
||||
|
||||
#[spacetimedb::reducer(client_disconnected)]
|
||||
pub fn disconnected(_ctx: &ReducerContext) {
|
||||
log::info!("disconnect called");
|
||||
panic!("Panic on disconnect");
|
||||
}
|
||||
|
||||
#[spacetimedb::reducer]
|
||||
|
||||
Reference in New Issue
Block a user