0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-06-17 07:59:55 +00:00

Add storage server RPC client ()

* Add storage server RPC client

* Address review comments

- Use `#[tracing::instrument]` wherever appropriate
- Remove `Mutex` for `client` and `clone` it wherever required
- Avoid reading env variable inside the lib, rather take a config object
- Make the private methods of `DurableWal` async
- Don't try to create a runtime, instead assume it always exists. Let the caller create it
- Update proto:
	- remove `max_frame_no` from `InsertFramesRequest`
	- make `page_no` to `u32` in the proto
- Update storage server to have `page_no` as `u32`
This commit is contained in:
Avinash Sajjanshetty
2024-06-04 16:56:50 +05:30
committed by GitHub
parent a9835eeae1
commit 3460edce12
7 changed files with 461 additions and 21 deletions
Cargo.lock
libsql-storage-server/src
libsql-storage

13
Cargo.lock generated

@ -3539,10 +3539,17 @@ dependencies = [
name = "libsql-storage"
version = "0.0.1"
dependencies = [
"libsql-sys",
"log",
"parking_lot",
"prost",
"prost-build",
"sieve-cache",
"tokio",
"tonic 0.10.2",
"tonic-build 0.10.2",
"tracing",
"uuid",
]
[[package]]
@ -5349,6 +5356,12 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "sieve-cache"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51bf3a9dccf2c079bf1465d449a485c85b36443caf765f2f127bfec28b180f75"
[[package]]
name = "signal-hook-registry"
version = "1.4.2"

@ -4,7 +4,6 @@ use std::sync::Mutex;
use crate::store::FrameData;
use crate::store::FrameStore;
use async_trait::async_trait;
use bytes::Bytes;
#[derive(Default)]
pub(crate) struct InMemFrameStore {
@ -16,7 +15,7 @@ struct InMemInternal {
// contains a frame data, key is the frame number
frames: BTreeMap<u64, FrameData>,
// pages map contains the page number as a key and the list of frames for the page as a value
pages: BTreeMap<u64, Vec<u64>>,
pages: BTreeMap<u32, Vec<u64>>,
max_frame_no: u64,
}
@ -29,7 +28,7 @@ impl InMemFrameStore {
#[async_trait]
impl FrameStore for InMemFrameStore {
// inserts a new frame for the page number and returns the new frame value
async fn insert_frame(&self, _namespace: &str, page_no: u64, frame: Bytes) -> u64 {
async fn insert_frame(&self, _namespace: &str, page_no: u32, frame: bytes::Bytes) -> u64 {
let mut inner = self.inner.lock().unwrap();
let frame_no = inner.max_frame_no + 1;
inner.max_frame_no = frame_no;
@ -62,7 +61,7 @@ impl FrameStore for InMemFrameStore {
}
// given a page number, return the maximum frame for the page
async fn find_frame(&self, _namespace: &str, page_no: u64) -> Option<u64> {
async fn find_frame(&self, _namespace: &str, page_no: u32) -> Option<u64> {
self.inner
.lock()
.unwrap()
@ -72,7 +71,7 @@ impl FrameStore for InMemFrameStore {
}
// given a frame num, return the page number
async fn frame_page_no(&self, _namespace: &str, frame_no: u64) -> Option<u64> {
async fn frame_page_no(&self, _namespace: &str, frame_no: u64) -> Option<u32> {
self.inner
.lock()
.unwrap()

@ -3,18 +3,18 @@ use bytes::Bytes;
#[async_trait]
pub trait FrameStore: Send + Sync {
async fn insert_frame(&self, namespace: &str, page_no: u64, frame: bytes::Bytes) -> u64;
async fn insert_frame(&self, namespace: &str, page_no: u32, frame: bytes::Bytes) -> u64;
#[allow(dead_code)]
async fn insert_frames(&self, namespace: &str, frames: Vec<FrameData>) -> u64;
async fn read_frame(&self, namespace: &str, frame_no: u64) -> Option<bytes::Bytes>;
async fn find_frame(&self, namespace: &str, page_no: u64) -> Option<u64>;
async fn frame_page_no(&self, namespace: &str, frame_no: u64) -> Option<u64>;
async fn find_frame(&self, namespace: &str, page_no: u32) -> Option<u64>;
async fn frame_page_no(&self, namespace: &str, frame_no: u64) -> Option<u32>;
async fn frames_in_wal(&self, namespace: &str) -> u64;
async fn destroy(&self, namespace: &str);
}
#[derive(Default)]
pub struct FrameData {
pub(crate) page_no: u64,
pub(crate) page_no: u32,
pub(crate) data: Bytes,
}

@ -7,6 +7,13 @@ repository = "https://github.com/tursodatabase/libsql"
license = "MIT"
[dependencies]
libsql-sys = { path = "../libsql-sys", features = ["rusqlite"] }
sieve-cache = "0.1.4"
tokio = { version = "1.22.2", features = ["rt-multi-thread", "net", "io-std", "io-util", "time", "macros", "sync", "fs"] }
tracing = { version = "0.1.37", default-features = false }
uuid = { version = "1.7.0", features = ["v4"] }
log = "0.4.20"
parking_lot = "0.12.1"
prost = "0.12"
tonic = { version = "0.10", features = ["tls"] }

@ -3,14 +3,13 @@ syntax = "proto3";
package storage;
message Frame {
uint64 page_no = 1;
uint32 page_no = 1;
bytes data = 2;
}
message InsertFramesRequest {
string namespace = 1;
repeated Frame frames = 2;
uint64 max_frame_no = 3;
}
message InsertFramesResponse {
@ -19,7 +18,7 @@ message InsertFramesResponse {
message FindFrameRequest {
string namespace = 1;
uint64 page_no = 2;
uint32 page_no = 2;
uint64 max_frame_no = 3;
}
@ -58,7 +57,7 @@ message FramePageNumRequest {
}
message FramePageNumResponse {
uint64 page_no = 1;
uint32 page_no = 1;
}
message DestroyRequest {

@ -2,8 +2,8 @@
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Frame {
#[prost(uint64, tag = "1")]
pub page_no: u64,
#[prost(uint32, tag = "1")]
pub page_no: u32,
#[prost(bytes = "vec", tag = "2")]
pub data: ::prost::alloc::vec::Vec<u8>,
}
@ -14,8 +14,6 @@ pub struct InsertFramesRequest {
pub namespace: ::prost::alloc::string::String,
#[prost(message, repeated, tag = "2")]
pub frames: ::prost::alloc::vec::Vec<Frame>,
#[prost(uint64, tag = "3")]
pub max_frame_no: u64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
@ -28,8 +26,8 @@ pub struct InsertFramesResponse {
pub struct FindFrameRequest {
#[prost(string, tag = "1")]
pub namespace: ::prost::alloc::string::String,
#[prost(uint64, tag = "2")]
pub page_no: u64,
#[prost(uint32, tag = "2")]
pub page_no: u32,
#[prost(uint64, tag = "3")]
pub max_frame_no: u64,
}
@ -88,8 +86,8 @@ pub struct FramePageNumRequest {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FramePageNumResponse {
#[prost(uint64, tag = "1")]
pub page_no: u64,
#[prost(uint32, tag = "1")]
pub page_no: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]

@ -1,4 +1,428 @@
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use libsql_sys::ffi::{SQLITE_ABORT, SQLITE_BUSY};
use libsql_sys::rusqlite;
use libsql_sys::wal::{Result, Vfs, Wal, WalManager};
use rpc::storage_client::StorageClient;
use sieve_cache::SieveCache;
use tonic::transport::Channel;
use tracing::{error, trace};
pub mod rpc {
#![allow(clippy::all)]
include!("generated/storage.rs");
}
// What does (not) work:
// - there are no read txn locks nor upgrades
// - no lock stealing
// - write set is kept in mem
// - txns don't use max_frame_no yet
// - no savepoints, yet
// - no multi tenancy, uses `default` namespace
// - txn can read new frames after it started (since there are no read locks)
// - requires huge memory as it assumes all the txn data fits in the memory
#[derive(Clone, Default)]
pub struct DurableWalConfig {
storage_server_address: String,
}
#[derive(Clone)]
pub struct DurableWalManager {
lock_manager: Arc<Mutex<LockManager>>,
config: DurableWalConfig,
}
impl DurableWalManager {
pub fn new(lock_manager: Arc<Mutex<LockManager>>, storage_server_address: String) -> Self {
Self {
lock_manager,
config: DurableWalConfig {
storage_server_address,
},
}
}
}
impl WalManager for DurableWalManager {
type Wal = DurableWal;
fn use_shared_memory(&self) -> bool {
trace!("DurableWalManager::use_shared_memory()");
false
}
#[tracing::instrument(skip_all, fields(_db_path))]
fn open(
&self,
_vfs: &mut Vfs,
_file: &mut libsql_sys::wal::Sqlite3File,
_no_shm_mode: std::ffi::c_int,
_max_log_size: i64,
db_path: &std::ffi::CStr,
) -> Result<Self::Wal> {
let _db_path = db_path.to_str().unwrap();
trace!("DurableWalManager::open()");
// TODO: use the actual namespace uuid from the connection
let namespace = "default".to_string();
let rt = tokio::runtime::Handle::current();
let resp = DurableWal::new(namespace, self.config.clone(), self.lock_manager.clone());
let resp = tokio::task::block_in_place(|| rt.block_on(resp));
Ok(resp)
}
fn close(
&self,
wal: &mut Self::Wal,
_db: &mut libsql_sys::wal::Sqlite3Db,
_sync_flags: std::ffi::c_int,
_scratch: Option<&mut [u8]>,
) -> Result<()> {
trace!("DurableWalManager::close()");
wal.end_read_txn();
Ok(())
}
fn destroy_log(&self, _vfs: &mut Vfs, _db_path: &std::ffi::CStr) -> Result<()> {
trace!("DurableWalManager::destroy_log()");
Ok(())
}
fn log_exists(&self, _vfs: &mut Vfs, _db_path: &std::ffi::CStr) -> Result<bool> {
trace!("DurableWalManager::log_exists()");
Ok(true)
}
fn destroy(self)
where
Self: Sized,
{
trace!("DurableWalManager::destroy()");
}
}
pub struct DurableWal {
namespace: String,
conn_id: String,
client: StorageClient<Channel>,
frames_cache: SieveCache<std::num::NonZeroU64, Vec<u8>>,
write_cache: BTreeMap<u32, rpc::Frame>,
lock_manager: Arc<Mutex<LockManager>>,
}
impl DurableWal {
async fn new(
namespace: String,
config: DurableWalConfig,
lock_manager: Arc<Mutex<LockManager>>,
) -> Self {
let client = StorageClient::connect(config.storage_server_address)
.await
.unwrap();
let page_frames = SieveCache::new(1000).unwrap();
Self {
namespace,
conn_id: uuid::Uuid::new_v4().to_string(),
client,
frames_cache: page_frames,
write_cache: BTreeMap::new(),
lock_manager,
}
}
#[tracing::instrument(skip(self))]
async fn find_frame_by_page_no(
&mut self,
page_no: std::num::NonZeroU32,
) -> Result<Option<std::num::NonZeroU64>> {
trace!("DurableWal::find_frame_by_page_no()");
let req = rpc::FindFrameRequest {
namespace: self.namespace.clone(),
page_no: page_no.get(),
max_frame_no: 0,
};
let mut binding = self.client.clone();
let resp = binding.find_frame(req).await.unwrap();
let frame_no = resp
.into_inner()
.frame_no
.map(|no| std::num::NonZeroU64::new(no))
.flatten();
Ok(frame_no)
}
async fn frames_count(&self) -> u64 {
let req = rpc::FramesInWalRequest {
namespace: self.namespace.clone(),
};
let mut binding = self.client.clone();
let resp = binding.frames_in_wal(req).await.unwrap();
let count = resp.into_inner().count;
trace!("DurableWal::frames_in_wal() = {}", count);
count
}
}
impl Wal for DurableWal {
fn limit(&mut self, _size: i64) {}
fn begin_read_txn(&mut self) -> Result<bool> {
trace!("DurableWal::begin_read_txn()");
// TODO:
// - create a read lock
// - save the current max_frame_no for this txn
//
Ok(true)
}
fn end_read_txn(&mut self) {
trace!("DurableWal::end_read_txn()");
// TODO: drop both read or write lock
let mut lock_manager = self.lock_manager.lock().unwrap();
trace!(
"DurableWal::end_read_txn() id = {}, unlocked = {}",
self.conn_id,
lock_manager.unlock(self.namespace.clone(), self.conn_id.clone())
);
}
// find_frame checks if the given page_no exists in the storage server. If so, it returns the
// same `page_no` back. The WAL interface expects the value to be u32 but the frames can exceed
// the limit and is set to u64. So, instead of returning the frame no, it returns the page no
// back and `read_frame` methods reads the frame by page_no
#[tracing::instrument(skip(self))]
fn find_frame(
&mut self,
page_no: std::num::NonZeroU32,
) -> Result<Option<std::num::NonZeroU32>> {
trace!("DurableWal::find_frame()");
let rt = tokio::runtime::Handle::current();
// TODO: find_frame should account for `max_frame_no` of this txn
let frame_no =
tokio::task::block_in_place(|| rt.block_on(self.find_frame_by_page_no(page_no)))
.unwrap();
if frame_no.is_none() {
return Ok(None);
}
return Ok(Some(page_no));
}
#[tracing::instrument(skip_all, fields(page_no))]
fn read_frame(&mut self, page_no: std::num::NonZeroU32, buffer: &mut [u8]) -> Result<()> {
trace!("DurableWal::read_frame()");
let rt = tokio::runtime::Handle::current();
if let Some(frame) = self.write_cache.get(&(u32::from(page_no))) {
trace!(
"DurableWal::read_frame(page_no: {:?}) -- write cache hit",
page_no
);
buffer.copy_from_slice(&frame.data);
return Ok(());
}
// TODO: this call is unnecessary since `read_frame` is always called after `find_frame`
let frame_no =
tokio::task::block_in_place(|| rt.block_on(self.find_frame_by_page_no(page_no)))
.unwrap()
.unwrap();
// check if the frame exists in the local cache
if let Some(frame) = self.frames_cache.get(&frame_no) {
trace!(
"DurableWal::read_frame(page_no: {:?}) -- read cache hit",
page_no
);
buffer.copy_from_slice(&frame);
return Ok(());
}
let req = rpc::ReadFrameRequest {
namespace: self.namespace.clone(),
frame_no: frame_no.get(),
};
let mut binding = self.client.clone();
let resp = binding.read_frame(req);
let resp = tokio::task::block_in_place(|| rt.block_on(resp)).unwrap();
let frame = resp.into_inner().frame.unwrap();
buffer.copy_from_slice(&frame);
self.frames_cache
.insert(std::num::NonZeroU64::new(frame_no.get()).unwrap(), frame);
Ok(())
}
fn db_size(&self) -> u32 {
let rt = tokio::runtime::Handle::current();
let size = tokio::task::block_in_place(|| rt.block_on(self.frames_count()))
.try_into()
.unwrap();
trace!("DurableWal::db_size() => {}", size);
size
}
fn begin_write_txn(&mut self) -> Result<()> {
// todo: check if the connection holds a read lock then try to acquire a write lock
let mut lock_manager = self.lock_manager.lock().unwrap();
if !lock_manager.lock(self.namespace.clone(), self.conn_id.clone()) {
trace!(
"DurableWal::begin_write_txn() lock acquired = false, id = {}",
self.conn_id
);
return Err(rusqlite::ffi::Error::new(SQLITE_BUSY));
};
trace!(
"DurableWal::begin_write_txn() lock acquired = true, id = {}",
self.conn_id
);
Ok(())
}
fn end_write_txn(&mut self) -> Result<()> {
let mut lock_manager = self.lock_manager.lock().unwrap();
trace!(
"DurableWal::end_write_txn() id = {}, unlocked = {}",
self.conn_id,
lock_manager.unlock(self.namespace.clone(), self.conn_id.clone())
);
Ok(())
}
fn undo<U: libsql_sys::wal::UndoHandler>(&mut self, _handler: Option<&mut U>) -> Result<()> {
// TODO: implement undo
Ok(())
}
fn savepoint(&mut self, _rollback_data: &mut [u32]) {
// TODO: implement savepoint
}
fn savepoint_undo(&mut self, _rollback_data: &mut [u32]) -> Result<()> {
// TODO: implement savepoint_undo
Ok(())
}
#[tracing::instrument(skip(self, page_headers))]
fn insert_frames(
&mut self,
page_size: std::ffi::c_int,
page_headers: &mut libsql_sys::wal::PageHeaders,
size_after: u32,
is_commit: bool,
sync_flags: std::ffi::c_int,
) -> Result<usize> {
trace!("DurableWal::insert_frames()");
let rt = tokio::runtime::Handle::current();
let mut lock_manager = self.lock_manager.lock().unwrap();
if !lock_manager.is_lock_owner(self.namespace.clone(), self.conn_id.clone()) {
error!("DurableWal::insert_frames() was called without acquiring lock!",);
self.write_cache.clear();
return Err(rusqlite::ffi::Error::new(SQLITE_ABORT));
};
// add the updated frames from frame_headers to writeCache
for (page_no, frame) in page_headers.iter() {
self.write_cache.insert(
page_no,
rpc::Frame {
page_no: page_no,
data: frame.to_vec(),
},
);
// todo: update size after
}
// check if the size_after is > 0, if so then mark txn as committed
if size_after <= 0 {
// todo: update new size
return Ok(0);
}
let req = rpc::InsertFramesRequest {
namespace: self.namespace.clone(),
frames: self.write_cache.values().cloned().collect(),
};
self.write_cache.clear();
let mut binding = self.client.clone();
trace!("sending DurableWal::insert_frames() {:?}", req.frames.len());
let resp = binding.insert_frames(req);
let resp = tokio::task::block_in_place(|| rt.block_on(resp)).unwrap();
Ok(resp.into_inner().num_frames as usize)
}
fn checkpoint(
&mut self,
_db: &mut libsql_sys::wal::Sqlite3Db,
_mode: libsql_sys::wal::CheckpointMode,
_busy_handler: Option<&mut dyn libsql_sys::wal::BusyHandler>,
_sync_flags: u32,
// temporary scratch buffer
_buf: &mut [u8],
_checkpoint_cb: Option<&mut dyn libsql_sys::wal::CheckpointCallback>,
_in_wal: Option<&mut i32>,
_backfilled: Option<&mut i32>,
) -> Result<()> {
// checkpoint is a no op
Ok(())
}
fn exclusive_mode(&mut self, op: std::ffi::c_int) -> Result<()> {
trace!("DurableWal::exclusive_mode(op: {})", op);
Ok(())
}
fn uses_heap_memory(&self) -> bool {
trace!("DurableWal::uses_heap_memory()");
true
}
fn set_db(&mut self, _db: &mut libsql_sys::wal::Sqlite3Db) {}
fn callback(&self) -> i32 {
trace!("DurableWal::callback()");
0
}
fn frames_in_wal(&self) -> u32 {
0
}
}
pub struct LockManager {
locks: std::collections::HashMap<String, String>,
}
impl LockManager {
pub fn new() -> Self {
Self {
locks: std::collections::HashMap::new(),
}
}
pub fn lock(&mut self, namespace: String, conn_id: String) -> bool {
if let Some(lock) = self.locks.get(&namespace) {
if lock == &conn_id {
return true;
}
return false;
}
self.locks.insert(namespace, conn_id);
true
}
pub fn unlock(&mut self, namespace: String, conn_id: String) -> bool {
if let Some(lock) = self.locks.get(&namespace) {
if lock == &conn_id {
self.locks.remove(&namespace);
return true;
}
return false;
}
true
}
pub fn is_lock_owner(&mut self, namespace: String, conn_id: String) -> bool {
if let Some(lock) = self.locks.get(&namespace) {
if lock == &conn_id {
return true;
}
}
return false;
}
}