mirror of
https://github.com/tursodatabase/libsql.git
synced 2024-11-22 02:16:15 +00:00
341 lines
11 KiB
Rust
341 lines
11 KiB
Rust
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use insta::assert_debug_snapshot;
|
|
use libsql::Database;
|
|
use libsql_server::config::{AdminApiConfig, DbConfig, RpcClientConfig, RpcServerConfig};
|
|
use tokio::sync::Notify;
|
|
|
|
use crate::common::{
|
|
http::Client,
|
|
net::{SimServer, TestServer, TurmoilAcceptor, TurmoilConnector},
|
|
};
|
|
|
|
/// In this test, we first create a primary with a very small max_log_size, and then add a good
|
|
/// amount of data to it. This will cause the primary to create a bunch of snaphots a large enough
|
|
/// to prevent the replica from applying them all at once. We then start the replica, and check
|
|
/// that it replicates correctly to the primary's replicaton index. #[test]
|
|
#[test]
|
|
fn apply_partial_snapshot() {
|
|
let mut sim = turmoil::Builder::new()
|
|
.tcp_capacity(4096 * 30)
|
|
.simulation_duration(Duration::from_secs(3600))
|
|
.build();
|
|
|
|
let prim_tmp = tempfile::tempdir().unwrap();
|
|
let notify = Arc::new(Notify::new());
|
|
|
|
sim.host("primary", {
|
|
let prim_path = prim_tmp.path().to_path_buf();
|
|
move || {
|
|
let prim_path = prim_path.clone();
|
|
async move {
|
|
let primary = TestServer {
|
|
path: prim_path.into(),
|
|
db_config: DbConfig {
|
|
max_log_size: 1,
|
|
..Default::default()
|
|
},
|
|
admin_api_config: Some(AdminApiConfig {
|
|
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
|
|
connector: TurmoilConnector,
|
|
disable_metrics: true,
|
|
auth_key: None,
|
|
}),
|
|
rpc_server_config: Some(RpcServerConfig {
|
|
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 5050)).await.unwrap(),
|
|
tls_config: None,
|
|
}),
|
|
..Default::default()
|
|
};
|
|
|
|
primary.start_sim(8080).await.unwrap();
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
});
|
|
|
|
sim.host("replica", {
|
|
let notify = notify.clone();
|
|
move || {
|
|
let notify = notify.clone();
|
|
async move {
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
let replica = TestServer {
|
|
path: tmp.path().to_path_buf().into(),
|
|
db_config: DbConfig {
|
|
max_log_size: 1,
|
|
..Default::default()
|
|
},
|
|
admin_api_config: Some(AdminApiConfig {
|
|
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
|
|
connector: TurmoilConnector,
|
|
disable_metrics: true,
|
|
auth_key: None,
|
|
}),
|
|
rpc_client_config: Some(RpcClientConfig {
|
|
remote_url: "http://primary:5050".into(),
|
|
tls_config: None,
|
|
connector: TurmoilConnector,
|
|
}),
|
|
..Default::default()
|
|
};
|
|
|
|
notify.notified().await;
|
|
replica.start_sim(8080).await.unwrap();
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
});
|
|
|
|
sim.client("client", async move {
|
|
let primary = libsql::Database::open_remote_with_connector(
|
|
"http://primary:8080",
|
|
"",
|
|
TurmoilConnector,
|
|
)
|
|
.unwrap();
|
|
let conn = primary.connect().unwrap();
|
|
conn.execute("CREATE TABLE TEST (x)", ()).await.unwrap();
|
|
// we need a sufficiently large snapshot for the test. Before the fix, 5000 insert would
|
|
// trigger an infinite loop.
|
|
for _ in 0..5000 {
|
|
conn.execute("INSERT INTO TEST VALUES (randomblob(6000))", ())
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
let client = Client::new();
|
|
let resp = client
|
|
.get("http://primary:9090/v1/namespaces/default/stats")
|
|
.await
|
|
.unwrap();
|
|
let stats = resp.json_value().await.unwrap();
|
|
let primary_replication_index = stats["replication_index"].as_i64().unwrap();
|
|
|
|
// primary is setup, time to start replica
|
|
notify.notify_waiters();
|
|
|
|
let client = Client::new();
|
|
|
|
// wait for replica to start up
|
|
while client.get("http://replica:8080/").await.is_err() {
|
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
|
}
|
|
|
|
loop {
|
|
let resp = client
|
|
.get("http://replica:9090/v1/namespaces/default/stats")
|
|
.await
|
|
.unwrap();
|
|
let stats = resp.json_value().await.unwrap();
|
|
let replication_index = &stats["replication_index"];
|
|
if !replication_index.is_null()
|
|
&& replication_index.as_i64().unwrap() == primary_replication_index
|
|
{
|
|
break;
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(1000)).await;
|
|
}
|
|
|
|
Ok(())
|
|
});
|
|
|
|
sim.run().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn replica_lazy_creation() {
|
|
let mut sim = turmoil::Builder::new()
|
|
.simulation_duration(Duration::from_secs(1000))
|
|
.build();
|
|
|
|
let prim_tmp = tempfile::tempdir().unwrap();
|
|
|
|
sim.host("primary", {
|
|
let prim_path = prim_tmp.path().to_path_buf();
|
|
move || {
|
|
let prim_path = prim_path.clone();
|
|
async move {
|
|
let primary = TestServer {
|
|
path: prim_path.into(),
|
|
db_config: DbConfig {
|
|
max_log_size: 1,
|
|
..Default::default()
|
|
},
|
|
admin_api_config: Some(AdminApiConfig {
|
|
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
|
|
connector: TurmoilConnector,
|
|
disable_metrics: true,
|
|
auth_key: None,
|
|
}),
|
|
rpc_server_config: Some(RpcServerConfig {
|
|
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 5050)).await.unwrap(),
|
|
tls_config: None,
|
|
}),
|
|
disable_namespaces: false,
|
|
disable_default_namespace: true,
|
|
..Default::default()
|
|
};
|
|
|
|
primary.start_sim(8080).await.unwrap();
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
});
|
|
|
|
sim.host("replica", {
|
|
move || async move {
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
let replica = TestServer {
|
|
path: tmp.path().to_path_buf().into(),
|
|
db_config: DbConfig {
|
|
max_log_size: 1,
|
|
..Default::default()
|
|
},
|
|
admin_api_config: Some(AdminApiConfig {
|
|
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
|
|
connector: TurmoilConnector,
|
|
disable_metrics: true,
|
|
auth_key: None,
|
|
}),
|
|
rpc_client_config: Some(RpcClientConfig {
|
|
remote_url: "http://primary:5050".into(),
|
|
tls_config: None,
|
|
connector: TurmoilConnector,
|
|
}),
|
|
disable_namespaces: false,
|
|
disable_default_namespace: true,
|
|
..Default::default()
|
|
};
|
|
|
|
replica.start_sim(8080).await.unwrap();
|
|
|
|
Ok(())
|
|
}
|
|
});
|
|
|
|
sim.client("client", async move {
|
|
let db =
|
|
Database::open_remote_with_connector("http://test.replica:8080", "", TurmoilConnector)
|
|
.unwrap();
|
|
let conn = db.connect().unwrap();
|
|
assert_debug_snapshot!(conn.execute("create table test (x)", ()).await.unwrap_err());
|
|
let primary_http = Client::new();
|
|
primary_http
|
|
.post(
|
|
"http://primary:9090/v1/namespaces/test/create",
|
|
serde_json::json!({}),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
// try again
|
|
conn.execute("create table test (x)", ()).await.unwrap();
|
|
|
|
Ok(())
|
|
});
|
|
|
|
sim.run().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn replica_interactive_transaction() {
|
|
let mut sim = turmoil::Builder::new()
|
|
.simulation_duration(Duration::from_secs(1000))
|
|
.build();
|
|
|
|
let prim_tmp = tempfile::tempdir().unwrap();
|
|
|
|
sim.host("primary", {
|
|
let prim_path = prim_tmp.path().to_path_buf();
|
|
move || {
|
|
let prim_path = prim_path.clone();
|
|
async move {
|
|
let primary = TestServer {
|
|
path: prim_path.into(),
|
|
db_config: DbConfig {
|
|
max_log_size: 1,
|
|
..Default::default()
|
|
},
|
|
rpc_server_config: Some(RpcServerConfig {
|
|
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 5050)).await.unwrap(),
|
|
tls_config: None,
|
|
}),
|
|
..Default::default()
|
|
};
|
|
|
|
primary.start_sim(8080).await.unwrap();
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
});
|
|
|
|
sim.host("replica", {
|
|
move || async move {
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
let replica = TestServer {
|
|
path: tmp.path().to_path_buf().into(),
|
|
db_config: DbConfig {
|
|
max_log_size: 1,
|
|
..Default::default()
|
|
},
|
|
rpc_client_config: Some(RpcClientConfig {
|
|
remote_url: "http://primary:5050".into(),
|
|
tls_config: None,
|
|
connector: TurmoilConnector,
|
|
}),
|
|
..Default::default()
|
|
};
|
|
|
|
replica.start_sim(8080).await.unwrap();
|
|
|
|
Ok(())
|
|
}
|
|
});
|
|
|
|
sim.client("client", async move {
|
|
let db = Database::open_remote_with_connector("http://replica:8080", "", TurmoilConnector)
|
|
.unwrap();
|
|
let conn = db.connect().unwrap();
|
|
|
|
let tx = conn
|
|
.transaction_with_behavior(libsql::TransactionBehavior::Immediate)
|
|
.await
|
|
.unwrap();
|
|
|
|
tx.execute("create table test (x)", ()).await.unwrap();
|
|
tx.execute("insert into test values (12)", ())
|
|
.await
|
|
.unwrap();
|
|
tx.execute("insert into test values (12)", ())
|
|
.await
|
|
.unwrap();
|
|
tx.commit().await.unwrap();
|
|
|
|
// libsql-client doesn't support read your writes yet
|
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
|
|
let count = conn
|
|
.query("select count(0) from test", ())
|
|
.await
|
|
.unwrap()
|
|
.next()
|
|
.await
|
|
.unwrap()
|
|
.unwrap()
|
|
.get::<u32>(0)
|
|
.unwrap();
|
|
|
|
assert_eq!(count, 2);
|
|
|
|
Ok(())
|
|
});
|
|
|
|
sim.run().unwrap();
|
|
}
|