0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-05-20 03:18:19 +00:00
Files
libsql/bottomless/src/read.rs
Piotr Sarna 4a71b2072a bottomless-cli: add restoring from a directory
Convenience function for manual restoration.
2023-11-09 12:17:15 +01:00

59 lines
1.9 KiB
Rust

use crate::replicator::CompressionKind;
use crate::wal::WalFrameHeader;
use anyhow::Result;
use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
use std::io::ErrorKind;
use std::pin::Pin;
use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
type AsyncByteReader = dyn AsyncRead + Send + Sync;
pub struct BatchReader {
reader: Pin<Box<AsyncByteReader>>,
next_frame_no: u32,
}
impl BatchReader {
pub fn new(
init_frame_no: u32,
content_stream: impl AsyncRead + Send + Sync + 'static,
page_size: usize,
use_compression: CompressionKind,
) -> Self {
let reader = BufReader::with_capacity(page_size + WalFrameHeader::SIZE, content_stream);
BatchReader {
next_frame_no: init_frame_no,
reader: match use_compression {
CompressionKind::None => Box::pin(reader),
CompressionKind::Gzip => {
let gzip = GzipDecoder::new(reader);
Box::pin(gzip)
}
CompressionKind::Zstd => {
let zstd = ZstdDecoder::new(reader);
Box::pin(zstd)
}
},
}
}
/// Reads next frame header without frame body (WAL page).
pub async fn next_frame_header(&mut self) -> Result<Option<WalFrameHeader>> {
let mut buf = [0u8; WalFrameHeader::SIZE];
let res = self.reader.read_exact(&mut buf).await;
match res {
Ok(_) => Ok(Some(WalFrameHeader::from(buf))),
Err(e) if e.kind() == ErrorKind::UnexpectedEof => Ok(None),
Err(e) => Err(e.into()),
}
}
/// Reads the next frame stored in a current batch.
/// Returns a frame number or `None` if no frame was remaining in the buffer.
pub async fn next_page(&mut self, page_buf: &mut [u8]) -> Result<()> {
self.reader.read_exact(page_buf).await?;
self.next_frame_no += 1;
Ok(())
}
}