mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-06-28 00:38:30 -04:00
2af138fb8f
`read_exact` doesn't distinguish between EOF and not enough bytes to fill the given buffer. We may thus consider a segment valid to be resumed, but leave in trailing bytes (less than the commit header length). That can cause silent data loss, because appending more data will render the segment corrupt: a restart will then start a new segment, leaving anything after the trailing bytes unreachable. To fix this, truncate the segment to the size determined by `Metadata::extract` before resuming writes. # Expected complexity level and risk 2 # Testing Added a test
258 lines
8.0 KiB
Rust
258 lines
8.0 KiB
Rust
use std::io::Write;
|
|
|
|
use log::info;
|
|
use spacetimedb_commitlog::repo::{Repo, SegmentLen};
|
|
use spacetimedb_commitlog::tests::helpers::enable_logging;
|
|
use spacetimedb_commitlog::{commit, commitlog, payload, repo, Commitlog, Options};
|
|
use spacetimedb_paths::server::CommitLogDir;
|
|
use spacetimedb_paths::FromPathUnchecked;
|
|
use tempfile::tempdir;
|
|
|
|
pub fn gen_payload() -> [u8; 256] {
|
|
rand::random()
|
|
}
|
|
|
|
#[test]
|
|
fn smoke() {
|
|
let root = tempdir().unwrap();
|
|
let clog = Commitlog::open(
|
|
CommitLogDir::from_path_unchecked(root.path()),
|
|
Options {
|
|
max_segment_size: 8 * 1024,
|
|
..Options::default()
|
|
},
|
|
None,
|
|
)
|
|
.unwrap();
|
|
|
|
let n_txs = 500;
|
|
let payload = gen_payload();
|
|
for i in 0..n_txs {
|
|
clog.commit([(i, payload)]).unwrap();
|
|
}
|
|
let committed_offset = clog.flush_and_sync().unwrap();
|
|
|
|
assert_eq!(n_txs - 1, committed_offset.unwrap());
|
|
assert_eq!(
|
|
n_txs as usize,
|
|
clog.transactions(&payload::ArrayDecoder).map(Result::unwrap).count()
|
|
);
|
|
// We set max_records_in_commit to 1, so n_commits == n_txs
|
|
assert_eq!(n_txs as usize, clog.commits().map(Result::unwrap).count());
|
|
}
|
|
|
|
#[test]
|
|
fn resets() {
|
|
let root = tempdir().unwrap();
|
|
let mut clog = Commitlog::open(
|
|
CommitLogDir::from_path_unchecked(root.path()),
|
|
Options {
|
|
max_segment_size: 512,
|
|
..Options::default()
|
|
},
|
|
None,
|
|
)
|
|
.unwrap();
|
|
|
|
let payload = gen_payload();
|
|
for i in 0..50 {
|
|
clog.commit([(i, payload)]).unwrap();
|
|
}
|
|
clog.flush_and_sync().unwrap();
|
|
|
|
for offset in (0..50).rev() {
|
|
clog = clog.reset_to(offset).unwrap();
|
|
assert_eq!(
|
|
offset,
|
|
clog.transactions(&payload::ArrayDecoder)
|
|
.map(Result::unwrap)
|
|
.last()
|
|
.unwrap()
|
|
.offset
|
|
);
|
|
// We're counting from zero, so offset + 1 is the # of txs.
|
|
assert_eq!(
|
|
offset + 1,
|
|
clog.transactions(&payload::ArrayDecoder).map(Result::unwrap).count() as u64
|
|
);
|
|
}
|
|
}
|
|
|
|
/// Try to generate commitlogs that will be amenable to compression -
|
|
/// random data doesn't compress well, so try and have there be repetition
|
|
fn compressible_payloads() -> impl Iterator<Item = [u8; 256]> {
|
|
(0..4).map(|_| gen_payload()).cycle()
|
|
}
|
|
|
|
#[test]
|
|
fn compression() {
|
|
enable_logging();
|
|
|
|
let root = tempdir().unwrap();
|
|
let clog = Commitlog::open(
|
|
CommitLogDir::from_path_unchecked(root.path()),
|
|
Options {
|
|
max_segment_size: 8 * 1024,
|
|
..Options::default()
|
|
},
|
|
None,
|
|
)
|
|
.unwrap();
|
|
|
|
let payloads = compressible_payloads().take(1024).collect::<Vec<_>>();
|
|
for (i, payload) in payloads.iter().enumerate() {
|
|
clog.commit([(i as u64, *payload)]).unwrap();
|
|
}
|
|
clog.flush_and_sync().unwrap();
|
|
|
|
let uncompressed_size = clog.size_on_disk().unwrap();
|
|
|
|
let segments = clog.existing_segment_offsets().unwrap();
|
|
let segments_to_compress = &segments[..segments.len() / 2];
|
|
info!("segments: {segments:?} compressing: {segments_to_compress:?}");
|
|
clog.compress_segments(segments_to_compress).unwrap();
|
|
|
|
let compressed_size = clog.size_on_disk().unwrap();
|
|
assert!(compressed_size.total_bytes < uncompressed_size.total_bytes);
|
|
|
|
assert!(clog
|
|
.transactions(&payload::ArrayDecoder)
|
|
.map(Result::unwrap)
|
|
.enumerate()
|
|
.all(|(i, x)| x.offset == i as u64 && x.txdata == payloads[i]));
|
|
}
|
|
|
|
/// When restoring an archived commitlog, all segments are compressed and should
|
|
/// remain immutable.
|
|
///
|
|
/// Tests that this is upheld, i.e. a fresh segment is created when resuming
|
|
/// writes.
|
|
#[test]
|
|
fn all_segments_sealed() {
|
|
enable_logging();
|
|
|
|
let root = tempdir().unwrap();
|
|
let path = CommitLogDir::from_path_unchecked(root.path());
|
|
let opts = Options {
|
|
max_segment_size: 64 * 1024,
|
|
..<_>::default()
|
|
};
|
|
let num_commits = 1024;
|
|
let repo = repo::Fs::new(path, None).unwrap();
|
|
{
|
|
let mut clog = commitlog::Generic::open(&repo, opts).unwrap();
|
|
for (i, payload) in compressible_payloads().take(num_commits).enumerate() {
|
|
clog.commit([(i as u64, payload)]).unwrap();
|
|
}
|
|
clog.flush().unwrap();
|
|
clog.sync();
|
|
}
|
|
|
|
let segments = repo.existing_offsets().unwrap();
|
|
let num_segments = segments.len();
|
|
|
|
// Compress all segments via the `repo`,
|
|
// to not trigger the assert that the head segment cannot be compressed.
|
|
for segment in segments {
|
|
repo.compress_segment(segment).unwrap();
|
|
}
|
|
|
|
// Re-opening the commitlog should create a fresh segment at offset `num_commits`.
|
|
let _ = commitlog::Generic::<_, [u8; 256]>::open(&repo, opts).unwrap();
|
|
let segments = repo.existing_offsets().unwrap();
|
|
assert_eq!(num_segments + 1, segments.len());
|
|
assert_eq!(segments.last().copied(), Some(num_commits as u64));
|
|
}
|
|
|
|
#[test]
|
|
fn resume_empty_segment() {
|
|
enable_logging();
|
|
|
|
let root = tempdir().unwrap();
|
|
let path = CommitLogDir::from_path_unchecked(root.path());
|
|
let opts = Options {
|
|
max_segment_size: 64 * 1024,
|
|
..<_>::default()
|
|
};
|
|
let num_commits = 1024;
|
|
let repo = repo::Fs::new(path, None).unwrap();
|
|
{
|
|
let mut clog = commitlog::Generic::open(&repo, opts).unwrap();
|
|
for (i, payload) in compressible_payloads().take(num_commits).enumerate() {
|
|
clog.commit([(i as u64, payload)]).unwrap();
|
|
}
|
|
clog.flush().unwrap();
|
|
clog.sync();
|
|
}
|
|
|
|
let mut segments = repo.existing_offsets().unwrap();
|
|
while let Some(last_segment) = segments.pop() {
|
|
repo.open_segment_writer(last_segment).unwrap().set_len(0).unwrap();
|
|
|
|
let _ = commitlog::Generic::<_, [u8; 256]>::open(&repo, opts).unwrap();
|
|
let segments1 = repo.existing_offsets().unwrap();
|
|
if segments.is_empty() {
|
|
assert_eq!([0], segments1.as_slice());
|
|
} else {
|
|
assert_eq!(segments, segments1);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Tests that resuming a segment that has trailing bytes smaller than a
|
|
/// commitlog header causes those trailing bytes to be removed.
|
|
///
|
|
/// Regression test for https://github.com/clockworklabs/SpacetimeDB/pull/5116
|
|
#[test]
|
|
fn resume_small_trailing_garbage() {
|
|
enable_logging();
|
|
|
|
let root = tempdir().unwrap();
|
|
let path = CommitLogDir::from_path_unchecked(root.path());
|
|
|
|
let repo = repo::Fs::new(path, None).unwrap();
|
|
// Write some data.
|
|
{
|
|
let mut clog = commitlog::Generic::open(&repo, <_>::default()).unwrap();
|
|
for (i, payload) in compressible_payloads().take(1024).enumerate() {
|
|
clog.commit([(i as u64, payload)]).unwrap();
|
|
clog.flush().unwrap();
|
|
clog.sync();
|
|
}
|
|
}
|
|
|
|
// Add some extra bytes, less than the commit header length.
|
|
let last_segment_size = {
|
|
let segments = repo.existing_offsets().unwrap();
|
|
let mut last_segment = repo.open_segment_writer(segments.last().copied().unwrap()).unwrap();
|
|
last_segment.write_all(&[67u8; commit::Header::LEN - 1]).unwrap();
|
|
last_segment.flush().unwrap();
|
|
last_segment.sync_all().unwrap();
|
|
last_segment.segment_len().unwrap()
|
|
};
|
|
{
|
|
let mut clog = commitlog::Generic::open(&repo, <_>::default()).unwrap();
|
|
|
|
// The extra bytes should have been truncated away.
|
|
let segments = repo.existing_offsets().unwrap();
|
|
let mut last_segment = repo.open_segment_writer(segments.last().copied().unwrap()).unwrap();
|
|
assert_eq!(
|
|
last_segment.segment_len().unwrap(),
|
|
last_segment_size - (commit::Header::LEN - 1) as u64
|
|
);
|
|
|
|
// Add some more data.
|
|
for (i, payload) in compressible_payloads()
|
|
.take(1024)
|
|
.enumerate()
|
|
.map(|(offset, payload)| (offset + 1024, payload))
|
|
{
|
|
clog.commit([(i as u64, payload)]).unwrap();
|
|
clog.flush().unwrap();
|
|
clog.sync();
|
|
}
|
|
|
|
assert_eq!(2048, clog.commits_from(0).map(Result::unwrap).count());
|
|
}
|
|
}
|