mirror of
https://github.com/tursodatabase/libsql.git
synced 2025-01-07 10:09:04 +00:00
543 lines
18 KiB
Rust
543 lines
18 KiB
Rust
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use futures::FutureExt;
|
|
use libsql::Database;
|
|
use libsql_server::config::{AdminApiConfig, RpcClientConfig, RpcServerConfig, UserApiConfig};
|
|
use tempfile::tempdir;
|
|
use tokio::sync::Notify;
|
|
use turmoil::Builder;
|
|
|
|
use crate::common::{
|
|
http::Client,
|
|
net::{init_tracing, SimServer, TestServer, TurmoilAcceptor, TurmoilConnector},
|
|
};
|
|
|
|
/// In this test, we create a primary and a replica, add some data and sync them. when then shut
|
|
/// down and bring back up the replica, and ensure the the replica continue normal mode of
|
|
/// operation.
|
|
#[test]
|
|
fn replica_restart() {
|
|
let mut sim = Builder::new()
|
|
.simulation_duration(Duration::from_secs(1000))
|
|
.build();
|
|
let tmp = tempdir().unwrap();
|
|
sim.host("primary", move || {
|
|
let path = tmp.path().to_path_buf();
|
|
async move {
|
|
let server = TestServer {
|
|
path: path.into(),
|
|
user_api_config: UserApiConfig {
|
|
..Default::default()
|
|
},
|
|
admin_api_config: Some(AdminApiConfig {
|
|
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?,
|
|
connector: TurmoilConnector,
|
|
disable_metrics: true,
|
|
}),
|
|
rpc_server_config: Some(RpcServerConfig {
|
|
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?,
|
|
tls_config: None,
|
|
}),
|
|
..Default::default()
|
|
};
|
|
|
|
server.start_sim(8080).await?;
|
|
|
|
Ok(())
|
|
}
|
|
});
|
|
|
|
let notify = Arc::new(Notify::new());
|
|
let tmp = tempdir().unwrap();
|
|
let notify_clone = notify.clone();
|
|
sim.host("replica", move || {
|
|
let path = tmp.path().to_path_buf();
|
|
let notify = notify_clone.clone();
|
|
async move {
|
|
let make_server = || {
|
|
let path = path.clone();
|
|
async {
|
|
TestServer {
|
|
path: path.into(),
|
|
user_api_config: UserApiConfig {
|
|
..Default::default()
|
|
},
|
|
admin_api_config: Some(AdminApiConfig {
|
|
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
|
|
connector: TurmoilConnector,
|
|
disable_metrics: true,
|
|
}),
|
|
rpc_client_config: Some(RpcClientConfig {
|
|
remote_url: "http://primary:4567".into(),
|
|
connector: TurmoilConnector,
|
|
tls_config: None,
|
|
}),
|
|
..Default::default()
|
|
}
|
|
}
|
|
};
|
|
|
|
let server = make_server().await;
|
|
|
|
tokio::select! {
|
|
res = server.start_sim(8080) => {
|
|
res.unwrap()
|
|
}
|
|
_ = notify.notified() => (),
|
|
}
|
|
|
|
let server = make_server().await;
|
|
server.start_sim(8080).await.unwrap();
|
|
|
|
Ok(())
|
|
}
|
|
});
|
|
|
|
sim.client("client", async move {
|
|
let http = Client::new();
|
|
let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?;
|
|
let conn = db.connect()?;
|
|
|
|
// insert a few valued into the primary
|
|
conn.execute("create table test (x)", ()).await.unwrap();
|
|
for _ in 0..50 {
|
|
conn.execute("insert into test values (42)", ())
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
let primary_index = http
|
|
.get("http://primary:9090/v1/namespaces/default/stats")
|
|
.await
|
|
.unwrap()
|
|
.json_value()
|
|
.await
|
|
.unwrap()["replication_index"]
|
|
.clone()
|
|
.as_i64();
|
|
|
|
loop {
|
|
let replica_index = http
|
|
.get("http://primary:9090/v1/namespaces/default/stats")
|
|
.await
|
|
.unwrap()
|
|
.json_value()
|
|
.await
|
|
.unwrap()["replication_index"]
|
|
.clone()
|
|
.as_i64();
|
|
if primary_index == replica_index {
|
|
break;
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
}
|
|
|
|
notify.notify_waiters();
|
|
|
|
// make sure that replica is up to date
|
|
loop {
|
|
let replica_index = http
|
|
.get("http://primary:9090/v1/namespaces/default/stats")
|
|
.await
|
|
.unwrap()
|
|
.json_value()
|
|
.await
|
|
.unwrap()["replication_index"]
|
|
.clone()
|
|
.as_i64();
|
|
if primary_index == replica_index {
|
|
break;
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
}
|
|
|
|
Ok(())
|
|
});
|
|
|
|
sim.run().unwrap();
|
|
}
|
|
|
|
/// In this test, we start a primary and a replica. We add some entries to the primary, and wait
|
|
/// for the replica to be up to date. Then we stop the primary, remove it's wallog, and restart the
|
|
/// primary. This will force the primary to regenerate the log. The replica should catch that, and
|
|
/// self heal. During this process the replica is not shutdown.
|
|
#[test]
|
|
fn primary_regenerate_log_no_replica_restart() {
|
|
let mut sim = Builder::new()
|
|
.simulation_duration(Duration::from_secs(1000))
|
|
.build();
|
|
let tmp = tempdir().unwrap();
|
|
|
|
let notify = Arc::new(Notify::new());
|
|
let notify_clone = notify.clone();
|
|
|
|
init_tracing();
|
|
sim.host("primary", move || {
|
|
let notify = notify_clone.clone();
|
|
let path = tmp.path().to_path_buf();
|
|
async move {
|
|
let make_server = || async {
|
|
TestServer {
|
|
path: path.clone().into(),
|
|
user_api_config: UserApiConfig {
|
|
..Default::default()
|
|
},
|
|
admin_api_config: Some(AdminApiConfig {
|
|
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
|
|
connector: TurmoilConnector,
|
|
disable_metrics: true,
|
|
}),
|
|
rpc_server_config: Some(RpcServerConfig {
|
|
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await.unwrap(),
|
|
tls_config: None,
|
|
}),
|
|
..Default::default()
|
|
}
|
|
};
|
|
let server = make_server().await;
|
|
let shutdown = server.shutdown.clone();
|
|
|
|
let fut = async move { server.start_sim(8080).await };
|
|
|
|
tokio::pin!(fut);
|
|
|
|
loop {
|
|
tokio::select! {
|
|
res = &mut fut => {
|
|
res.unwrap();
|
|
break
|
|
}
|
|
_ = notify.notified() => {
|
|
shutdown.notify_waiters();
|
|
},
|
|
}
|
|
}
|
|
// remove the wallog and start again
|
|
tokio::fs::remove_file(path.join("dbs/default/wallog"))
|
|
.await
|
|
.unwrap();
|
|
notify.notify_waiters();
|
|
let server = make_server().await;
|
|
server.start_sim(8080).await.unwrap();
|
|
|
|
Ok(())
|
|
}
|
|
});
|
|
|
|
let tmp = tempdir().unwrap();
|
|
sim.host("replica", move || {
|
|
let path = tmp.path().to_path_buf();
|
|
async move {
|
|
let make_server = || {
|
|
let path = path.clone();
|
|
async {
|
|
TestServer {
|
|
path: path.into(),
|
|
user_api_config: UserApiConfig {
|
|
..Default::default()
|
|
},
|
|
admin_api_config: Some(AdminApiConfig {
|
|
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
|
|
connector: TurmoilConnector,
|
|
disable_metrics: true,
|
|
}),
|
|
rpc_client_config: Some(RpcClientConfig {
|
|
remote_url: "http://primary:4567".into(),
|
|
connector: TurmoilConnector,
|
|
tls_config: None,
|
|
}),
|
|
..Default::default()
|
|
}
|
|
}
|
|
};
|
|
|
|
let server = make_server().await;
|
|
server.start_sim(8080).await.unwrap();
|
|
|
|
Ok(())
|
|
}
|
|
});
|
|
|
|
sim.client("client", async move {
|
|
let http = Client::new();
|
|
let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?;
|
|
let conn = db.connect()?;
|
|
|
|
// insert a few valued into the primary
|
|
conn.execute("create table test (x)", ()).await.unwrap();
|
|
for _ in 0..50 {
|
|
conn.execute("insert into test values (42)", ())
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
let primary_index = http
|
|
.get("http://primary:9090/v1/namespaces/default/stats")
|
|
.await
|
|
.unwrap()
|
|
.json_value()
|
|
.await
|
|
.unwrap()["replication_index"]
|
|
.clone()
|
|
.as_i64();
|
|
|
|
loop {
|
|
let replica_index = http
|
|
.get("http://primary:9090/v1/namespaces/default/stats")
|
|
.await
|
|
.unwrap()
|
|
.json_value()
|
|
.await
|
|
.unwrap()["replication_index"]
|
|
.clone()
|
|
.as_i64();
|
|
if primary_index == replica_index {
|
|
break;
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
}
|
|
|
|
notify.notify_waiters();
|
|
notify.notified().await;
|
|
|
|
drop(http);
|
|
let http = Client::new();
|
|
// make sure that replica is up to date
|
|
let new_primary_index = http
|
|
.get("http://primary:9090/v1/namespaces/default/stats")
|
|
.await
|
|
.unwrap()
|
|
.json_value()
|
|
.await
|
|
.unwrap()["replication_index"]
|
|
.clone()
|
|
.as_i64();
|
|
assert_ne!(primary_index, new_primary_index);
|
|
loop {
|
|
let replica_index = http
|
|
.get("http://primary:9090/v1/namespaces/default/stats")
|
|
.await
|
|
.unwrap()
|
|
.json_value()
|
|
.await
|
|
.unwrap()["replication_index"]
|
|
.clone()
|
|
.as_i64();
|
|
if new_primary_index == replica_index {
|
|
break;
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
}
|
|
|
|
Ok(())
|
|
});
|
|
|
|
sim.run().unwrap();
|
|
}
|
|
|
|
/// This test is very similar to `primary_regenerate_log_no_replica_restart`. The only difference
|
|
/// is that the replica is being shutdown before the primary regenerates their log. When the
|
|
/// replica is brought back up, it will try to load the namespace from a primary with a new log,
|
|
/// and it should self heal.
|
|
#[test]
|
|
fn primary_regenerate_log_with_replica_restart() {
|
|
let mut sim = Builder::new()
|
|
.simulation_duration(Duration::from_secs(1000))
|
|
.build();
|
|
let tmp = tempdir().unwrap();
|
|
|
|
let notify = Arc::new(Notify::new());
|
|
let notify_clone = notify.clone();
|
|
|
|
init_tracing();
|
|
sim.host("primary", move || {
|
|
let notify = notify_clone.clone();
|
|
let path = tmp.path().to_path_buf();
|
|
async move {
|
|
let make_server = || async {
|
|
TestServer {
|
|
path: path.clone().into(),
|
|
user_api_config: UserApiConfig {
|
|
..Default::default()
|
|
},
|
|
admin_api_config: Some(AdminApiConfig {
|
|
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
|
|
connector: TurmoilConnector,
|
|
disable_metrics: true,
|
|
}),
|
|
rpc_server_config: Some(RpcServerConfig {
|
|
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await.unwrap(),
|
|
tls_config: None,
|
|
}),
|
|
..Default::default()
|
|
}
|
|
};
|
|
let server = make_server().await;
|
|
let shutdown = server.shutdown.clone();
|
|
|
|
let fut = async move { server.start_sim(8080).await };
|
|
|
|
tokio::pin!(fut);
|
|
|
|
loop {
|
|
tokio::select! {
|
|
res = &mut fut => {
|
|
res.unwrap();
|
|
break
|
|
}
|
|
_ = notify.notified() => {
|
|
shutdown.notify_waiters();
|
|
},
|
|
}
|
|
}
|
|
// remove the wallog and start again
|
|
tokio::fs::remove_file(path.join("dbs/default/wallog"))
|
|
.await
|
|
.unwrap();
|
|
notify.notify_waiters();
|
|
let server = make_server().await;
|
|
server.start_sim(8080).await.unwrap();
|
|
|
|
Ok(())
|
|
}
|
|
});
|
|
|
|
let tmp = tempdir().unwrap();
|
|
let notify_clone = notify.clone();
|
|
sim.host("replica", move || {
|
|
let path = tmp.path().to_path_buf();
|
|
let notify = notify_clone.clone();
|
|
async move {
|
|
let make_server = || {
|
|
let path = path.clone();
|
|
async {
|
|
TestServer {
|
|
path: path.into(),
|
|
user_api_config: UserApiConfig {
|
|
..Default::default()
|
|
},
|
|
admin_api_config: Some(AdminApiConfig {
|
|
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
|
|
connector: TurmoilConnector,
|
|
disable_metrics: true,
|
|
}),
|
|
rpc_client_config: Some(RpcClientConfig {
|
|
remote_url: "http://primary:4567".into(),
|
|
connector: TurmoilConnector,
|
|
tls_config: None,
|
|
}),
|
|
..Default::default()
|
|
}
|
|
}
|
|
};
|
|
|
|
let server = make_server().await;
|
|
let shutdown = server.shutdown.clone();
|
|
let fut = async {
|
|
server.start_sim(8080).await.unwrap();
|
|
};
|
|
|
|
tokio::pin!(fut);
|
|
let notify_fut = async {
|
|
notify.notified().await;
|
|
}
|
|
.fuse();
|
|
tokio::pin!(notify_fut);
|
|
loop {
|
|
tokio::select! {
|
|
_ = &mut fut => break,
|
|
_ = &mut notify_fut => {
|
|
shutdown.notify_waiters();
|
|
}
|
|
}
|
|
}
|
|
|
|
// we wait for the server to have restarted
|
|
notify.notified().await;
|
|
|
|
// and then restart the replica
|
|
let server = make_server().await;
|
|
server.start_sim(8080).await.unwrap();
|
|
|
|
Ok(())
|
|
}
|
|
});
|
|
|
|
sim.client("client", async move {
|
|
let http = Client::new();
|
|
let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?;
|
|
let conn = db.connect()?;
|
|
|
|
// insert a few valued into the primary
|
|
conn.execute("create table test (x)", ()).await.unwrap();
|
|
for _ in 0..50 {
|
|
conn.execute("insert into test values (42)", ())
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
let primary_index = http
|
|
.get("http://primary:9090/v1/namespaces/default/stats")
|
|
.await
|
|
.unwrap()
|
|
.json_value()
|
|
.await
|
|
.unwrap()["replication_index"]
|
|
.clone()
|
|
.as_i64();
|
|
|
|
loop {
|
|
let replica_index = http
|
|
.get("http://primary:9090/v1/namespaces/default/stats")
|
|
.await
|
|
.unwrap()
|
|
.json_value()
|
|
.await
|
|
.unwrap()["replication_index"]
|
|
.clone()
|
|
.as_i64();
|
|
if primary_index == replica_index {
|
|
break;
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
}
|
|
|
|
notify.notify_waiters();
|
|
notify.notified().await;
|
|
|
|
drop(http);
|
|
let http = Client::new();
|
|
// make sure that replica is up to date
|
|
let new_primary_index = http
|
|
.get("http://primary:9090/v1/namespaces/default/stats")
|
|
.await
|
|
.unwrap()
|
|
.json_value()
|
|
.await
|
|
.unwrap()["replication_index"]
|
|
.clone()
|
|
.as_i64();
|
|
assert_ne!(primary_index, new_primary_index);
|
|
loop {
|
|
let replica_index = http
|
|
.get("http://primary:9090/v1/namespaces/default/stats")
|
|
.await
|
|
.unwrap()
|
|
.json_value()
|
|
.await
|
|
.unwrap()["replication_index"]
|
|
.clone()
|
|
.as_i64();
|
|
if new_primary_index == replica_index {
|
|
break;
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
|
}
|
|
|
|
Ok(())
|
|
});
|
|
|
|
sim.run().unwrap();
|
|
}
|