From e5148b97ea92f1556597fe4ebec79eda4e33121c Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Mon, 23 Sep 2024 20:00:04 +0200 Subject: [PATCH] box not vec & extract brotli compression --- crates/core/src/client/messages.rs | 67 ++++++++++--------- crates/core/src/host/module_host.rs | 6 +- .../core/src/subscription/execution_unit.rs | 3 + crates/core/src/subscription/query.rs | 16 ++--- crates/core/src/subscription/subscription.rs | 4 ++ 5 files changed, 54 insertions(+), 42 deletions(-) diff --git a/crates/core/src/client/messages.rs b/crates/core/src/client/messages.rs index c7e255c2ed..5fb97dccde 100644 --- a/crates/core/src/client/messages.rs +++ b/crates/core/src/client/messages.rs @@ -33,41 +33,46 @@ pub fn serialize(msg: impl ToProtocol, protocol FormatSwitch::Json(msg) => serde_json::to_string(&SerializeWrapper::new(msg)).unwrap().into(), FormatSwitch::Bsatn(msg) => { let msg_bytes = bsatn::to_vec(&msg).unwrap(); - let reader = &mut &msg_bytes[..]; - - // TODO(perf): Compression should depend on message size and type. - // - // SubscriptionUpdate messages will typically be quite large, - // while TransactionUpdate messages will typically be quite small. - // - // If we are optimizing for SubscriptionUpdates, - // we want a large buffer. - // But if we are optimizing for TransactionUpdates, - // we probably want to skip compression altogether. - // - // For now we choose a reasonable middle ground, - // which is to compress everything using a 32KB buffer. - const BUFFER_SIZE: usize = 32 * 1024; - // Again we are optimizing for compression speed, - // so we choose the lowest (fastest) level of compression. - // Experiments on internal workloads have shown compression ratios between 7:1 and 10:1 - // for large `SubscriptionUpdate` messages at this level. - const COMPRESSION_LEVEL: u32 = 1; - // The default value for an internal compression parameter. - // See `BrotliEncoderParams` for more details. - const LG_WIN: u32 = 22; - - let mut encoder = CompressorReader::new(reader, BUFFER_SIZE, COMPRESSION_LEVEL, LG_WIN); - - let mut out = Vec::new(); - encoder - .read_to_end(&mut out) - .expect("Failed to Brotli compress `SubscriptionUpdateMessage`"); - out.into() + let msg_bytes = brotli_compress(&msg_bytes); + msg_bytes.into() } } } +fn brotli_compress(bytes: &[u8]) -> Vec { + let reader = &mut &bytes[..]; + + // TODO(perf): Compression should depend on message size and type. + // + // SubscriptionUpdate messages will typically be quite large, + // while TransactionUpdate messages will typically be quite small. + // + // If we are optimizing for SubscriptionUpdates, + // we want a large buffer. + // But if we are optimizing for TransactionUpdates, + // we probably want to skip compression altogether. + // + // For now we choose a reasonable middle ground, + // which is to compress everything using a 32KB buffer. + const BUFFER_SIZE: usize = 32 * 1024; + // Again we are optimizing for compression speed, + // so we choose the lowest (fastest) level of compression. + // Experiments on internal workloads have shown compression ratios between 7:1 and 10:1 + // for large `SubscriptionUpdate` messages at this level. + const COMPRESSION_LEVEL: u32 = 1; + // The default value for an internal compression parameter. + // See `BrotliEncoderParams` for more details. + const LG_WIN: u32 = 22; + + let mut encoder = CompressorReader::new(reader, BUFFER_SIZE, COMPRESSION_LEVEL, LG_WIN); + + let mut out = Vec::new(); + encoder + .read_to_end(&mut out) + .expect("Failed to Brotli compress `SubscriptionUpdateMessage`"); + out +} + #[derive(Debug, From)] pub enum SerializableMessage { Query(OneOffQueryResponseMessage), diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 276a2292ac..23fadaf150 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -102,7 +102,7 @@ pub struct DatabaseTableUpdate { #[derive(Debug)] pub struct DatabaseUpdateRelValue<'a> { - pub tables: Vec>, + pub tables: Box<[DatabaseTableUpdateRelValue<'a>]>, } #[derive(PartialEq, Debug)] @@ -114,8 +114,8 @@ pub struct DatabaseTableUpdateRelValue<'a> { #[derive(Default, PartialEq, Debug)] pub struct UpdatesRelValue<'a> { - pub deletes: Vec>, - pub inserts: Vec>, + pub deletes: Box<[RelValue<'a>]>, + pub inserts: Box<[RelValue<'a>]>, } impl UpdatesRelValue<'_> { diff --git a/crates/core/src/subscription/execution_unit.rs b/crates/core/src/subscription/execution_unit.rs index 48c50c3e1d..c15b9f688f 100644 --- a/crates/core/src/subscription/execution_unit.rs +++ b/crates/core/src/subscription/execution_unit.rs @@ -295,6 +295,9 @@ impl ExecutionUnit { Self::collect_rows(&mut deletes, query); } } + + let deletes = deletes.into(); + let inserts = inserts.into(); UpdatesRelValue { deletes, inserts } } diff --git a/crates/core/src/subscription/query.rs b/crates/core/src/subscription/query.rs index 86b4de2247..a0090980b7 100644 --- a/crates/core/src/subscription/query.rs +++ b/crates/core/src/subscription/query.rs @@ -275,12 +275,12 @@ mod tests { let result = result .tables - .into_iter() - .map(|u| u.updates) + .iter() + .map(|u| &u.updates) .flat_map(|u| { u.deletes .iter() - .chain(&u.inserts) + .chain(&*u.inserts) .map(|rv| rv.clone().into_product_value()) .collect::>() }) @@ -729,14 +729,14 @@ mod tests { let result = query.eval_incr_for_test(ctx, db, &tx, &update, None); let tables = result .tables - .into_iter() + .iter() .map(|update| { - let convert = |rvs: Vec<_>| rvs.into_iter().map(RelValue::into_product_value).collect(); + let convert = |rvs: &[_]| rvs.iter().cloned().map(RelValue::into_product_value).collect(); DatabaseTableUpdate { table_id: update.table_id, - table_name: update.table_name, - deletes: convert(update.updates.deletes), - inserts: convert(update.updates.inserts), + table_name: update.table_name.clone(), + deletes: convert(&update.updates.deletes), + inserts: convert(&update.updates.inserts), } }) .collect(); diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index 666aa55fe4..1cdcf9607d 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -481,6 +481,8 @@ impl IncrementalJoin { } inserts.extend(join_5); + let deletes = deletes.into(); + let inserts = inserts.into(); UpdatesRelValue { deletes, inserts } } } @@ -554,6 +556,8 @@ impl ExecutionSet { tables.push(table); } } + + let tables = tables.into(); DatabaseUpdateRelValue { tables } }