0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-05-27 21:22:20 +00:00
This commit is contained in:
ad hoc
2024-09-18 15:21:40 +02:00
parent 5617376025
commit 1df0acbb16
11 changed files with 89 additions and 48 deletions

@ -66,10 +66,7 @@ pub async fn bottomless_migrate(
}
});
let tmp_registry = Arc::new(WalRegistry::new(
NoStorage.into(),
sender,
)?);
let tmp_registry = Arc::new(WalRegistry::new(NoStorage.into(), sender)?);
let mut configurators = NamespaceConfigurators::default();

@ -3,6 +3,7 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use chrono::NaiveDateTime;
use futures::prelude::Future;
use libsql_sys::name::NamespaceResolver;
use libsql_wal::io::StdIO;
@ -267,13 +268,19 @@ impl ConfigureNamespace for LibsqlPrimaryConfigurator {
_from_config: MetaStoreHandle,
to_ns: NamespaceName,
to_config: MetaStoreHandle,
timestamp: Option<DateTime<Utc>>,
timestamp: Option<NaiveDateTime>,
store: NamespaceStore,
) -> Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> {
let registry = self.registry.clone();
let base_path = &self.base.base_path;
Box::pin(super::libsql_fork::libsql_wal_fork(
registry, base_path, from_ns, to_ns, to_config, timestamp, store,
registry,
base_path,
from_ns,
to_ns,
to_config,
timestamp.map(|ts| ts.and_utc()),
store,
))
}
}

@ -1,6 +1,7 @@
use std::path::Path;
use std::sync::Arc;
use chrono::NaiveDateTime;
use futures::prelude::Future;
use libsql_sys::name::NamespaceResolver;
use libsql_wal::io::StdIO;
@ -163,13 +164,19 @@ impl ConfigureNamespace for LibsqlSchemaConfigurator {
_from_config: MetaStoreHandle,
to_ns: NamespaceName,
to_config: MetaStoreHandle,
timestamp: Option<DateTime<Utc>>,
timestamp: Option<NaiveDateTime>,
store: NamespaceStore,
) -> std::pin::Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> {
let registry = self.registry.clone();
let base_path = &self.base.base_path;
Box::pin(super::libsql_fork::libsql_wal_fork(
registry, base_path, from_ns, to_ns, to_config, timestamp, store,
registry,
base_path,
from_ns,
to_ns,
to_config,
timestamp.map(|ts| ts.and_utc()),
store,
))
}
}

@ -20,8 +20,8 @@ use super::{
};
pub mod fork;
mod libsql_fork;
mod helpers;
mod libsql_fork;
mod libsql_primary;
mod libsql_replica;
mod libsql_schema;

