mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-15 12:18:01 -04:00
e77b62f475
# Description of Changes We've run into a problem on Maincloud caused by a database that was writing a relatively small number of very large transactions. This was accruing many commitlog segments consuming hundreds of gigabytes of disk, but had not ever taken a snapshot, or compressed or archived any data, as the database had not progressed past one million transactions. With this PR, we take a snapshot every time the commitlog segment rotates. We still also snapshot every million transactions. One BitCraft database we looked at had 2.5 million transactions per commitlog segment, meaning that this change will not meaningfully affect the frequency of snapshots. The offending Maincloud database, however, had only 50 transactions per segment! # API and ABI breaking changes N/a # Expected complexity level and risk 3: Hastily made changes to finnicky code across several crates. # Testing I am unsure how to test these changes. - [ ] <!-- maybe a test you want to do --> - [ ] <!-- maybe a test you want a reviewer to do, so they can check it off when they're satisfied. -->
116 lines
3.2 KiB
Rust
116 lines
3.2 KiB
Rust
use std::num::NonZeroU16;
|
|
|
|
use spacetimedb_commitlog::{payload, Commitlog, Options};
|
|
use spacetimedb_paths::server::CommitLogDir;
|
|
use spacetimedb_paths::FromPathUnchecked;
|
|
use tempfile::tempdir;
|
|
|
|
pub fn gen_payload() -> [u8; 256] {
|
|
rand::random()
|
|
}
|
|
|
|
#[test]
|
|
fn smoke() {
|
|
let root = tempdir().unwrap();
|
|
let clog = Commitlog::open(
|
|
CommitLogDir::from_path_unchecked(root.path()),
|
|
Options {
|
|
max_segment_size: 8 * 1024,
|
|
max_records_in_commit: NonZeroU16::MIN,
|
|
..Options::default()
|
|
},
|
|
None,
|
|
)
|
|
.unwrap();
|
|
|
|
let n_txs = 500;
|
|
let payload = gen_payload();
|
|
for _ in 0..n_txs {
|
|
clog.append_maybe_flush(payload).unwrap();
|
|
}
|
|
let committed_offset = clog.flush_and_sync().unwrap();
|
|
|
|
assert_eq!(n_txs - 1, committed_offset.unwrap() as usize);
|
|
assert_eq!(
|
|
n_txs,
|
|
clog.transactions(&payload::ArrayDecoder).map(Result::unwrap).count()
|
|
);
|
|
// We set max_records_in_commit to 1, so n_commits == n_txs
|
|
assert_eq!(n_txs, clog.commits().map(Result::unwrap).count());
|
|
}
|
|
|
|
#[test]
|
|
fn resets() {
|
|
let root = tempdir().unwrap();
|
|
let mut clog = Commitlog::open(
|
|
CommitLogDir::from_path_unchecked(root.path()),
|
|
Options {
|
|
max_segment_size: 512,
|
|
max_records_in_commit: NonZeroU16::MIN,
|
|
..Options::default()
|
|
},
|
|
None,
|
|
)
|
|
.unwrap();
|
|
|
|
let payload = gen_payload();
|
|
for _ in 0..50 {
|
|
clog.append_maybe_flush(payload).unwrap();
|
|
}
|
|
clog.flush_and_sync().unwrap();
|
|
|
|
for offset in (0..50).rev() {
|
|
clog = clog.reset_to(offset).unwrap();
|
|
assert_eq!(
|
|
offset,
|
|
clog.transactions(&payload::ArrayDecoder)
|
|
.map(Result::unwrap)
|
|
.last()
|
|
.unwrap()
|
|
.offset
|
|
);
|
|
// We're counting from zero, so offset + 1 is the # of txs.
|
|
assert_eq!(
|
|
offset + 1,
|
|
clog.transactions(&payload::ArrayDecoder).map(Result::unwrap).count() as u64
|
|
);
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn compression() {
|
|
let root = tempdir().unwrap();
|
|
let clog = Commitlog::open(
|
|
CommitLogDir::from_path_unchecked(root.path()),
|
|
Options {
|
|
max_segment_size: 8 * 1024,
|
|
max_records_in_commit: NonZeroU16::MIN,
|
|
..Options::default()
|
|
},
|
|
None,
|
|
)
|
|
.unwrap();
|
|
|
|
// try to generate commitlogs that will be amenable to compression -
|
|
// random data doesn't compress well, so try and have there be repetition
|
|
let payloads = (0..4).map(|_| gen_payload()).cycle().take(1024).collect::<Vec<_>>();
|
|
for payload in &payloads {
|
|
clog.append_maybe_flush(*payload).unwrap();
|
|
}
|
|
clog.flush_and_sync().unwrap();
|
|
|
|
let uncompressed_size = clog.size_on_disk().unwrap();
|
|
|
|
let mut segments_to_compress = clog.existing_segment_offsets().unwrap();
|
|
segments_to_compress.retain(|&off| off < 20);
|
|
clog.compress_segments(&segments_to_compress).unwrap();
|
|
|
|
assert!(clog.size_on_disk().unwrap() < uncompressed_size);
|
|
|
|
assert!(clog
|
|
.transactions(&payload::ArrayDecoder)
|
|
.map(Result::unwrap)
|
|
.enumerate()
|
|
.all(|(i, x)| x.offset == i as u64 && x.txdata == payloads[i]));
|
|
}
|