mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-16 12:48:20 -04:00
Use rayon instead of tokio blocking thread for serialize
This commit is contained in:
@@ -26,7 +26,7 @@ use spacetimedb::client::{
|
||||
use spacetimedb::execution_context::WorkloadType;
|
||||
use spacetimedb::host::module_host::ClientConnectedError;
|
||||
use spacetimedb::host::NoSuchModule;
|
||||
use spacetimedb::util::asyncify;
|
||||
use spacetimedb::util::spawn_rayon;
|
||||
use spacetimedb::worker_metrics::WORKER_METRICS;
|
||||
use spacetimedb::Identity;
|
||||
use spacetimedb_client_api_messages::websocket::{self as ws_api, Compression};
|
||||
@@ -527,22 +527,20 @@ async fn send_message(
|
||||
message: impl ToProtocol<Encoded = SwitchedServerMessage> + Send + 'static,
|
||||
) -> (SerializeBuffer, Result<(), WsError>) {
|
||||
let (workload, num_rows) = metrics_metadata.unzip();
|
||||
let start_serialize = Instant::now();
|
||||
// Move large messages to a blocking thread,
|
||||
// Move large messages to a rayon thread,
|
||||
// as serialization and compression can take a long time.
|
||||
// The threshold of 1024 rows is arbitrary, and may need to be refined.
|
||||
let (msg_alloc, msg_data) = if num_rows.is_some_and(|n| n > 1024) {
|
||||
asyncify(move || serialize(serialize_buf, message, config)).await
|
||||
} else {
|
||||
serialize(serialize_buf, message, config)
|
||||
let serialize_and_compress = |serialize_buf, message, config| {
|
||||
let start = Instant::now();
|
||||
let (msg_alloc, msg_data) = serialize(serialize_buf, message, config);
|
||||
(start.elapsed(), msg_alloc, msg_data)
|
||||
};
|
||||
report_ws_sent_metrics(
|
||||
database_identity,
|
||||
workload,
|
||||
num_rows,
|
||||
start_serialize.elapsed(),
|
||||
&msg_data,
|
||||
);
|
||||
let (timing, msg_alloc, msg_data) = if num_rows.is_some_and(|n| n > 1024) {
|
||||
spawn_rayon(move || serialize_and_compress(serialize_buf, message, config)).await
|
||||
} else {
|
||||
serialize_and_compress(serialize_buf, message, config)
|
||||
};
|
||||
report_ws_sent_metrics(database_identity, workload, num_rows, timing, &msg_data);
|
||||
|
||||
let res = async {
|
||||
ws.feed(datamsg_to_wsmsg(msg_data)).await?;
|
||||
|
||||
Reference in New Issue
Block a user