mirror of
https://github.com/tursodatabase/libsql.git
synced 2025-05-25 19:10:39 +00:00
write queue cleanup (#1242)
This commit is contained in:
bottomless/src
libsql-server/src
@ -193,4 +193,16 @@ impl<T: Wal> WrapWal<T> for BottomlessWalWrapper {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn close<M: libsql_sys::wal::WalManager<Wal = T>>(
|
||||
&mut self,
|
||||
manager: &M,
|
||||
wrapped: &mut T,
|
||||
db: &mut libsql_sys::wal::Sqlite3Db,
|
||||
sync_flags: c_int,
|
||||
_scratch: Option<&mut [u8]>,
|
||||
) -> libsql_sys::wal::Result<()> {
|
||||
// prevent unmonitored checkpoints
|
||||
manager.close(wrapped, db, sync_flags, None)
|
||||
}
|
||||
}
|
||||
|
@ -209,6 +209,33 @@ impl ManagedConnectionWalWrapper {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn release(&self) {
|
||||
let mut current = self.manager.current.lock();
|
||||
let Some((id, started_at, _)) = current.take() else {
|
||||
unreachable!("no lock to release")
|
||||
};
|
||||
|
||||
assert_eq!(id, self.id);
|
||||
|
||||
tracing::debug!("transaction finished after {:?}", started_at.elapsed());
|
||||
let next = loop {
|
||||
match self.manager.write_queue.steal() {
|
||||
Steal::Empty => break None,
|
||||
Steal::Success(item) => break Some(item),
|
||||
Steal::Retry => (),
|
||||
}
|
||||
};
|
||||
|
||||
if let Some((id, unparker)) = next {
|
||||
tracing::debug!(line = line!(), "unparking id={id}");
|
||||
*current = Some((id, Instant::now(), false));
|
||||
unparker.unpark()
|
||||
} else {
|
||||
*current = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl WrapWal<Sqlite3Wal> for ManagedConnectionWalWrapper {
|
||||
@ -227,8 +254,8 @@ impl WrapWal<Sqlite3Wal> for ManagedConnectionWalWrapper {
|
||||
Err(e) => {
|
||||
if !matches!(e.code, ErrorCode::DatabaseBusy) {
|
||||
// this is not a retriable error
|
||||
tracing::debug!("error acquiring lock, dropping slot: {e}");
|
||||
self.manager.current.lock().take();
|
||||
tracing::debug!("error acquiring lock, releasing: {e}");
|
||||
self.release();
|
||||
} else {
|
||||
tracing::debug!("error acquiring lock: {e}");
|
||||
}
|
||||
@ -270,26 +297,10 @@ impl WrapWal<Sqlite3Wal> for ManagedConnectionWalWrapper {
|
||||
backfilled,
|
||||
);
|
||||
|
||||
let mut current = self.manager.current.lock();
|
||||
let next = loop {
|
||||
match self.manager.write_queue.steal() {
|
||||
Steal::Empty => break None,
|
||||
Steal::Success(item) => break Some(item),
|
||||
Steal::Retry => (),
|
||||
}
|
||||
};
|
||||
|
||||
if let Some((id, unparker)) = next {
|
||||
tracing::debug!(line = line!(), "unparking id={id}");
|
||||
*current = Some((id, Instant::now(), false));
|
||||
unparker.unpark()
|
||||
} else {
|
||||
*current = None;
|
||||
}
|
||||
self.release();
|
||||
|
||||
tracing::debug!("checkpoint called: {:?}", before.elapsed());
|
||||
ret
|
||||
// Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(id = self.id))]
|
||||
@ -302,24 +313,12 @@ impl WrapWal<Sqlite3Wal> for ManagedConnectionWalWrapper {
|
||||
fn end_read_txn(&mut self, wrapped: &mut Sqlite3Wal) {
|
||||
wrapped.end_read_txn();
|
||||
{
|
||||
let mut current = self.manager.current.lock();
|
||||
let current = self.manager.current.lock();
|
||||
if let Some((id, _, true)) = *current {
|
||||
// releasing read transaction releases the write lock (see wal.c)
|
||||
if id == self.id {
|
||||
let next = loop {
|
||||
match self.manager.write_queue.steal() {
|
||||
Steal::Empty => break None,
|
||||
Steal::Success(item) => break Some(item),
|
||||
Steal::Retry => (),
|
||||
}
|
||||
};
|
||||
|
||||
if let Some((id, unparker)) = next {
|
||||
tracing::debug!(line = line!(), "unparking id={id}");
|
||||
*current = Some((id, Instant::now(), false));
|
||||
unparker.unpark()
|
||||
} else {
|
||||
*current = None;
|
||||
}
|
||||
drop(current);
|
||||
self.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -330,27 +329,7 @@ impl WrapWal<Sqlite3Wal> for ManagedConnectionWalWrapper {
|
||||
fn end_write_txn(&mut self, wrapped: &mut Sqlite3Wal) -> libsql_sys::wal::Result<()> {
|
||||
wrapped.end_write_txn()?;
|
||||
tracing::debug!("end write txn");
|
||||
let mut current = self.manager.current.lock();
|
||||
let before = Instant::now();
|
||||
let Some((id, started_at, true)) = current.take() else {
|
||||
unreachable!()
|
||||
};
|
||||
tracing::debug!("lock acquired in {:?}", before.elapsed());
|
||||
tracing::debug!("transaction finished after {:?}", started_at.elapsed());
|
||||
assert_eq!(id, self.id, "multiple write transaction at the same time??");
|
||||
let next = loop {
|
||||
match self.manager.write_queue.steal() {
|
||||
Steal::Empty => break None,
|
||||
Steal::Success(item) => break Some(item),
|
||||
Steal::Retry => (),
|
||||
}
|
||||
};
|
||||
|
||||
if let Some((id, unparker)) = next {
|
||||
tracing::debug!(line = line!(), "unparking id={id}");
|
||||
*current = Some((id, Instant::now(), false));
|
||||
unparker.unpark()
|
||||
}
|
||||
self.release();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -368,24 +347,12 @@ impl WrapWal<Sqlite3Wal> for ManagedConnectionWalWrapper {
|
||||
let ret = manager.close(wrapped, db, sync_flags, scratch);
|
||||
{
|
||||
tracing::debug!(line = line!(), "unparked");
|
||||
let mut current = self.manager.current.lock();
|
||||
let current = self.manager.current.lock();
|
||||
if let Some((id, _, _)) = *current {
|
||||
if id == self.id {
|
||||
let next = loop {
|
||||
match self.manager.write_queue.steal() {
|
||||
Steal::Empty => break None,
|
||||
Steal::Success(item) => break Some(item),
|
||||
Steal::Retry => (),
|
||||
}
|
||||
};
|
||||
|
||||
if let Some((id, unparker)) = next {
|
||||
tracing::debug!(line = line!(), "unparking id={id}");
|
||||
*current = Some((id, Instant::now(), false));
|
||||
unparker.unpark()
|
||||
} else {
|
||||
*current = None;
|
||||
}
|
||||
tracing::error!("connection closed without releasing lock");
|
||||
drop(current);
|
||||
self.release()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -10,9 +10,6 @@ use crate::replication::ReplicationLogger;
|
||||
|
||||
pub type ReplicationWalWrapper =
|
||||
Then<ReplicationLoggerWalWrapper, Option<BottomlessWalWrapper>, ManagedConnectionWal>;
|
||||
// pub type ReplicationWalManager =
|
||||
// WalWrapper<Option<BottomlessWalWrapper>, ReplicationLoggerWalWrapper>;
|
||||
// pub type ReplicationWal = WrappedWal<Option<BottomlessWalWrapper>, ReplicationLoggerWal>;
|
||||
|
||||
pub fn make_replication_wal_wrapper(
|
||||
bottomless: Option<Replicator>,
|
||||
|
Reference in New Issue
Block a user