mirror of
https://github.com/tursodatabase/libsql.git
synced 2025-01-09 15:46:04 +00:00
1165 lines
41 KiB
Rust
1165 lines
41 KiB
Rust
use std::sync::atomic::AtomicBool;
|
|
use std::sync::Arc;
|
|
|
|
use futures_core::Future;
|
|
use parking_lot::Mutex;
|
|
use rusqlite::TransactionBehavior;
|
|
use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore};
|
|
use tokio::task;
|
|
use tokio::task::JoinSet;
|
|
|
|
use crate::connection::program::Program;
|
|
use crate::connection::{Connection, MakeConnection};
|
|
use crate::database::PrimaryConnectionMaker;
|
|
use crate::namespace::meta_store::{MetaStore, MetaStoreConnection};
|
|
use crate::namespace::{NamespaceName, NamespaceStore};
|
|
use crate::query_result_builder::{IgnoreResult, QueryBuilderConfig};
|
|
use crate::schema::db::{get_unfinished_task_batch, update_job_status, update_meta_task_status};
|
|
use crate::schema::{step_migration_task_run, MigrationJobStatus};
|
|
|
|
use super::db::{
|
|
get_next_pending_migration_job, get_next_pending_migration_tasks_batch,
|
|
job_step_dry_run_success, register_schema_migration_job, setup_schema,
|
|
};
|
|
use super::error::Error;
|
|
use super::handle::JobHandle;
|
|
use super::migration::enqueue_migration_task;
|
|
use super::status::{MigrationJob, MigrationTask};
|
|
use super::{
|
|
abort_migration_task, perform_migration, step_task, MigrationTaskStatus, SchedulerMessage,
|
|
};
|
|
|
|
const MAX_CONCURRENT: usize = 10;
|
|
|
|
pub struct Scheduler {
|
|
namespace_store: NamespaceStore,
|
|
/// this is a connection to the meta store db, but it's used for migration operations
|
|
migration_db: Arc<Mutex<MetaStoreConnection>>,
|
|
workers: JoinSet<WorkResult>,
|
|
/// A batch of tasks for the current job (if any)
|
|
current_batch: Vec<MigrationTask>,
|
|
/// Currently processing job
|
|
current_job: Option<MigrationJob>,
|
|
has_work: bool,
|
|
permits: Arc<Semaphore>,
|
|
event_notifier: tokio::sync::broadcast::Sender<(i64, MigrationJobStatus)>,
|
|
}
|
|
|
|
impl Scheduler {
|
|
pub(crate) async fn new(
|
|
namespace_store: NamespaceStore,
|
|
mut conn: MetaStoreConnection,
|
|
) -> crate::Result<Self> {
|
|
let conn = tokio::task::spawn_blocking(move || -> crate::Result<_> {
|
|
setup_schema(&mut conn)?;
|
|
Ok(conn)
|
|
})
|
|
.await
|
|
.unwrap()?;
|
|
|
|
Ok(Self {
|
|
namespace_store,
|
|
workers: Default::default(),
|
|
current_batch: Vec::new(),
|
|
current_job: None,
|
|
// initialized to true to kickoff the queue
|
|
has_work: true,
|
|
migration_db: Arc::new(Mutex::new(conn)),
|
|
permits: Arc::new(Semaphore::new(MAX_CONCURRENT)),
|
|
event_notifier: tokio::sync::broadcast::Sender::new(32),
|
|
})
|
|
}
|
|
|
|
pub async fn run(mut self, mut receiver: mpsc::Receiver<SchedulerMessage>) {
|
|
const MAX_ERROR_RETRIES: usize = 10;
|
|
let mut tries = 0;
|
|
loop {
|
|
match self.step(&mut receiver).await {
|
|
Ok(true) => {
|
|
tries = 0;
|
|
}
|
|
Ok(false) => {
|
|
tracing::info!("all scheduler handles dropped: exiting.");
|
|
break;
|
|
}
|
|
Err(e) => {
|
|
if tries >= MAX_ERROR_RETRIES {
|
|
tracing::error!("scheduler could not make progress after {MAX_ERROR_RETRIES}, exiting: {e}");
|
|
break;
|
|
} else {
|
|
tracing::error!("an error occured while stepping the scheduler, {} tries remaining: {e}", MAX_ERROR_RETRIES - tries);
|
|
tries += 1;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
async fn step(
|
|
&mut self,
|
|
receiver: &mut mpsc::Receiver<SchedulerMessage>,
|
|
) -> Result<bool, Error> {
|
|
tokio::select! {
|
|
Some(msg) = receiver.recv() => {
|
|
self.handle_msg(msg).await;
|
|
}
|
|
// There is work to do, and a worker slot to perform it
|
|
// TODO: optim: we could try enqueue more work in a go by try_acquiring more
|
|
// permits here
|
|
Ok(permit) = self.permits.clone().acquire_owned(), if self.has_work => {
|
|
self.enqueue_work(permit).await?;
|
|
}
|
|
Some(res) = self.workers.join_next(), if !self.workers.is_empty() => {
|
|
match res {
|
|
Ok(WorkResult::Task { old_status, task, error }) => {
|
|
let new_status = *task.status();
|
|
let current_job = self.current_job
|
|
.as_mut()
|
|
.expect("processing task result, but job is missing");
|
|
|
|
*current_job.progress_mut(old_status) -= 1;
|
|
*current_job.progress_mut(new_status) += 1;
|
|
if current_job.task_error.is_none() && error.is_some() {
|
|
current_job.task_error = error.map(|e| (task.task_id, e, task.namespace()));
|
|
}
|
|
|
|
// we have more work if:
|
|
// - the current batch has more tasks to enqueue
|
|
// - the remaining number of pending tasks is greater than the amount of in-flight tasks: we can enqueue more
|
|
// - there's no more in-flight nor pending tasks for the job: we need to step the job
|
|
let in_flight = MAX_CONCURRENT - self.permits.available_permits();
|
|
let pending_tasks = current_job.count_pending_tasks();
|
|
self.has_work = !self.current_batch.is_empty() || (pending_tasks == 0 && in_flight == 0) || pending_tasks > in_flight;
|
|
}
|
|
Ok(WorkResult::Job { status }) => {
|
|
let job_id = if status.is_finished() {
|
|
let job = self.current_job.take().unwrap();
|
|
job.job_id
|
|
} else {
|
|
let current_job = self.current_job
|
|
.as_mut()
|
|
.expect("job is missing, but got status update for that job");
|
|
*current_job.status_mut() = status;
|
|
current_job.job_id()
|
|
};
|
|
|
|
let _ = self.event_notifier.send((job_id, status));
|
|
|
|
self.has_work = true;
|
|
}
|
|
Err(e) => {
|
|
todo!("migration task panicked: {e}");
|
|
}
|
|
}
|
|
}
|
|
else => return Ok(false),
|
|
}
|
|
|
|
Ok(true)
|
|
}
|
|
|
|
async fn handle_msg(&mut self, msg: SchedulerMessage) {
|
|
match msg {
|
|
SchedulerMessage::ScheduleMigration {
|
|
schema,
|
|
migration,
|
|
ret,
|
|
} => {
|
|
let res = self.register_migration_job(schema, migration).await;
|
|
let _ = ret.send(res.map(|id| JobHandle::new(id, self.event_notifier.subscribe())));
|
|
// it not necessary to raise the flag if we are currently processing a job: it
|
|
// prevents spurious wakeups, and the job will be picked up anyway.
|
|
self.has_work = self.current_job.is_none();
|
|
}
|
|
SchedulerMessage::GetJobStatus { job_id, ret } => {
|
|
let res = self.get_job_status(job_id).await;
|
|
let _ = ret.send(res);
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn maybe_step_job(
|
|
&mut self,
|
|
permit: OwnedSemaphorePermit,
|
|
) -> Result<Option<OwnedSemaphorePermit>, Error> {
|
|
let job = match self.current_job {
|
|
Some(ref mut job) => job,
|
|
None => {
|
|
let maybe_next_job = with_conn_async(self.migration_db.clone(), move |conn| {
|
|
get_next_pending_migration_job(conn)
|
|
})
|
|
.await?;
|
|
match maybe_next_job {
|
|
Some(job) => self.current_job.insert(job),
|
|
None => {
|
|
self.has_work = false;
|
|
return Ok(None);
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
// try to step the current job
|
|
match *job.status() {
|
|
MigrationJobStatus::WaitingDryRun => {
|
|
// there was a dry run failure, abort the task
|
|
if job.progress(MigrationTaskStatus::DryRunFailure) != 0 {
|
|
let error = job
|
|
.task_error
|
|
.clone()
|
|
.expect("task error reported, but error is missing");
|
|
self.workers.spawn(step_job_dry_run_failure(
|
|
permit,
|
|
self.migration_db.clone(),
|
|
job.job_id(),
|
|
self.namespace_store.clone(),
|
|
MigrationJobStatus::WaitingDryRun,
|
|
error,
|
|
));
|
|
*job.status_mut() = MigrationJobStatus::WaitingTransition;
|
|
self.has_work = false;
|
|
return Ok(None);
|
|
}
|
|
|
|
// all tasks reported a successful dry run, we are ready to step the job state
|
|
if job.progress_all(MigrationTaskStatus::DryRunSuccess) {
|
|
self.workers.spawn(step_job_dry_run_success(
|
|
permit,
|
|
self.migration_db.clone(),
|
|
job.job_id(),
|
|
self.namespace_store.clone(),
|
|
));
|
|
*job.status_mut() = MigrationJobStatus::WaitingTransition;
|
|
self.has_work = false;
|
|
return Ok(None);
|
|
}
|
|
}
|
|
MigrationJobStatus::DryRunSuccess => {
|
|
self.workers.spawn(step_job_waiting_run(
|
|
permit,
|
|
self.migration_db.clone(),
|
|
job.job_id(),
|
|
self.namespace_store.clone(),
|
|
));
|
|
*job.status_mut() = MigrationJobStatus::WaitingTransition;
|
|
self.has_work = false;
|
|
return Ok(None);
|
|
}
|
|
MigrationJobStatus::DryRunFailure => {
|
|
if job.progress_all(MigrationTaskStatus::Failure) {
|
|
self.workers.spawn(step_job_failure(
|
|
permit,
|
|
self.migration_db.clone(),
|
|
job.job_id(),
|
|
self.namespace_store.clone(),
|
|
));
|
|
*job.status_mut() = MigrationJobStatus::WaitingTransition;
|
|
self.has_work = false;
|
|
return Ok(None);
|
|
}
|
|
}
|
|
MigrationJobStatus::WaitingRun => {
|
|
// there was a dry run failure, abort the task
|
|
if job.progress(MigrationTaskStatus::Failure) != 0 {
|
|
todo!("that shouldn't happen! retry");
|
|
}
|
|
|
|
if job.progress_all(MigrationTaskStatus::Success) {
|
|
self.workers.spawn(step_job_run_success(
|
|
permit,
|
|
job.schema(),
|
|
job.migration(),
|
|
job.job_id(),
|
|
self.namespace_store.clone(),
|
|
self.migration_db.clone(),
|
|
));
|
|
// do not enqueue anything until the schema migration is complete
|
|
self.has_work = false;
|
|
*job.status_mut() = MigrationJobStatus::WaitingTransition;
|
|
return Ok(None);
|
|
}
|
|
}
|
|
MigrationJobStatus::WaitingTransition => {
|
|
// just wait for schema update to return
|
|
// this is a transient state, and it's not persisted. It's only necessary to make
|
|
// the code more robust when there are spurious wakups that would cause to this
|
|
// function being called;
|
|
self.has_work = false;
|
|
return Ok(None);
|
|
}
|
|
MigrationJobStatus::RunSuccess => unreachable!(),
|
|
MigrationJobStatus::RunFailure => todo!("handle run failure"),
|
|
}
|
|
|
|
Ok(Some(permit))
|
|
}
|
|
|
|
async fn enqueue_task(&mut self, permit: OwnedSemaphorePermit) -> Result<(), Error> {
|
|
let Some(ref job) = self.current_job else {
|
|
return Ok(());
|
|
};
|
|
if self.current_batch.is_empty()
|
|
&& matches!(
|
|
*job.status(),
|
|
MigrationJobStatus::WaitingDryRun
|
|
| MigrationJobStatus::WaitingRun
|
|
| MigrationJobStatus::DryRunFailure
|
|
)
|
|
{
|
|
const MAX_BATCH_SIZE: usize = 50;
|
|
// get a batch of enqueued tasks
|
|
let job_id = job.job_id();
|
|
self.current_batch = match *job.status() {
|
|
MigrationJobStatus::WaitingDryRun => {
|
|
with_conn_async(self.migration_db.clone(), move |conn| {
|
|
get_next_pending_migration_tasks_batch(
|
|
conn,
|
|
job_id,
|
|
MigrationTaskStatus::Enqueued,
|
|
MAX_BATCH_SIZE,
|
|
)
|
|
})
|
|
.await?
|
|
}
|
|
MigrationJobStatus::WaitingRun => {
|
|
with_conn_async(self.migration_db.clone(), move |conn| {
|
|
get_next_pending_migration_tasks_batch(
|
|
conn,
|
|
job_id,
|
|
MigrationTaskStatus::DryRunSuccess,
|
|
MAX_BATCH_SIZE,
|
|
)
|
|
})
|
|
.await?
|
|
}
|
|
MigrationJobStatus::DryRunFailure => {
|
|
// in case of dry run failure we are failing all the tasks
|
|
with_conn_async(self.migration_db.clone(), move |conn| {
|
|
get_unfinished_task_batch(conn, job_id, MAX_BATCH_SIZE)
|
|
})
|
|
.await?
|
|
}
|
|
_ => unreachable!(),
|
|
};
|
|
}
|
|
|
|
// enqueue some work
|
|
if let Some(task) = self.current_batch.pop() {
|
|
let (connection_maker, block_writes) =
|
|
self.namespace_store
|
|
.with(task.namespace(), move |ns| {
|
|
let db = ns.db.as_primary().expect(
|
|
"attempting to perform schema migration on non-primary database",
|
|
);
|
|
(db.connection_maker().clone(), db.block_writes.clone())
|
|
})
|
|
.await
|
|
.map_err(|e| Error::NamespaceLoad(Box::new(e)))?;
|
|
|
|
// we block the writes before enqueuing the task, it makes testing predictable
|
|
if *task.status() == MigrationTaskStatus::Enqueued {
|
|
block_writes.store(true, std::sync::atomic::Ordering::SeqCst);
|
|
}
|
|
|
|
self.workers.spawn(try_step_task(
|
|
permit,
|
|
self.namespace_store.clone(),
|
|
self.migration_db.clone(),
|
|
connection_maker,
|
|
*job.status(),
|
|
job.migration.clone(),
|
|
task,
|
|
block_writes,
|
|
));
|
|
} else {
|
|
// there is still a job, but the queue is empty, it means that we are waiting for the
|
|
// remaining jobs to report status. just wait.
|
|
self.has_work = false;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// TODO: refactor this function it's turning into a mess. Not so simple, because of borrow
|
|
// constraints
|
|
async fn enqueue_work(&mut self, permit: OwnedSemaphorePermit) -> Result<(), Error> {
|
|
let Some(permit) = self.maybe_step_job(permit).await? else {
|
|
return Ok(());
|
|
};
|
|
// fill the current batch if necessary
|
|
self.enqueue_task(permit).await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn register_migration_job(
|
|
&self,
|
|
schema: NamespaceName,
|
|
migration: Arc<Program>,
|
|
) -> Result<i64, Error> {
|
|
// acquire an exclusive lock to the schema before enqueueing to ensure that no namespaces
|
|
// are still being created before we register the migration
|
|
let _lock = self
|
|
.namespace_store
|
|
.schema_locks()
|
|
.acquire_exlusive(schema.clone())
|
|
.await;
|
|
with_conn_async(self.migration_db.clone(), move |conn| {
|
|
register_schema_migration_job(conn, &schema, &migration)
|
|
})
|
|
.await
|
|
}
|
|
|
|
async fn get_job_status(
|
|
&self,
|
|
job_id: i64,
|
|
) -> Result<(MigrationJobStatus, Option<String>), Error> {
|
|
with_conn_async(self.migration_db.clone(), move |conn| {
|
|
super::db::get_job_status(conn, job_id)
|
|
})
|
|
.await
|
|
}
|
|
}
|
|
|
|
async fn try_step_task(
|
|
_permit: OwnedSemaphorePermit,
|
|
namespace_store: NamespaceStore,
|
|
migration_db: Arc<Mutex<MetaStoreConnection>>,
|
|
connection_maker: Arc<PrimaryConnectionMaker>,
|
|
job_status: MigrationJobStatus,
|
|
migration: Arc<Program>,
|
|
mut task: MigrationTask,
|
|
block_writes: Arc<AtomicBool>,
|
|
) -> WorkResult {
|
|
let old_status = *task.status();
|
|
let error = match try_step_task_inner(
|
|
namespace_store,
|
|
connection_maker,
|
|
job_status,
|
|
migration,
|
|
&task,
|
|
block_writes,
|
|
)
|
|
.await
|
|
{
|
|
Ok((status, error)) => {
|
|
*task.status_mut() = status;
|
|
error
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(
|
|
"error processing task {} for {}, rescheduling for later: {e}",
|
|
task.task_id(),
|
|
task.namespace()
|
|
);
|
|
None
|
|
}
|
|
};
|
|
|
|
let (task, error) = tokio::task::spawn_blocking(move || {
|
|
let mut conn = migration_db.lock();
|
|
if let Err(e) = update_meta_task_status(&mut conn, &task, error.as_deref()) {
|
|
tracing::error!("failed to update task status, retryng later: {e}");
|
|
*task.status_mut() = old_status;
|
|
}
|
|
(task, error)
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
WorkResult::Task {
|
|
old_status,
|
|
task,
|
|
error,
|
|
}
|
|
}
|
|
|
|
async fn try_step_task_inner(
|
|
namespace_store: NamespaceStore,
|
|
connection_maker: Arc<PrimaryConnectionMaker>,
|
|
job_status: MigrationJobStatus,
|
|
migration: Arc<Program>,
|
|
task: &MigrationTask,
|
|
block_writes: Arc<AtomicBool>,
|
|
) -> Result<(MigrationTaskStatus, Option<String>), Error> {
|
|
let status = *task.status();
|
|
let mut db_connection = connection_maker
|
|
.create()
|
|
.await
|
|
.map_err(|e| Error::FailedToConnect(task.namespace(), Box::new(e)))?;
|
|
if task.status().is_enqueued() {
|
|
// once writes are blocked, we first make sure that
|
|
// there are no ongoing transactions...
|
|
db_connection = task::spawn_blocking(move || -> Result<_, Error> {
|
|
db_connection.with_raw(|conn| -> Result<_, Error> {
|
|
conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
|
|
Ok(())
|
|
})?;
|
|
Ok(db_connection)
|
|
})
|
|
.await
|
|
.expect("task panicked")?;
|
|
}
|
|
|
|
let job_id = task.job_id();
|
|
let (status, error) = tokio::task::spawn_blocking(move || -> Result<_, Error> {
|
|
db_connection.with_raw(move |conn| {
|
|
let mut txn = conn.transaction()?;
|
|
|
|
match status {
|
|
_ if job_status.is_dry_run_failure() => {
|
|
abort_migration_task(&txn, job_id)?;
|
|
}
|
|
MigrationTaskStatus::Enqueued => {
|
|
enqueue_migration_task(&txn, job_id, status, &migration)?;
|
|
}
|
|
MigrationTaskStatus::DryRunSuccess if job_status.is_waiting_run() => {
|
|
step_migration_task_run(&txn, job_id)?;
|
|
}
|
|
_ => unreachable!("expected task status to be `enqueued` or `run`"),
|
|
}
|
|
|
|
let (new_status, error) = step_task(&mut txn, job_id)?;
|
|
txn.commit()?;
|
|
|
|
if new_status.is_finished() {
|
|
block_writes.store(false, std::sync::atomic::Ordering::SeqCst);
|
|
}
|
|
|
|
Ok((new_status, error))
|
|
})
|
|
})
|
|
.await
|
|
.expect("task panicked")?;
|
|
|
|
// ... then we're good to go and make sure that the current database state is
|
|
// in the backup
|
|
backup_namespace(&namespace_store, task.namespace()).await?;
|
|
|
|
Ok((status, error))
|
|
}
|
|
|
|
async fn with_conn_async<T: Send + 'static>(
|
|
conn: Arc<Mutex<MetaStoreConnection>>,
|
|
f: impl FnOnce(&mut rusqlite::Connection) -> Result<T, Error> + Send + 'static,
|
|
) -> Result<T, Error> {
|
|
tokio::task::spawn_blocking(move || {
|
|
let mut conn = conn.lock();
|
|
f(&mut *conn)
|
|
})
|
|
.await
|
|
.expect("migration db task panicked")
|
|
}
|
|
|
|
enum WorkResult {
|
|
Task {
|
|
old_status: MigrationTaskStatus,
|
|
task: MigrationTask,
|
|
error: Option<String>,
|
|
},
|
|
Job {
|
|
status: MigrationJobStatus,
|
|
},
|
|
}
|
|
|
|
async fn backup_meta_store(
|
|
meta: &MetaStore,
|
|
migration_db: Arc<Mutex<MetaStoreConnection>>,
|
|
) -> Result<(), Error> {
|
|
with_conn_async(migration_db, |conn| {
|
|
Ok(conn.query_row("PRAGMA wal_checkpoint(truncate)", (), |_| Ok(()))?)
|
|
})
|
|
.await?;
|
|
|
|
if let Some(mut savepoint) = meta.backup_savepoint().await {
|
|
if let Err(e) = savepoint.confirmed().await {
|
|
tracing::error!("failed to backup meta store: {e}");
|
|
// do not step the job, and schedule for retry.
|
|
// TODO: backoff?
|
|
// TODO: this is fine if we don't manage to get a backup here,
|
|
// then we'll restart in the previous state in case of restore,
|
|
// however, in case of restart we may not have a backup.
|
|
|
|
return Err(Error::MetaStoreBackupFailure);
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn backup_namespace(store: &NamespaceStore, ns: NamespaceName) -> Result<(), Error> {
|
|
let (replicator, conn_maker) = store
|
|
.with(ns.clone(), |ns| {
|
|
let replicator = ns.db.replicator();
|
|
let conn_maker = ns.db.connection_maker();
|
|
(replicator, conn_maker)
|
|
})
|
|
.await
|
|
.map_err(|e| Error::NamespaceLoad(Box::new(e)))?;
|
|
|
|
let savepoint = match replicator {
|
|
Some(replicator) => {
|
|
let lock = replicator.lock().await;
|
|
match &*lock {
|
|
Some(replicator) => Some(replicator.savepoint()),
|
|
None => None,
|
|
}
|
|
}
|
|
None => None,
|
|
};
|
|
|
|
conn_maker
|
|
.create()
|
|
.await
|
|
.map_err(|e| Error::NamespaceBackupFailure(ns.clone(), e.into()))?
|
|
.checkpoint()
|
|
.await
|
|
.map_err(|e| Error::NamespaceBackupFailure(ns.clone(), e.into()))?;
|
|
|
|
if let Some(mut savepoint) = savepoint {
|
|
if let Err(e) = savepoint.confirmed().await {
|
|
return Err(Error::NamespaceBackupFailure(ns, e.into()));
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn try_step_job(
|
|
fallback_state: MigrationJobStatus,
|
|
f: impl Future<Output = Result<MigrationJobStatus, Error>>,
|
|
) -> WorkResult {
|
|
let status = match f.await {
|
|
Ok(status) => status,
|
|
Err(e) => {
|
|
tracing::error!("error while stepping job, falling back to previous state: {e}");
|
|
fallback_state
|
|
}
|
|
};
|
|
|
|
WorkResult::Job { status }
|
|
}
|
|
|
|
async fn step_job_failure(
|
|
_ermit: OwnedSemaphorePermit,
|
|
migration_db: Arc<Mutex<MetaStoreConnection>>,
|
|
job_id: i64,
|
|
namespace_store: NamespaceStore,
|
|
) -> WorkResult {
|
|
try_step_job(MigrationJobStatus::DryRunFailure, async move {
|
|
with_conn_async(migration_db.clone(), move |conn| {
|
|
// TODO ensure here that this transition is valid
|
|
// the error must already be there from when we stepped to DryRunFailure
|
|
update_job_status(conn, job_id, MigrationJobStatus::RunFailure, None)
|
|
})
|
|
.await?;
|
|
|
|
backup_meta_store(namespace_store.meta_store(), migration_db).await?;
|
|
|
|
Ok(MigrationJobStatus::RunFailure)
|
|
})
|
|
.await
|
|
}
|
|
|
|
async fn step_job_waiting_run(
|
|
_permit: OwnedSemaphorePermit,
|
|
migration_db: Arc<Mutex<MetaStoreConnection>>,
|
|
job_id: i64,
|
|
namespace_store: NamespaceStore,
|
|
) -> WorkResult {
|
|
try_step_job(MigrationJobStatus::DryRunSuccess, async move {
|
|
with_conn_async(migration_db.clone(), move |conn| {
|
|
// TODO ensure here that this transition is valid
|
|
update_job_status(conn, job_id, MigrationJobStatus::WaitingRun, None)
|
|
})
|
|
.await?;
|
|
|
|
backup_meta_store(namespace_store.meta_store(), migration_db).await?;
|
|
|
|
Ok(MigrationJobStatus::WaitingRun)
|
|
})
|
|
.await
|
|
}
|
|
|
|
async fn step_job_dry_run_failure(
|
|
_permit: OwnedSemaphorePermit,
|
|
migration_db: Arc<Mutex<MetaStoreConnection>>,
|
|
job_id: i64,
|
|
namespace_store: NamespaceStore,
|
|
status: MigrationJobStatus,
|
|
(task_id, error, ns): (i64, String, NamespaceName),
|
|
) -> WorkResult {
|
|
try_step_job(status, async move {
|
|
with_conn_async(migration_db.clone(), move |conn| {
|
|
let error = format!("task {task_id} for namespace `{ns}` failed with error: {error}");
|
|
update_job_status(
|
|
conn,
|
|
job_id,
|
|
MigrationJobStatus::DryRunFailure,
|
|
Some(&error),
|
|
)
|
|
})
|
|
.await?;
|
|
|
|
backup_meta_store(namespace_store.meta_store(), migration_db).await?;
|
|
Ok(MigrationJobStatus::DryRunFailure)
|
|
})
|
|
.await
|
|
}
|
|
|
|
async fn step_job_dry_run_success(
|
|
_permit: OwnedSemaphorePermit,
|
|
migration_db: Arc<Mutex<MetaStoreConnection>>,
|
|
job_id: i64,
|
|
namespace_store: NamespaceStore,
|
|
) -> WorkResult {
|
|
try_step_job(MigrationJobStatus::WaitingDryRun, async move {
|
|
with_conn_async(migration_db.clone(), move |conn| {
|
|
job_step_dry_run_success(conn, job_id)
|
|
})
|
|
.await?;
|
|
|
|
backup_meta_store(namespace_store.meta_store(), migration_db).await?;
|
|
|
|
Ok(MigrationJobStatus::DryRunSuccess)
|
|
})
|
|
.await
|
|
}
|
|
|
|
async fn step_job_run_success(
|
|
_permit: OwnedSemaphorePermit,
|
|
schema: NamespaceName,
|
|
migration: Arc<Program>,
|
|
job_id: i64,
|
|
namespace_store: NamespaceStore,
|
|
migration_db: Arc<Mutex<MetaStoreConnection>>,
|
|
) -> WorkResult {
|
|
try_step_job(MigrationJobStatus::WaitingRun, async move {
|
|
// TODO: check that all tasks actually reported success before migration
|
|
let connection_maker = namespace_store
|
|
.with(schema.clone(), |ns| {
|
|
ns.db
|
|
.as_schema()
|
|
.expect("expected database to be a schema database")
|
|
.connection_maker()
|
|
.clone()
|
|
})
|
|
.await
|
|
.map_err(|e| Error::NamespaceLoad(Box::new(e)))?;
|
|
|
|
let connection = connection_maker
|
|
.create()
|
|
.await
|
|
.map_err(|e| Error::FailedToConnect(schema.clone(), e.into()))?;
|
|
tokio::task::spawn_blocking(move || -> Result<(), Error> {
|
|
connection
|
|
.connection()
|
|
.with_raw(|conn| -> Result<(), Error> {
|
|
let mut txn = conn.transaction()?;
|
|
let schema_version =
|
|
txn.query_row("PRAGMA schema_version", (), |row| row.get::<_, i64>(0))?;
|
|
|
|
if schema_version != job_id {
|
|
// todo: use proper builder and collect errors
|
|
let (ret, _status) = perform_migration(
|
|
&mut txn,
|
|
&migration,
|
|
false,
|
|
IgnoreResult,
|
|
&QueryBuilderConfig::default(),
|
|
);
|
|
let _error = ret.err().map(|e| e.to_string());
|
|
txn.pragma_update(None, "schema_version", job_id)?;
|
|
// update schema version to job_id?
|
|
txn.commit()?;
|
|
}
|
|
|
|
Ok(())
|
|
})
|
|
})
|
|
.await
|
|
.expect("task panicked")?;
|
|
|
|
// backup the schema
|
|
backup_namespace(&namespace_store, schema).await?;
|
|
|
|
with_conn_async(migration_db.clone(), move |conn| {
|
|
update_job_status(conn, job_id, MigrationJobStatus::RunSuccess, None)
|
|
})
|
|
.await?;
|
|
|
|
backup_meta_store(namespace_store.meta_store(), migration_db).await?;
|
|
Ok(MigrationJobStatus::RunSuccess)
|
|
})
|
|
.await
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod test {
|
|
use insta::assert_debug_snapshot;
|
|
#[cfg(not(feature = "durable-wal"))]
|
|
use libsql_sys::wal::either::Either as EitherWAL;
|
|
#[cfg(feature = "durable-wal")]
|
|
use libsql_sys::wal::either::Either3 as EitherWAL;
|
|
use libsql_sys::wal::Sqlite3WalManager;
|
|
use std::path::Path;
|
|
use tempfile::tempdir;
|
|
|
|
use crate::connection::config::DatabaseConfig;
|
|
use crate::database::DatabaseKind;
|
|
use crate::namespace::meta_store::{metastore_connection_maker, MetaStore};
|
|
use crate::namespace::{NamespaceConfig, RestoreOption};
|
|
use crate::schema::SchedulerHandle;
|
|
|
|
use super::super::migration::has_pending_migration_task;
|
|
use super::*;
|
|
|
|
// FIXME: lots of coupling here, there whoudl be an easier way to test this.
|
|
#[tokio::test]
|
|
async fn writes_blocked_while_performing_migration() {
|
|
let tmp = tempdir().unwrap();
|
|
let (maker, manager) = metastore_connection_maker(None, tmp.path()).await.unwrap();
|
|
let conn = maker().unwrap();
|
|
let meta_store = MetaStore::new(Default::default(), tmp.path(), conn, manager)
|
|
.await
|
|
.unwrap();
|
|
let (sender, mut receiver) = mpsc::channel(100);
|
|
let config = make_config(sender.clone().into(), tmp.path());
|
|
let store = NamespaceStore::new(false, false, 10, config, meta_store)
|
|
.await
|
|
.unwrap();
|
|
let mut scheduler = Scheduler::new(store.clone(), maker().unwrap())
|
|
.await
|
|
.unwrap();
|
|
|
|
store
|
|
.create(
|
|
"schema".into(),
|
|
RestoreOption::Latest,
|
|
DatabaseConfig {
|
|
is_shared_schema: true,
|
|
..Default::default()
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
store
|
|
.create(
|
|
"ns".into(),
|
|
RestoreOption::Latest,
|
|
DatabaseConfig {
|
|
shared_schema_name: Some("schema".into()),
|
|
..Default::default()
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let (block_write, ns_conn_maker) = store
|
|
.with("ns".into(), |ns| {
|
|
(
|
|
ns.db.as_primary().unwrap().block_writes.clone(),
|
|
ns.db.as_primary().unwrap().connection_maker(),
|
|
)
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let (snd, mut rcv) = tokio::sync::oneshot::channel();
|
|
sender
|
|
.send(SchedulerMessage::ScheduleMigration {
|
|
schema: "schema".into(),
|
|
migration: Program::seq(&["create table test (c)"]).into(),
|
|
ret: snd,
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
// step until we get a response
|
|
loop {
|
|
scheduler.step(&mut receiver).await.unwrap();
|
|
if rcv.try_recv().is_ok() {
|
|
break;
|
|
}
|
|
}
|
|
|
|
// this is right before the task gets enqueued
|
|
assert!(!block_write.load(std::sync::atomic::Ordering::Relaxed));
|
|
// next step should enqueue the task
|
|
let conn = ns_conn_maker.create().await.unwrap();
|
|
|
|
assert!(!block_write.load(std::sync::atomic::Ordering::Relaxed));
|
|
while conn.with_raw(|conn| !has_pending_migration_task(&conn).unwrap()) {
|
|
scheduler.step(&mut receiver).await.unwrap();
|
|
}
|
|
assert!(block_write.load(std::sync::atomic::Ordering::Relaxed));
|
|
|
|
while scheduler.current_job.is_some() {
|
|
scheduler.step(&mut receiver).await.unwrap();
|
|
}
|
|
|
|
assert!(!block_write.load(std::sync::atomic::Ordering::Relaxed));
|
|
}
|
|
|
|
fn make_config(migration_scheduler: SchedulerHandle, path: &Path) -> NamespaceConfig {
|
|
NamespaceConfig {
|
|
db_kind: DatabaseKind::Primary,
|
|
base_path: path.to_path_buf().into(),
|
|
max_log_size: 1000000000,
|
|
max_log_duration: None,
|
|
extensions: Arc::new([]),
|
|
stats_sender: tokio::sync::mpsc::channel(1).0,
|
|
max_response_size: 100000000000000,
|
|
max_total_response_size: 100000000000,
|
|
checkpoint_interval: None,
|
|
max_concurrent_connections: Arc::new(Semaphore::new(10)),
|
|
max_concurrent_requests: 10000,
|
|
encryption_config: None,
|
|
channel: None,
|
|
uri: None,
|
|
bottomless_replication: None,
|
|
scripted_backup: None,
|
|
migration_scheduler,
|
|
make_wal_manager: Arc::new(|| EitherWAL::A(Sqlite3WalManager::default())),
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn ns_loaded_with_pending_tasks_writes_is_blocked() {
|
|
let tmp = tempdir().unwrap();
|
|
{
|
|
let (maker, manager) = metastore_connection_maker(None, tmp.path()).await.unwrap();
|
|
let conn = maker().unwrap();
|
|
let meta_store = MetaStore::new(Default::default(), tmp.path(), conn, manager)
|
|
.await
|
|
.unwrap();
|
|
let (sender, mut receiver) = mpsc::channel(100);
|
|
let config = make_config(sender.clone().into(), tmp.path());
|
|
let store = NamespaceStore::new(false, false, 10, config, meta_store)
|
|
.await
|
|
.unwrap();
|
|
let mut scheduler = Scheduler::new(store.clone(), maker().unwrap())
|
|
.await
|
|
.unwrap();
|
|
|
|
store
|
|
.create(
|
|
"schema".into(),
|
|
RestoreOption::Latest,
|
|
DatabaseConfig {
|
|
is_shared_schema: true,
|
|
..Default::default()
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
store
|
|
.create(
|
|
"ns".into(),
|
|
RestoreOption::Latest,
|
|
DatabaseConfig {
|
|
shared_schema_name: Some("schema".into()),
|
|
..Default::default()
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let (block_write, ns_conn_maker) = store
|
|
.with("ns".into(), |ns| {
|
|
(
|
|
ns.db.as_primary().unwrap().block_writes.clone(),
|
|
ns.db.as_primary().unwrap().connection_maker(),
|
|
)
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let (snd, mut rcv) = tokio::sync::oneshot::channel();
|
|
sender
|
|
.send(SchedulerMessage::ScheduleMigration {
|
|
schema: "schema".into(),
|
|
migration: Program::seq(&["create table test (c)"]).into(),
|
|
ret: snd,
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
// step until we get a response
|
|
loop {
|
|
scheduler.step(&mut receiver).await.unwrap();
|
|
if rcv.try_recv().is_ok() {
|
|
break;
|
|
}
|
|
}
|
|
|
|
let conn = ns_conn_maker.create().await.unwrap();
|
|
|
|
assert!(!block_write.load(std::sync::atomic::Ordering::Relaxed));
|
|
while conn.with_raw(|conn| !has_pending_migration_task(&conn).unwrap()) {
|
|
scheduler.step(&mut receiver).await.unwrap();
|
|
}
|
|
assert!(block_write.load(std::sync::atomic::Ordering::Relaxed));
|
|
|
|
// at this point we drop everything and recreated the store (simultes a restart mid-task)
|
|
}
|
|
|
|
let (maker, manager) = metastore_connection_maker(None, tmp.path()).await.unwrap();
|
|
let conn = maker().unwrap();
|
|
let meta_store = MetaStore::new(Default::default(), tmp.path(), conn, manager)
|
|
.await
|
|
.unwrap();
|
|
let (sender, _receiver) = mpsc::channel(100);
|
|
let config = make_config(sender.clone().into(), tmp.path());
|
|
let store = NamespaceStore::new(false, false, 10, config, meta_store)
|
|
.await
|
|
.unwrap();
|
|
|
|
store
|
|
.with("ns".into(), |ns| {
|
|
assert!(ns
|
|
.db
|
|
.as_primary()
|
|
.unwrap()
|
|
.block_writes
|
|
.load(std::sync::atomic::Ordering::Relaxed));
|
|
})
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn cant_delete_namespace_while_pending_job() {
|
|
let tmp = tempdir().unwrap();
|
|
let (maker, manager) = metastore_connection_maker(None, tmp.path()).await.unwrap();
|
|
let conn = maker().unwrap();
|
|
let meta_store = MetaStore::new(Default::default(), tmp.path(), conn, manager)
|
|
.await
|
|
.unwrap();
|
|
let (sender, mut receiver) = mpsc::channel(100);
|
|
let config = make_config(sender.clone().into(), tmp.path());
|
|
let store = NamespaceStore::new(false, false, 10, config, meta_store)
|
|
.await
|
|
.unwrap();
|
|
let mut scheduler = Scheduler::new(store.clone(), maker().unwrap())
|
|
.await
|
|
.unwrap();
|
|
|
|
store
|
|
.create(
|
|
"schema".into(),
|
|
RestoreOption::Latest,
|
|
DatabaseConfig {
|
|
is_shared_schema: true,
|
|
..Default::default()
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
store
|
|
.create(
|
|
"ns".into(),
|
|
RestoreOption::Latest,
|
|
DatabaseConfig {
|
|
shared_schema_name: Some("schema".into()),
|
|
..Default::default()
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let (snd, _rcv) = tokio::sync::oneshot::channel();
|
|
sender
|
|
.send(SchedulerMessage::ScheduleMigration {
|
|
schema: "schema".into(),
|
|
migration: Program::seq(&["create table test (c)"]).into(),
|
|
ret: snd,
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
while !super::super::db::has_pending_migration_jobs(
|
|
&scheduler.migration_db.lock(),
|
|
&"schema".into(),
|
|
)
|
|
.unwrap()
|
|
{
|
|
scheduler.step(&mut receiver).await.unwrap();
|
|
}
|
|
|
|
assert_debug_snapshot!(store.destroy("ns".into(), true).await.unwrap_err());
|
|
|
|
while super::super::db::has_pending_migration_jobs(
|
|
&scheduler.migration_db.lock(),
|
|
&"schema".into(),
|
|
)
|
|
.unwrap()
|
|
{
|
|
scheduler.step(&mut receiver).await.unwrap();
|
|
}
|
|
|
|
store.destroy("ns".into(), true).await.unwrap();
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn schema_locks() {
|
|
let tmp = tempdir().unwrap();
|
|
let (maker, manager) = metastore_connection_maker(None, tmp.path()).await.unwrap();
|
|
let conn = maker().unwrap();
|
|
let meta_store = MetaStore::new(Default::default(), tmp.path(), conn, manager)
|
|
.await
|
|
.unwrap();
|
|
let (sender, _receiver) = mpsc::channel(100);
|
|
let config = make_config(sender.clone().into(), tmp.path());
|
|
let store = NamespaceStore::new(false, false, 10, config, meta_store)
|
|
.await
|
|
.unwrap();
|
|
let scheduler = Scheduler::new(store.clone(), maker().unwrap())
|
|
.await
|
|
.unwrap();
|
|
|
|
store
|
|
.create(
|
|
"schema".into(),
|
|
RestoreOption::Latest,
|
|
DatabaseConfig {
|
|
is_shared_schema: true,
|
|
..Default::default()
|
|
},
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
{
|
|
let _lock = store.schema_locks().acquire_shared("schema".into()).await;
|
|
let fut = scheduler.register_migration_job(
|
|
"schema".into(),
|
|
Program::seq(&["create table test (x)"]).into(),
|
|
);
|
|
// we can't acquire the lock
|
|
assert!(tokio::time::timeout(std::time::Duration::from_secs(1), fut)
|
|
.await
|
|
.is_err());
|
|
}
|
|
|
|
{
|
|
// simulate an ongoing migration registration
|
|
let _lock = store.schema_locks().acquire_exlusive("schema".into()).await;
|
|
|
|
let fut = store.create(
|
|
"some_namespace".into(),
|
|
RestoreOption::Latest,
|
|
DatabaseConfig {
|
|
shared_schema_name: Some("schema".into()),
|
|
..Default::default()
|
|
},
|
|
);
|
|
// we can't acquire the lock
|
|
assert!(tokio::time::timeout(std::time::Duration::from_secs(1), fut)
|
|
.await
|
|
.is_err());
|
|
}
|
|
}
|
|
}
|