0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-08-13 05:32:56 +00:00

Abstract Replicator

This commit is contained in:
ad hoc
2023-10-20 15:48:59 +02:00
parent f493f3f054
commit ade217c5eb
16 changed files with 328 additions and 18 deletions

3
Cargo.lock generated
View File

@@ -2284,6 +2284,7 @@ name = "libsql-replication"
version = "0.1.0"
dependencies = [
"arbitrary",
"async-trait",
"bytemuck",
"bytes",
"parking_lot",
@@ -2293,6 +2294,8 @@ dependencies = [
"serde",
"sqld-libsql-bindings",
"thiserror",
"tokio",
"tokio-stream",
"tonic",
"tonic-build",
"tracing",

View File

@@ -15,6 +15,9 @@ bytes = { version = "1.5.0", features = ["serde"] }
serde = { version = "1.0.189", features = ["derive"] }
thiserror = "1.0.49"
tracing = "0.1.40"
tokio = { version = "1.33.0", features = ["full"] }
tokio-stream = "0.1.14"
async-trait = "0.1.74"
[dev-dependencies]
arbitrary = { version = "1.3.0", features = ["derive_arbitrary"] }

View File

@@ -1,5 +1,5 @@
#[derive(Debug, thiserror::Error)]
pub enum Error {
pub enum Error {
#[error("Invalid frame length")]
InvalidFrameLen,
}

View File

@@ -6,8 +6,8 @@ use std::ops::{Deref, DerefMut};
use bytemuck::{bytes_of, from_bytes, Pod, Zeroable};
use bytes::Bytes;
use crate::LIBSQL_PAGE_SIZE;
use crate::error::Error;
use crate::LIBSQL_PAGE_SIZE;
pub type FrameNo = u64;
@@ -62,8 +62,7 @@ impl TryFrom<&[u8]> for FrameMut {
fn try_from(data: &[u8]) -> Result<Self, Self::Error> {
if data.len() != size_of::<FrameBorrowed>() {
return Err(Error::InvalidFrameLen)
return Err(Error::InvalidFrameLen);
}
// frames are relatively large (~4ko), we want to avoid allocating them on the stack and
// then copying them to the heap, and instead copy them to the heap directly.

View File

@@ -1,8 +1,8 @@
use std::ffi::{c_int, CStr};
use sqld_libsql_bindings::rusqlite::ffi::{libsql_wal as Wal, PgHdr};
use sqld_libsql_bindings::ffi::types::XWalFrameFn;
use sqld_libsql_bindings::init_static_wal_method;
use sqld_libsql_bindings::rusqlite::ffi::{libsql_wal as Wal, PgHdr};
use sqld_libsql_bindings::wal_hook::WalHook;
use crate::frame::FrameBorrowed;
@@ -64,7 +64,7 @@ impl InjectorHookCtx {
Ok(())
} else {
tracing::error!("fatal replication error: failed to apply pages");
return Err(())
return Err(());
}
}
}

View File

@@ -3,19 +3,20 @@ use std::path::Path;
use std::sync::Arc;
use parking_lot::Mutex;
use sqld_libsql_bindings::rusqlite::{OpenFlags, self};
use sqld_libsql_bindings::rusqlite::{self, OpenFlags};
use sqld_libsql_bindings::wal_hook::TRANSPARENT_METHODS;
use crate::frame::{Frame, FrameNo};
use hook::{
InjectorHookCtx, INJECTOR_METHODS, LIBSQL_INJECT_FATAL, LIBSQL_INJECT_OK, LIBSQL_INJECT_OK_TXN, InjectorHook
};
pub use error::Error;
use hook::{
InjectorHook, InjectorHookCtx, INJECTOR_METHODS, LIBSQL_INJECT_FATAL, LIBSQL_INJECT_OK,
LIBSQL_INJECT_OK_TXN,
};
mod error;
mod headers;
mod hook;
mod error;
#[derive(Debug)]
pub enum InjectError {}

View File

@@ -1,5 +1,6 @@
pub mod frame;
pub mod injector;
pub mod replicator;
pub mod rpc;
mod error;

View File

@@ -0,0 +1,158 @@
use std::path::PathBuf;
use std::sync::Arc;
use parking_lot::Mutex;
use tokio::task::spawn_blocking;
use tokio::time::Duration;
use tokio_stream::{Stream, StreamExt};
use crate::frame::{Frame, FrameNo};
use crate::injector::Injector;
const HANDSHAKE_MAX_RETRIES: usize = 100;
type BoxError = Box<dyn std::error::Error + Sync + Send + 'static>;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Internal error: {0}")]
Internal(BoxError),
#[error("Injector error: {0}")]
Injector(#[from] crate::injector::Error),
#[error("Replicator client error: {0}")]
Client(BoxError),
#[error("Fatal replicator error: {0}")]
Fatal(BoxError),
#[error("Timeout performing handshake with primary")]
PrimaryHandshakeTimeout,
#[error("Replicator needs to load from snapshot")]
NeedSnapshot,
}
impl From<tokio::task::JoinError> for Error {
fn from(value: tokio::task::JoinError) -> Self {
Self::Internal(value.into())
}
}
#[async_trait::async_trait]
pub trait ReplicatorClient {
type FrameStream: Stream<Item = Result<Frame, Error>> + Unpin;
/// Perform handshake with remote
async fn handshake(&mut self) -> Result<(), Error>;
/// Return a stream of frames to apply to the database
async fn next_frames(&mut self) -> Result<Self::FrameStream, Error>;
/// Return a snapshot for the current replication index. Called after next_frame has returned a
/// NeedSnapshot error
async fn snapshot(&mut self) -> Result<Self::FrameStream, Error>;
/// set the new commit frame_no
async fn commit_frame_no(&mut self, frame_no: FrameNo) -> Result<(), Error>;
}
/// The `Replicator`'s duty is to download frames from the primary, and pass them to the injector at
/// transaction boundaries.
pub struct Replicator<C> {
client: C,
injector: Arc<Mutex<Injector>>,
has_handshake: bool,
}
const INJECTOR_BUFFER_CAPACITY: usize = 10;
impl<C: ReplicatorClient> Replicator<C> {
pub async fn new(client: C, db_path: PathBuf, auto_checkpoint: u32) -> Result<Self, Error> {
let injector = {
let db_path = db_path.clone();
spawn_blocking(move || {
Injector::new(&db_path, INJECTOR_BUFFER_CAPACITY, auto_checkpoint)
})
.await??
};
Ok(Self {
client,
injector: Arc::new(Mutex::new(injector)),
has_handshake: false,
})
}
pub async fn run(&mut self) -> Result<(), Error> {
loop {
if let Err(e) = self.replicate().await {
// Replication encountered an error. We log the error, and then shut down the
// injector and propagate a potential panic from there.
self.has_handshake = false;
return Err(e);
}
}
}
async fn try_perform_handshake(&mut self) -> Result<(), Error> {
let mut error_printed = false;
for _ in 0..HANDSHAKE_MAX_RETRIES {
tracing::info!("Attempting to perform handshake with primary.");
match self.client.handshake().await {
Ok(_) => {
self.has_handshake = true;
return Ok(());
}
Err(e @ Error::Fatal(_)) => return Err(e),
Err(e) if !error_printed => {
tracing::error!("error connecting to primary. retrying. error: {e}");
error_printed = true;
}
_ => (),
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
Err(Error::PrimaryHandshakeTimeout)
}
async fn replicate(&mut self) -> Result<(), Error> {
if !self.has_handshake {
self.try_perform_handshake().await?;
}
let mut stream = self.client.next_frames().await?;
loop {
match stream.next().await {
Some(Ok(frame)) => {
self.inject_frame(frame).await?;
}
Some(Err(Error::NeedSnapshot)) => {
tracing::debug!("loading snapshot");
// remove any outstanding frames in the buffer that are not part of a
// transaction: they are now part of the snapshot.
self.load_snapshot().await?;
}
Some(Err(e)) => return Err(e.into()),
None => return Ok(()),
}
}
}
async fn load_snapshot(&mut self) -> Result<(), Error> {
self.injector.lock().clear_buffer();
let mut stream = self.client.snapshot().await?;
while let Some(frame) = stream.next().await {
let frame = frame?;
self.inject_frame(frame).await?;
}
Ok(())
}
async fn inject_frame(&mut self, frame: Frame) -> Result<(), Error> {
let injector = self.injector.clone();
match spawn_blocking(move || injector.lock().inject_frame(frame)).await? {
Ok(Some(commit_fno)) => {
self.client.commit_frame_no(commit_fno).await?;
}
Ok(None) => (),
Err(e) => Err(e)?,
}
Ok(())
}
}

View File

@@ -4,7 +4,7 @@ use std::{pin::Pin, task::Context};
use futures::future::BoxFuture;
use futures::{FutureExt, Stream};
use libsql_replication::frame::{FrameNo, Frame};
use libsql_replication::frame::{Frame, FrameNo};
use crate::replication::{LogReadError, ReplicationLogger};
use crate::BLOCKING_RT;

View File

@@ -9,7 +9,7 @@ use std::sync::Arc;
use anyhow::{bail, ensure};
use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable};
use bytes::{Bytes, BytesMut};
use libsql_replication::frame::{FrameHeader, Frame, FrameMut};
use libsql_replication::frame::{Frame, FrameHeader, FrameMut};
use parking_lot::RwLock;
use rusqlite::ffi::SQLITE_BUSY;
use sqld_libsql_bindings::init_static_wal_method;

View File

@@ -0,0 +1,129 @@
use std::pin::Pin;
use libsql_replication::frame::Frame;
use libsql_replication::replicator::{Error, ReplicatorClient};
use libsql_replication::rpc::replication::replication_log_client::ReplicationLogClient;
use libsql_replication::rpc::replication::{HelloRequest, LogOffset, Frame as RpcFrame};
use tokio::sync::watch;
use tokio_stream::{Stream, StreamExt};
use tonic::metadata::BinaryMetadataValue;
use tonic::transport::Channel;
use tonic::{Code, Request};
use crate::namespace::NamespaceName;
use crate::replication::FrameNo;
use crate::rpc::replication_log::NEED_SNAPSHOT_ERROR_MSG;
use crate::rpc::{NAMESPACE_DOESNT_EXIST, NAMESPACE_METADATA_KEY};
use super::error::ReplicationError;
use super::meta::WalIndexMeta;
pub struct Client {
client: ReplicationLogClient<Channel>,
meta: WalIndexMeta,
current_frame_no_notifier: watch::Sender<Option<FrameNo>>,
namespace: NamespaceName,
}
impl From<ReplicationError> for Error {
fn from(error: ReplicationError) -> Self {
match error {
ReplicationError::LogIncompatible
| ReplicationError::NamespaceDoesntExist(_)
| ReplicationError::FailedToCommit(_) => Error::Fatal(error.into()),
_ => Error::Client(error.into()),
}
}
}
impl Client {
fn make_request<T>(&self, msg: T) -> Request<T> {
let mut req = Request::new(msg);
req.metadata_mut().insert_bin(
NAMESPACE_METADATA_KEY,
BinaryMetadataValue::from_bytes(self.namespace.as_slice()),
);
req
}
fn next_frame_no(&self) -> FrameNo {
match *self.current_frame_no_notifier.borrow() {
Some(fno) => fno + 1,
None => 0,
}
}
}
#[async_trait::async_trait]
impl ReplicatorClient for Client {
type FrameStream = Pin<Box<dyn Stream<Item = Result<Frame, Error>>>>;
async fn handshake(&mut self) -> Result<(), Error> {
tracing::info!("Attempting to perform handshake with primary.");
let req = self.make_request(HelloRequest {});
match self.client.hello(req).await {
Ok(resp) => {
let hello = resp.into_inner();
self.meta.merge_hello(hello)?;
self.current_frame_no_notifier
.send_replace(self.meta.current_frame_no());
Ok(())
}
Err(e)
if e.code() == Code::FailedPrecondition
&& e.message() == NAMESPACE_DOESNT_EXIST =>
{
Err(ReplicationError::NamespaceDoesntExist(
self.namespace.clone(),
))?
}
Err(e) => Err(ReplicationError::Other(e.into()))?,
}
}
async fn next_frames(&mut self) -> Result<Self::FrameStream, Error> {
let offset = LogOffset {
next_offset: self.next_frame_no(),
};
let req = self.make_request(offset);
let stream = self.client
.log_entries(req)
.await
.map_err(ReplicationError::Rpc)?
.into_inner()
.map(map_frame_err);
Ok(Box::pin(stream))
}
async fn snapshot(&mut self) -> Result<Self::FrameStream, Error> {
let offset = LogOffset {
next_offset: self.next_frame_no(),
};
let req = self.make_request(offset);
let stream = self.client.snapshot(req).await
.map_err(ReplicationError::Rpc)?
.into_inner().map(map_frame_err);
Ok(Box::pin(stream))
}
async fn commit_frame_no(&mut self, frame_no: libsql_replication::frame::FrameNo) -> Result<(), Error> {
self.current_frame_no_notifier.send_replace(Some(frame_no));
self.meta.set_commit_frame_no(frame_no).await?;
Ok(())
}
}
fn map_frame_err(f: Result<RpcFrame, tonic::Status>) -> Result<Frame, Error> {
match f {
Ok(frame) => Ok(Frame::try_from(&*frame.data).map_err(|_| ReplicationError::InvalidFrame)?),
Err(err) if err.code() == tonic::Code::FailedPrecondition
&& err.message() == NEED_SNAPSHOT_ERROR_MSG => {
Err(Error::NeedSnapshot)
}
Err(err) => Err(ReplicationError::Rpc(err))?
}
}

View File

@@ -1,7 +1,17 @@
use crate::namespace::NamespaceName;
#[derive(Debug, thiserror::Error)]
pub enum ReplicationError {
#[error("Primary has incompatible log")]
LogIncompatible,
#[error("{0}")]
Other(#[from] anyhow::Error),
#[error("namespace {0} doesn't exist")]
NamespaceDoesntExist(NamespaceName),
#[error("Failed to commit current replication index")]
FailedToCommit(std::io::Error),
#[error("Rpc error: {0}")]
Rpc(tonic::Status),
#[error("Received invalid frame")]
InvalidFrame,
}

View File

@@ -90,7 +90,7 @@ impl WalIndexMeta {
}
}
pub async fn flush(&mut self) -> crate::Result<()> {
pub async fn flush(&mut self) -> std::io::Result<()> {
if let Some(data) = self.data {
// FIXME: we can save a syscall by calling read_exact_at, but let's use tokio API for now
self.file.seek(SeekFrom::Start(0)).await?;
@@ -107,7 +107,10 @@ impl WalIndexMeta {
/// Apply the last commit frame no to the meta file.
/// This function must be called after each injection, because it's idempotent to re-apply the
/// last transaction, but not idempotent if we lose track of more than one.
pub async fn set_commit_frame_no(&mut self, commit_fno: FrameNo) -> crate::Result<()> {
pub async fn set_commit_frame_no(
&mut self,
commit_fno: FrameNo,
) -> Result<(), ReplicationError> {
{
let data = self
.data
@@ -116,7 +119,9 @@ impl WalIndexMeta {
data.committed_frame_no = commit_fno;
}
self.flush().await?;
if let Err(e) = self.flush().await {
return Err(ReplicationError::FailedToCommit(e));
}
Ok(())
}

View File

@@ -1,5 +1,6 @@
pub mod error;
mod meta;
mod client;
mod replicator;
pub use replicator::Replicator;

View File

@@ -4,11 +4,11 @@ use std::pin::Pin;
use std::sync::{Arc, RwLock};
use futures::stream::BoxStream;
pub use libsql_replication::rpc::replication as rpc;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tonic::Status;
pub use libsql_replication::rpc::replication as rpc;
use crate::auth::Auth;
use crate::namespace::{NamespaceName, NamespaceStore, PrimaryNamespaceMaker};

View File

@@ -7,9 +7,9 @@ use std::{ffi::CString, ops::Deref, time::Duration};
pub use crate::wal_hook::WalMethodsHook;
pub use once_cell::sync::Lazy;
pub use rusqlite;
use rusqlite::ffi::sqlite3;
use wal_hook::TransparentMethods;
pub use rusqlite;
use self::{
ffi::{libsql_wal_methods, libsql_wal_methods_find},