0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-07-15 10:45:39 +00:00

Merge pull request #1786 from tursodatabase/lucio/skip-upload-shutdown

bottomless: add upload on shutdown skip option
This commit is contained in:
Lucio Franco
2024-10-09 18:19:57 +00:00
committed by GitHub
2 changed files with 47 additions and 26 deletions

View File

@ -75,6 +75,7 @@ pub struct Replicator {
upload_progress: Arc<Mutex<CompletionProgress>>,
last_uploaded_frame_no: Receiver<u32>,
skip_snapshot: bool,
skip_shutdown_upload: bool,
}
#[derive(Debug)]
@ -122,6 +123,8 @@ pub struct Options {
pub s3_max_retries: u32,
/// Skip snapshot upload per checkpoint.
pub skip_snapshot: bool,
/// Skip uploading snapshots on shutdown
pub skip_shutdown_upload: bool,
}
impl Options {
@ -238,6 +241,10 @@ impl Options {
Some(key) => Some(EncryptionConfig::new(cipher, key)),
None => None,
};
let skip_shutdown_upload =
env_var_or("LIBSQL_BOTTOMLESS_SKIP_SHUTDOWN_UPLOAD", false).parse::<bool>()?;
Ok(Options {
db_id,
create_bucket_if_not_exists: true,
@ -255,6 +262,7 @@ impl Options {
bucket_name,
s3_max_retries,
skip_snapshot,
skip_shutdown_upload,
})
}
}
@ -343,6 +351,12 @@ impl Replicator {
};
tracing::debug!("Database path: '{}', name: '{}'", db_path, db_name);
let skip_shutdown_upload = options.skip_shutdown_upload;
if skip_shutdown_upload {
tracing::warn!("skipping upload on shutdown");
}
let (flush_trigger, mut flush_trigger_rx) = channel(());
let (last_committed_frame_no_sender, last_committed_frame_no) = channel(Ok(0));
@ -498,6 +512,7 @@ impl Replicator {
join_set,
upload_progress,
last_uploaded_frame_no,
skip_shutdown_upload,
})
}
@ -529,33 +544,38 @@ impl Replicator {
}
pub async fn shutdown_gracefully(&mut self) -> Result<()> {
tracing::info!("bottomless replicator: shutting down...");
// 1. wait for all committed WAL frames to be committed locally
let last_frame_no = self.last_known_frame();
// force flush in order to not wait for periodic wake up of local back up process
if let Some(tx) = &self.flush_trigger {
let _ = tx.send(());
}
self.wait_until_committed(last_frame_no).await?;
tracing::info!(
"bottomless replicator: local backup replicated frames until {}",
last_frame_no
);
// 2. wait for snapshot upload to S3 to finish
self.wait_until_snapshotted().await?;
tracing::info!("bottomless replicator: snapshot succesfully uploaded to S3");
// 3. drop flush trigger, which will cause WAL upload loop to close. Since this action will
// close the channel used by wait_until_committed, it must happen after wait_until_committed
// has finished. If trigger won't be dropped, tasks from join_set will never finish.
self.flush_trigger.take();
// 4. drop shutdown trigger which will notify S3 upload process to stop all retry attempts
// and finish upload process
self.shutdown_trigger.take();
while let Some(t) = self.join_set.join_next().await {
// one of the tasks we're waiting for is upload of local WAL segment from pt.1 to S3
// this should ensure that all WAL frames are one S3
t?;
if !self.skip_shutdown_upload {
tracing::info!("bottomless replicator: shutting down...");
// 1. wait for all committed WAL frames to be committed locally
let last_frame_no = self.last_known_frame();
// force flush in order to not wait for periodic wake up of local back up process
if let Some(tx) = &self.flush_trigger {
let _ = tx.send(());
}
self.wait_until_committed(last_frame_no).await?;
tracing::info!(
"bottomless replicator: local backup replicated frames until {}",
last_frame_no
);
// 2. wait for snapshot upload to S3 to finish
self.wait_until_snapshotted().await?;
tracing::info!("bottomless replicator: snapshot succesfully uploaded to S3");
// 3. drop flush trigger, which will cause WAL upload loop to close. Since this action will
// close the channel used by wait_until_committed, it must happen after wait_until_committed
// has finished. If trigger won't be dropped, tasks from join_set will never finish.
self.flush_trigger.take();
// 4. drop shutdown trigger which will notify S3 upload process to stop all retry attempts
// and finish upload process
self.shutdown_trigger.take();
while let Some(t) = self.join_set.join_next().await {
// one of the tasks we're waiting for is upload of local WAL segment from pt.1 to S3
// this should ensure that all WAL frames are one S3
t?;
}
} else {
tracing::warn!("skipping snapshot upload during shutdown");
}
tracing::info!("bottomless replicator: shutdown complete");
Ok(())
}

View File

@ -131,6 +131,7 @@ pub async fn metastore_connection_maker(
s3_max_parallelism: 32,
s3_max_retries: 10,
skip_snapshot: false,
skip_shutdown_upload: false,
};
let mut replicator = bottomless::replicator::Replicator::with_options(
db_path.join("data").to_str().unwrap(),