0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-05-13 02:23:25 +00:00
Files
libsql/libsql-wal/src/wal.rs
2024-07-31 00:28:05 +02:00

330 lines
9.8 KiB
Rust

use std::ffi::OsStr;
use std::os::unix::prelude::OsStrExt;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use libsql_sys::name::NamespaceResolver;
use libsql_sys::wal::{Wal, WalManager};
use crate::io::Io;
use crate::registry::WalRegistry;
use crate::segment::sealed::SealedSegment;
use crate::shared_wal::SharedWal;
use crate::storage::Storage;
use crate::transaction::Transaction;
pub struct LibsqlWalManager<IO: Io, S> {
registry: Arc<WalRegistry<IO, S>>,
next_conn_id: Arc<AtomicU64>,
namespace_resolver: Arc<dyn NamespaceResolver>,
}
impl<IO: Io, S> Clone for LibsqlWalManager<IO, S> {
fn clone(&self) -> Self {
Self {
registry: self.registry.clone(),
next_conn_id: self.next_conn_id.clone(),
namespace_resolver: self.namespace_resolver.clone(),
}
}
}
impl<FS: Io, S> LibsqlWalManager<FS, S> {
pub fn new(
registry: Arc<WalRegistry<FS, S>>,
namespace_resolver: Arc<dyn NamespaceResolver>,
) -> Self {
Self {
registry,
next_conn_id: Default::default(),
namespace_resolver,
}
}
}
pub struct LibsqlWal<FS: Io> {
last_read_frame_no: Option<u64>,
tx: Option<Transaction<FS::File>>,
shared: Arc<SharedWal<FS>>,
conn_id: u64,
}
impl<IO: Io, S: Storage<Segment = SealedSegment<IO::File>>> WalManager for LibsqlWalManager<IO, S> {
type Wal = LibsqlWal<IO>;
fn use_shared_memory(&self) -> bool {
false
}
fn open(
&self,
_vfs: &mut libsql_sys::wal::Vfs,
_file: &mut libsql_sys::wal::Sqlite3File,
_no_shm_mode: std::ffi::c_int,
_max_log_size: i64,
db_path: &std::ffi::CStr,
) -> libsql_sys::wal::Result<Self::Wal> {
let db_path = OsStr::from_bytes(&db_path.to_bytes());
let namespace = self.namespace_resolver.resolve(db_path.as_ref());
let shared = self
.registry
.clone()
.open(db_path.as_ref(), &namespace)
.map_err(|e| e.into())?;
let conn_id = self
.next_conn_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok(LibsqlWal {
last_read_frame_no: None,
tx: None,
shared,
conn_id,
})
}
fn close(
&self,
wal: &mut Self::Wal,
_db: &mut libsql_sys::wal::Sqlite3Db,
_sync_flags: std::ffi::c_int,
_scratch: Option<&mut [u8]>,
) -> libsql_sys::wal::Result<()> {
wal.end_read_txn();
Ok(())
}
fn destroy_log(
&self,
_vfs: &mut libsql_sys::wal::Vfs,
_db_path: &std::ffi::CStr,
) -> libsql_sys::wal::Result<()> {
Ok(())
}
fn log_exists(
&self,
_vfs: &mut libsql_sys::wal::Vfs,
_db_path: &std::ffi::CStr,
) -> libsql_sys::wal::Result<bool> {
Ok(true)
}
fn destroy(self)
where
Self: Sized,
{
}
}
impl<FS: Io> Wal for LibsqlWal<FS> {
#[tracing::instrument(skip_all, fields(id = self.conn_id))]
fn limit(&mut self, _size: i64) {}
#[tracing::instrument(skip_all, fields(id = self.conn_id, ns = self.shared.namespace().as_str()))]
fn begin_read_txn(&mut self) -> libsql_sys::wal::Result<bool> {
tracing::trace!("begin read");
let tx = self.shared.begin_read(self.conn_id);
let invalidate_cache = self
.last_read_frame_no
.map(|idx| tx.max_frame_no != idx)
.unwrap_or(true);
self.last_read_frame_no = Some(tx.max_frame_no);
self.tx = Some(Transaction::Read(tx));
tracing::debug!(invalidate_cache, "read started");
Ok(invalidate_cache)
}
#[tracing::instrument(skip_all, fields(id = self.conn_id))]
fn end_read_txn(&mut self) {
self.tx.take().map(|tx| tx.end());
tracing::trace!("end read tx");
}
#[tracing::instrument(skip_all, fields(id = self.conn_id))]
fn find_frame(
&mut self,
page_no: std::num::NonZeroU32,
) -> libsql_sys::wal::Result<Option<std::num::NonZeroU32>> {
tracing::trace!(page_no, "find frame");
// this is a trick: we defer the frame read to the `read_frame` method. The read_frame
// method will read from the journal if the page exist, or from the db_file if it doesn't
Ok(Some(page_no))
}
#[tracing::instrument(skip_all, fields(id = self.conn_id))]
fn read_frame(
&mut self,
page_no: std::num::NonZeroU32,
buffer: &mut [u8],
) -> libsql_sys::wal::Result<()> {
tracing::trace!(page_no, "reading frame");
let tx = self.tx.as_mut().unwrap();
self.shared
.read_page(tx, page_no.get(), buffer)
.map_err(Into::into)?;
Ok(())
}
#[tracing::instrument(skip_all, fields(id = self.conn_id))]
fn db_size(&self) -> u32 {
let db_size = match self.tx.as_ref() {
Some(tx) => tx.db_size,
None => 0,
};
tracing::trace!(db_size, "db_size");
db_size
}
#[tracing::instrument(skip_all, fields(id = self.conn_id))]
fn begin_write_txn(&mut self) -> libsql_sys::wal::Result<()> {
tracing::trace!("begin write");
match self.tx.as_mut() {
Some(tx) => {
self.shared.upgrade(tx).map_err(Into::into)?;
tracing::debug!("write lock acquired");
}
None => todo!("should acquire read txn first"),
}
Ok(())
}
#[tracing::instrument(skip_all, fields(id = self.conn_id))]
fn end_write_txn(&mut self) -> libsql_sys::wal::Result<()> {
tracing::trace!("end write");
match self.tx.take() {
Some(Transaction::Write(tx)) => {
self.last_read_frame_no = Some(tx.next_frame_no - 1);
self.tx = Some(Transaction::Read(tx.downgrade()));
}
other => {
self.tx = other;
}
}
Ok(())
}
#[tracing::instrument(skip_all, fields(id = self.conn_id))]
fn undo<U: libsql_sys::wal::UndoHandler>(
&mut self,
handler: Option<&mut U>,
) -> libsql_sys::wal::Result<()> {
match self.tx {
Some(Transaction::Write(ref mut tx)) => {
if tx.is_commited() {
return Ok(());
}
if let Some(handler) = handler {
for page_no in tx.index_page_iter() {
// FIXME: maybe it's not OK to call that callback with duplicated pages_no,
// need to test that
if let Err(e) = handler.handle_undo(page_no) {
tracing::debug!("undo handler error: {e}");
break;
}
}
}
tx.reset(0);
tracing::debug!("rolled back tx");
Ok(())
}
_ => Ok(()),
}
}
#[tracing::instrument(skip_all, fields(id = self.conn_id))]
fn savepoint(&mut self, rollback_data: &mut [u32]) {
match self.tx {
Some(Transaction::Write(ref mut tx)) => {
let id = tx.savepoint() as u32;
rollback_data[0] = id;
}
_ => {
// if we don't have a write tx, we always point to the beginning of the tx
rollback_data[0] = 0;
}
}
}
#[tracing::instrument(skip_all, fields(id = self.conn_id))]
fn savepoint_undo(&mut self, rollback_data: &mut [u32]) -> libsql_sys::wal::Result<()> {
match self.tx {
Some(Transaction::Write(ref mut tx)) => {
tx.reset(rollback_data[0] as usize);
Ok(())
}
_ => Ok(()),
}
}
#[tracing::instrument(skip_all, fields(id = self.conn_id))]
fn insert_frames(
&mut self,
page_size: std::ffi::c_int,
page_headers: &mut libsql_sys::wal::PageHeaders,
size_after: u32,
_is_commit: bool,
_sync_flags: std::ffi::c_int,
) -> libsql_sys::wal::Result<usize> {
assert_eq!(page_size, 4096);
match self.tx.as_mut() {
Some(Transaction::Write(ref mut tx)) => {
self.shared
.insert_frames(
tx,
page_headers.iter(),
(size_after != 0).then_some(size_after),
)
.map_err(Into::into)?;
}
_ => todo!("no write transaction"),
}
Ok(0)
}
#[tracing::instrument(skip_all, fields(id = self.conn_id))]
fn checkpoint(
&mut self,
_db: &mut libsql_sys::wal::Sqlite3Db,
_mode: libsql_sys::wal::CheckpointMode,
_busy_handler: Option<&mut dyn libsql_sys::wal::BusyHandler>,
_sync_flags: u32,
_buf: &mut [u8],
_checkpoint_cb: Option<&mut dyn libsql_sys::wal::CheckpointCallback>,
_in_wal: Option<&mut i32>,
_backfilled: Option<&mut i32>,
) -> libsql_sys::wal::Result<()> {
// self.shared.segments.checkpoint();
Ok(())
}
#[tracing::instrument(skip_all, fields(id = self.conn_id))]
fn exclusive_mode(&mut self, op: std::ffi::c_int) -> libsql_sys::wal::Result<()> {
tracing::trace!(op, "trying to acquire exclusive mode");
Ok(())
}
#[tracing::instrument(skip_all, fields(id = self.conn_id))]
fn uses_heap_memory(&self) -> bool {
true
}
#[tracing::instrument(skip_all, fields(id = self.conn_id))]
fn set_db(&mut self, _db: &mut libsql_sys::wal::Sqlite3Db) {}
#[tracing::instrument(skip_all, fields(id = self.conn_id))]
fn callback(&self) -> i32 {
0
}
#[tracing::instrument(skip_all, fields(id = self.conn_id))]
fn frames_in_wal(&self) -> u32 {
0
}
}