mirror of
https://github.com/tursodatabase/libsql.git
synced 2024-12-15 07:29:41 +00:00
314 lines
9.4 KiB
Rust
314 lines
9.4 KiB
Rust
//! Tests for sqld in cluster mode
|
|
#![allow(deprecated)]
|
|
|
|
use super::common;
|
|
|
|
use insta::assert_snapshot;
|
|
use libsql::{Database, Value};
|
|
use libsql_server::config::{AdminApiConfig, RpcClientConfig, RpcServerConfig, UserApiConfig};
|
|
use serde_json::json;
|
|
use tempfile::tempdir;
|
|
use tokio::{task::JoinSet, time::Duration};
|
|
use turmoil::{Builder, Sim};
|
|
|
|
use common::net::{init_tracing, TestServer, TurmoilAcceptor, TurmoilConnector};
|
|
|
|
use crate::common::{http::Client, net::SimServer, snapshot_metrics};
|
|
|
|
mod replica_restart;
|
|
mod replication;
|
|
mod schema_dbs;
|
|
|
|
pub fn make_cluster(sim: &mut Sim, num_replica: usize, disable_namespaces: bool) {
|
|
init_tracing();
|
|
let tmp = tempdir().unwrap();
|
|
sim.host("primary", move || {
|
|
let path = tmp.path().to_path_buf();
|
|
async move {
|
|
let server = TestServer {
|
|
path: path.into(),
|
|
user_api_config: UserApiConfig {
|
|
..Default::default()
|
|
},
|
|
admin_api_config: Some(AdminApiConfig {
|
|
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?,
|
|
connector: TurmoilConnector,
|
|
disable_metrics: true,
|
|
auth_key: None,
|
|
}),
|
|
rpc_server_config: Some(RpcServerConfig {
|
|
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?,
|
|
tls_config: None,
|
|
}),
|
|
disable_namespaces,
|
|
disable_default_namespace: !disable_namespaces,
|
|
..Default::default()
|
|
};
|
|
|
|
server.start_sim(8080).await?;
|
|
|
|
Ok(())
|
|
}
|
|
});
|
|
|
|
for i in 0..num_replica {
|
|
let tmp = tempdir().unwrap();
|
|
sim.host(format!("replica{i}"), move || {
|
|
let path = tmp.path().to_path_buf();
|
|
async move {
|
|
let server = TestServer {
|
|
path: path.into(),
|
|
user_api_config: UserApiConfig {
|
|
..Default::default()
|
|
},
|
|
admin_api_config: Some(AdminApiConfig {
|
|
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?,
|
|
connector: TurmoilConnector,
|
|
disable_metrics: true,
|
|
auth_key: None,
|
|
}),
|
|
rpc_client_config: Some(RpcClientConfig {
|
|
remote_url: "http://primary:4567".into(),
|
|
connector: TurmoilConnector,
|
|
tls_config: None,
|
|
}),
|
|
disable_namespaces,
|
|
disable_default_namespace: !disable_namespaces,
|
|
..Default::default()
|
|
};
|
|
|
|
server.start_sim(8080).await.unwrap();
|
|
|
|
Ok(())
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn proxy_write() {
|
|
let mut sim = Builder::new()
|
|
.simulation_duration(Duration::from_secs(1000))
|
|
.build();
|
|
make_cluster(&mut sim, 1, true);
|
|
|
|
sim.client("client", async {
|
|
let db =
|
|
Database::open_remote_with_connector("http://replica0:8080", "", TurmoilConnector)?;
|
|
let conn = db.connect()?;
|
|
|
|
conn.execute("create table test (x)", ()).await?;
|
|
conn.execute("insert into test values (12)", ()).await?;
|
|
|
|
// assert that the primary got the write
|
|
let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?;
|
|
let conn = db.connect()?;
|
|
let mut rows = conn.query("select count(*) from test", ()).await?;
|
|
|
|
assert!(matches!(
|
|
rows.next().await.unwrap().unwrap().get_value(0).unwrap(),
|
|
Value::Integer(1)
|
|
));
|
|
|
|
snapshot_metrics().assert_gauge("libsql_server_current_frame_no", 2.0);
|
|
|
|
Ok(())
|
|
});
|
|
|
|
sim.run().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
#[ignore = "libsql client doesn't reuse the stream yet, so we can't do RYW"]
|
|
fn replica_read_write() {
|
|
let mut sim = Builder::new()
|
|
.simulation_duration(Duration::from_secs(1000))
|
|
.build();
|
|
make_cluster(&mut sim, 1, true);
|
|
|
|
sim.client("client", async {
|
|
let db =
|
|
Database::open_remote_with_connector("http://replica0:8080", "", TurmoilConnector)?;
|
|
let conn = db.connect()?;
|
|
|
|
conn.execute("create table test (x)", ()).await?;
|
|
conn.execute("insert into test values (12)", ()).await?;
|
|
let mut rows = conn.query("select count(*) from test", ()).await?;
|
|
|
|
assert!(matches!(
|
|
rows.next().await.unwrap().unwrap().get_value(0).unwrap(),
|
|
Value::Integer(1)
|
|
));
|
|
|
|
Ok(())
|
|
});
|
|
|
|
sim.run().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn sync_many_replica() {
|
|
const NUM_REPLICA: usize = 10;
|
|
let mut sim = Builder::new()
|
|
.simulation_duration(Duration::from_secs(1000))
|
|
.build();
|
|
make_cluster(&mut sim, NUM_REPLICA, true);
|
|
sim.client("client", async {
|
|
let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?;
|
|
let conn = db.connect()?;
|
|
|
|
conn.execute("create table test (x)", ()).await?;
|
|
conn.execute("insert into test values (42)", ()).await?;
|
|
|
|
async fn get_frame_no(url: &str) -> Option<u64> {
|
|
let client = Client::new();
|
|
Some(
|
|
client
|
|
.get(url)
|
|
.await
|
|
.unwrap()
|
|
.json::<serde_json::Value>()
|
|
.await
|
|
.unwrap()
|
|
.get("replication_index")?
|
|
.as_u64()
|
|
.unwrap(),
|
|
)
|
|
}
|
|
|
|
let primary_fno = loop {
|
|
if let Some(fno) = get_frame_no("http://primary:9090/v1/namespaces/default/stats").await
|
|
{
|
|
break fno;
|
|
}
|
|
};
|
|
|
|
// wait for all replicas to sync
|
|
let mut join_set = JoinSet::new();
|
|
for i in 0..NUM_REPLICA {
|
|
join_set.spawn(async move {
|
|
let uri = format!("http://replica{i}:9090/v1/namespaces/default/stats");
|
|
loop {
|
|
if let Some(replica_fno) = get_frame_no(&uri).await {
|
|
if replica_fno == primary_fno {
|
|
break;
|
|
}
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
|
}
|
|
});
|
|
}
|
|
|
|
while join_set.join_next().await.is_some() {}
|
|
|
|
for i in 0..NUM_REPLICA {
|
|
let db = Database::open_remote_with_connector(
|
|
format!("http://replica{i}:8080"),
|
|
"",
|
|
TurmoilConnector,
|
|
)?;
|
|
let conn = db.connect()?;
|
|
let mut rows = conn.query("select count(*) from test", ()).await?;
|
|
assert!(matches!(
|
|
rows.next().await.unwrap().unwrap().get_value(0).unwrap(),
|
|
Value::Integer(1)
|
|
));
|
|
}
|
|
|
|
let client = Client::new();
|
|
|
|
let stats = client
|
|
.get("http://primary:9090/v1/namespaces/default/stats")
|
|
.await?
|
|
.json_value()
|
|
.await
|
|
.unwrap();
|
|
|
|
let stat = stats
|
|
.get("embedded_replica_frames_replicated")
|
|
.unwrap()
|
|
.as_u64()
|
|
.unwrap();
|
|
|
|
assert_eq!(stat, 0);
|
|
|
|
Ok(())
|
|
});
|
|
|
|
sim.run().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn create_namespace() {
|
|
let mut sim = Builder::new()
|
|
.simulation_duration(Duration::from_secs(1000))
|
|
.build();
|
|
make_cluster(&mut sim, 0, false);
|
|
|
|
sim.client("client", async {
|
|
let db =
|
|
Database::open_remote_with_connector("http://foo.primary:8080", "", TurmoilConnector)?;
|
|
let conn = db.connect()?;
|
|
|
|
let Err(e) = conn.execute("create table test (x)", ()).await else {
|
|
panic!()
|
|
};
|
|
assert_snapshot!(e.to_string());
|
|
|
|
let client = Client::new();
|
|
let resp = client
|
|
.post(
|
|
"http://foo.primary:9090/v1/namespaces/foo/create",
|
|
json!({}),
|
|
)
|
|
.await?;
|
|
|
|
assert_eq!(resp.status(), 200);
|
|
|
|
conn.execute("create table test (x)", ()).await.unwrap();
|
|
let mut rows = conn.query("select count(*) from test", ()).await.unwrap();
|
|
assert!(matches!(
|
|
rows.next().await.unwrap().unwrap().get_value(0).unwrap(),
|
|
Value::Integer(0)
|
|
));
|
|
|
|
Ok(())
|
|
});
|
|
|
|
sim.run().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn large_proxy_query() {
|
|
let mut sim = Builder::new()
|
|
.simulation_duration(Duration::from_secs(10000))
|
|
.tcp_capacity(100000)
|
|
.build();
|
|
make_cluster(&mut sim, 1, true);
|
|
|
|
sim.client("client", async {
|
|
let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)
|
|
.unwrap();
|
|
let conn = db.connect().unwrap();
|
|
|
|
conn.execute("create table test (x)", ()).await.unwrap();
|
|
for _ in 0..5000 {
|
|
conn.execute("insert into test values (randomblob(1000))", ())
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
let db = Database::open_remote_with_connector("http://replica0:8080", "", TurmoilConnector)
|
|
.unwrap();
|
|
let conn = db.connect().unwrap();
|
|
|
|
conn.execute_batch("begin immediate; select * from test limit (4000)")
|
|
.await
|
|
.unwrap();
|
|
|
|
Ok(())
|
|
});
|
|
|
|
sim.run().unwrap();
|
|
}
|