0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-05-31 16:22:46 +00:00

Merge pull request from tursodatabase/lucio/improved-embedded-replica-return

libsql: provide more return info for sync
This commit is contained in:
Lucio Franco
2024-08-02 15:21:03 +00:00
committed by GitHub
6 changed files with 258 additions and 36 deletions
libsql-replication/src
libsql-server/tests/embedded_replica
libsql/src

@ -141,6 +141,7 @@ pub struct Replicator<C> {
client: C,
injector: Arc<Mutex<Injector>>,
state: ReplicatorState,
frames_synced: usize,
}
const INJECTOR_BUFFER_CAPACITY: usize = 10;
@ -178,6 +179,7 @@ impl<C: ReplicatorClient> Replicator<C> {
client,
injector: Arc::new(Mutex::new(injector)),
state: ReplicatorState::NeedHandshake,
frames_synced: 0,
})
}
@ -311,6 +313,8 @@ impl<C: ReplicatorClient> Replicator<C> {
}
async fn inject_frame(&mut self, frame: Frame) -> Result<(), Error> {
self.frames_synced += 1;
let injector = self.injector.clone();
match spawn_blocking(move || injector.lock().inject_frame(frame)).await? {
Ok(Some(commit_fno)) => {
@ -335,6 +339,10 @@ impl<C: ReplicatorClient> Replicator<C> {
Ok(())
}
pub fn frames_synced(&self) -> usize {
self.frames_synced
}
}
/// Helper function to convert rpc frames results to replicator frames

@ -98,7 +98,7 @@ fn embedded_replica() {
)
.await?;
let n = db.sync().await?;
let n = db.sync().await?.frame_no();
assert_eq!(n, None);
let conn = db.connect()?;
@ -106,7 +106,7 @@ fn embedded_replica() {
conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
.await?;
let n = db.sync().await?;
let n = db.sync().await?.frame_no();
assert_eq!(n, Some(1));
let err = conn
@ -171,7 +171,7 @@ fn execute_batch() {
)
.await?;
let n = db.sync().await?;
let n = db.sync().await?.frame_no();
assert_eq!(n, None);
let conn = db.connect()?;
@ -179,7 +179,7 @@ fn execute_batch() {
conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
.await?;
let n = db.sync().await?;
let n = db.sync().await?.frame_no();
assert_eq!(n, Some(1));
conn.execute_batch(
@ -224,7 +224,7 @@ fn stream() {
)
.await?;
let n = db.sync().await?;
let n = db.sync().await?.frame_no();
assert_eq!(n, None);
let conn = db.connect()?;
@ -232,7 +232,7 @@ fn stream() {
conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
.await?;
let n = db.sync().await?;
let n = db.sync().await?.frame_no();
assert_eq!(n, Some(1));
conn.execute_batch(
@ -299,7 +299,7 @@ fn embedded_replica_with_encryption() {
)
.await?;
let n = db.sync().await?;
let n = db.sync().await?.frame_no();
assert_eq!(n, None);
let conn = db.connect()?;
@ -307,7 +307,7 @@ fn embedded_replica_with_encryption() {
conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
.await?;
let n = db.sync().await?;
let n = db.sync().await?.frame_no();
assert_eq!(n, Some(1));
let err = conn
@ -461,7 +461,7 @@ fn replica_primary_reset() {
)
.await
.unwrap();
let replica_index = replica.sync().await.unwrap().unwrap();
let replica_index = replica.sync().await.unwrap().frame_no().unwrap();
let primary_index = Client::new()
.get("http://primary:9090/v1/namespaces/default/stats")
.await
@ -520,7 +520,7 @@ fn replica_primary_reset() {
)
.await
.unwrap();
let replica_index = replica.sync().await.unwrap().unwrap();
let replica_index = replica.sync().await.unwrap().frame_no().unwrap();
let primary_index = Client::new()
.get("http://primary:9090/v1/namespaces/default/stats")
.await
@ -625,7 +625,7 @@ fn replica_no_resync_on_restart() {
)
.await
.unwrap();
db.sync().await.unwrap().unwrap()
db.sync().await.unwrap().frame_no().unwrap()
};
let first_sync = before.elapsed();
@ -641,7 +641,7 @@ fn replica_no_resync_on_restart() {
)
.await
.unwrap();
db.sync().await.unwrap().unwrap()
db.sync().await.unwrap().frame_no().unwrap()
};
let second_sync = before.elapsed();
@ -725,7 +725,8 @@ fn replicate_with_snapshots() {
.await
.unwrap();
db.sync().await.unwrap();
let rep = db.sync().await.unwrap();
assert_eq!(rep.frames_synced(), 427);
let conn = db.connect().unwrap();
@ -757,7 +758,8 @@ fn replicate_with_snapshots() {
assert_eq!(stat, 427);
db.sync().await.unwrap();
let rep = db.sync().await.unwrap();
assert_eq!(rep.frames_synced(), 0);
let conn = db.connect().unwrap();
@ -1226,3 +1228,132 @@ fn txn_bug_issue_1283() {
sim.run().unwrap();
}
#[test]
fn replicated_return() {
let tmp_embedded = tempdir().unwrap();
let tmp_embedded_path = tmp_embedded.path().to_owned();
let mut sim = Builder::new()
.simulation_duration(Duration::from_secs(1000))
.build();
let tmp = tempdir().unwrap();
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
init_tracing();
sim.host("primary", move || {
let notify = notify_clone.clone();
let path = tmp.path().to_path_buf();
async move {
let make_server = || async {
TestServer {
path: path.clone().into(),
user_api_config: UserApiConfig {
..Default::default()
},
admin_api_config: Some(AdminApiConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
connector: TurmoilConnector,
disable_metrics: true,
}),
rpc_server_config: Some(RpcServerConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await.unwrap(),
tls_config: None,
}),
..Default::default()
}
};
let server = make_server().await;
let shutdown = server.shutdown.clone();
let fut = async move { server.start_sim(8080).await };
tokio::pin!(fut);
loop {
tokio::select! {
res = &mut fut => {
res.unwrap();
break
}
_ = notify.notified() => {
shutdown.notify_waiters();
},
}
}
drop(fut);
tokio::fs::File::create(path.join("dbs").join("default").join(".sentinel"))
.await
.unwrap();
notify.notify_waiters();
let server = make_server().await;
server.start_sim(8080).await.unwrap();
Ok(())
}
});
sim.client("client", async move {
let path = tmp_embedded_path.join("embedded");
let db = Database::open_with_remote_sync_connector(
path.to_str().unwrap(),
"http://primary:8080",
"",
TurmoilConnector,
false,
None,
)
.await?;
let rep = db.sync().await.unwrap();
assert_eq!(rep.frame_no(), None);
assert_eq!(rep.frames_synced(), 0);
let conn = db.connect()?;
conn.execute("CREATE TABLE user (id INTEGER)", ())
.await
.unwrap();
let rep = db.sync().await.unwrap();
assert_eq!(rep.frame_no(), Some(1));
assert_eq!(rep.frames_synced(), 2);
conn.execute_batch(
"
INSERT into user(id) values (randomblob(4096));
INSERT into user(id) values (randomblob(4096));
INSERT into user(id) values (randomblob(4096));
",
)
.await
.unwrap();
let rep = db.sync().await.unwrap();
assert_eq!(rep.frame_no(), Some(10));
assert_eq!(rep.frames_synced(), 9);
// Regenerate log
notify.notify_waiters();
notify.notified().await;
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let rep = db.sync().await.unwrap();
assert_eq!(rep.frame_no(), Some(4));
assert_eq!(rep.frames_synced(), 3);
let mut row = conn.query("select count(*) from user", ()).await.unwrap();
let count = row.next().await.unwrap().unwrap().get::<u64>(0).unwrap();
assert_eq!(count, 3);
Ok(())
});
sim.run().unwrap();
}

@ -323,7 +323,7 @@ cfg_replication! {
/// Sync database from remote, and returns the committed frame_no after syncing, if
/// applicable.
pub async fn sync(&self) -> Result<Option<FrameNo>> {
pub async fn sync(&self) -> Result<crate::replication::Replicated> {
if let DbType::Sync { db, encryption_config: _ } = &self.db_type {
db.sync().await
} else {

@ -83,7 +83,7 @@ impl Database {
encryption_config: Option<EncryptionConfig>,
sync_interval: Option<std::time::Duration>,
http_request_callback: Option<crate::util::HttpRequestCallback>,
namespace: Option<String>
namespace: Option<String>,
) -> Result<Database> {
use std::path::PathBuf;
@ -260,8 +260,8 @@ impl Database {
#[cfg(feature = "replication")]
/// Perform a sync step, returning the new replication index, or None, if the nothing was
/// replicated yet
pub async fn sync_oneshot(&self) -> Result<Option<FrameNo>> {
if let Some(ref ctx) = self.replication_ctx {
pub async fn sync_oneshot(&self) -> Result<crate::replication::Replicated> {
if let Some(ctx) = &self.replication_ctx {
ctx.replicator.sync_oneshot().await
} else {
Err(crate::errors::Error::Misuse(
@ -273,7 +273,7 @@ impl Database {
#[cfg(feature = "replication")]
/// Sync with primary
pub async fn sync(&self) -> Result<Option<FrameNo>> {
pub async fn sync(&self) -> Result<crate::replication::Replicated> {
Ok(self.sync_oneshot().await?)
}

@ -1,6 +1,7 @@
//! Utilities used when using a replicated version of libsql.
use std::path::PathBuf;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
@ -32,6 +33,28 @@ mod connection;
pub(crate) mod local_client;
pub(crate) mod remote_client;
#[derive(Debug)]
pub struct Replicated {
frame_no: Option<FrameNo>,
frames_synced: usize,
}
impl Replicated {
/// The currently synced frame number. This can be used to track
/// where in the log you might be. Beware that this value can be reset to a lower value by the
/// server in certain situations. Please use `frames_synced` if you want to track the amount of
/// work a sync has done.
pub fn frame_no(&self) -> Option<FrameNo> {
self.frame_no
}
/// The count of frames synced during this call of `sync`. A frame is a 4kB frame from the
/// libsql write ahead log.
pub fn frames_synced(&self) -> usize {
self.frames_synced
}
}
/// A set of rames to be injected via `sync_frames`.
pub enum Frames {
/// A set of frames, in increasing frame_no.
@ -75,10 +98,7 @@ impl Writer {
self.execute_steps(steps).await
}
pub(crate) async fn execute_steps(
&self,
steps: Vec<Step>,
) -> anyhow::Result<ExecuteResults> {
pub(crate) async fn execute_steps(&self, steps: Vec<Step>) -> anyhow::Result<ExecuteResults> {
self.client
.execute_program(ProgramReq {
client_id: self.client.client_id(),
@ -111,6 +131,7 @@ impl Writer {
pub(crate) struct EmbeddedReplicator {
replicator: Arc<Mutex<Replicator<Either<RemoteClient, LocalClient>>>>,
bg_abort: Option<Arc<DropAbort>>,
last_frames_synced: Arc<AtomicUsize>,
}
impl From<libsql_replication::replicator::Error> for errors::Error {
@ -140,6 +161,7 @@ impl EmbeddedReplicator {
let mut replicator = Self {
replicator,
bg_abort: None,
last_frames_synced: Arc::new(AtomicUsize::new(0)),
};
if let Some(sync_duration) = perodic_sync {
@ -183,10 +205,11 @@ impl EmbeddedReplicator {
Ok(Self {
replicator,
bg_abort: None,
last_frames_synced: Arc::new(AtomicUsize::new(0)),
})
}
pub async fn sync_oneshot(&self) -> Result<Option<FrameNo>> {
pub async fn sync_oneshot(&self) -> Result<Replicated> {
use libsql_replication::replicator::ReplicatorClient;
let mut replicator = self.replicator.lock().await;
@ -218,7 +241,10 @@ impl EmbeddedReplicator {
unreachable!()
};
let Some(primary_index) = client.last_handshake_replication_index() else {
return Ok(None);
return Ok(Replicated {
frame_no: None,
frames_synced: 0,
});
};
if let Some(replica_index) = replicator.client_mut().committed_frame_no() {
if replica_index >= primary_index {
@ -229,7 +255,20 @@ impl EmbeddedReplicator {
}
}
Ok(replicator.client_mut().committed_frame_no())
let last_frames_synced = self.last_frames_synced.fetch_add(
replicator.frames_synced(),
std::sync::atomic::Ordering::Relaxed,
);
let frames_synced =
((replicator.frames_synced() as i64 - last_frames_synced as i64).abs()) as usize;
let replicated = Replicated {
frame_no: replicator.client_mut().committed_frame_no(),
frames_synced,
};
Ok(replicated)
}
pub async fn sync_frames(&self, frames: Frames) -> Result<Option<FrameNo>> {

@ -8,7 +8,9 @@ use futures::StreamExt as _;
use libsql_replication::frame::{Frame, FrameHeader, FrameNo};
use libsql_replication::meta::WalIndexMeta;
use libsql_replication::replicator::{map_frame_err, Error, ReplicatorClient};
use libsql_replication::rpc::replication::{verify_session_token, Frames, HelloRequest, LogOffset, SESSION_TOKEN_KEY, HelloResponse};
use libsql_replication::rpc::replication::{
verify_session_token, Frames, HelloRequest, HelloResponse, LogOffset, SESSION_TOKEN_KEY,
};
use tokio_stream::Stream;
use tonic::metadata::AsciiMetadataValue;
use tonic::{Response, Status};
@ -81,7 +83,10 @@ impl RemoteClient {
self.last_handshake_replication_index
}
async fn handle_handshake_response(&mut self, hello:Result<Response<HelloResponse>, Status>) -> Result<bool, Error> {
async fn handle_handshake_response(
&mut self,
hello: Result<Response<HelloResponse>, Status>,
) -> Result<bool, Error> {
let hello = hello?.into_inner();
verify_session_token(&hello.session_token).map_err(Error::Client)?;
let new_session = self.session_token != Some(hello.session_token.clone());
@ -130,7 +135,9 @@ impl RemoteClient {
(hello_fut.await, None)
};
self.prefetched_batch_log_entries = if let Ok(true) = hello.0 {
tracing::warn!("Frames prefetching failed because of new session token returned by handshake");
tracing::warn!(
"Frames prefetching failed because of new session token returned by handshake"
);
None
} else {
frames
@ -139,7 +146,10 @@ impl RemoteClient {
hello
}
async fn handle_next_frames_response(&mut self, frames: Result<Response<Frames>, Status>) -> Result<<Self as ReplicatorClient>::FrameStream, Error> {
async fn handle_next_frames_response(
&mut self,
frames: Result<Response<Frames>, Status>,
) -> Result<<Self as ReplicatorClient>::FrameStream, Error> {
let frames = frames?.into_inner().frames;
if let Some(f) = frames.last() {
@ -157,7 +167,12 @@ impl RemoteClient {
Ok(Box::pin(stream))
}
async fn do_next_frames(&mut self) -> (Result<<Self as ReplicatorClient>::FrameStream, Error>, Duration) {
async fn do_next_frames(
&mut self,
) -> (
Result<<Self as ReplicatorClient>::FrameStream, Error>,
Duration,
) {
let (frames, time) = match self.prefetched_batch_log_entries.take() {
Some((result, time)) => (result, time),
None => {
@ -197,7 +212,13 @@ impl RemoteClient {
}
}
fn maybe_log<T>(time: Duration, sum: &mut Duration, count: &mut u128, result: &Result<T, Error>, op_name: &str) {
fn maybe_log<T>(
time: Duration,
sum: &mut Duration,
count: &mut u128,
result: &Result<T, Error>,
op_name: &str,
) {
if let Err(e) = &result {
tracing::warn!("Failed {} in {} ms: {:?}", op_name, time.as_millis(), e);
} else {
@ -206,7 +227,12 @@ fn maybe_log<T>(time: Duration, sum: &mut Duration, count: &mut u128, result: &R
let avg = (*sum).as_millis() / *count;
let time = time.as_millis();
if *count > 10 && time > 2 * avg {
tracing::warn!("Unusually long {}. Took {} ms, average {} ms", op_name, time, avg);
tracing::warn!(
"Unusually long {}. Took {} ms, average {} ms",
op_name,
time,
avg
);
}
}
}
@ -218,14 +244,26 @@ impl ReplicatorClient for RemoteClient {
/// Perform handshake with remote
async fn handshake(&mut self) -> Result<(), Error> {
let (result, time) = self.do_handshake_with_prefetch().await;
maybe_log(time, &mut self.handshake_latency_sum, &mut self.handshake_latency_count, &result, "handshake");
maybe_log(
time,
&mut self.handshake_latency_sum,
&mut self.handshake_latency_count,
&result,
"handshake",
);
result.map(|_| ())
}
/// Return a stream of frames to apply to the database
async fn next_frames(&mut self) -> Result<Self::FrameStream, Error> {
let (result, time) = self.do_next_frames().await;
maybe_log(time, &mut self.frames_latency_sum, &mut self.frames_latency_count, &result, "frames fetch");
maybe_log(
time,
&mut self.frames_latency_sum,
&mut self.frames_latency_count,
&result,
"frames fetch",
);
result
}
@ -233,7 +271,13 @@ impl ReplicatorClient for RemoteClient {
/// NeedSnapshot error
async fn snapshot(&mut self) -> Result<Self::FrameStream, Error> {
let (snapshot, time) = time(self.do_snapshot()).await;
maybe_log(time, &mut self.snapshot_latency_sum, &mut self.snapshot_latency_count, &snapshot, "snapshot fetch");
maybe_log(
time,
&mut self.snapshot_latency_sum,
&mut self.snapshot_latency_count,
&snapshot,
"snapshot fetch",
);
snapshot
}