diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index f1c7a73219..432110f2ef 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -26,7 +26,7 @@ use spacetimedb::client::{ use spacetimedb::execution_context::WorkloadType; use spacetimedb::host::module_host::ClientConnectedError; use spacetimedb::host::NoSuchModule; -use spacetimedb::util::asyncify; +use spacetimedb::util::spawn_rayon; use spacetimedb::worker_metrics::WORKER_METRICS; use spacetimedb::Identity; use spacetimedb_client_api_messages::websocket::{self as ws_api, Compression}; @@ -527,22 +527,20 @@ async fn send_message( message: impl ToProtocol + Send + 'static, ) -> (SerializeBuffer, Result<(), WsError>) { let (workload, num_rows) = metrics_metadata.unzip(); - let start_serialize = Instant::now(); - // Move large messages to a blocking thread, + // Move large messages to a rayon thread, // as serialization and compression can take a long time. // The threshold of 1024 rows is arbitrary, and may need to be refined. - let (msg_alloc, msg_data) = if num_rows.is_some_and(|n| n > 1024) { - asyncify(move || serialize(serialize_buf, message, config)).await - } else { - serialize(serialize_buf, message, config) + let serialize_and_compress = |serialize_buf, message, config| { + let start = Instant::now(); + let (msg_alloc, msg_data) = serialize(serialize_buf, message, config); + (start.elapsed(), msg_alloc, msg_data) }; - report_ws_sent_metrics( - database_identity, - workload, - num_rows, - start_serialize.elapsed(), - &msg_data, - ); + let (timing, msg_alloc, msg_data) = if num_rows.is_some_and(|n| n > 1024) { + spawn_rayon(move || serialize_and_compress(serialize_buf, message, config)).await + } else { + serialize_and_compress(serialize_buf, message, config) + }; + report_ws_sent_metrics(database_identity, workload, num_rows, timing, &msg_data); let res = async { ws.feed(datamsg_to_wsmsg(msg_data)).await?;