@ -60,8 +60,7 @@ fn with_libsql_conn(f: impl FnOnce(&mut Connection<LibsqlWal<StdIO>>)) {
let resolver = |_: &Path| NamespaceName::from_string("test".into());
let (sender, _) = tokio::sync::mpsc::channel(12);
let registry =
Arc::new(WalRegistry::new(NoStorage.into(), sender).unwrap());
let registry = Arc::new(WalRegistry::new(NoStorage.into(), sender).unwrap());
let wal_manager = LibsqlWalManager::new(registry.clone(), Arc::new(resolver));
let mut conn = libsql_sys::Connection::open(

@ -110,12 +110,8 @@ pub mod test {
let (sender, receiver) = mpsc::channel(128);
let registry = Arc::new(
WalRegistry::new_with_io(
io.clone(),
TestStorage::new_io(store, io).into(),
sender,
)
.unwrap(),
WalRegistry::new_with_io(io.clone(), TestStorage::new_io(store, io).into(), sender)
.unwrap(),
);
if store {

@ -133,8 +133,26 @@ fn maybe_store_segment<S: Storage>(
storage.store(namespace, seg, None, cb);
} else {
// segment can be checkpointed right away.
let _ = notifier.blocking_send(CheckpointMessage::Namespace(namespace.clone()));
tracing::debug!(segment_end = seg.last_committed(), durable_frame_no = *durable_frame_no.lock(), "segment doesn't contain any new data");
// FIXME: this is only necessary because some tests call this method in an async context.
#[cfg(debug_assertions)]
{
let namespace = namespace.clone();
let notifier = notifier.clone();
tokio::spawn(async move {
let _ = notifier.send(CheckpointMessage::Namespace(namespace)).await;
});
}
#[cfg(not(debug_assertions))]
{
let _ = notifier.blocking_send(CheckpointMessage::Namespace(namespace.clone()));
}
tracing::debug!(
segment_end = seg.last_committed(),
durable_frame_no = *durable_frame_no.lock(),
"segment doesn't contain any new data"
);
}
}
@ -245,10 +263,15 @@ where
// will think that this is a wal file, but it's in fact a directory and it will not like
// it.
let mut wals_path = db_path.to_owned();
wals_path.set_file_name(format!("{}-wal", db_path.file_name().unwrap().to_str().unwrap()));
wals_path.set_file_name(format!(
"{}-wal",
db_path.file_name().unwrap().to_str().unwrap()
));
self.io.create_dir_all(&wals_path)?;
// TODO: handle that with abstract io
let dir = walkdir::WalkDir::new(&wals_path).sort_by_file_name().into_iter();
let dir = walkdir::WalkDir::new(&wals_path)
.sort_by_file_name()
.into_iter();
// we only checkpoint durable frame_no so this is a good first estimate without an actual
// network call.
@ -268,9 +291,12 @@ where
let file = self.io.open(false, true, true, entry.path())?;
if let Some(sealed) =
SealedSegment::open(file.into(), entry.path().to_path_buf(), Default::default(), self.io.now())?
{
if let Some(sealed) = SealedSegment::open(
file.into(),
entry.path().to_path_buf(),
Default::default(),
self.io.now(),
)? {
list.push(sealed.clone());
maybe_store_segment(
self.storage.as_ref(),
@ -526,9 +552,7 @@ where
return Ok(());
}
let start_frame_no = current.next_frame_no();
let path = shared
.wals_path
.join(format!("{start_frame_no:020}.seg"));
let path = shared.wals_path.join(format!("{start_frame_no:020}.seg"));
let segment_file = self.io.open(true, true, true, &path)?;
let salt = self.io.with_rng(|rng| rng.gen());

@ -10,7 +10,7 @@ use zerocopy::FromZeroes;
use crate::io::buf::ZeroCopyBoxIoBuf;
use crate::segment::Frame;
use crate::storage::backend::FindSegmentReq;
use crate::storage::{Storage};
use crate::storage::Storage;
use super::Result;

@ -4,8 +4,8 @@ use std::io::{BufWriter, ErrorKind, Write};
use std::mem::size_of;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use chrono::prelude::{DateTime, Utc};
use fst::{Map, MapBuilder, Streamer};
@ -210,7 +210,12 @@ where
}
impl<F: FileExt> SealedSegment<F> {
pub fn open(file: Arc<F>, path: PathBuf, read_locks: Arc<AtomicU64>, now: DateTime<Utc>) -> Result<Option<Self>> {
pub fn open(
file: Arc<F>,
path: PathBuf,
read_locks: Arc<AtomicU64>,
now: DateTime<Utc>,
) -> Result<Option<Self>> {
let mut header: SegmentHeader = SegmentHeader::new_zeroed();
file.read_exact_at(header.as_bytes_mut(), 0)?;
@ -246,7 +251,12 @@ impl<F: FileExt> SealedSegment<F> {
}))
}
fn recover(file: Arc<F>, path: PathBuf, mut header: SegmentHeader, now: DateTime<Utc>) -> Result<Self> {
fn recover(
file: Arc<F>,
path: PathBuf,
mut header: SegmentHeader,
now: DateTime<Utc>,
) -> Result<Self> {
assert!(!header.is_empty());
assert_eq!(header.index_size.get(), 0);
assert_eq!(header.index_offset.get(), 0);

@ -211,15 +211,13 @@ async fn flaky_fs() {
};
let (sender, _receiver) = tokio::sync::mpsc::channel(64);
let registry = Arc::new(
WalRegistry::new_with_io(
io.clone(),
TestStorage::new_io(false, io).into(),
sender,
)
.unwrap(),
WalRegistry::new_with_io(io.clone(), TestStorage::new_io(false, io).into(), sender)
.unwrap(),
);
let wal_manager = LibsqlWalManager::new(registry.clone(), Arc::new(resolver));
tokio::fs::create_dir_all(tmp.path().join("test")).await.unwrap();
tokio::fs::create_dir_all(tmp.path().join("test"))
.await
.unwrap();
let conn = libsql_sys::Connection::open(
tmp.path().join("test/data").clone(),
OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE,

@ -67,7 +67,13 @@ async fn run_test_sample(path: &Path) -> Result {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(42);
let before = std::time::Instant::now();
let sqlite_results = run_script(&sqlite_conn, &script, &mut rng, Sqlite3WalManager::default()).collect::<Vec<_>>();
let sqlite_results = run_script(
&sqlite_conn,
&script,
&mut rng,
Sqlite3WalManager::default(),
)
.collect::<Vec<_>>();
println!("ran sqlite in {:?}", before.elapsed());
drop(sqlite_conn);
@ -93,15 +99,11 @@ async fn run_test_sample(path: &Path) -> Result {
};
let (sender, _receiver) = tokio::sync::mpsc::channel(64);
let registry = Arc::new(
WalRegistry::new(
TestStorage::new().into(),
sender,
)
.unwrap(),
);
let registry = Arc::new(WalRegistry::new(TestStorage::new().into(), sender).unwrap());
let wal_manager = LibsqlWalManager::new(registry.clone(), Arc::new(resolver));
tokio::fs::create_dir_all(tmp.path().join("test")).await.unwrap();
tokio::fs::create_dir_all(tmp.path().join("test"))
.await
.unwrap();
let db_path = tmp.path().join("test/data").clone();
let libsql_conn = libsql_sys::Connection::open(
&db_path,
@ -114,7 +116,8 @@ async fn run_test_sample(path: &Path) -> Result {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(42);
let before = std::time::Instant::now();
let libsql_results = run_script(&libsql_conn, &script, &mut rng, wal_manager.clone()).collect::<Vec<_>>();
let libsql_results =
run_script(&libsql_conn, &script, &mut rng, wal_manager.clone()).collect::<Vec<_>>();
println!("ran libsql in {:?}", before.elapsed());
for ((a, _), (b, _)) in sqlite_results.iter().zip(libsql_results.iter()) {