0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2024-11-22 02:16:15 +00:00
libsql/libsql-server/tests/cluster/replication.rs
2024-10-09 16:50:59 +02:00

341 lines
11 KiB
Rust

use std::sync::Arc;
use std::time::Duration;
use insta::assert_debug_snapshot;
use libsql::Database;
use libsql_server::config::{AdminApiConfig, DbConfig, RpcClientConfig, RpcServerConfig};
use tokio::sync::Notify;
use crate::common::{
http::Client,
net::{SimServer, TestServer, TurmoilAcceptor, TurmoilConnector},
};
/// In this test, we first create a primary with a very small max_log_size, and then add a good
/// amount of data to it. This will cause the primary to create a bunch of snaphots a large enough
/// to prevent the replica from applying them all at once. We then start the replica, and check
/// that it replicates correctly to the primary's replicaton index. #[test]
#[test]
fn apply_partial_snapshot() {
let mut sim = turmoil::Builder::new()
.tcp_capacity(4096 * 30)
.simulation_duration(Duration::from_secs(3600))
.build();
let prim_tmp = tempfile::tempdir().unwrap();
let notify = Arc::new(Notify::new());
sim.host("primary", {
let prim_path = prim_tmp.path().to_path_buf();
move || {
let prim_path = prim_path.clone();
async move {
let primary = TestServer {
path: prim_path.into(),
db_config: DbConfig {
max_log_size: 1,
..Default::default()
},
admin_api_config: Some(AdminApiConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
connector: TurmoilConnector,
disable_metrics: true,
auth_key: None,
}),
rpc_server_config: Some(RpcServerConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 5050)).await.unwrap(),
tls_config: None,
}),
..Default::default()
};
primary.start_sim(8080).await.unwrap();
Ok(())
}
}
});
sim.host("replica", {
let notify = notify.clone();
move || {
let notify = notify.clone();
async move {
let tmp = tempfile::tempdir().unwrap();
let replica = TestServer {
path: tmp.path().to_path_buf().into(),
db_config: DbConfig {
max_log_size: 1,
..Default::default()
},
admin_api_config: Some(AdminApiConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
connector: TurmoilConnector,
disable_metrics: true,
auth_key: None,
}),
rpc_client_config: Some(RpcClientConfig {
remote_url: "http://primary:5050".into(),
tls_config: None,
connector: TurmoilConnector,
}),
..Default::default()
};
notify.notified().await;
replica.start_sim(8080).await.unwrap();
Ok(())
}
}
});
sim.client("client", async move {
let primary = libsql::Database::open_remote_with_connector(
"http://primary:8080",
"",
TurmoilConnector,
)
.unwrap();
let conn = primary.connect().unwrap();
conn.execute("CREATE TABLE TEST (x)", ()).await.unwrap();
// we need a sufficiently large snapshot for the test. Before the fix, 5000 insert would
// trigger an infinite loop.
for _ in 0..5000 {
conn.execute("INSERT INTO TEST VALUES (randomblob(6000))", ())
.await
.unwrap();
}
let client = Client::new();
let resp = client
.get("http://primary:9090/v1/namespaces/default/stats")
.await
.unwrap();
let stats = resp.json_value().await.unwrap();
let primary_replication_index = stats["replication_index"].as_i64().unwrap();
// primary is setup, time to start replica
notify.notify_waiters();
let client = Client::new();
// wait for replica to start up
while client.get("http://replica:8080/").await.is_err() {
tokio::time::sleep(Duration::from_millis(100)).await;
}
loop {
let resp = client
.get("http://replica:9090/v1/namespaces/default/stats")
.await
.unwrap();
let stats = resp.json_value().await.unwrap();
let replication_index = &stats["replication_index"];
if !replication_index.is_null()
&& replication_index.as_i64().unwrap() == primary_replication_index
{
break;
}
tokio::time::sleep(Duration::from_millis(1000)).await;
}
Ok(())
});
sim.run().unwrap();
}
#[test]
fn replica_lazy_creation() {
let mut sim = turmoil::Builder::new()
.simulation_duration(Duration::from_secs(1000))
.build();
let prim_tmp = tempfile::tempdir().unwrap();
sim.host("primary", {
let prim_path = prim_tmp.path().to_path_buf();
move || {
let prim_path = prim_path.clone();
async move {
let primary = TestServer {
path: prim_path.into(),
db_config: DbConfig {
max_log_size: 1,
..Default::default()
},
admin_api_config: Some(AdminApiConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
connector: TurmoilConnector,
disable_metrics: true,
auth_key: None,
}),
rpc_server_config: Some(RpcServerConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 5050)).await.unwrap(),
tls_config: None,
}),
disable_namespaces: false,
disable_default_namespace: true,
..Default::default()
};
primary.start_sim(8080).await.unwrap();
Ok(())
}
}
});
sim.host("replica", {
move || async move {
let tmp = tempfile::tempdir().unwrap();
let replica = TestServer {
path: tmp.path().to_path_buf().into(),
db_config: DbConfig {
max_log_size: 1,
..Default::default()
},
admin_api_config: Some(AdminApiConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
connector: TurmoilConnector,
disable_metrics: true,
auth_key: None,
}),
rpc_client_config: Some(RpcClientConfig {
remote_url: "http://primary:5050".into(),
tls_config: None,
connector: TurmoilConnector,
}),
disable_namespaces: false,
disable_default_namespace: true,
..Default::default()
};
replica.start_sim(8080).await.unwrap();
Ok(())
}
});
sim.client("client", async move {
let db =
Database::open_remote_with_connector("http://test.replica:8080", "", TurmoilConnector)
.unwrap();
let conn = db.connect().unwrap();
assert_debug_snapshot!(conn.execute("create table test (x)", ()).await.unwrap_err());
let primary_http = Client::new();
primary_http
.post(
"http://primary:9090/v1/namespaces/test/create",
serde_json::json!({}),
)
.await
.unwrap();
// try again
conn.execute("create table test (x)", ()).await.unwrap();
Ok(())
});
sim.run().unwrap();
}
#[test]
fn replica_interactive_transaction() {
let mut sim = turmoil::Builder::new()
.simulation_duration(Duration::from_secs(1000))
.build();
let prim_tmp = tempfile::tempdir().unwrap();
sim.host("primary", {
let prim_path = prim_tmp.path().to_path_buf();
move || {
let prim_path = prim_path.clone();
async move {
let primary = TestServer {
path: prim_path.into(),
db_config: DbConfig {
max_log_size: 1,
..Default::default()
},
rpc_server_config: Some(RpcServerConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 5050)).await.unwrap(),
tls_config: None,
}),
..Default::default()
};
primary.start_sim(8080).await.unwrap();
Ok(())
}
}
});
sim.host("replica", {
move || async move {
let tmp = tempfile::tempdir().unwrap();
let replica = TestServer {
path: tmp.path().to_path_buf().into(),
db_config: DbConfig {
max_log_size: 1,
..Default::default()
},
rpc_client_config: Some(RpcClientConfig {
remote_url: "http://primary:5050".into(),
tls_config: None,
connector: TurmoilConnector,
}),
..Default::default()
};
replica.start_sim(8080).await.unwrap();
Ok(())
}
});
sim.client("client", async move {
let db = Database::open_remote_with_connector("http://replica:8080", "", TurmoilConnector)
.unwrap();
let conn = db.connect().unwrap();
let tx = conn
.transaction_with_behavior(libsql::TransactionBehavior::Immediate)
.await
.unwrap();
tx.execute("create table test (x)", ()).await.unwrap();
tx.execute("insert into test values (12)", ())
.await
.unwrap();
tx.execute("insert into test values (12)", ())
.await
.unwrap();
tx.commit().await.unwrap();
// libsql-client doesn't support read your writes yet
tokio::time::sleep(Duration::from_secs(1)).await;
let count = conn
.query("select count(0) from test", ())
.await
.unwrap()
.next()
.await
.unwrap()
.unwrap()
.get::<u32>(0)
.unwrap();
assert_eq!(count, 2);
Ok(())
});
sim.run().unwrap();
}