mirror of
https://github.com/tursodatabase/libsql.git
synced 2025-05-31 15:12:51 +00:00
determine whether segment should be stored based on durable frame_no
instead of a segment property
This commit is contained in:
libsql-wal/src
@ -119,7 +119,7 @@ fn maybe_store_segment<S: Storage>(
|
||||
durable_frame_no: &Arc<Mutex<u64>>,
|
||||
seg: S::Segment,
|
||||
) {
|
||||
if seg.is_storable() {
|
||||
if seg.last_committed() > *durable_frame_no.lock() {
|
||||
let cb: OnStoreCallback = Box::new({
|
||||
let notifier = notifier.clone();
|
||||
let durable_frame_no = durable_frame_no.clone();
|
||||
@ -134,7 +134,7 @@ fn maybe_store_segment<S: Storage>(
|
||||
} else {
|
||||
// segment can be checkpointed right away.
|
||||
let _ = notifier.blocking_send(CheckpointMessage::Namespace(namespace.clone()));
|
||||
tracing::debug!("segment marked as not storable; skipping");
|
||||
tracing::debug!(segment_end = seg.last_committed(), durable_frame_no = *durable_frame_no.lock(), "segment doesn't contain any new data");
|
||||
}
|
||||
}
|
||||
|
||||
@ -269,7 +269,7 @@ where
|
||||
let file = self.io.open(false, true, true, entry.path())?;
|
||||
|
||||
if let Some(sealed) =
|
||||
SealedSegment::open(file.into(), entry.path().to_path_buf(), Default::default())?
|
||||
SealedSegment::open(file.into(), entry.path().to_path_buf(), Default::default(), self.io.now())?
|
||||
{
|
||||
list.push(sealed.clone());
|
||||
maybe_store_segment(
|
||||
@ -401,6 +401,8 @@ where
|
||||
match self.opened.insert(namespace.clone(), Slot::Tombstone) {
|
||||
Some(Slot::Tombstone) => None,
|
||||
Some(Slot::Building(_, _)) => {
|
||||
// FIXME: that could happen is someone removed it and immediately reopenned the
|
||||
// wal. fix by retrying in a loop
|
||||
unreachable!("already waited for ns to open")
|
||||
}
|
||||
Some(Slot::Wal(wal)) => Some(wal),
|
||||
@ -558,8 +560,8 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn storage(&self) -> &S {
|
||||
&self.storage
|
||||
pub fn storage(&self) -> Arc<S> {
|
||||
self.storage.clone()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -163,7 +163,6 @@ pub trait Segment: Send + Sync + 'static {
|
||||
fn start_frame_no(&self) -> u64;
|
||||
fn last_committed(&self) -> u64;
|
||||
fn index(&self) -> &fst::Map<Arc<[u8]>>;
|
||||
fn is_storable(&self) -> bool;
|
||||
fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> io::Result<bool>;
|
||||
/// returns the number of readers currently holding a reference to this log.
|
||||
/// The read count must monotonically decrease.
|
||||
@ -199,10 +198,6 @@ impl<T: Segment> Segment for Arc<T> {
|
||||
self.as_ref().index()
|
||||
}
|
||||
|
||||
fn is_storable(&self) -> bool {
|
||||
self.as_ref().is_storable()
|
||||
}
|
||||
|
||||
fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> io::Result<bool> {
|
||||
self.as_ref().read_page(page_no, max_frame_no, buf)
|
||||
}
|
||||
|
@ -157,17 +157,6 @@ where
|
||||
&self.index
|
||||
}
|
||||
|
||||
fn is_storable(&self) -> bool {
|
||||
// we don't store unordered segments, since they only happen in two cases:
|
||||
// - in a replica: no need for storage
|
||||
// - in a primary, on recovery from storage: we don't want to override remote
|
||||
// segment.
|
||||
!self
|
||||
.header()
|
||||
.flags()
|
||||
.contains(SegmentFlags::FRAME_UNORDERED)
|
||||
}
|
||||
|
||||
fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> std::io::Result<bool> {
|
||||
if self.header().start_frame_no.get() > max_frame_no {
|
||||
return Ok(false);
|
||||
|
@ -176,10 +176,6 @@ mod test {
|
||||
async move { todo!() }
|
||||
}
|
||||
|
||||
fn is_storable(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn timestamp(&self) -> DateTime<Utc> {
|
||||
Utc::now()
|
||||
}
|
||||
|
Reference in New Issue
Block a user