mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-06 07:26:43 -04:00
Add a mode to the Rust SDK with additional logging to a file (#4566)
# Description of Changes
In the Rust client SDK, with this PR, doing
`.with_debug_to_file("path.txt")` on the connection builder will result
in the SDK logging additional verbose info to `path.txt`.
# API and ABI breaking changes
Adds a new user-facing-ish API to the Rust client SDK.
# Expected complexity level and risk
2? Some possibility of deadlock due to adding a new mutex, but this is
explicitly not for production use.
# Testing
- [x] Ran `chat-console-rs` locally with this enabled and got some debug
logs out of it.
This commit is contained in:
@@ -1308,7 +1308,7 @@ impl __sdk::InModule for Reducer {{
|
||||
}
|
||||
|
||||
fn print_db_update_defn(module: &ModuleDef, visibility: CodegenVisibility, out: &mut Indenter) {
|
||||
writeln!(out, "#[derive(Default)]");
|
||||
writeln!(out, "#[derive(Default, Debug)]");
|
||||
writeln!(out, "#[allow(non_snake_case)]");
|
||||
writeln!(out, "#[doc(hidden)]");
|
||||
out.delimited_block(
|
||||
@@ -1616,6 +1616,7 @@ fn print_const_db_context_types(out: &mut Indenter) {
|
||||
out,
|
||||
"
|
||||
#[doc(hidden)]
|
||||
#[derive(Debug)]
|
||||
pub struct RemoteModule;
|
||||
|
||||
impl __sdk::InModule for RemoteModule {{
|
||||
|
||||
@@ -1230,7 +1230,7 @@ _ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Debug)]
|
||||
#[allow(non_snake_case)]
|
||||
#[doc(hidden)]
|
||||
pub struct DbUpdate {
|
||||
@@ -1346,6 +1346,7 @@ impl<'r> __sdk::AppliedDiff<'r> for AppliedDiff<'r> {
|
||||
|
||||
|
||||
#[doc(hidden)]
|
||||
#[derive(Debug)]
|
||||
pub struct RemoteModule;
|
||||
|
||||
impl __sdk::InModule for RemoteModule {
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
//! This module is internal, and may incompatibly change without warning.
|
||||
|
||||
use crate::callbacks::CallbackId;
|
||||
use crate::db_connection::{PendingMutation, SharedCell};
|
||||
use crate::db_connection::{debug_log, PendingMutation, SharedCell};
|
||||
use crate::spacetime_module::{InModule, SpacetimeModule, TableUpdate, WithBsatn};
|
||||
use anymap::{any::Any, Map};
|
||||
use bytes::Bytes;
|
||||
@@ -11,6 +11,9 @@ use core::any::type_name;
|
||||
use core::hash::Hash;
|
||||
use futures_channel::mpsc;
|
||||
use spacetimedb_data_structures::map::{hash_map::Entry, HashCollectionExt, HashMap};
|
||||
use std::fmt::Debug;
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -35,6 +38,9 @@ pub struct TableCache<Row> {
|
||||
/// Entries are added to this map during [`crate::DbConnectionBuilder::build`],
|
||||
/// via a `register_table` function autogenerated for each table.
|
||||
pub(crate) unique_indices: HashMap<&'static str, Box<dyn UniqueIndexDyn<Row = Row>>>,
|
||||
|
||||
/// Clone of the [`crate::db_connection::DbContextImpl::extra_logging`].
|
||||
extra_logging: Option<SharedCell<File>>,
|
||||
}
|
||||
|
||||
/// Stores an entry of the typed row value together with its ref count in the table cache.
|
||||
@@ -59,11 +65,12 @@ pub(crate) struct RowEntry<Row> {
|
||||
}
|
||||
|
||||
// Can't derive this because the `Row` generic messes us up.
|
||||
impl<Row> Default for TableCache<Row> {
|
||||
fn default() -> Self {
|
||||
impl<Row> TableCache<Row> {
|
||||
fn new(extra_logging: Option<SharedCell<File>>) -> Self {
|
||||
Self {
|
||||
entries: Default::default(),
|
||||
unique_indices: Default::default(),
|
||||
extra_logging,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -185,7 +192,13 @@ impl<'r, Row> TableAppliedDiff<'r, Row> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<Row: Clone + Send + Sync + 'static> TableCache<Row> {
|
||||
impl<Row> TableCache<Row> {
|
||||
fn debug_log(&self, body: impl FnOnce(&mut File) -> std::result::Result<(), std::io::Error>) {
|
||||
debug_log(&self.extra_logging, body);
|
||||
}
|
||||
}
|
||||
|
||||
impl<Row: Clone + Debug + Send + Sync + 'static> TableCache<Row> {
|
||||
fn handle_delete<'r>(
|
||||
&mut self,
|
||||
inserts: &mut RowEventMap<'_, Row>,
|
||||
@@ -195,8 +208,16 @@ impl<Row: Clone + Send + Sync + 'static> TableCache<Row> {
|
||||
// Extract the entry and decrement the `ref_count`.
|
||||
// Only create a delete event if `ref_count = 0`.
|
||||
let Entry::Occupied(mut entry) = self.entries.entry(delete.bsatn.clone()) else {
|
||||
self.debug_log(|file| {
|
||||
writeln!(file, "`handle_delete` for table with row type {}: a delete update should correspond to an existing row in the table cache, but the row {delete:?} was not present", std::any::type_name::<Row>())?;
|
||||
writeln!(file, "table contents:")?;
|
||||
for (bsatn, RowEntry { row, ref_count }) in self.entries.iter() {
|
||||
writeln!(file, "\t{bsatn:?}\n\t\t{row:?}\n\t\tref_count {ref_count}")?;
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
// We're guaranteed to never hit this as long as we apply inserts before deletes.
|
||||
unreachable!("a delete update should correspond to an existing row in the table cache");
|
||||
unreachable!("a delete update should correspond to an existing row in the table cache, but the row {delete:?} was not present");
|
||||
};
|
||||
let ref_count = &mut entry.get_mut().ref_count;
|
||||
*ref_count -= 1;
|
||||
@@ -320,19 +341,21 @@ pub struct ClientCache<M: SpacetimeModule + ?Sized> {
|
||||
/// The strings are table names, since we may have multiple tables with the same row type.
|
||||
tables: Map<dyn Any + Send + Sync>,
|
||||
|
||||
/// Clone of the [`crate::db_connection::DbContextImpl::extra_logging`].
|
||||
extra_logging: Option<SharedCell<File>>,
|
||||
|
||||
_module: PhantomData<M>,
|
||||
}
|
||||
|
||||
impl<M: SpacetimeModule> Default for ClientCache<M> {
|
||||
fn default() -> Self {
|
||||
impl<M: SpacetimeModule> ClientCache<M> {
|
||||
pub(crate) fn new(extra_logging: Option<SharedCell<File>>) -> Self {
|
||||
Self {
|
||||
tables: Map::new(),
|
||||
extra_logging,
|
||||
_module: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: SpacetimeModule> ClientCache<M> {
|
||||
/// Get a handle on the [`TableCache`] which stores rows of type `Row` for the table `table_name`.
|
||||
pub(crate) fn get_table<Row: InModule<Module = M> + Send + Sync + 'static>(
|
||||
&self,
|
||||
@@ -353,12 +376,12 @@ impl<M: SpacetimeModule> ClientCache<M> {
|
||||
.entry::<HashMap<&'static str, TableCache<Row>>>()
|
||||
.or_insert_with(Default::default)
|
||||
.entry(table_name)
|
||||
.or_default()
|
||||
.or_insert_with(|| TableCache::new(self.extra_logging.clone()))
|
||||
}
|
||||
|
||||
/// Apply all the mutations in `diff`
|
||||
/// to the [`TableCache`] which stores rows of type `Row` for the table `table_name`.
|
||||
pub fn apply_diff_to_table<'r, Row: InModule<Module = M> + Clone + Send + Sync + 'static>(
|
||||
pub fn apply_diff_to_table<'r, Row: InModule<Module = M> + Clone + Debug + Send + Sync + 'static>(
|
||||
&mut self,
|
||||
table_name: &'static str,
|
||||
diff: &'r TableUpdate<Row>,
|
||||
@@ -530,7 +553,7 @@ pub struct UniqueConstraintHandle<Row: InModule, Col> {
|
||||
}
|
||||
|
||||
impl<
|
||||
Row: Clone + InModule + Send + Sync + 'static,
|
||||
Row: Clone + Debug + InModule + Send + Sync + 'static,
|
||||
Col: std::any::Any + Eq + std::hash::Hash + Clone + Send + Sync + std::fmt::Debug + 'static,
|
||||
> UniqueConstraintHandle<Row, Col>
|
||||
{
|
||||
|
||||
@@ -37,7 +37,12 @@ use http::Uri;
|
||||
use spacetimedb_client_api_messages::websocket::{self as ws, common::QuerySetId};
|
||||
use spacetimedb_lib::{bsatn, ser::Serialize, ConnectionId, Identity, Timestamp};
|
||||
use spacetimedb_sats::Deserialize;
|
||||
use std::sync::{atomic::AtomicU32, Arc, Mutex as StdMutex, OnceLock};
|
||||
use std::{
|
||||
fs::{File, OpenOptions},
|
||||
io::Write,
|
||||
path::PathBuf,
|
||||
sync::{atomic::AtomicU32, Arc, Mutex as StdMutex, OnceLock},
|
||||
};
|
||||
use tokio::{
|
||||
runtime::{self, Runtime},
|
||||
sync::Mutex as TokioMutex,
|
||||
@@ -86,6 +91,8 @@ pub struct DbContextImpl<M: SpacetimeModule> {
|
||||
///
|
||||
/// This may be none if we have not yet received the [`ws::v2::InitialConnection`] message.
|
||||
connection_id: SharedCell<Option<ConnectionId>>,
|
||||
|
||||
pub(crate) extra_logging: Option<SharedCell<File>>,
|
||||
}
|
||||
|
||||
impl<M: SpacetimeModule> Clone for DbContextImpl<M> {
|
||||
@@ -103,14 +110,20 @@ impl<M: SpacetimeModule> Clone for DbContextImpl<M> {
|
||||
pending_mutations_recv: Arc::clone(&self.pending_mutations_recv),
|
||||
identity: Arc::clone(&self.identity),
|
||||
connection_id: Arc::clone(&self.connection_id),
|
||||
extra_logging: Option::<Arc<_>>::clone(&self.extra_logging),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: SpacetimeModule> DbContextImpl<M> {
|
||||
pub(crate) fn debug_log(&self, body: impl FnOnce(&mut File) -> std::result::Result<(), std::io::Error>) {
|
||||
debug_log(&self.extra_logging, body);
|
||||
}
|
||||
|
||||
/// Process a parsed WebSocket message,
|
||||
/// applying its mutations to the client cache and invoking callbacks.
|
||||
fn process_message(&self, msg: ParsedMessage<M>) -> crate::Result<()> {
|
||||
self.debug_log(|out| writeln!(out, "`process_message`: {msg:?}"));
|
||||
match msg {
|
||||
// Error: treat this as an erroneous disconnect.
|
||||
ParsedMessage::Error(e) => {
|
||||
@@ -315,6 +328,7 @@ impl<M: SpacetimeModule> DbContextImpl<M> {
|
||||
|
||||
/// Apply an individual [`PendingMutation`].
|
||||
fn apply_mutation(&self, mutation: PendingMutation<M>) -> crate::Result<()> {
|
||||
self.debug_log(|out| writeln!(out, "`apply_mutation`: {mutation:?}"));
|
||||
match mutation {
|
||||
// Subscribe: register the subscription in the [`SubscriptionManager`]
|
||||
// and send the `Subscribe` WS message.
|
||||
@@ -763,6 +777,8 @@ pub struct DbConnectionBuilder<M: SpacetimeModule> {
|
||||
on_connect_error: Option<OnConnectErrorCallback<M>>,
|
||||
on_disconnect: Option<OnDisconnectCallback<M>>,
|
||||
|
||||
additional_logging_path: Option<PathBuf>,
|
||||
|
||||
params: WsParams,
|
||||
}
|
||||
|
||||
@@ -798,6 +814,15 @@ pub fn set_connection_id(id: ConnectionId) -> crate::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn debug_log(
|
||||
extra_logging: &Option<SharedCell<File>>,
|
||||
body: impl FnOnce(&mut File) -> std::result::Result<(), std::io::Error>,
|
||||
) {
|
||||
if let Some(file) = extra_logging {
|
||||
body(&mut file.lock().expect("`extra_logging` file Mutex is poisoned")).expect("Writing debug log failed")
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: SpacetimeModule> DbConnectionBuilder<M> {
|
||||
/// Implementation of the generated `DbConnection::builder` method.
|
||||
/// Call that method instead.
|
||||
@@ -810,6 +835,7 @@ impl<M: SpacetimeModule> DbConnectionBuilder<M> {
|
||||
on_connect: None,
|
||||
on_connect_error: None,
|
||||
on_disconnect: None,
|
||||
additional_logging_path: None,
|
||||
params: <_>::default(),
|
||||
}
|
||||
}
|
||||
@@ -847,6 +873,16 @@ but you must call one of them, or else the connection will never progress.
|
||||
/// Open a WebSocket connection, build an empty client cache, &c,
|
||||
/// to construct a [`DbContextImpl`].
|
||||
fn build_impl(self) -> crate::Result<DbContextImpl<M>> {
|
||||
let extra_logging = self
|
||||
.additional_logging_path
|
||||
.map(|path| {
|
||||
OpenOptions::new().append(true).create(true).open(&path).map_err(|e| {
|
||||
InternalError::new(format!("Failed to open file '{path:?}' for additional logging")).with_cause(e)
|
||||
})
|
||||
})
|
||||
.transpose()?
|
||||
.map(|file| Arc::new(StdMutex::new(file)));
|
||||
|
||||
let (runtime, handle) = enter_or_create_runtime()?;
|
||||
let db_callbacks = DbCallbacks::default();
|
||||
let reducer_callbacks = ReducerCallbacks::default();
|
||||
@@ -866,8 +902,10 @@ but you must call one of them, or else the connection will never progress.
|
||||
source: InternalError::new("Failed to initiate WebSocket connection").with_cause(source),
|
||||
})?;
|
||||
|
||||
let (_websocket_loop_handle, raw_msg_recv, raw_msg_send) = ws_connection.spawn_message_loop(&handle);
|
||||
let (_parse_loop_handle, parsed_recv_chan) = spawn_parse_loop::<M>(raw_msg_recv, &handle);
|
||||
let (_websocket_loop_handle, raw_msg_recv, raw_msg_send) =
|
||||
ws_connection.spawn_message_loop(&handle, extra_logging.clone());
|
||||
let (_parse_loop_handle, parsed_recv_chan) =
|
||||
spawn_parse_loop::<M>(raw_msg_recv, &handle, extra_logging.clone());
|
||||
|
||||
let inner = Arc::new(StdMutex::new(DbContextImplInner {
|
||||
runtime,
|
||||
@@ -882,7 +920,7 @@ but you must call one of them, or else the connection will never progress.
|
||||
procedure_callbacks,
|
||||
}));
|
||||
|
||||
let mut cache = ClientCache::default();
|
||||
let mut cache = ClientCache::new(extra_logging.clone());
|
||||
M::register_tables(&mut cache);
|
||||
let cache = Arc::new(StdMutex::new(cache));
|
||||
let send_chan = Arc::new(StdMutex::new(Some(raw_msg_send)));
|
||||
@@ -898,6 +936,7 @@ but you must call one of them, or else the connection will never progress.
|
||||
pending_mutations_recv: Arc::new(TokioMutex::new(pending_mutations_recv)),
|
||||
identity: Arc::new(StdMutex::new(None)),
|
||||
connection_id: Arc::new(StdMutex::new(connection_id_override)),
|
||||
extra_logging,
|
||||
};
|
||||
|
||||
Ok(ctx_imp)
|
||||
@@ -963,6 +1002,23 @@ but you must call one of them, or else the connection will never progress.
|
||||
self
|
||||
}
|
||||
|
||||
/// Set `path` as a path for additional debug logging related to SDK internals.
|
||||
///
|
||||
/// When enabled, the SDK will create or open `path` for write-append and write logs to it.
|
||||
/// This is useful for diagnosing bugs in the SDK,
|
||||
/// but will generate a large volume of text logs and may have performance overhead,
|
||||
/// so it should not be used in production.
|
||||
///
|
||||
/// When running multiple connections in parallel,
|
||||
/// either within the same process or from separate processes,
|
||||
/// prefer giving each its own unique path here;
|
||||
/// multiple `DbConnection`s writing to the same debug file concurrently
|
||||
/// may interleave or corrupt the output.
|
||||
pub fn with_debug_to_file(mut self, path: impl Into<PathBuf>) -> Self {
|
||||
self.additional_logging_path = Some(path.into());
|
||||
self
|
||||
}
|
||||
|
||||
/// Register a callback to run when the connection is successfully initiated.
|
||||
///
|
||||
/// The callback will receive three arguments:
|
||||
@@ -1043,6 +1099,7 @@ fn enter_or_create_runtime() -> crate::Result<(Option<Runtime>, runtime::Handle)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum ParsedMessage<M: SpacetimeModule> {
|
||||
TransactionUpdate(M::DbUpdate),
|
||||
IdentityToken(Identity, Box<str>, ConnectionId),
|
||||
@@ -1073,9 +1130,10 @@ enum ParsedMessage<M: SpacetimeModule> {
|
||||
fn spawn_parse_loop<M: SpacetimeModule>(
|
||||
raw_message_recv: mpsc::UnboundedReceiver<ws::v2::ServerMessage>,
|
||||
handle: &runtime::Handle,
|
||||
extra_logging: Option<SharedCell<File>>,
|
||||
) -> (tokio::task::JoinHandle<()>, mpsc::UnboundedReceiver<ParsedMessage<M>>) {
|
||||
let (parsed_message_send, parsed_message_recv) = mpsc::unbounded();
|
||||
let handle = handle.spawn(parse_loop(raw_message_recv, parsed_message_send));
|
||||
let handle = handle.spawn(parse_loop(raw_message_recv, parsed_message_send, extra_logging));
|
||||
(handle, parsed_message_recv)
|
||||
}
|
||||
|
||||
@@ -1084,9 +1142,13 @@ fn spawn_parse_loop<M: SpacetimeModule>(
|
||||
async fn parse_loop<M: SpacetimeModule>(
|
||||
mut recv: mpsc::UnboundedReceiver<ws::v2::ServerMessage>,
|
||||
send: mpsc::UnboundedSender<ParsedMessage<M>>,
|
||||
extra_logging: Option<SharedCell<File>>,
|
||||
) {
|
||||
while let Some(msg) = recv.next().await {
|
||||
send.unbounded_send(match msg {
|
||||
debug_log(&extra_logging, |file| {
|
||||
writeln!(file, "`parse_loop`: Got raw message: {msg:?}")
|
||||
});
|
||||
let parsed = match msg {
|
||||
ws::v2::ServerMessage::TransactionUpdate(transaction_update) => {
|
||||
match M::DbUpdate::parse_update(transaction_update) {
|
||||
Err(e) => ParsedMessage::Error(
|
||||
@@ -1210,8 +1272,12 @@ async fn parse_loop<M: SpacetimeModule>(
|
||||
ws::v2::ProcedureStatus::Returned(val) => Ok(val),
|
||||
},
|
||||
},
|
||||
})
|
||||
.expect("Failed to send ParsedMessage to main thread");
|
||||
};
|
||||
debug_log(&extra_logging, |file| {
|
||||
writeln!(file, "`parse_loop`: Parsed as: {parsed:?}")
|
||||
});
|
||||
send.unbounded_send(parsed)
|
||||
.expect("Failed to send ParsedMessage to main thread");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1263,6 +1329,62 @@ pub(crate) enum PendingMutation<M: SpacetimeModule> {
|
||||
},
|
||||
}
|
||||
|
||||
// Hand-written `Debug` impl, 'cause `SubscriptionHandleImpl` and callbacks aren't printable.
|
||||
impl<M: SpacetimeModule> std::fmt::Debug for PendingMutation<M> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match self {
|
||||
PendingMutation::Unsubscribe { query_set_id } => f
|
||||
.debug_struct("PendingMutation::Unsubscribe")
|
||||
.field("query_set_id", query_set_id)
|
||||
.finish(),
|
||||
PendingMutation::Subscribe { query_set_id, .. } => f
|
||||
.debug_struct("PendingMutation::Subscribe")
|
||||
.field("query_set_id", query_set_id)
|
||||
.finish_non_exhaustive(),
|
||||
PendingMutation::AddInsertCallback { table, callback_id, .. } => f
|
||||
.debug_struct("PendingMutation::AddInsertCallback")
|
||||
.field("table", table)
|
||||
.field("callback_id", callback_id)
|
||||
.finish_non_exhaustive(),
|
||||
PendingMutation::RemoveInsertCallback { table, callback_id } => f
|
||||
.debug_struct("PendingMutation::RemoveInsertCallback")
|
||||
.field("table", table)
|
||||
.field("callback_id", callback_id)
|
||||
.finish(),
|
||||
PendingMutation::AddDeleteCallback { table, callback_id, .. } => f
|
||||
.debug_struct("PendingMutation::AddDeleteCallback")
|
||||
.field("table", table)
|
||||
.field("callback_id", callback_id)
|
||||
.finish_non_exhaustive(),
|
||||
PendingMutation::RemoveDeleteCallback { table, callback_id } => f
|
||||
.debug_struct("PendingMutation::RemoveDeleteCallback")
|
||||
.field("table", table)
|
||||
.field("callback_id", callback_id)
|
||||
.finish(),
|
||||
PendingMutation::AddUpdateCallback { table, callback_id, .. } => f
|
||||
.debug_struct("PendingMutation::AddUpdateCallback")
|
||||
.field("table", table)
|
||||
.field("callback_id", callback_id)
|
||||
.finish_non_exhaustive(),
|
||||
PendingMutation::RemoveUpdateCallback { table, callback_id } => f
|
||||
.debug_struct("PendingMutation::RemoveUpdateCallback")
|
||||
.field("table", table)
|
||||
.field("callback_id", callback_id)
|
||||
.finish(),
|
||||
PendingMutation::Disconnect => write!(f, "PendingMutation::Disconnect"),
|
||||
PendingMutation::InvokeReducerWithCallback { reducer, .. } => f
|
||||
.debug_struct("PendingMutation::InvokeReducerWithCallback")
|
||||
.field("reducer", reducer)
|
||||
.finish_non_exhaustive(),
|
||||
PendingMutation::InvokeProcedureWithCallback { procedure, args, .. } => f
|
||||
.debug_struct("PendingMutation::InvokeProcedureWithCallback")
|
||||
.field("procedure", procedure)
|
||||
.field("args", args)
|
||||
.finish_non_exhaustive(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum Message<M: SpacetimeModule> {
|
||||
Ws(Option<ParsedMessage<M>>),
|
||||
Local(PendingMutation<M>),
|
||||
|
||||
@@ -32,7 +32,7 @@ pub struct QueryTableAccessor;
|
||||
|
||||
/// Each module's codegen will define a unit struct which implements this trait,
|
||||
/// with associated type links to various other generated types.
|
||||
pub trait SpacetimeModule: Send + Sync + 'static {
|
||||
pub trait SpacetimeModule: Debug + Send + Sync + 'static {
|
||||
/// [`crate::DbContext`] implementor which exists in the global scope.
|
||||
type DbConnection: DbConnection<Module = Self>;
|
||||
|
||||
@@ -83,7 +83,7 @@ pub trait SpacetimeModule: Send + Sync + 'static {
|
||||
/// Implemented by the autogenerated `DbUpdate` type,
|
||||
/// which is a parsed and typed analogue of [`crate::ws::v2::TransactionUpdate`].
|
||||
pub trait DbUpdate:
|
||||
TryFrom<ws::v2::TransactionUpdate, Error = crate::Error> + Default + InModule + Send + 'static
|
||||
TryFrom<ws::v2::TransactionUpdate, Error = crate::Error> + Default + Debug + InModule + Send + 'static
|
||||
where
|
||||
Self::Module: SpacetimeModule<DbUpdate = Self>,
|
||||
{
|
||||
@@ -217,11 +217,13 @@ where
|
||||
fn unsubscribe(self) -> crate::Result<()>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct WithBsatn<Row> {
|
||||
pub bsatn: Bytes,
|
||||
pub row: Row,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TableUpdate<Row> {
|
||||
pub inserts: Vec<WithBsatn<Row>>,
|
||||
pub deletes: Vec<WithBsatn<Row>>,
|
||||
|
||||
@@ -2,8 +2,10 @@
|
||||
//!
|
||||
//! This module is internal, and may incompatibly change without warning.
|
||||
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use std::mem;
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::Bytes;
|
||||
@@ -24,6 +26,7 @@ use tokio_tungstenite::{
|
||||
};
|
||||
|
||||
use crate::compression::decompress_server_message;
|
||||
use crate::db_connection::debug_log;
|
||||
use crate::metrics::CLIENT_METRICS;
|
||||
|
||||
#[derive(Error, Debug, Clone)]
|
||||
@@ -207,9 +210,11 @@ fn request_insert_auth_header(req: &mut http::Request<()>, token: Option<&str>)
|
||||
///
|
||||
/// Could be trivially written as a function, but macro-ifying it preserves the source location of the log.
|
||||
macro_rules! maybe_log_error {
|
||||
($cause:expr, $res:expr) => {
|
||||
($extra_logging:expr, $cause:expr, $res:expr) => {
|
||||
if let Err(e) = $res {
|
||||
log::warn!("{}: {:?}", $cause, e);
|
||||
let cause = $cause;
|
||||
debug_log($extra_logging, |file| writeln!(file, "{}: {:?}", cause, e));
|
||||
log::warn!("{}: {:?}", cause, e);
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -259,6 +264,7 @@ impl WsConnection {
|
||||
mut self,
|
||||
incoming_messages: mpsc::UnboundedSender<ws::v2::ServerMessage>,
|
||||
outgoing_messages: mpsc::UnboundedReceiver<ws::v2::ClientMessage>,
|
||||
extra_logging: Option<Arc<Mutex<File>>>,
|
||||
) {
|
||||
let websocket_received = CLIENT_METRICS.websocket_received.with_label_values(&self.db_name);
|
||||
let websocket_received_msg_size = CLIENT_METRICS
|
||||
@@ -312,6 +318,7 @@ impl WsConnection {
|
||||
|
||||
Err(e) => {
|
||||
maybe_log_error!(
|
||||
&extra_logging,
|
||||
"Error reading message from read WebSocket stream",
|
||||
Result::<(), _>::Err(e)
|
||||
);
|
||||
@@ -323,10 +330,12 @@ impl WsConnection {
|
||||
record_metrics(bytes.len());
|
||||
match Self::parse_response(&bytes) {
|
||||
Err(e) => maybe_log_error!(
|
||||
&extra_logging,
|
||||
"Error decoding WebSocketMessage::Binary payload",
|
||||
Result::<(), _>::Err(e)
|
||||
),
|
||||
Ok(msg) => maybe_log_error!(
|
||||
&extra_logging,
|
||||
"Error sending decoded message to incoming_messages queue",
|
||||
incoming_messages.unbounded_send(msg)
|
||||
),
|
||||
@@ -350,6 +359,7 @@ impl WsConnection {
|
||||
},
|
||||
|
||||
Ok(Some(other)) => {
|
||||
debug_log(&extra_logging, |file| writeln!(file, "Unexpeccted WebSocket message {other:?}"));
|
||||
log::warn!("Unexpected WebSocket message {other:?}");
|
||||
idle = false;
|
||||
record_metrics(other.len());
|
||||
@@ -360,6 +370,7 @@ impl WsConnection {
|
||||
if mem::replace(&mut idle, true) {
|
||||
if want_pong {
|
||||
// Nothing received while we were waiting for a pong.
|
||||
debug_log(&extra_logging, |file| writeln!(file, "Connection timed out"));
|
||||
log::warn!("Connection timed out");
|
||||
break;
|
||||
}
|
||||
@@ -367,6 +378,7 @@ impl WsConnection {
|
||||
log::trace!("sending client ping");
|
||||
let ping = WebSocketMessage::Ping(Bytes::new());
|
||||
if let Err(e) = self.sock.send(ping).await {
|
||||
debug_log(&extra_logging, |file| writeln!(file, "Error sending ping: {e:?}"));
|
||||
log::warn!("Error sending ping: {e:?}");
|
||||
break;
|
||||
}
|
||||
@@ -379,12 +391,13 @@ impl WsConnection {
|
||||
Some(outgoing) => {
|
||||
let msg = Self::encode_message(outgoing);
|
||||
if let Err(e) = self.sock.send(msg).await {
|
||||
debug_log(&extra_logging, |file| writeln!(file, "Error sending outgoing message: {e:?}"));
|
||||
log::warn!("Error sending outgoing message: {e:?}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
maybe_log_error!("Error sending close frame", SinkExt::close(&mut self.sock).await);
|
||||
maybe_log_error!(&extra_logging, "Error sending close frame", SinkExt::close(&mut self.sock).await);
|
||||
outgoing_messages = None;
|
||||
}
|
||||
},
|
||||
@@ -395,6 +408,7 @@ impl WsConnection {
|
||||
pub(crate) fn spawn_message_loop(
|
||||
self,
|
||||
runtime: &runtime::Handle,
|
||||
extra_logging: Option<Arc<Mutex<File>>>,
|
||||
) -> (
|
||||
JoinHandle<()>,
|
||||
mpsc::UnboundedReceiver<ws::v2::ServerMessage>,
|
||||
@@ -402,7 +416,7 @@ impl WsConnection {
|
||||
) {
|
||||
let (outgoing_send, outgoing_recv) = mpsc::unbounded();
|
||||
let (incoming_send, incoming_recv) = mpsc::unbounded();
|
||||
let handle = runtime.spawn(self.message_loop(incoming_send, outgoing_recv));
|
||||
let handle = runtime.spawn(self.message_loop(incoming_send, outgoing_recv, extra_logging));
|
||||
(handle, incoming_recv, outgoing_send)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
|
||||
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
|
||||
|
||||
// This was generated using spacetimedb cli version 2.0.0 (commit 85095cfa85e3addc29ce58bfe670b6003271b288).
|
||||
// This was generated using spacetimedb cli version 2.0.3 (commit c5743cfc8d2fe70b31f43f275313332524a56476).
|
||||
|
||||
#![allow(unused, clippy::all)]
|
||||
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
|
||||
@@ -56,7 +56,7 @@ impl __sdk::Reducer for Reducer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Debug)]
|
||||
#[allow(non_snake_case)]
|
||||
#[doc(hidden)]
|
||||
pub struct DbUpdate {
|
||||
@@ -156,6 +156,7 @@ impl<'r> __sdk::AppliedDiff<'r> for AppliedDiff<'r> {
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[derive(Debug)]
|
||||
pub struct RemoteModule;
|
||||
|
||||
impl __sdk::InModule for RemoteModule {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
|
||||
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
|
||||
|
||||
// This was generated using spacetimedb cli version 2.0.0 (commit 85095cfa85e3addc29ce58bfe670b6003271b288).
|
||||
// This was generated using spacetimedb cli version 2.0.3 (commit c5743cfc8d2fe70b31f43f275313332524a56476).
|
||||
|
||||
#![allow(unused, clippy::all)]
|
||||
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
|
||||
@@ -62,7 +62,7 @@ impl __sdk::Reducer for Reducer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Debug)]
|
||||
#[allow(non_snake_case)]
|
||||
#[doc(hidden)]
|
||||
pub struct DbUpdate {
|
||||
@@ -149,6 +149,7 @@ impl<'r> __sdk::AppliedDiff<'r> for AppliedDiff<'r> {
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[derive(Debug)]
|
||||
pub struct RemoteModule;
|
||||
|
||||
impl __sdk::InModule for RemoteModule {
|
||||
|
||||
+3
-2
@@ -1,7 +1,7 @@
|
||||
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
|
||||
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
|
||||
|
||||
// This was generated using spacetimedb cli version 2.0.0 (commit 85095cfa85e3addc29ce58bfe670b6003271b288).
|
||||
// This was generated using spacetimedb cli version 2.0.3 (commit c5743cfc8d2fe70b31f43f275313332524a56476).
|
||||
|
||||
#![allow(unused, clippy::all)]
|
||||
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
|
||||
@@ -83,7 +83,7 @@ impl __sdk::Reducer for Reducer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Debug)]
|
||||
#[allow(non_snake_case)]
|
||||
#[doc(hidden)]
|
||||
pub struct DbUpdate {
|
||||
@@ -216,6 +216,7 @@ impl<'r> __sdk::AppliedDiff<'r> for AppliedDiff<'r> {
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[derive(Debug)]
|
||||
pub struct RemoteModule;
|
||||
|
||||
impl __sdk::InModule for RemoteModule {
|
||||
|
||||
+3
-2
@@ -1,7 +1,7 @@
|
||||
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
|
||||
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
|
||||
|
||||
// This was generated using spacetimedb cli version 2.0.0 (commit e528393902d8cc982769e3b1a0f250d7d53edfa1).
|
||||
// This was generated using spacetimedb cli version 2.0.3 (commit c5743cfc8d2fe70b31f43f275313332524a56476).
|
||||
|
||||
#![allow(unused, clippy::all)]
|
||||
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
|
||||
@@ -2648,7 +2648,7 @@ impl __sdk::Reducer for Reducer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Debug)]
|
||||
#[allow(non_snake_case)]
|
||||
#[doc(hidden)]
|
||||
pub struct DbUpdate {
|
||||
@@ -4259,6 +4259,7 @@ impl<'r> __sdk::AppliedDiff<'r> for AppliedDiff<'r> {
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[derive(Debug)]
|
||||
pub struct RemoteModule;
|
||||
|
||||
impl __sdk::InModule for RemoteModule {
|
||||
|
||||
+3
-2
@@ -1,7 +1,7 @@
|
||||
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
|
||||
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
|
||||
|
||||
// This was generated using spacetimedb cli version 2.0.0 (commit 85095cfa85e3addc29ce58bfe670b6003271b288).
|
||||
// This was generated using spacetimedb cli version 2.0.4 (commit fe987f3e3103528cd95cc86e13fcf206196672c7).
|
||||
|
||||
#![allow(unused, clippy::all)]
|
||||
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
|
||||
@@ -83,7 +83,7 @@ impl __sdk::Reducer for Reducer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Debug)]
|
||||
#[allow(non_snake_case)]
|
||||
#[doc(hidden)]
|
||||
pub struct DbUpdate {
|
||||
@@ -249,6 +249,7 @@ impl<'r> __sdk::AppliedDiff<'r> for AppliedDiff<'r> {
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[derive(Debug)]
|
||||
pub struct RemoteModule;
|
||||
|
||||
impl __sdk::InModule for RemoteModule {
|
||||
|
||||
+3
-2
@@ -1,7 +1,7 @@
|
||||
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
|
||||
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
|
||||
|
||||
// This was generated using spacetimedb cli version 2.0.3 (commit 995798d29d314301cb475e2cd499f32a1691ea90).
|
||||
// This was generated using spacetimedb cli version 2.0.4 (commit fe987f3e3103528cd95cc86e13fcf206196672c7).
|
||||
|
||||
#![allow(unused, clippy::all)]
|
||||
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
|
||||
@@ -94,7 +94,7 @@ impl __sdk::Reducer for Reducer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Debug)]
|
||||
#[allow(non_snake_case)]
|
||||
#[doc(hidden)]
|
||||
pub struct DbUpdate {
|
||||
@@ -273,6 +273,7 @@ impl<'r> __sdk::AppliedDiff<'r> for AppliedDiff<'r> {
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[derive(Debug)]
|
||||
pub struct RemoteModule;
|
||||
|
||||
impl __sdk::InModule for RemoteModule {
|
||||
|
||||
+3
-2
@@ -1,7 +1,7 @@
|
||||
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
|
||||
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
|
||||
|
||||
// This was generated using spacetimedb cli version 2.0.0 (commit 5183461758f0c9cb05138a5054c947a7aeb8497e).
|
||||
// This was generated using spacetimedb cli version 2.0.4 (commit 912be0bbc1fedbe62ab968e1a0775235ab7f650d).
|
||||
|
||||
#![allow(unused, clippy::all)]
|
||||
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
|
||||
@@ -50,7 +50,7 @@ impl __sdk::Reducer for Reducer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Debug)]
|
||||
#[allow(non_snake_case)]
|
||||
#[doc(hidden)]
|
||||
pub struct DbUpdate {
|
||||
@@ -135,6 +135,7 @@ impl<'r> __sdk::AppliedDiff<'r> for AppliedDiff<'r> {
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[derive(Debug)]
|
||||
pub struct RemoteModule;
|
||||
|
||||
impl __sdk::InModule for RemoteModule {
|
||||
|
||||
+3
-2
@@ -1,7 +1,7 @@
|
||||
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
|
||||
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
|
||||
|
||||
// This was generated using spacetimedb cli version 2.0.0 (commit 5183461758f0c9cb05138a5054c947a7aeb8497e).
|
||||
// This was generated using spacetimedb cli version 2.0.4 (commit 912be0bbc1fedbe62ab968e1a0775235ab7f650d).
|
||||
|
||||
#![allow(unused, clippy::all)]
|
||||
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
|
||||
@@ -50,7 +50,7 @@ impl __sdk::Reducer for Reducer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Debug)]
|
||||
#[allow(non_snake_case)]
|
||||
#[doc(hidden)]
|
||||
pub struct DbUpdate {
|
||||
@@ -135,6 +135,7 @@ impl<'r> __sdk::AppliedDiff<'r> for AppliedDiff<'r> {
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[derive(Debug)]
|
||||
pub struct RemoteModule;
|
||||
|
||||
impl __sdk::InModule for RemoteModule {
|
||||
|
||||
+3
-2
@@ -1,7 +1,7 @@
|
||||
// THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE
|
||||
// WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD.
|
||||
|
||||
// This was generated using spacetimedb cli version 2.0.0 (commit 9e0e81a6aaec6bf3619cfb9f7916743d86ab7ffc).
|
||||
// This was generated using spacetimedb cli version 2.0.4 (commit 912be0bbc1fedbe62ab968e1a0775235ab7f650d).
|
||||
|
||||
#![allow(unused, clippy::all)]
|
||||
use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};
|
||||
@@ -56,7 +56,7 @@ impl __sdk::Reducer for Reducer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Debug)]
|
||||
#[allow(non_snake_case)]
|
||||
#[doc(hidden)]
|
||||
pub struct DbUpdate {
|
||||
@@ -156,6 +156,7 @@ impl<'r> __sdk::AppliedDiff<'r> for AppliedDiff<'r> {
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[derive(Debug)]
|
||||
pub struct RemoteModule;
|
||||
|
||||
impl __sdk::InModule for RemoteModule {
|
||||
|
||||
Reference in New Issue
Block a user