0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-01-07 10:09:04 +00:00

311 lines
9.3 KiB
Rust

//! Tests for sqld in cluster mode
#![allow(deprecated)]
use super::common;
use insta::assert_snapshot;
use libsql::{Database, Value};
use libsql_server::config::{AdminApiConfig, RpcClientConfig, RpcServerConfig, UserApiConfig};
use serde_json::json;
use tempfile::tempdir;
use tokio::{task::JoinSet, time::Duration};
use turmoil::{Builder, Sim};
use common::net::{init_tracing, TestServer, TurmoilAcceptor, TurmoilConnector};
use crate::common::{http::Client, net::SimServer, snapshot_metrics};
mod replica_restart;
mod replication;
pub fn make_cluster(sim: &mut Sim, num_replica: usize, disable_namespaces: bool) {
init_tracing();
let tmp = tempdir().unwrap();
sim.host("primary", move || {
let path = tmp.path().to_path_buf();
async move {
let server = TestServer {
path: path.into(),
user_api_config: UserApiConfig {
..Default::default()
},
admin_api_config: Some(AdminApiConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?,
connector: TurmoilConnector,
disable_metrics: true,
}),
rpc_server_config: Some(RpcServerConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?,
tls_config: None,
}),
disable_namespaces,
disable_default_namespace: !disable_namespaces,
..Default::default()
};
server.start_sim(8080).await?;
Ok(())
}
});
for i in 0..num_replica {
let tmp = tempdir().unwrap();
sim.host(format!("replica{i}"), move || {
let path = tmp.path().to_path_buf();
async move {
let server = TestServer {
path: path.into(),
user_api_config: UserApiConfig {
..Default::default()
},
admin_api_config: Some(AdminApiConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?,
connector: TurmoilConnector,
disable_metrics: true,
}),
rpc_client_config: Some(RpcClientConfig {
remote_url: "http://primary:4567".into(),
connector: TurmoilConnector,
tls_config: None,
}),
disable_namespaces,
disable_default_namespace: !disable_namespaces,
..Default::default()
};
server.start_sim(8080).await.unwrap();
Ok(())
}
});
}
}
#[test]
fn proxy_write() {
let mut sim = Builder::new()
.simulation_duration(Duration::from_secs(1000))
.build();
make_cluster(&mut sim, 1, true);
sim.client("client", async {
let db =
Database::open_remote_with_connector("http://replica0:8080", "", TurmoilConnector)?;
let conn = db.connect()?;
conn.execute("create table test (x)", ()).await?;
conn.execute("insert into test values (12)", ()).await?;
// assert that the primary got the write
let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?;
let conn = db.connect()?;
let mut rows = conn.query("select count(*) from test", ()).await?;
assert!(matches!(
rows.next().await.unwrap().unwrap().get_value(0).unwrap(),
Value::Integer(1)
));
snapshot_metrics().assert_gauge("libsql_server_current_frame_no", 2.0);
Ok(())
});
sim.run().unwrap();
}
#[test]
#[ignore = "libsql client doesn't reuse the stream yet, so we can't do RYW"]
fn replica_read_write() {
let mut sim = Builder::new()
.simulation_duration(Duration::from_secs(1000))
.build();
make_cluster(&mut sim, 1, true);
sim.client("client", async {
let db =
Database::open_remote_with_connector("http://replica0:8080", "", TurmoilConnector)?;
let conn = db.connect()?;
conn.execute("create table test (x)", ()).await?;
conn.execute("insert into test values (12)", ()).await?;
let mut rows = conn.query("select count(*) from test", ()).await?;
assert!(matches!(
rows.next().await.unwrap().unwrap().get_value(0).unwrap(),
Value::Integer(1)
));
Ok(())
});
sim.run().unwrap();
}
#[test]
fn sync_many_replica() {
const NUM_REPLICA: usize = 10;
let mut sim = Builder::new()
.simulation_duration(Duration::from_secs(1000))
.build();
make_cluster(&mut sim, NUM_REPLICA, true);
sim.client("client", async {
let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?;
let conn = db.connect()?;
conn.execute("create table test (x)", ()).await?;
conn.execute("insert into test values (42)", ()).await?;
async fn get_frame_no(url: &str) -> Option<u64> {
let client = Client::new();
Some(
client
.get(url)
.await
.unwrap()
.json::<serde_json::Value>()
.await
.unwrap()
.get("replication_index")?
.as_u64()
.unwrap(),
)
}
let primary_fno = loop {
if let Some(fno) = get_frame_no("http://primary:9090/v1/namespaces/default/stats").await
{
break fno;
}
};
// wait for all replicas to sync
let mut join_set = JoinSet::new();
for i in 0..NUM_REPLICA {
join_set.spawn(async move {
let uri = format!("http://replica{i}:9090/v1/namespaces/default/stats");
loop {
if let Some(replica_fno) = get_frame_no(&uri).await {
if replica_fno == primary_fno {
break;
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
}
while join_set.join_next().await.is_some() {}
for i in 0..NUM_REPLICA {
let db = Database::open_remote_with_connector(
format!("http://replica{i}:8080"),
"",
TurmoilConnector,
)?;
let conn = db.connect()?;
let mut rows = conn.query("select count(*) from test", ()).await?;
assert!(matches!(
rows.next().await.unwrap().unwrap().get_value(0).unwrap(),
Value::Integer(1)
));
}
let client = Client::new();
let stats = client
.get("http://primary:9090/v1/namespaces/default/stats")
.await?
.json_value()
.await
.unwrap();
let stat = stats
.get("embedded_replica_frames_replicated")
.unwrap()
.as_u64()
.unwrap();
assert_eq!(stat, 0);
Ok(())
});
sim.run().unwrap();
}
#[test]
fn create_namespace() {
let mut sim = Builder::new()
.simulation_duration(Duration::from_secs(1000))
.build();
make_cluster(&mut sim, 0, false);
sim.client("client", async {
let db =
Database::open_remote_with_connector("http://foo.primary:8080", "", TurmoilConnector)?;
let conn = db.connect()?;
let Err(e) = conn.execute("create table test (x)", ()).await else {
panic!()
};
assert_snapshot!(e.to_string());
let client = Client::new();
let resp = client
.post(
"http://foo.primary:9090/v1/namespaces/foo/create",
json!({}),
)
.await?;
assert_eq!(resp.status(), 200);
conn.execute("create table test (x)", ()).await.unwrap();
let mut rows = conn.query("select count(*) from test", ()).await.unwrap();
assert!(matches!(
rows.next().await.unwrap().unwrap().get_value(0).unwrap(),
Value::Integer(0)
));
Ok(())
});
sim.run().unwrap();
}
#[test]
fn large_proxy_query() {
let mut sim = Builder::new()
.simulation_duration(Duration::from_secs(10000))
.tcp_capacity(100000)
.build();
make_cluster(&mut sim, 1, true);
sim.client("client", async {
let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)
.unwrap();
let conn = db.connect().unwrap();
conn.execute("create table test (x)", ()).await.unwrap();
for _ in 0..5000 {
conn.execute("insert into test values (randomblob(1000))", ())
.await
.unwrap();
}
let db = Database::open_remote_with_connector("http://replica0:8080", "", TurmoilConnector)
.unwrap();
let conn = db.connect().unwrap();
conn.execute_batch("begin immediate; select * from test limit (4000)")
.await
.unwrap();
Ok(())
});
sim.run().unwrap();
}