0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-06-30 00:09:22 +00:00
Files

341 lines
11 KiB
Rust
Raw Permalink Normal View History

2023-11-19 18:24:54 +01:00
use std::sync::Arc;
2023-11-19 21:01:44 +01:00
use std::time::Duration;
2023-11-19 18:24:54 +01:00
2024-01-08 15:50:49 +01:00
use insta::assert_debug_snapshot;
use libsql::Database;
2023-11-20 16:05:59 +01:00
use libsql_server::config::{AdminApiConfig, DbConfig, RpcClientConfig, RpcServerConfig};
2023-11-19 18:24:54 +01:00
use tokio::sync::Notify;
2023-11-19 21:01:44 +01:00
use crate::common::{
http::Client,
net::{SimServer, TestServer, TurmoilAcceptor, TurmoilConnector},
};
2023-11-19 18:24:54 +01:00
/// In this test, we first create a primary with a very small max_log_size, and then add a good
/// amount of data to it. This will cause the primary to create a bunch of snaphots a large enough
/// to prevent the replica from applying them all at once. We then start the replica, and check
/// that it replicates correctly to the primary's replicaton index. #[test]
2023-11-19 18:24:54 +01:00
#[test]
fn apply_partial_snapshot() {
let mut sim = turmoil::Builder::new()
.tcp_capacity(4096 * 30)
.simulation_duration(Duration::from_secs(3600))
.build();
let prim_tmp = tempfile::tempdir().unwrap();
let notify = Arc::new(Notify::new());
sim.host("primary", {
let prim_path = prim_tmp.path().to_path_buf();
move || {
let prim_path = prim_path.clone();
async move {
let primary = TestServer {
path: prim_path.into(),
db_config: DbConfig {
max_log_size: 1,
..Default::default()
},
2023-11-19 21:01:44 +01:00
admin_api_config: Some(AdminApiConfig {
2023-11-19 18:24:54 +01:00
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
connector: TurmoilConnector,
disable_metrics: true,
auth_key: None,
2023-11-19 18:24:54 +01:00
}),
rpc_server_config: Some(RpcServerConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 5050)).await.unwrap(),
2023-11-19 21:01:44 +01:00
tls_config: None,
}),
..Default::default()
2023-11-19 18:24:54 +01:00
};
primary.start_sim(8080).await.unwrap();
Ok(())
}
}
});
2023-11-19 21:01:44 +01:00
sim.host("replica", {
2023-11-19 18:24:54 +01:00
let notify = notify.clone();
move || {
let notify = notify.clone();
async move {
let tmp = tempfile::tempdir().unwrap();
let replica = TestServer {
path: tmp.path().to_path_buf().into(),
db_config: DbConfig {
max_log_size: 1,
..Default::default()
},
2023-11-19 21:01:44 +01:00
admin_api_config: Some(AdminApiConfig {
2023-11-19 18:24:54 +01:00
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
connector: TurmoilConnector,
disable_metrics: true,
auth_key: None,
2023-11-19 18:24:54 +01:00
}),
2023-11-19 21:01:44 +01:00
rpc_client_config: Some(RpcClientConfig {
2023-11-19 18:24:54 +01:00
remote_url: "http://primary:5050".into(),
tls_config: None,
2023-11-19 21:01:44 +01:00
connector: TurmoilConnector,
2023-11-19 18:24:54 +01:00
}),
..Default::default()
};
notify.notified().await;
replica.start_sim(8080).await.unwrap();
Ok(())
}
}
});
sim.client("client", async move {
2023-11-19 21:01:44 +01:00
let primary = libsql::Database::open_remote_with_connector(
"http://primary:8080",
"",
TurmoilConnector,
)
.unwrap();
2023-11-19 18:24:54 +01:00
let conn = primary.connect().unwrap();
conn.execute("CREATE TABLE TEST (x)", ()).await.unwrap();
// we need a sufficiently large snapshot for the test. Before the fix, 5000 insert would
// trigger an infinite loop.
for _ in 0..5000 {
2023-11-19 21:01:44 +01:00
conn.execute("INSERT INTO TEST VALUES (randomblob(6000))", ())
.await
.unwrap();
2023-11-19 18:24:54 +01:00
}
let client = Client::new();
2023-11-19 21:01:44 +01:00
let resp = client
.get("http://primary:9090/v1/namespaces/default/stats")
.await
.unwrap();
2023-11-19 18:24:54 +01:00
let stats = resp.json_value().await.unwrap();
let primary_replication_index = stats["replication_index"].as_i64().unwrap();
// primary is setup, time to start replica
notify.notify_waiters();
let client = Client::new();
2023-11-21 14:38:04 +01:00
// wait for replica to start up
while client.get("http://replica:8080/").await.is_err() {
tokio::time::sleep(Duration::from_millis(100)).await;
}
2023-11-19 18:24:54 +01:00
loop {
2023-11-19 21:01:44 +01:00
let resp = client
.get("http://replica:9090/v1/namespaces/default/stats")
.await
.unwrap();
2023-11-19 18:24:54 +01:00
let stats = resp.json_value().await.unwrap();
let replication_index = &stats["replication_index"];
if !replication_index.is_null()
&& replication_index.as_i64().unwrap() == primary_replication_index
{
break;
2023-11-19 18:24:54 +01:00
}
tokio::time::sleep(Duration::from_millis(1000)).await;
}
Ok(())
});
sim.run().unwrap();
}
2024-01-08 15:50:49 +01:00
#[test]
fn replica_lazy_creation() {
2024-03-09 12:39:39 +01:00
let mut sim = turmoil::Builder::new()
.simulation_duration(Duration::from_secs(1000))
.build();
2024-01-08 15:50:49 +01:00
let prim_tmp = tempfile::tempdir().unwrap();
sim.host("primary", {
let prim_path = prim_tmp.path().to_path_buf();
move || {
let prim_path = prim_path.clone();
async move {
let primary = TestServer {
path: prim_path.into(),
db_config: DbConfig {
max_log_size: 1,
..Default::default()
},
admin_api_config: Some(AdminApiConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
connector: TurmoilConnector,
disable_metrics: true,
auth_key: None,
2024-01-08 15:50:49 +01:00
}),
rpc_server_config: Some(RpcServerConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 5050)).await.unwrap(),
tls_config: None,
}),
disable_namespaces: false,
disable_default_namespace: true,
..Default::default()
};
primary.start_sim(8080).await.unwrap();
Ok(())
}
}
});
sim.host("replica", {
move || async move {
let tmp = tempfile::tempdir().unwrap();
let replica = TestServer {
path: tmp.path().to_path_buf().into(),
db_config: DbConfig {
max_log_size: 1,
..Default::default()
},
admin_api_config: Some(AdminApiConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
connector: TurmoilConnector,
disable_metrics: true,
auth_key: None,
2024-01-08 15:50:49 +01:00
}),
rpc_client_config: Some(RpcClientConfig {
remote_url: "http://primary:5050".into(),
tls_config: None,
connector: TurmoilConnector,
}),
disable_namespaces: false,
disable_default_namespace: true,
..Default::default()
};
replica.start_sim(8080).await.unwrap();
Ok(())
}
});
sim.client("client", async move {
let db =
Database::open_remote_with_connector("http://test.replica:8080", "", TurmoilConnector)
.unwrap();
let conn = db.connect().unwrap();
assert_debug_snapshot!(conn.execute("create table test (x)", ()).await.unwrap_err());
let primary_http = Client::new();
primary_http
.post(
"http://primary:9090/v1/namespaces/test/create",
serde_json::json!({}),
)
.await
.unwrap();
// try again
conn.execute("create table test (x)", ()).await.unwrap();
Ok(())
});
sim.run().unwrap();
}
#[test]
fn replica_interactive_transaction() {
let mut sim = turmoil::Builder::new()
.simulation_duration(Duration::from_secs(1000))
.build();
let prim_tmp = tempfile::tempdir().unwrap();
sim.host("primary", {
let prim_path = prim_tmp.path().to_path_buf();
move || {
let prim_path = prim_path.clone();
async move {
let primary = TestServer {
path: prim_path.into(),
db_config: DbConfig {
max_log_size: 1,
..Default::default()
},
rpc_server_config: Some(RpcServerConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 5050)).await.unwrap(),
tls_config: None,
}),
..Default::default()
};
primary.start_sim(8080).await.unwrap();
Ok(())
}
}
});
sim.host("replica", {
move || async move {
let tmp = tempfile::tempdir().unwrap();
let replica = TestServer {
path: tmp.path().to_path_buf().into(),
db_config: DbConfig {
max_log_size: 1,
..Default::default()
},
rpc_client_config: Some(RpcClientConfig {
remote_url: "http://primary:5050".into(),
tls_config: None,
connector: TurmoilConnector,
}),
..Default::default()
};
replica.start_sim(8080).await.unwrap();
Ok(())
}
});
sim.client("client", async move {
let db = Database::open_remote_with_connector("http://replica:8080", "", TurmoilConnector)
.unwrap();
let conn = db.connect().unwrap();
let tx = conn
.transaction_with_behavior(libsql::TransactionBehavior::Immediate)
.await
.unwrap();
tx.execute("create table test (x)", ()).await.unwrap();
tx.execute("insert into test values (12)", ())
.await
.unwrap();
tx.execute("insert into test values (12)", ())
.await
.unwrap();
tx.commit().await.unwrap();
2024-10-09 16:50:59 +02:00
// libsql-client doesn't support read your writes yet
tokio::time::sleep(Duration::from_secs(1)).await;
let count = conn
.query("select count(0) from test", ())
.await
.unwrap()
.next()
.await
.unwrap()
.unwrap()
.get::<u32>(0)
.unwrap();
assert_eq!(count, 2);
Ok(())
});
sim.run().unwrap();
}