0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-05-22 21:48:12 +00:00

report durable frame_no

This commit is contained in:
ad hoc
2024-07-24 15:32:54 +02:00
parent 44507d629d
commit c953114a51
4 changed files with 75 additions and 36 deletions

@ -1,6 +1,7 @@
//! `AsyncStorage` is a `Storage` implementation that defer storage to a background thread. The
//! durable frame_no is notified asynchronously.
use std::future::Future;
use std::sync::Arc;
use chrono::Utc;
@ -8,7 +9,8 @@ use libsql_sys::name::NamespaceName;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinSet;
use crate::io::{Io, StdIO};
use crate::io::{FileExt, Io, StdIO};
use crate::segment::compacted::CompactedSegment;
use crate::segment::Segment;
use super::backend::Backend;
@ -84,8 +86,8 @@ where
}
msg = self.receiver.recv(), if !shutting_down => {
match msg {
Some(StorageLoopMessage::StoreReq(req)) => {
self.scheduler.register(req);
Some(StorageLoopMessage::StoreReq(req, ret)) => {
self.scheduler.register(req, ret);
}
Some(StorageLoopMessage::DurableFrameNoReq { namespace, ret, config_override }) => {
self.fetch_durable_frame_no_async(namespace, ret, config_override);
@ -132,7 +134,7 @@ pub struct BottomlessConfig<C> {
}
enum StorageLoopMessage<C, S> {
StoreReq(StoreSegmentRequest<C, S>),
StoreReq(StoreSegmentRequest<C, S>, oneshot::Sender<u64>),
DurableFrameNoReq {
namespace: NamespaceName,
config_override: Option<Arc<C>>,
@ -162,7 +164,7 @@ where
namespace: &NamespaceName,
segment: Self::Segment,
config_override: Option<Arc<Self::Config>>,
) {
) -> impl Future<Output = u64> + Send + Sync + 'static{
let req = StoreSegmentRequest {
namespace: namespace.clone(),
segment,
@ -170,9 +172,14 @@ where
storage_config_override: config_override,
};
let (sender, receiver) = oneshot::channel();
self.job_sender
.send(StorageLoopMessage::StoreReq(req))
.send(StorageLoopMessage::StoreReq(req, sender))
.expect("bottomless loop was closed before the handle was dropped");
async move {
receiver.await.unwrap()
}
}
async fn durable_frame_no(

@ -1,5 +1,7 @@
use std::ops::Deref;
use tokio::sync::oneshot;
use super::backend::Backend;
use super::backend::SegmentMeta;
use super::Result;
@ -12,6 +14,7 @@ use crate::segment::Segment;
pub(crate) struct IndexedRequest<C, T> {
pub(crate) request: StoreSegmentRequest<C, T>,
pub(crate) id: u64,
pub(crate) ret: oneshot::Sender<u64>,
}
impl<C, T> Deref for IndexedRequest<C, T> {
@ -484,6 +487,7 @@ mod test {
storage_config_override: None,
},
id: 0,
ret: oneshot::channel().0,
},
};

@ -1,18 +1,26 @@
use std::marker::PhantomData;
use std::collections::BTreeMap;
use std::future::Future;
use std::path::PathBuf;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use fst::Map;
use hashbrown::HashMap;
use libsql_sys::name::NamespaceName;
use parking_lot::Mutex;
use tempfile::{tempdir, TempDir};
use crate::io::FileExt;
use crate::io::{FileExt, Io, StdIO};
use crate::segment::compacted::CompactedSegment;
use crate::segment::{sealed::SealedSegment, Segment};
use self::backend::s3::SegmentKey;
pub use self::error::Error;
mod job;
pub mod async_storage;
pub mod backend;
pub(crate) mod error;
mod job;
mod scheduler;
pub type Result<T, E = self::error::Error> = std::result::Result<T, E>;
@ -27,12 +35,14 @@ pub trait Storage: Send + Sync + 'static {
type Config;
/// store the passed segment for `namespace`. This function is called in a context where
/// blocking is acceptable.
/// returns a future that resolves when the segment is stored
/// The segment should be stored whether or not the future is polled.
fn store(
&self,
namespace: &NamespaceName,
seg: Self::Segment,
config_override: Option<Arc<Self::Config>>,
);
) -> impl Future<Output = u64> + Send + Sync + 'static;
fn durable_frame_no_sync(
&self,
@ -89,7 +99,8 @@ impl Storage for NoStorage {
_namespace: &NamespaceName,
_seg: Self::Segment,
_config: Option<Arc<Self::Config>>,
) {
) -> impl Future<Output = u64> + Send + Sync + 'static{
std::future::ready(u64::MAX)
}
async fn durable_frame_no(

@ -1,7 +1,7 @@
use std::cmp::Reverse;
use std::collections::{HashMap, VecDeque};
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};
use super::job::{IndexedRequest, Job, JobResult};
use super::StoreSegmentRequest;
@ -50,14 +50,14 @@ impl<C, T> Scheduler<C, T> {
/// Register a new request with the scheduler
#[tracing::instrument(skip_all)]
pub fn register(&mut self, request: StoreSegmentRequest<C, T>) {
pub fn register(&mut self, request: StoreSegmentRequest<C, T>, ret: oneshot::Sender<u64>) {
// invariant: new segment comes immediately after the latest segment for that namespace. This means:
// - immediately after the last registered segment, if there is any
// - immediately after the last durable index
let id = self.next_request_id;
self.next_request_id += 1;
let name = request.namespace.clone();
let slot = IndexedRequest { request, id };
let slot = IndexedRequest { request, id, ret };
let requests = self.requests.entry(name.clone()).or_default();
requests.requests.push_back(slot);
@ -107,6 +107,7 @@ impl<C, T> Scheduler<C, T> {
match result.result {
Ok(durable_index) => {
tracing::debug!("job success registered");
let _ = result.job.request.ret.send(durable_index);
if self
.durable_notifier
.send((name.clone(), durable_index))
@ -165,26 +166,35 @@ mod test {
let ns1 = NamespaceName::from("test1");
let ns2 = NamespaceName::from("test2");
scheduler.register(StoreSegmentRequest {
namespace: ns1.clone(),
segment: (),
created_at: Utc::now(),
storage_config_override: None,
});
scheduler.register(
StoreSegmentRequest {
namespace: ns1.clone(),
segment: (),
created_at: Utc::now(),
storage_config_override: None,
},
oneshot::channel().0,
);
scheduler.register(StoreSegmentRequest {
namespace: ns2.clone(),
segment: (),
created_at: Utc::now(),
storage_config_override: None,
});
scheduler.register(
StoreSegmentRequest {
namespace: ns2.clone(),
segment: (),
created_at: Utc::now(),
storage_config_override: None,
},
oneshot::channel().0,
);
scheduler.register(StoreSegmentRequest {
namespace: ns1.clone(),
segment: (),
created_at: Utc::now(),
storage_config_override: None,
});
scheduler.register(
StoreSegmentRequest {
namespace: ns1.clone(),
segment: (),
created_at: Utc::now(),
storage_config_override: None,
},
oneshot::channel().0,
);
let job1 = scheduler.schedule().unwrap();
assert_eq!(job1.request.request.namespace, ns1);
@ -231,14 +241,18 @@ mod test {
segment: (),
created_at: Utc::now(),
storage_config_override: None,
});
},
oneshot::channel().0,
);
scheduler.register(StoreSegmentRequest {
namespace: ns2.clone(),
segment: (),
created_at: Utc::now(),
storage_config_override: None,
});
},
oneshot::channel().0,
);
let job1 = scheduler.schedule().unwrap();
assert_eq!(job1.request.request.namespace, ns1);
@ -269,7 +283,9 @@ mod test {
segment: (),
created_at: Utc::now(),
storage_config_override: None,
});
},
oneshot::channel().0,
);
let job = scheduler.schedule().unwrap();
assert_eq!(job.request.request.namespace, ns1);
@ -280,7 +296,8 @@ mod test {
segment: (),
created_at: Utc::now(),
storage_config_override: None,
});
}, oneshot::channel().0,
);
assert!(scheduler.schedule().is_none());