0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-05-23 21:16:07 +00:00

optimizations

This commit is contained in:
ad hoc
2024-04-08 11:44:40 +02:00
parent 9471fb1405
commit 10f0618e30
7 changed files with 168 additions and 74 deletions

@ -54,6 +54,7 @@ impl FileExt for File {
}
}
#[derive(Debug)]
pub struct Cursor<'a, T> {
file: &'a T,
offset: u64,

@ -1,5 +1,5 @@
use std::fs::File;
use std::io::{IoSlice, Write};
use std::io::{IoSlice, Write, BufWriter};
use std::mem::size_of;
use std::path::{Path, PathBuf};
use std::sync::Arc;
@ -339,7 +339,9 @@ impl Log {
let index_offset = header.last_commited_frame_no.get() - header.start_frame_no.get();
let index_byte_offset = byte_offset(index_offset as u32);
let mut cursor = self.file.cursor(index_byte_offset);
self.index.merge_all(&mut cursor);
let mut writer = BufWriter::new(&mut cursor);
self.index.merge_all(&mut writer);
writer.into_inner().unwrap();
header.index_offset = index_byte_offset.into();
header.index_size = cursor.count().into();
self.file.write_all_at(header.as_bytes(), 0).unwrap();

@ -1,3 +1,4 @@
use std::ffi::{c_void, c_int};
use std::{path::Path, time::Instant};
use std::sync::Arc;
@ -25,9 +26,9 @@ fn enable_libsql_logging() {
fn main() {
tracing_subscriber::registry()
// .with(fmt::layer())
.with(fmt::layer()
.with_span_events(FmtSpan::CLOSE))
.with(fmt::layer())
// .with(fmt::layer()
// .with_span_events(FmtSpan::CLOSE))
.with(EnvFilter::from_default_env())
.init();
@ -59,7 +60,14 @@ fn main() {
let span = tracing::span!(Level::TRACE, "conn", w);
let _enter = span.enter();
let mut conn = libsql_sys::Connection::open(db_path, OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE, wal_manager, 100000, None).unwrap();
for _i in 0..10_000 {
unsafe {
extern "C" fn do_nothing_handler(_: *mut c_void, _: c_int) -> c_int {
1
}
libsql_sys::ffi::sqlite3_busy_handler(conn.handle(), Some(do_nothing_handler), std::ptr::null_mut());
}
for _i in 0..1000 {
let before = Instant::now();
let tx = conn.transaction().unwrap();
tx.execute("REPLACE INTO t1 VALUES(abs(random() % 5000000), randomblob(16), randomblob(16), randomblob(400));", ()).unwrap();

@ -4,8 +4,8 @@ use std::fs::OpenOptions;
use std::sync::Arc;
use std::path::{PathBuf, Path};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Instant;
use crossbeam::deque::Injector;
use hashbrown::HashMap;
use libsql_sys::ffi::Sqlite3DbHeader;
use parking_lot::RwLock;
@ -83,10 +83,8 @@ impl WalRegistry {
let shared = Arc::new(SharedWal {
current,
segments: RwLock::new(segments),
tx_id: Default::default(),
next_tx_id: Default::default(),
wal_lock: Default::default(),
db_file,
waiters: Arc::new(Injector::new()),
registry: self.clone(),
namespace: namespace.clone(),
});
@ -100,10 +98,12 @@ impl WalRegistry {
#[tracing::instrument(skip_all)]
pub fn swap_current(&self, shared: &SharedWal, tx: &WriteTransaction) {
let before = Instant::now();
assert!(tx.is_commited());
// at this point we must hold a lock to a commited transation.
// First, we'll acquire the lock to the current transaction to make sure no one steals it from us:
let lock = shared.tx_id.lock();
let lock = shared.wal_lock.tx_id.lock();
println!("lock_acquired: {}", before.elapsed().as_micros());
// Make sure that we still own the transaction:
if lock.is_none() || lock.unwrap() != tx.id {
return
@ -114,14 +114,19 @@ impl WalRegistry {
let start_frame_no = current.last_commited() + 1;
let path = self.path.join(shared.namespace.as_str()).join(format!("{}:{start_frame_no:020}.log", shared.namespace));
let log = Log::create(&path, start_frame_no, current.db_size());
println!("log_created: {}", before.elapsed().as_micros());
// seal the old log and add it to the list
let sealed = current.seal();
println!("log_sealed: {}", before.elapsed().as_micros());
{
// this lock is too long to acquire. use a ring buffer
shared.segments.write().push_back(sealed);
println!("segment_written: {}", before.elapsed().as_micros());
}
// place the new log
shared.current.swap(Arc::new(log));
println!("log_swapped: {}", before.elapsed().as_micros());
tracing::debug!("current log swapped");
}
@ -129,7 +134,7 @@ impl WalRegistry {
self.shutdown.store(true, Ordering::SeqCst);
let mut openned = self.openned.write();
for (_, shared) in openned.drain() {
let mut tx = Transaction::Read(shared.begin_read());
let mut tx = Transaction::Read(shared.begin_read(u64::MAX));
shared.upgrade(&mut tx).unwrap();
tx.commit();
self.swap_current(&shared, &mut tx.as_write_mut().unwrap());

@ -11,7 +11,7 @@ use fst::Streamer;
use fst::map::OpBuilder;
use libsql_sys::ffi::Sqlite3DbHeader;
use libsql_sys::wal::PageHeaders;
use parking_lot::{Mutex, RwLock};
use parking_lot::{Mutex, RwLock, MutexGuard};
use zerocopy::FromBytes;
use crate::error::Error;
@ -23,14 +23,20 @@ use crate::registry::WalRegistry;
use crate::transaction::Transaction;
use crate::transaction::{ReadTransaction, Savepoint, WriteTransaction};
#[derive(Default)]
pub struct WalLock {
pub tx_id: Mutex<Option<u64>>,
pub reserved: Mutex<Option<u64>>,
pub next_tx_id: AtomicU64,
pub waiters: Injector<(Unparker, u64)>,
}
pub struct SharedWal {
pub current: ArcSwap<Log>,
pub segments: RwLock<VecDeque<SealedLog>>,
pub wal_lock: Arc<WalLock>,
/// Current transaction id
pub tx_id: Arc<Mutex<Option<u64>>>,
pub next_tx_id: AtomicU64,
pub db_file: File,
pub waiters: Arc<Injector<Unparker>>,
pub namespace: NamespaceName,
pub registry: Arc<WalRegistry>,
}
@ -41,7 +47,7 @@ impl SharedWal {
}
#[tracing::instrument(skip_all)]
pub fn begin_read(&self) -> ReadTransaction {
pub fn begin_read(&self, conn_id: u64) -> ReadTransaction {
// FIXME: this is not enough to just increment the counter, we must make sure that the log
// is not sealed. If the log is sealed, retry with the current log
loop {
@ -57,66 +63,103 @@ impl SharedWal {
max_frame_no,
log: current.clone(),
db_size,
created_at: Instant::now()
created_at: Instant::now(),
conn_id,
pages_read: 0,
};
}
}
/// Upgrade a read transaction to a write transaction
pub fn upgrade(&self, tx: &mut Transaction) -> Result<(), Error> {
match tx {
Transaction::Write(_) => todo!("already in a write transaction"),
Transaction::Read(read_tx) => {
loop {
let mut lock = self.tx_id.lock();
let before = Instant::now();
loop {
match tx {
Transaction::Write(_) => todo!("already in a write transaction"),
Transaction::Read(read_tx) => {
{
let mut reserved = self.wal_lock.reserved.lock();
match *reserved {
// we have already reserved the slot, go ahead and try to acquire
Some(id) if id == read_tx.conn_id => {
tracing::trace!("taking reserved slot");
reserved.take();
let lock = self.wal_lock.tx_id.lock();
let write_tx = self.acquire_write(read_tx, lock, reserved)?;
*tx = Transaction::Write(write_tx);
println!("upgraded: {}", before.elapsed().as_micros());
return Ok(())
}
_ => (),
}
}
let lock = self.wal_lock.tx_id.lock();
match *lock {
Some(id) => {
// FIXME this is not ver fair, always enqueue to the queue before acquiring
// lock
None if self.wal_lock.waiters.is_empty() => {
let write_tx = self.acquire_write(read_tx, lock, self.wal_lock.reserved.lock())?;
*tx = Transaction::Write(write_tx);
println!("upgraded: {}", before.elapsed().as_micros());
return Ok(())
}
Some(_) | None => {
tracing::trace!(
"txn currently held by {id}, registering to wait queue"
"txn currently held by another connection, registering to wait queue"
);
let parker = crossbeam::sync::Parker::new();
let unpaker = parker.unparker().clone();
self.waiters.push(unpaker);
self.wal_lock.waiters.push((unpaker, read_tx.conn_id));
drop(lock);
parker.park();
}
None => {
let id = self.next_tx_id.fetch_add(1, Ordering::Relaxed);
// we read two fields in the header. There is no risk that a transaction commit in
// between the two reads because this would require that:
// 1) there would be a running txn
// 2) that transaction held the lock to tx_id (be in a transaction critical section)
let current = self.current.load();
let last_commited = current.last_commited();
if read_tx.max_frame_no != last_commited {
return Err(Error::BusySnapshot);
}
let next_offset = current.frames_in_log() as u32;
*lock = Some(id);
*tx = Transaction::Write(WriteTransaction {
id,
lock: self.tx_id.clone(),
savepoints: vec![Savepoint {
next_offset,
next_frame_no: last_commited + 1,
index: None,
}],
next_frame_no: last_commited + 1,
next_offset,
is_commited: false,
read_tx: read_tx.clone(),
waiters: self.waiters.clone(),
});
return Ok(());
}
}
}
}
}
}
pub fn read_frame(&self, tx: &Transaction, page_no: u32, buffer: &mut [u8]) {
fn acquire_write(
&self,
read_tx: &ReadTransaction,
mut tx_id_lock: MutexGuard<Option<u64>>,
mut reserved: MutexGuard<Option<u64>>,
) -> Result<WriteTransaction, Error> {
let id = self.wal_lock.next_tx_id.fetch_add(1, Ordering::Relaxed);
// we read two fields in the header. There is no risk that a transaction commit in
// between the two reads because this would require that:
// 1) there would be a running txn
// 2) that transaction held the lock to tx_id (be in a transaction critical section)
let current = self.current.load();
let last_commited = current.last_commited();
if read_tx.max_frame_no != last_commited {
if read_tx.pages_read <= 1 {
// this transaction hasn't read anything yet, it will retry to
// acquire the lock, reserved the slot so that it can make
// progress quickly
tracing::debug!("reserving tx slot");
reserved.replace(read_tx.conn_id);
}
return Err(Error::BusySnapshot);
}
let next_offset = current.frames_in_log() as u32;
*tx_id_lock = Some(id);
Ok(WriteTransaction {
id,
wal_lock: self.wal_lock.clone(),
savepoints: vec![Savepoint {
next_offset,
next_frame_no: last_commited + 1,
index: None,
}],
next_frame_no: last_commited + 1,
next_offset,
is_commited: false,
read_tx: read_tx.clone(),
})
}
pub fn read_frame(&self, tx: &mut Transaction, page_no: u32, buffer: &mut [u8]) {
match tx.log.find_frame(page_no, tx) {
Some((_, offset)) => tx.log.read_page_offset(offset, buffer),
None => {
@ -130,6 +173,9 @@ impl SharedWal {
}
}
tx.pages_read += 1;
// TODO: debug
if page_no == 1 {
let header = Sqlite3DbHeader::read_from_prefix(&buffer).unwrap();
tracing::info!(db_size = header.db_size.get(), "read page 1");
@ -163,14 +209,19 @@ impl SharedWal {
pages: &mut PageHeaders,
size_after: u32,
) {
let before = Instant::now();
let current = self.current.load();
current.insert_pages(pages.iter(), (size_after != 0).then_some(size_after), tx);
println!("before_swap: {}", before.elapsed().as_micros());
// TODO: use config for max log size
if tx.is_commited() && current.len() > 1000 {
self.registry.swap_current(self, tx);
}
println!("inserted: {}", before.elapsed().as_micros());
// TODO: remove, stupid strategy for tests
// ok, we still hold a write txn
if self.segments.read().len() > 10 {

@ -1,14 +1,12 @@
use std::ops::Deref;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, atomic::Ordering};
use std::time::Instant;
use crossbeam::deque::Injector;
use crossbeam::sync::Unparker;
use fst::Streamer;
use fst::map::{Map, OpBuilder};
use parking_lot::Mutex;
use crate::log::{Log, index_entry_split};
use crate::shared_wal::WalLock;
pub enum Transaction {
Write(WriteTransaction),
@ -45,6 +43,15 @@ impl Deref for Transaction {
}
}
impl DerefMut for Transaction {
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
Transaction::Write(ref mut tx) => tx,
Transaction::Read(ref mut tx) => tx,
}
}
}
pub struct ReadTransaction {
/// Max frame number that this transaction can read
pub max_frame_no: u64,
@ -52,12 +59,16 @@ pub struct ReadTransaction {
/// The log to which we have a read lock
pub log: Arc<Log>,
pub created_at: Instant,
pub conn_id: u64,
/// number of pages read by this transaction. This is used to determine whether a write lock
/// will be re-acquired.
pub pages_read: usize,
}
impl Clone for ReadTransaction {
fn clone(&self) -> Self {
self.log.read_locks.fetch_add(1, Ordering::SeqCst);
Self { max_frame_no: self.max_frame_no, log: self.log.clone(), db_size: self.db_size, created_at: self.created_at }
Self { max_frame_no: self.max_frame_no, log: self.log.clone(), db_size: self.db_size, created_at: self.created_at, conn_id: self.conn_id, pages_read: self.pages_read }
}
}
@ -77,8 +88,7 @@ pub struct Savepoint {
pub struct WriteTransaction {
pub id: u64,
/// id of the transaction currently holding the lock
pub lock: Arc<Mutex<Option<u64>>>,
pub waiters: Arc<Injector<Unparker>>,
pub wal_lock: Arc<WalLock>,
pub savepoints: Vec<Savepoint>,
pub next_frame_no: u64,
pub next_offset: u32,
@ -94,8 +104,8 @@ impl WriteTransaction {
todo!("txn has already been commited");
}
let lock = self.lock.clone();
let g = lock.lock();
let wal_lock = self.wal_lock.clone();
let g = wal_lock.tx_id.lock();
match *g {
// we still hold the lock, we can proceed
Some(id) if self.id == id => {
@ -141,8 +151,9 @@ impl WriteTransaction {
#[tracing::instrument(skip(self))]
pub fn downgrade(self) -> ReadTransaction {
let Self { id, lock, read_tx, .. } = self;
let mut lock = lock.lock();
tracing::trace!("downgrading write transaction");
let Self { id, wal_lock, read_tx, .. } = self;
let mut lock = wal_lock.tx_id.lock();
match *lock {
Some(lock_id) if lock_id == id => {
lock.take();
@ -150,10 +161,20 @@ impl WriteTransaction {
_ => (),
}
if let Some(id) = *wal_lock.reserved.lock() {
tracing::trace!("tx already reserved by {id}");
return read_tx;
}
loop {
match self.waiters.steal() {
crossbeam::deque::Steal::Empty => break,
crossbeam::deque::Steal::Success(unparker) => {
match wal_lock.waiters.steal() {
crossbeam::deque::Steal::Empty => {
tracing::trace!("no connection waiting");
break
},
crossbeam::deque::Steal::Success((unparker, id)) => {
tracing::trace!("waking up {id}");
wal_lock.reserved.lock().replace(id);
unparker.unpark();
break
},
@ -189,3 +210,9 @@ impl Deref for WriteTransaction {
&self.read_tx
}
}
impl DerefMut for WriteTransaction {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.read_tx
}
}

@ -92,7 +92,7 @@ impl Wal for LibsqlWal {
#[tracing::instrument(skip_all, fields(id = self.conn_id, ns = self.namespace.as_str()))]
fn begin_read_txn(&mut self) -> libsql_sys::wal::Result<bool> {
tracing::trace!("begin read");
let tx = self.shared.begin_read();
let tx = self.shared.begin_read(self.conn_id);
let invalidate_cache = self
.last_read_frame_no
.map(|idx| tx.max_frame_no != idx)
@ -134,7 +134,7 @@ impl Wal for LibsqlWal {
buffer: &mut [u8],
) -> libsql_sys::wal::Result<()> {
tracing::trace!(page_no, "reading frame");
let tx = self.tx.as_ref().unwrap();
let tx = self.tx.as_mut().unwrap();
self.shared.read_frame(tx, page_no.get(), buffer);
Ok(())
}