diff --git a/libsql-storage/src/lib.rs b/libsql-storage/src/lib.rs index 833b55c676..85e63923c4 100644 --- a/libsql-storage/src/lib.rs +++ b/libsql-storage/src/lib.rs @@ -13,7 +13,7 @@ use libsql_sys::rusqlite; use libsql_sys::wal::{Result, Vfs, Wal, WalManager}; use rpc::storage_client::StorageClient; use tonic::transport::Channel; -use tracing::{error, trace}; +use tracing::{error, trace, warn}; pub mod rpc { #![allow(clippy::all)] @@ -196,8 +196,13 @@ impl Wal for DurableWal { // - save the current max_frame_no for this txn trace!("DurableWal::begin_read_txn()"); let rt = tokio::runtime::Handle::current(); - let frame_no = tokio::task::block_in_place(|| rt.block_on(self.frames_count())); - self.max_frame_no = frame_no; + let remote_frame_no = tokio::task::block_in_place(|| rt.block_on(self.frames_count())); + let local_frame_no = self.local_cache.get_max_frame_num().unwrap(); + if local_frame_no < remote_frame_no { + warn!("cache is lagging behind the remote!") + // TODO: replicate data from source + } + self.max_frame_no = local_frame_no; Ok(true) } diff --git a/libsql-storage/src/local_cache.rs b/libsql-storage/src/local_cache.rs index 521e308db8..8d771b34c7 100644 --- a/libsql-storage/src/local_cache.rs +++ b/libsql-storage/src/local_cache.rs @@ -77,6 +77,18 @@ impl LocalCache { } } + pub fn get_max_frame_num(&self) -> Result<u64> { + match self + .conn + .query_row("select MAX(frame_no) from frames", params![], |row| { + row.get(0) + }) { + Ok(frame_no) => Ok(frame_no), + Err(Error::QueryReturnedNoRows) => Ok(0), + Err(e) => Err(e), + } + } + pub fn insert_page(&self, txn_id: &str, page_no: u32, frame_data: &[u8]) -> Result<()> { self.conn.execute( "INSERT INTO transactions (txn_id, page_no, data) VALUES (?1, ?2, ?3)