mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-06 07:26:43 -04:00
commitlog: Introduce epoch (#1851)
This commit is contained in:
@@ -5,3 +5,4 @@
|
||||
# It is recommended to check this file in to source control so that
|
||||
# everyone who runs the test benefits from these saved cases.
|
||||
cc a224c9559a4f825676852b58397b59027a14561c8bd9439b52691234fab848de # shrinks to inputs = Inputs { byte_pos: 354, bit_mask: 205, segment_offset: 30 }
|
||||
cc a62542123f6c7a5c747cdf8d64246d93b1ba55e53f207dd0827d3bc65442cb35 # shrinks to inputs = Inputs { byte_pos: 25, bit_mask: 1, segment_offset: 0 }
|
||||
|
||||
@@ -6,16 +6,27 @@ use std::{
|
||||
use crc32c::{Crc32cReader, Crc32cWriter};
|
||||
use spacetimedb_sats::buffer::{BufReader, Cursor, DecodeError};
|
||||
|
||||
use crate::{error::ChecksumMismatch, payload::Decoder, segment::CHECKSUM_ALGORITHM_CRC32C, Transaction};
|
||||
use crate::{
|
||||
error::ChecksumMismatch, payload::Decoder, segment::CHECKSUM_ALGORITHM_CRC32C, Transaction,
|
||||
DEFAULT_LOG_FORMAT_VERSION,
|
||||
};
|
||||
|
||||
#[derive(Default)]
|
||||
enum Version {
|
||||
V0,
|
||||
#[default]
|
||||
V1,
|
||||
}
|
||||
|
||||
pub struct Header {
|
||||
pub min_tx_offset: u64,
|
||||
pub epoch: u64,
|
||||
pub n: u16,
|
||||
pub len: u32,
|
||||
}
|
||||
|
||||
impl Header {
|
||||
pub const LEN: usize = /* offset */ 8 + /* n */ 2 + /* len */ 4;
|
||||
pub const LEN: usize = /* offset */ 8 + /* epoch */ 8 + /* n */ 2 + /* len */ 4;
|
||||
|
||||
/// Read [`Self::LEN`] bytes from `reader` and interpret them as the
|
||||
/// "header" of a [`Commit`].
|
||||
@@ -30,8 +41,20 @@ impl Header {
|
||||
///
|
||||
/// This is to allow preallocation of segments.
|
||||
///
|
||||
pub fn decode<R: Read>(mut reader: R) -> io::Result<Option<Self>> {
|
||||
let mut hdr = [0; Self::LEN];
|
||||
pub fn decode<R: Read>(reader: R) -> io::Result<Option<Self>> {
|
||||
Self::decode_v1(reader)
|
||||
}
|
||||
|
||||
fn decode_internal<R: Read>(reader: R, v: Version) -> io::Result<Option<Self>> {
|
||||
use Version::*;
|
||||
match v {
|
||||
V0 => Self::decode_v0(reader),
|
||||
V1 => Self::decode_v1(reader),
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_v0<R: Read>(mut reader: R) -> io::Result<Option<Self>> {
|
||||
let mut hdr = [0; Self::LEN - 8];
|
||||
if let Err(e) = reader.read_exact(&mut hdr) {
|
||||
if e.kind() == io::ErrorKind::UnexpectedEof {
|
||||
return Ok(None);
|
||||
@@ -46,7 +69,39 @@ impl Header {
|
||||
let n = buf.get_u16().map_err(decode_error)?;
|
||||
let len = buf.get_u32().map_err(decode_error)?;
|
||||
|
||||
Ok(Some(Self { min_tx_offset, n, len }))
|
||||
Ok(Some(Self {
|
||||
min_tx_offset,
|
||||
epoch: Commit::DEFAULT_EPOCH,
|
||||
n,
|
||||
len,
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_v1<R: Read>(mut reader: R) -> io::Result<Option<Self>> {
|
||||
let mut hdr = [0; Self::LEN];
|
||||
if let Err(e) = reader.read_exact(&mut hdr) {
|
||||
if e.kind() == io::ErrorKind::UnexpectedEof {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
return Err(e);
|
||||
}
|
||||
match &mut hdr.as_slice() {
|
||||
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] => Ok(None),
|
||||
buf => {
|
||||
let min_tx_offset = buf.get_u64().map_err(decode_error)?;
|
||||
let epoch = buf.get_u64().map_err(decode_error)?;
|
||||
let n = buf.get_u16().map_err(decode_error)?;
|
||||
let len = buf.get_u32().map_err(decode_error)?;
|
||||
|
||||
Ok(Some(Self {
|
||||
min_tx_offset,
|
||||
epoch,
|
||||
n,
|
||||
len,
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -60,6 +115,18 @@ pub struct Commit {
|
||||
/// The offset starts from zero and is counted from the beginning of the
|
||||
/// entire log.
|
||||
pub min_tx_offset: u64,
|
||||
/// The epoch within which the commit was created.
|
||||
///
|
||||
/// Indicates the monotonically increasing term number of the leader when
|
||||
/// the commitlog is being written to in a distributed deployment.
|
||||
///
|
||||
/// The default epoch is 0 (zero). It should be used when the log is written
|
||||
/// to by a single process.
|
||||
///
|
||||
/// Note, however, that an existing log may have a non-zero epoch.
|
||||
/// It is currently unspecified how a commitlog is transitioned between
|
||||
/// distributed and single-node deployment, wrt the epoch.
|
||||
pub epoch: u64,
|
||||
/// The number of records in the commit.
|
||||
pub n: u16,
|
||||
/// A buffer of all records in the commit in serialized form.
|
||||
@@ -70,6 +137,8 @@ pub struct Commit {
|
||||
}
|
||||
|
||||
impl Commit {
|
||||
pub const DEFAULT_EPOCH: u64 = 0;
|
||||
|
||||
pub const FRAMING_LEN: usize = Header::LEN + /* crc32 */ 4;
|
||||
pub const CHECKSUM_ALGORITHM: u8 = CHECKSUM_ALGORITHM_CRC32C;
|
||||
|
||||
@@ -90,10 +159,12 @@ impl Commit {
|
||||
let mut out = Crc32cWriter::new(out);
|
||||
|
||||
let min_tx_offset = self.min_tx_offset.to_le_bytes();
|
||||
let epoch = self.epoch.to_le_bytes();
|
||||
let n = self.n.to_le_bytes();
|
||||
let len = (self.records.len() as u32).to_le_bytes();
|
||||
|
||||
out.write_all(&min_tx_offset)?;
|
||||
out.write_all(&epoch)?;
|
||||
out.write_all(&n)?;
|
||||
out.write_all(&len)?;
|
||||
out.write_all(&self.records)?;
|
||||
@@ -173,6 +244,7 @@ impl From<StoredCommit> for Commit {
|
||||
fn from(
|
||||
StoredCommit {
|
||||
min_tx_offset,
|
||||
epoch,
|
||||
n,
|
||||
records,
|
||||
checksum: _,
|
||||
@@ -180,6 +252,7 @@ impl From<StoredCommit> for Commit {
|
||||
) -> Self {
|
||||
Self {
|
||||
min_tx_offset,
|
||||
epoch,
|
||||
n,
|
||||
records,
|
||||
}
|
||||
@@ -194,6 +267,8 @@ impl From<StoredCommit> for Commit {
|
||||
pub struct StoredCommit {
|
||||
/// See [`Commit::min_tx_offset`].
|
||||
pub min_tx_offset: u64,
|
||||
/// See [`Commit::epoch`].
|
||||
pub epoch: u64,
|
||||
/// See [`Commit::n`].
|
||||
pub n: u16,
|
||||
/// See [`Commit::records`].
|
||||
@@ -216,9 +291,18 @@ impl StoredCommit {
|
||||
/// kind [`io::ErrorKind::InvalidData`] with an inner error downcastable to
|
||||
/// [`ChecksumMismatch`] is returned.
|
||||
pub fn decode<R: Read>(reader: R) -> io::Result<Option<Self>> {
|
||||
Self::decode_internal(reader, DEFAULT_LOG_FORMAT_VERSION)
|
||||
}
|
||||
|
||||
pub(crate) fn decode_internal<R: Read>(reader: R, log_format_version: u8) -> io::Result<Option<Self>> {
|
||||
let mut reader = Crc32cReader::new(reader);
|
||||
|
||||
let Some(hdr) = Header::decode(&mut reader)? else {
|
||||
let v = if log_format_version == 0 {
|
||||
Version::V0
|
||||
} else {
|
||||
Version::V1
|
||||
};
|
||||
let Some(hdr) = Header::decode_internal(&mut reader, v)? else {
|
||||
return Ok(None);
|
||||
};
|
||||
let mut records = vec![0; hdr.len as usize];
|
||||
@@ -233,6 +317,7 @@ impl StoredCommit {
|
||||
|
||||
Ok(Some(Self {
|
||||
min_tx_offset: hdr.min_tx_offset,
|
||||
epoch: hdr.epoch,
|
||||
n: hdr.n,
|
||||
records,
|
||||
checksum: crc,
|
||||
@@ -258,6 +343,7 @@ impl StoredCommit {
|
||||
pub struct Metadata {
|
||||
pub tx_range: Range<u64>,
|
||||
pub size_in_bytes: u64,
|
||||
pub epoch: u64,
|
||||
}
|
||||
|
||||
impl Metadata {
|
||||
@@ -275,6 +361,7 @@ impl From<Commit> for Metadata {
|
||||
Self {
|
||||
tx_range: commit.tx_range(),
|
||||
size_in_bytes: commit.encoded_len() as u64,
|
||||
epoch: commit.epoch,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -312,6 +399,7 @@ mod tests {
|
||||
min_tx_offset: 0,
|
||||
n: 3,
|
||||
records,
|
||||
epoch: Commit::DEFAULT_EPOCH,
|
||||
};
|
||||
|
||||
let mut buf = Vec::with_capacity(commit.encoded_len());
|
||||
@@ -329,6 +417,7 @@ mod tests {
|
||||
min_tx_offset: 0,
|
||||
n: 4,
|
||||
records: vec![0; 128],
|
||||
epoch: Commit::DEFAULT_EPOCH,
|
||||
};
|
||||
|
||||
let txs = commit
|
||||
@@ -358,6 +447,7 @@ mod tests {
|
||||
min_tx_offset: 42,
|
||||
n: 10,
|
||||
records: vec![1; 512],
|
||||
epoch: Commit::DEFAULT_EPOCH,
|
||||
};
|
||||
|
||||
let mut buf = Vec::with_capacity(commit.encoded_len());
|
||||
|
||||
@@ -55,11 +55,11 @@ impl<R: Repo, T> Generic<R, T> {
|
||||
debug!("resuming last segment: {last}");
|
||||
repo::resume_segment_writer(&repo, opts, last)?.or_else(|meta| {
|
||||
tail.push(meta.tx_range.start);
|
||||
repo::create_segment_writer(&repo, opts, meta.tx_range.end)
|
||||
repo::create_segment_writer(&repo, opts, meta.max_epoch, meta.tx_range.end)
|
||||
})?
|
||||
} else {
|
||||
debug!("starting fresh log");
|
||||
repo::create_segment_writer(&repo, opts, 0)?
|
||||
repo::create_segment_writer(&repo, opts, Commit::DEFAULT_EPOCH, 0)?
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
@@ -72,6 +72,43 @@ impl<R: Repo, T> Generic<R, T> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Get the current epoch.
|
||||
///
|
||||
/// See also: [`Commit::epoch`].
|
||||
pub fn epoch(&self) -> u64 {
|
||||
self.head.commit.epoch
|
||||
}
|
||||
|
||||
/// Update the current epoch.
|
||||
///
|
||||
/// Calls [`Self::commit`] to flush all data of the previous epoch, and
|
||||
/// returns the result.
|
||||
///
|
||||
/// Does nothing if the given `epoch` is equal to the current epoch.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// If `epoch` is smaller than the current epoch, an error of kind
|
||||
/// [`io::ErrorKind::InvalidInput`] is returned.
|
||||
///
|
||||
/// Also see [`Self::commit`].
|
||||
pub fn set_epoch(&mut self, epoch: u64) -> io::Result<Option<Committed>> {
|
||||
use std::cmp::Ordering::*;
|
||||
|
||||
match self.head.epoch().cmp(&epoch) {
|
||||
Less => Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
"new epoch is smaller than current epoch",
|
||||
)),
|
||||
Equal => Ok(None),
|
||||
Greater => {
|
||||
let res = self.commit()?;
|
||||
self.head.set_epoch(epoch);
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Write the currently buffered data to storage and rotate segments as
|
||||
/// necessary.
|
||||
///
|
||||
@@ -254,7 +291,7 @@ impl<R: Repo, T> Generic<R, T> {
|
||||
self.head.next_tx_offset(),
|
||||
self.head.min_tx_offset()
|
||||
);
|
||||
let new = repo::create_segment_writer(&self.repo, self.opts, self.head.next_tx_offset())?;
|
||||
let new = repo::create_segment_writer(&self.repo, self.opts, self.head.epoch(), self.head.next_tx_offset())?;
|
||||
let old = mem::replace(&mut self.head, new);
|
||||
self.tail.push(old.min_tx_offset());
|
||||
self.head.commit = old.commit;
|
||||
@@ -821,6 +858,7 @@ mod tests {
|
||||
min_tx_offset: 0,
|
||||
n: 1,
|
||||
records: [43; 32].to_vec(),
|
||||
epoch: 0,
|
||||
};
|
||||
log.commit().unwrap();
|
||||
|
||||
|
||||
@@ -120,6 +120,35 @@ impl<T> Commitlog<T> {
|
||||
self.inner.read().unwrap().max_committed_offset()
|
||||
}
|
||||
|
||||
/// Get the current epoch.
|
||||
///
|
||||
/// See also: [`Commit::epoch`].
|
||||
pub fn epoch(&self) -> u64 {
|
||||
self.inner.read().unwrap().epoch()
|
||||
}
|
||||
|
||||
/// Update the current epoch.
|
||||
///
|
||||
/// Does nothing if the given `epoch` is equal to the current epoch.
|
||||
/// Otherwise flushes outstanding transactions to disk (equivalent to
|
||||
/// [`Self::flush`]) before updating the epoch.
|
||||
///
|
||||
/// Returns the maximum transaction offset written to disk. The offset is
|
||||
/// `None` if the log is empty and no data was pending to be flushed.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// If `epoch` is smaller than the current epoch, an error of kind
|
||||
/// [`io::ErrorKind::InvalidInput`] is returned.
|
||||
///
|
||||
/// Errors from the implicit flush are propagated.
|
||||
pub fn set_epoch(&self, epoch: u64) -> io::Result<Option<u64>> {
|
||||
let mut inner = self.inner.write().unwrap();
|
||||
inner.set_epoch(epoch)?;
|
||||
|
||||
Ok(inner.max_committed_offset())
|
||||
}
|
||||
|
||||
/// Sync all OS-buffered writes to disk.
|
||||
///
|
||||
/// Note that this does **not** write outstanding records to disk.
|
||||
|
||||
@@ -7,7 +7,7 @@ use thiserror::Error;
|
||||
use crate::{
|
||||
error,
|
||||
varint::{decode_varint, encode_varint},
|
||||
Encode, Varchar,
|
||||
Encode, Varchar, DEFAULT_LOG_FORMAT_VERSION,
|
||||
};
|
||||
|
||||
// Re-export so we get a hyperlink in rustdocs by default
|
||||
@@ -116,7 +116,7 @@ impl<T> Txdata<T> {
|
||||
}
|
||||
|
||||
impl<T: Encode> Txdata<T> {
|
||||
pub const VERSION: u8 = 0;
|
||||
pub const VERSION: u8 = DEFAULT_LOG_FORMAT_VERSION;
|
||||
|
||||
pub fn encode(&self, buf: &mut impl BufWriter) {
|
||||
let mut flags = Flags::empty();
|
||||
|
||||
@@ -95,7 +95,12 @@ fn create_offset_index_writer<R: Repo>(repo: &R, offset: u64, opts: Options) ->
|
||||
/// `log_format_version`.
|
||||
///
|
||||
/// If the segment already exists, [`io::ErrorKind::AlreadyExists`] is returned.
|
||||
pub fn create_segment_writer<R: Repo>(repo: &R, opts: Options, offset: u64) -> io::Result<Writer<R::Segment>> {
|
||||
pub fn create_segment_writer<R: Repo>(
|
||||
repo: &R,
|
||||
opts: Options,
|
||||
epoch: u64,
|
||||
offset: u64,
|
||||
) -> io::Result<Writer<R::Segment>> {
|
||||
let mut storage = repo.create_segment(offset)?;
|
||||
Header {
|
||||
log_format_version: opts.log_format_version,
|
||||
@@ -109,6 +114,7 @@ pub fn create_segment_writer<R: Repo>(repo: &R, opts: Options, offset: u64) -> i
|
||||
min_tx_offset: offset,
|
||||
n: 0,
|
||||
records: Vec::new(),
|
||||
epoch,
|
||||
},
|
||||
inner: io::BufWriter::new(storage),
|
||||
|
||||
@@ -146,6 +152,7 @@ pub fn resume_segment_writer<R: Repo>(
|
||||
header,
|
||||
tx_range,
|
||||
size_in_bytes,
|
||||
max_epoch,
|
||||
} = match Metadata::extract(offset, &mut storage) {
|
||||
Err(error::SegmentMetadata::InvalidCommit { sofar, source }) => {
|
||||
warn!("invalid commit in segment {offset}: {source}");
|
||||
@@ -158,12 +165,23 @@ pub fn resume_segment_writer<R: Repo>(
|
||||
header
|
||||
.ensure_compatible(opts.log_format_version, Commit::CHECKSUM_ALGORITHM)
|
||||
.map_err(|msg| io::Error::new(io::ErrorKind::InvalidData, msg))?;
|
||||
// When resuming, the log format version must be equal.
|
||||
if header.log_format_version != opts.log_format_version {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
format!(
|
||||
"log format version mismatch: current={} segment={}",
|
||||
opts.log_format_version, header.log_format_version
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(Ok(Writer {
|
||||
commit: Commit {
|
||||
min_tx_offset: tx_range.end,
|
||||
n: 0,
|
||||
records: Vec::new(),
|
||||
epoch: max_epoch,
|
||||
},
|
||||
inner: io::BufWriter::new(storage),
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ use crate::{
|
||||
|
||||
pub const MAGIC: [u8; 6] = [b'(', b'd', b's', b')', b'^', b'2'];
|
||||
|
||||
pub const DEFAULT_LOG_FORMAT_VERSION: u8 = 0;
|
||||
pub const DEFAULT_LOG_FORMAT_VERSION: u8 = 1;
|
||||
pub const DEFAULT_CHECKSUM_ALGORITHM: u8 = CHECKSUM_ALGORITHM_CRC32C;
|
||||
|
||||
pub const CHECKSUM_ALGORITHM_CRC32C: u8 = 0;
|
||||
@@ -156,6 +156,22 @@ impl<W: io::Write> Writer<W> {
|
||||
}))
|
||||
}
|
||||
|
||||
/// Get the current epoch.
|
||||
pub fn epoch(&self) -> u64 {
|
||||
self.commit.epoch
|
||||
}
|
||||
|
||||
/// Update the epoch.
|
||||
///
|
||||
/// The caller must ensure that:
|
||||
///
|
||||
/// - The new epoch is greater than the current epoch.
|
||||
/// - [`Self::commit`] has been called as appropriate.
|
||||
///
|
||||
pub fn set_epoch(&mut self, epoch: u64) {
|
||||
self.commit.epoch = epoch;
|
||||
}
|
||||
|
||||
/// The smallest transaction offset in this segment.
|
||||
pub fn min_tx_offset(&self) -> u64 {
|
||||
self.min_tx_offset
|
||||
@@ -437,7 +453,7 @@ impl<R: io::Read> Iterator for Commits<R> {
|
||||
type Item = io::Result<StoredCommit>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
StoredCommit::decode(&mut self.reader).transpose()
|
||||
StoredCommit::decode_internal(&mut self.reader, self.header.log_format_version).transpose()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -474,13 +490,15 @@ pub struct Metadata {
|
||||
pub header: Header,
|
||||
pub tx_range: Range<u64>,
|
||||
pub size_in_bytes: u64,
|
||||
pub max_epoch: u64,
|
||||
}
|
||||
|
||||
impl Metadata {
|
||||
/// Read and validate metadata from a segment.
|
||||
///
|
||||
/// This traverses the entire segment, consuming thre `reader.
|
||||
/// Doing so is necessary to determine the `max_tx_offset` and `size_in_bytes`.
|
||||
/// Doing so is necessary to determine `max_tx_offset`, `size_in_bytes` and
|
||||
/// `max_epoch`.
|
||||
pub(crate) fn extract<R: io::Read>(min_tx_offset: u64, mut reader: R) -> Result<Self, error::SegmentMetadata> {
|
||||
let header = Header::decode(&mut reader)?;
|
||||
Self::with_header(min_tx_offset, header, reader)
|
||||
@@ -498,6 +516,7 @@ impl Metadata {
|
||||
end: min_tx_offset,
|
||||
},
|
||||
size_in_bytes: Header::LEN as u64,
|
||||
max_epoch: Commit::DEFAULT_EPOCH,
|
||||
};
|
||||
|
||||
fn commit_meta<R: io::Read>(
|
||||
@@ -529,6 +548,8 @@ impl Metadata {
|
||||
}
|
||||
sofar.tx_range.end = commit.tx_range.end;
|
||||
sofar.size_in_bytes += commit.size_in_bytes;
|
||||
// TODO: Should it be an error to encounter an epoch going backwards?
|
||||
sofar.max_epoch = commit.epoch.max(sofar.max_epoch);
|
||||
}
|
||||
|
||||
Ok(sofar)
|
||||
@@ -562,7 +583,7 @@ mod tests {
|
||||
fn write_read_roundtrip() {
|
||||
let repo = repo::Memory::default();
|
||||
|
||||
let mut writer = repo::create_segment_writer(&repo, Options::default(), 0).unwrap();
|
||||
let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
|
||||
writer.append([0; 32]).unwrap();
|
||||
writer.append([1; 32]).unwrap();
|
||||
writer.append([2; 32]).unwrap();
|
||||
@@ -591,7 +612,7 @@ mod tests {
|
||||
fn metadata() {
|
||||
let repo = repo::Memory::default();
|
||||
|
||||
let mut writer = repo::create_segment_writer(&repo, Options::default(), 0).unwrap();
|
||||
let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
|
||||
writer.append([0; 32]).unwrap();
|
||||
writer.append([0; 32]).unwrap();
|
||||
writer.commit().unwrap();
|
||||
@@ -606,6 +627,7 @@ mod tests {
|
||||
header: _,
|
||||
tx_range,
|
||||
size_in_bytes,
|
||||
max_epoch: _,
|
||||
} = reader.metadata().unwrap();
|
||||
|
||||
assert_eq!(tx_range.start, 0);
|
||||
@@ -621,7 +643,7 @@ mod tests {
|
||||
let repo = repo::Memory::default();
|
||||
let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]];
|
||||
|
||||
let mut writer = repo::create_segment_writer(&repo, Options::default(), 0).unwrap();
|
||||
let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
|
||||
for commit in &commits {
|
||||
for tx in commit {
|
||||
writer.append(*tx).unwrap();
|
||||
@@ -637,6 +659,7 @@ mod tests {
|
||||
min_tx_offset,
|
||||
n: txs.len() as u16,
|
||||
records: txs.concat(),
|
||||
epoch: 0,
|
||||
});
|
||||
min_tx_offset += txs.len() as u64;
|
||||
}
|
||||
@@ -653,7 +676,7 @@ mod tests {
|
||||
let repo = repo::Memory::default();
|
||||
let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]];
|
||||
|
||||
let mut writer = repo::create_segment_writer(&repo, Options::default(), 0).unwrap();
|
||||
let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
|
||||
for commit in &commits {
|
||||
for tx in commit {
|
||||
writer.append(*tx).unwrap();
|
||||
|
||||
Reference in New Issue
Block a user