0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-06-01 04:03:10 +00:00

bottomless-cli: add restoring from a directory

Convenience function for manual restoration.
This commit is contained in:
Piotr Sarna
2023-11-09 11:45:31 +01:00
parent de02b00a7e
commit 4a71b2072a
9 changed files with 238 additions and 16 deletions

@ -11,10 +11,12 @@ description = "Command-line interface for bottomless replication for libSQL"
[dependencies]
anyhow = "1.0.66"
async-compression = { version = "0.4.4", features = ["tokio", "gzip", "zstd"] }
aws-config = "0.55"
aws-sdk-s3 = "0.28"
aws-smithy-types = "0.55"
bottomless = { version = "0", path = "../bottomless" }
bytes = "1"
chrono = "0.4.23"
clap = { version = "4.0.29", features = ["derive"] }
tokio = { version = "1.23.0", features = ["macros", "rt", "rt-multi-thread"] }

@ -71,6 +71,8 @@ enum Commands {
long_help = "UTC timestamp which is an upper bound for the transactions to be restored."
)]
utc_time: Option<NaiveDateTime>,
#[clap(long, short, conflicts_with_all = ["generation", "utc_time"], long_help = "Restore from a local directory")]
from_dir: Option<PathBuf>,
},
#[clap(about = "Verify integrity of the database")]
Verify {
@ -107,6 +109,51 @@ async fn run() -> Result<()> {
tracing_subscriber::fmt::init();
let mut options = Cli::parse();
if let Commands::Restore {
generation: _,
utc_time: _,
from_dir: Some(from_dir),
} = options.command
{
let database = match &options.database {
Some(database) => database,
None => {
println!("Please pass the database name with -d option");
return Ok(());
}
};
println!("trying to restore from {}", from_dir.display());
let mut db_file = tokio::fs::File::create(database).await?;
let (page_size, checksum) = match Replicator::get_local_metadata(&from_dir).await {
Ok(Some((page_size, checksum))) => (page_size, checksum),
Ok(None) => {
println!("No local metadata found, continuing anyway");
(4096, 0)
}
Err(e) => {
println!("Failed to get local metadata: {e}, continuing anyway");
(4096, 0)
}
};
println!("Local metadata: page_size={page_size}, checksum={checksum:x}");
Replicator::restore_from_local_snapshot(&from_dir, &mut db_file).await?;
println!("Restored local snapshot to {}", database);
let applied_frames = Replicator::apply_wal_from_local_generation(
&from_dir,
&mut db_file,
page_size,
checksum,
)
.await?;
println!("Applied {applied_frames} frames from local generation");
if let Err(e) = verify_db(&PathBuf::from(database)) {
println!("Verification failed: {e}");
std::process::exit(1)
}
println!("Verification: ok");
return Ok(());
}
if let Some(ep) = options.endpoint.as_deref() {
std::env::set_var("LIBSQL_BOTTOMLESS_ENDPOINT", ep)
} else {
@ -166,6 +213,7 @@ async fn run() -> Result<()> {
Commands::Restore {
generation,
utc_time,
..
} => {
tokio::fs::create_dir_all(&database_dir).await?;
client.restore(generation, utc_time).await?;

@ -264,4 +264,170 @@ impl Replicator {
self.print_snapshot_summary(&generation).await?;
Ok(())
}
pub async fn restore_from_local_snapshot(
from_dir: impl AsRef<std::path::Path>,
db: &mut tokio::fs::File,
) -> Result<bool> {
let from_dir = from_dir.as_ref();
use bottomless::replicator::CompressionKind;
use tokio::io::AsyncWriteExt;
let algos_to_try = &[
CompressionKind::Gzip,
CompressionKind::Zstd,
CompressionKind::None,
];
for algo in algos_to_try {
let main_db_path = match algo {
CompressionKind::None => from_dir.join("db.db"),
CompressionKind::Gzip => from_dir.join("db.gz"),
CompressionKind::Zstd => from_dir.join("db.zstd"),
};
if let Ok(mut db_file) = tokio::fs::File::open(&main_db_path).await {
let db_size = match algo {
CompressionKind::None => tokio::io::copy(&mut db_file, db).await?,
CompressionKind::Gzip => {
let mut decompress_reader =
async_compression::tokio::bufread::GzipDecoder::new(
tokio::io::BufReader::new(db_file),
);
tokio::io::copy(&mut decompress_reader, db).await?
}
CompressionKind::Zstd => {
let mut decompress_reader =
async_compression::tokio::bufread::ZstdDecoder::new(
tokio::io::BufReader::new(db_file),
);
tokio::io::copy(&mut decompress_reader, db).await?
}
};
db.flush().await?;
tracing::info!("Restored the main database file ({} bytes)", db_size);
return Ok(true);
}
}
Ok(false)
}
pub async fn apply_wal_from_local_generation(
from_dir: impl AsRef<std::path::Path>,
db: &mut tokio::fs::File,
page_size: u32,
checksum: u64,
) -> Result<u32> {
use bottomless::transaction_cache::TransactionPageCache;
use tokio::io::AsyncWriteExt;
const SWAP_AFTER: u32 = 65536;
const TMP_RESTORE_DIR: &str = ".bottomless.restore.tmp";
let from_dir = from_dir.as_ref();
let mut page_buf = {
let mut v = Vec::with_capacity(page_size as usize);
v.spare_capacity_mut();
unsafe { v.set_len(page_size as usize) };
v
};
let objs = {
let mut objs = Vec::new();
let mut dir = tokio::fs::read_dir(from_dir).await.unwrap();
while let Some(entry) = dir.next_entry().await.unwrap() {
let path = entry.path();
if let Some(file_name) = path.file_name() {
if let Some(file_name) = file_name.to_str() {
if file_name.ends_with(".gz")
|| file_name.ends_with(".zstd")
|| file_name.ends_with(".raw")
{
objs.push(path);
}
}
}
}
objs.sort();
objs.into_iter()
};
let mut last_received_frame_no = 0;
let mut pending_pages =
TransactionPageCache::new(SWAP_AFTER, page_size, TMP_RESTORE_DIR.into());
let mut checksum: Option<u64> = Some(checksum);
for obj in objs {
let key = obj.file_name().unwrap().to_str().unwrap();
tracing::debug!("Loading {}", key);
let (first_frame_no, _last_frame_no, _timestamp, compression_kind) =
match bottomless::replicator::Replicator::parse_frame_range(&format!("/{key}")) {
Some(result) => result,
None => {
if key != "db.gz" && key != "db.zstd" && key != "db.db" {
tracing::warn!("Failed to parse frame/page from key {}", key);
}
continue;
}
};
if first_frame_no != last_received_frame_no + 1 {
tracing::warn!("Missing series of consecutive frames. Last applied frame: {}, next found: {}. Stopping the restoration process",
last_received_frame_no, first_frame_no);
break;
}
// read frame from the file - from_dir and `obj` dir entry compose the path to it
let frame = tokio::fs::File::open(&obj).await?;
let mut frameno = first_frame_no;
let mut reader = bottomless::read::BatchReader::new(
frameno,
frame,
page_size as usize,
compression_kind,
);
while let Some(frame) = reader.next_frame_header().await? {
let pgno = frame.pgno();
reader.next_page(&mut page_buf).await?;
if let Some(ck) = checksum {
checksum = match frame.verify(ck, &page_buf) {
Ok(checksum) => Some(checksum),
Err(e) => {
println!("ERROR: failed to verify checksum of page {pgno}: {e}, continuing anyway. Checksum will no longer be validated");
tracing::error!("Failed to verify checksum of page {pgno}: {e}, continuing anyway. Checksum will no longer be validated");
None
}
};
}
pending_pages.insert(pgno, &page_buf).await?;
if frame.is_committed() {
let pending_pages = std::mem::replace(
&mut pending_pages,
TransactionPageCache::new(SWAP_AFTER, page_size, TMP_RESTORE_DIR.into()),
);
pending_pages.flush(db).await?;
}
frameno += 1;
last_received_frame_no += 1;
}
db.flush().await?;
}
Ok(last_received_frame_no)
}
pub async fn get_local_metadata(
from_dir: impl AsRef<std::path::Path>,
) -> Result<Option<(u32, u64)>> {
use bytes::Buf;
if let Ok(data) = tokio::fs::read(from_dir.as_ref().join(".meta")).await {
let mut data = bytes::Bytes::from(data);
let page_size = data.get_u32();
let crc = data.get_u64();
Ok(Some((page_size, crc)))
} else {
Ok(None)
}
}
}

@ -23,6 +23,7 @@ arc-swap = "1.6"
chrono = "0.4.23"
uuid = "1.4.1"
rand = "0.8.5"
futures-core = "0.3.29"
[features]
libsql_linked_statically = []

@ -5,11 +5,11 @@
mod ffi;
mod backup;
mod read;
pub mod read;
pub mod replicator;
mod transaction_cache;
pub mod transaction_cache;
pub mod uuid_utils;
mod wal;
pub mod wal;
use crate::ffi::{
bottomless_methods, libsql_wal_methods, sqlite3, sqlite3_file, sqlite3_vfs, PgHdr, Wal,

@ -2,15 +2,13 @@ use crate::replicator::CompressionKind;
use crate::wal::WalFrameHeader;
use anyhow::Result;
use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
use aws_sdk_s3::primitives::ByteStream;
use std::io::ErrorKind;
use std::pin::Pin;
use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
use tokio_util::io::StreamReader;
type AsyncByteReader = dyn AsyncRead + Send + Sync;
pub(crate) struct BatchReader {
pub struct BatchReader {
reader: Pin<Box<AsyncByteReader>>,
next_frame_no: u32,
}
@ -18,12 +16,11 @@ pub(crate) struct BatchReader {
impl BatchReader {
pub fn new(
init_frame_no: u32,
content: ByteStream,
content_stream: impl AsyncRead + Send + Sync + 'static,
page_size: usize,
use_compression: CompressionKind,
) -> Self {
let reader =
BufReader::with_capacity(page_size + WalFrameHeader::SIZE, StreamReader::new(content));
let reader = BufReader::with_capacity(page_size + WalFrameHeader::SIZE, content_stream);
BatchReader {
next_frame_no: init_frame_no,
reader: match use_compression {
@ -41,7 +38,7 @@ impl BatchReader {
}
/// Reads next frame header without frame body (WAL page).
pub(crate) async fn next_frame_header(&mut self) -> Result<Option<WalFrameHeader>> {
pub async fn next_frame_header(&mut self) -> Result<Option<WalFrameHeader>> {
let mut buf = [0u8; WalFrameHeader::SIZE];
let res = self.reader.read_exact(&mut buf).await;
match res {
@ -53,7 +50,7 @@ impl BatchReader {
/// Reads the next frame stored in a current batch.
/// Returns a frame number or `None` if no frame was remaining in the buffer.
pub(crate) async fn next_page(&mut self, page_buf: &mut [u8]) -> Result<()> {
pub async fn next_page(&mut self, page_buf: &mut [u8]) -> Result<()> {
self.reader.read_exact(page_buf).await?;
self.next_frame_no += 1;
Ok(())

@ -381,6 +381,10 @@ impl Replicator {
self.last_sent_frame_no.load(Ordering::Acquire)
}
pub fn compression_kind(&self) -> CompressionKind {
self.use_compression
}
pub async fn wait_until_snapshotted(&mut self) -> Result<bool> {
if let Ok(generation) = self.generation() {
if !self.main_db_exists_and_not_empty().await {
@ -963,7 +967,7 @@ impl Replicator {
// Parses the frame and page number from given key.
// Format: <db-name>-<generation>/<first-frame-no>-<last-frame-no>-<timestamp>.<compression-kind>
fn parse_frame_range(key: &str) -> Option<(u32, u32, u64, CompressionKind)> {
pub fn parse_frame_range(key: &str) -> Option<(u32, u32, u64, CompressionKind)> {
let frame_delim = key.rfind('/')?;
let frame_suffix = &key[(frame_delim + 1)..];
let timestamp_delim = frame_suffix.rfind('-')?;
@ -1329,8 +1333,12 @@ impl Replicator {
}
let frame = self.get_object(key.into()).send().await?;
let mut frameno = first_frame_no;
let mut reader =
BatchReader::new(frameno, frame.body, self.page_size, compression_kind);
let mut reader = BatchReader::new(
frameno,
tokio_util::io::StreamReader::new(frame.body),
self.page_size,
compression_kind,
);
while let Some(frame) = reader.next_frame_header().await? {
let pgno = frame.pgno();

@ -7,7 +7,7 @@ use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
#[derive(Debug)]
pub(crate) struct TransactionPageCache {
pub struct TransactionPageCache {
/// Threshold (in pages) after which, the cache will start flushing pages on disk.
swap_after_pages: u32,
page_size: u32,

@ -6,7 +6,7 @@ use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWrite};
#[repr(transparent)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub(crate) struct WalFrameHeader([u8; WalFrameHeader::SIZE]);
pub struct WalFrameHeader([u8; WalFrameHeader::SIZE]);
impl WalFrameHeader {
pub const SIZE: usize = 24;