mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-13 19:27:46 -04:00
cfd0d4b712
When a new commitlog segment is created, allocate disk space for it up to the maximum segment size. Also do this when resuming writes to an existing segment, such that segments created without preallocation will allocate as well when the database is opened. Preallocation is gated behind the feature "fallocate", because it is not always desirable to preallocate, e.g. for local `standalone` users. The feature can only be enabled on Linux targets, because allocation is done using the Linux-specific `fallocate(2)` system call. Unlike `ftruncate(2)` or the portable `posix_fallocate(3)`, `fallocate(2)` supports allocating disk space without zeroing. This is currently required, because the commitlog format does not handle padding bytes. If not enough space can be allocated, the commitlog refuses writes. For commitlogs that were created without preallocation, this means that the commitlog cannot even be opened in this situation. The local durability impl will crash if it detects that the commitlog is unable to allocate enough space. This means that a database will eventually crash and be unable to start in an out-of-space situation. Allocated space is not included in the reported size of the commitlog. Instead, allocated blocks are reported separately. # Expected complexity level and risk 3 - Disk size monitoring may need to be adjusted. # Testing - [x] Adds a test that demonstrates the crash behavior of [`spacetimedb_durability::Local`] when there is insufficient space. The test performs I/O against a loop device. - [x] Modified the `repo::Memory` impl so that it can run out of space. No test currently utilizes this, but existing tests assuming infinite space still pass.
122 lines
3.5 KiB
Rust
122 lines
3.5 KiB
Rust
use std::num::NonZeroU16;
|
|
|
|
use log::info;
|
|
use spacetimedb_commitlog::tests::helpers::enable_logging;
|
|
use spacetimedb_commitlog::{payload, Commitlog, Options};
|
|
use spacetimedb_paths::server::CommitLogDir;
|
|
use spacetimedb_paths::FromPathUnchecked;
|
|
use tempfile::tempdir;
|
|
|
|
pub fn gen_payload() -> [u8; 256] {
|
|
rand::random()
|
|
}
|
|
|
|
#[test]
|
|
fn smoke() {
|
|
let root = tempdir().unwrap();
|
|
let clog = Commitlog::open(
|
|
CommitLogDir::from_path_unchecked(root.path()),
|
|
Options {
|
|
max_segment_size: 8 * 1024,
|
|
max_records_in_commit: NonZeroU16::MIN,
|
|
..Options::default()
|
|
},
|
|
None,
|
|
)
|
|
.unwrap();
|
|
|
|
let n_txs = 500;
|
|
let payload = gen_payload();
|
|
for _ in 0..n_txs {
|
|
clog.append_maybe_flush(payload).unwrap();
|
|
}
|
|
let committed_offset = clog.flush_and_sync().unwrap();
|
|
|
|
assert_eq!(n_txs - 1, committed_offset.unwrap() as usize);
|
|
assert_eq!(
|
|
n_txs,
|
|
clog.transactions(&payload::ArrayDecoder).map(Result::unwrap).count()
|
|
);
|
|
// We set max_records_in_commit to 1, so n_commits == n_txs
|
|
assert_eq!(n_txs, clog.commits().map(Result::unwrap).count());
|
|
}
|
|
|
|
#[test]
|
|
fn resets() {
|
|
let root = tempdir().unwrap();
|
|
let mut clog = Commitlog::open(
|
|
CommitLogDir::from_path_unchecked(root.path()),
|
|
Options {
|
|
max_segment_size: 512,
|
|
max_records_in_commit: NonZeroU16::MIN,
|
|
..Options::default()
|
|
},
|
|
None,
|
|
)
|
|
.unwrap();
|
|
|
|
let payload = gen_payload();
|
|
for _ in 0..50 {
|
|
clog.append_maybe_flush(payload).unwrap();
|
|
}
|
|
clog.flush_and_sync().unwrap();
|
|
|
|
for offset in (0..50).rev() {
|
|
clog = clog.reset_to(offset).unwrap();
|
|
assert_eq!(
|
|
offset,
|
|
clog.transactions(&payload::ArrayDecoder)
|
|
.map(Result::unwrap)
|
|
.last()
|
|
.unwrap()
|
|
.offset
|
|
);
|
|
// We're counting from zero, so offset + 1 is the # of txs.
|
|
assert_eq!(
|
|
offset + 1,
|
|
clog.transactions(&payload::ArrayDecoder).map(Result::unwrap).count() as u64
|
|
);
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn compression() {
|
|
enable_logging();
|
|
|
|
let root = tempdir().unwrap();
|
|
let clog = Commitlog::open(
|
|
CommitLogDir::from_path_unchecked(root.path()),
|
|
Options {
|
|
max_segment_size: 8 * 1024,
|
|
max_records_in_commit: NonZeroU16::MIN,
|
|
..Options::default()
|
|
},
|
|
None,
|
|
)
|
|
.unwrap();
|
|
|
|
// try to generate commitlogs that will be amenable to compression -
|
|
// random data doesn't compress well, so try and have there be repetition
|
|
let payloads = (0..4).map(|_| gen_payload()).cycle().take(1024).collect::<Vec<_>>();
|
|
for payload in &payloads {
|
|
clog.append_maybe_flush(*payload).unwrap();
|
|
}
|
|
clog.flush_and_sync().unwrap();
|
|
|
|
let uncompressed_size = clog.size_on_disk().unwrap();
|
|
|
|
let segments = clog.existing_segment_offsets().unwrap();
|
|
let segments_to_compress = &segments[..segments.len() / 2];
|
|
info!("segments: {segments:?} compressing: {segments_to_compress:?}");
|
|
clog.compress_segments(segments_to_compress).unwrap();
|
|
|
|
let compressed_size = clog.size_on_disk().unwrap();
|
|
assert!(compressed_size.total_bytes < uncompressed_size.total_bytes);
|
|
|
|
assert!(clog
|
|
.transactions(&payload::ArrayDecoder)
|
|
.map(Result::unwrap)
|
|
.enumerate()
|
|
.all(|(i, x)| x.offset == i as u64 && x.txdata == payloads[i]));
|
|
}
|