box not vec & extract brotli compression

This commit is contained in:
Mazdak Farrokhzad
2024-09-23 20:00:04 +02:00
parent f357563c2a
commit e5148b97ea
5 changed files with 54 additions and 42 deletions
+36 -31
View File
@@ -33,41 +33,46 @@ pub fn serialize(msg: impl ToProtocol<Encoded = SwitchedServerMessage>, protocol
FormatSwitch::Json(msg) => serde_json::to_string(&SerializeWrapper::new(msg)).unwrap().into(),
FormatSwitch::Bsatn(msg) => {
let msg_bytes = bsatn::to_vec(&msg).unwrap();
let reader = &mut &msg_bytes[..];
// TODO(perf): Compression should depend on message size and type.
//
// SubscriptionUpdate messages will typically be quite large,
// while TransactionUpdate messages will typically be quite small.
//
// If we are optimizing for SubscriptionUpdates,
// we want a large buffer.
// But if we are optimizing for TransactionUpdates,
// we probably want to skip compression altogether.
//
// For now we choose a reasonable middle ground,
// which is to compress everything using a 32KB buffer.
const BUFFER_SIZE: usize = 32 * 1024;
// Again we are optimizing for compression speed,
// so we choose the lowest (fastest) level of compression.
// Experiments on internal workloads have shown compression ratios between 7:1 and 10:1
// for large `SubscriptionUpdate` messages at this level.
const COMPRESSION_LEVEL: u32 = 1;
// The default value for an internal compression parameter.
// See `BrotliEncoderParams` for more details.
const LG_WIN: u32 = 22;
let mut encoder = CompressorReader::new(reader, BUFFER_SIZE, COMPRESSION_LEVEL, LG_WIN);
let mut out = Vec::new();
encoder
.read_to_end(&mut out)
.expect("Failed to Brotli compress `SubscriptionUpdateMessage`");
out.into()
let msg_bytes = brotli_compress(&msg_bytes);
msg_bytes.into()
}
}
}
fn brotli_compress(bytes: &[u8]) -> Vec<u8> {
let reader = &mut &bytes[..];
// TODO(perf): Compression should depend on message size and type.
//
// SubscriptionUpdate messages will typically be quite large,
// while TransactionUpdate messages will typically be quite small.
//
// If we are optimizing for SubscriptionUpdates,
// we want a large buffer.
// But if we are optimizing for TransactionUpdates,
// we probably want to skip compression altogether.
//
// For now we choose a reasonable middle ground,
// which is to compress everything using a 32KB buffer.
const BUFFER_SIZE: usize = 32 * 1024;
// Again we are optimizing for compression speed,
// so we choose the lowest (fastest) level of compression.
// Experiments on internal workloads have shown compression ratios between 7:1 and 10:1
// for large `SubscriptionUpdate` messages at this level.
const COMPRESSION_LEVEL: u32 = 1;
// The default value for an internal compression parameter.
// See `BrotliEncoderParams` for more details.
const LG_WIN: u32 = 22;
let mut encoder = CompressorReader::new(reader, BUFFER_SIZE, COMPRESSION_LEVEL, LG_WIN);
let mut out = Vec::new();
encoder
.read_to_end(&mut out)
.expect("Failed to Brotli compress `SubscriptionUpdateMessage`");
out
}
#[derive(Debug, From)]
pub enum SerializableMessage {
Query(OneOffQueryResponseMessage),
+3 -3
View File
@@ -102,7 +102,7 @@ pub struct DatabaseTableUpdate {
#[derive(Debug)]
pub struct DatabaseUpdateRelValue<'a> {
pub tables: Vec<DatabaseTableUpdateRelValue<'a>>,
pub tables: Box<[DatabaseTableUpdateRelValue<'a>]>,
}
#[derive(PartialEq, Debug)]
@@ -114,8 +114,8 @@ pub struct DatabaseTableUpdateRelValue<'a> {
#[derive(Default, PartialEq, Debug)]
pub struct UpdatesRelValue<'a> {
pub deletes: Vec<RelValue<'a>>,
pub inserts: Vec<RelValue<'a>>,
pub deletes: Box<[RelValue<'a>]>,
pub inserts: Box<[RelValue<'a>]>,
}
impl UpdatesRelValue<'_> {
@@ -295,6 +295,9 @@ impl ExecutionUnit {
Self::collect_rows(&mut deletes, query);
}
}
let deletes = deletes.into();
let inserts = inserts.into();
UpdatesRelValue { deletes, inserts }
}
+8 -8
View File
@@ -275,12 +275,12 @@ mod tests {
let result = result
.tables
.into_iter()
.map(|u| u.updates)
.iter()
.map(|u| &u.updates)
.flat_map(|u| {
u.deletes
.iter()
.chain(&u.inserts)
.chain(&*u.inserts)
.map(|rv| rv.clone().into_product_value())
.collect::<Vec<_>>()
})
@@ -729,14 +729,14 @@ mod tests {
let result = query.eval_incr_for_test(ctx, db, &tx, &update, None);
let tables = result
.tables
.into_iter()
.iter()
.map(|update| {
let convert = |rvs: Vec<_>| rvs.into_iter().map(RelValue::into_product_value).collect();
let convert = |rvs: &[_]| rvs.iter().cloned().map(RelValue::into_product_value).collect();
DatabaseTableUpdate {
table_id: update.table_id,
table_name: update.table_name,
deletes: convert(update.updates.deletes),
inserts: convert(update.updates.inserts),
table_name: update.table_name.clone(),
deletes: convert(&update.updates.deletes),
inserts: convert(&update.updates.inserts),
}
})
.collect();
@@ -481,6 +481,8 @@ impl IncrementalJoin {
}
inserts.extend(join_5);
let deletes = deletes.into();
let inserts = inserts.into();
UpdatesRelValue { deletes, inserts }
}
}
@@ -554,6 +556,8 @@ impl ExecutionSet {
tables.push(table);
}
}
let tables = tables.into();
DatabaseUpdateRelValue { tables }
}