//! Tests for sqld in cluster mode #![allow(deprecated)] use super::common; use insta::assert_snapshot; use libsql::{Database, Value}; use libsql_server::config::{AdminApiConfig, RpcClientConfig, RpcServerConfig, UserApiConfig}; use serde_json::json; use tempfile::tempdir; use tokio::{task::JoinSet, time::Duration}; use turmoil::{Builder, Sim}; use common::net::{init_tracing, TestServer, TurmoilAcceptor, TurmoilConnector}; use crate::common::{http::Client, net::SimServer, snapshot_metrics}; mod replica_restart; mod replication; pub fn make_cluster(sim: &mut Sim, num_replica: usize, disable_namespaces: bool) { init_tracing(); let tmp = tempdir().unwrap(); sim.host("primary", move || { let path = tmp.path().to_path_buf(); async move { let server = TestServer { path: path.into(), user_api_config: UserApiConfig { ..Default::default() }, admin_api_config: Some(AdminApiConfig { acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, connector: TurmoilConnector, disable_metrics: true, }), rpc_server_config: Some(RpcServerConfig { acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?, tls_config: None, }), disable_namespaces, disable_default_namespace: !disable_namespaces, ..Default::default() }; server.start_sim(8080).await?; Ok(()) } }); for i in 0..num_replica { let tmp = tempdir().unwrap(); sim.host(format!("replica{i}"), move || { let path = tmp.path().to_path_buf(); async move { let server = TestServer { path: path.into(), user_api_config: UserApiConfig { ..Default::default() }, admin_api_config: Some(AdminApiConfig { acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, connector: TurmoilConnector, disable_metrics: true, }), rpc_client_config: Some(RpcClientConfig { remote_url: "http://primary:4567".into(), connector: TurmoilConnector, tls_config: None, }), disable_namespaces, disable_default_namespace: !disable_namespaces, ..Default::default() }; server.start_sim(8080).await.unwrap(); Ok(()) } }); } } #[test] fn proxy_write() { let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); make_cluster(&mut sim, 1, true); sim.client("client", async { let db = Database::open_remote_with_connector("http://replica0:8080", "", TurmoilConnector)?; let conn = db.connect()?; conn.execute("create table test (x)", ()).await?; conn.execute("insert into test values (12)", ()).await?; // assert that the primary got the write let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?; let conn = db.connect()?; let mut rows = conn.query("select count(*) from test", ()).await?; assert!(matches!( rows.next().await.unwrap().unwrap().get_value(0).unwrap(), Value::Integer(1) )); snapshot_metrics().assert_gauge("libsql_server_current_frame_no", 2.0); Ok(()) }); sim.run().unwrap(); } #[test] #[ignore = "libsql client doesn't reuse the stream yet, so we can't do RYW"] fn replica_read_write() { let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); make_cluster(&mut sim, 1, true); sim.client("client", async { let db = Database::open_remote_with_connector("http://replica0:8080", "", TurmoilConnector)?; let conn = db.connect()?; conn.execute("create table test (x)", ()).await?; conn.execute("insert into test values (12)", ()).await?; let mut rows = conn.query("select count(*) from test", ()).await?; assert!(matches!( rows.next().await.unwrap().unwrap().get_value(0).unwrap(), Value::Integer(1) )); Ok(()) }); sim.run().unwrap(); } #[test] fn sync_many_replica() { const NUM_REPLICA: usize = 10; let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); make_cluster(&mut sim, NUM_REPLICA, true); sim.client("client", async { let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?; let conn = db.connect()?; conn.execute("create table test (x)", ()).await?; conn.execute("insert into test values (42)", ()).await?; async fn get_frame_no(url: &str) -> Option<u64> { let client = Client::new(); Some( client .get(url) .await .unwrap() .json::<serde_json::Value>() .await .unwrap() .get("replication_index")? .as_u64() .unwrap(), ) } let primary_fno = loop { if let Some(fno) = get_frame_no("http://primary:9090/v1/namespaces/default/stats").await { break fno; } }; // wait for all replicas to sync let mut join_set = JoinSet::new(); for i in 0..NUM_REPLICA { join_set.spawn(async move { let uri = format!("http://replica{i}:9090/v1/namespaces/default/stats"); loop { if let Some(replica_fno) = get_frame_no(&uri).await { if replica_fno == primary_fno { break; } } tokio::time::sleep(Duration::from_millis(100)).await; } }); } while join_set.join_next().await.is_some() {} for i in 0..NUM_REPLICA { let db = Database::open_remote_with_connector( format!("http://replica{i}:8080"), "", TurmoilConnector, )?; let conn = db.connect()?; let mut rows = conn.query("select count(*) from test", ()).await?; assert!(matches!( rows.next().await.unwrap().unwrap().get_value(0).unwrap(), Value::Integer(1) )); } let client = Client::new(); let stats = client .get("http://primary:9090/v1/namespaces/default/stats") .await? .json_value() .await .unwrap(); let stat = stats .get("embedded_replica_frames_replicated") .unwrap() .as_u64() .unwrap(); assert_eq!(stat, 0); Ok(()) }); sim.run().unwrap(); } #[test] fn create_namespace() { let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); make_cluster(&mut sim, 0, false); sim.client("client", async { let db = Database::open_remote_with_connector("http://foo.primary:8080", "", TurmoilConnector)?; let conn = db.connect()?; let Err(e) = conn.execute("create table test (x)", ()).await else { panic!() }; assert_snapshot!(e.to_string()); let client = Client::new(); let resp = client .post( "http://foo.primary:9090/v1/namespaces/foo/create", json!({}), ) .await?; assert_eq!(resp.status(), 200); conn.execute("create table test (x)", ()).await.unwrap(); let mut rows = conn.query("select count(*) from test", ()).await.unwrap(); assert!(matches!( rows.next().await.unwrap().unwrap().get_value(0).unwrap(), Value::Integer(0) )); Ok(()) }); sim.run().unwrap(); } #[test] fn large_proxy_query() { let mut sim = Builder::new() .simulation_duration(Duration::from_secs(10000)) .tcp_capacity(100000) .build(); make_cluster(&mut sim, 1, true); sim.client("client", async { let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector) .unwrap(); let conn = db.connect().unwrap(); conn.execute("create table test (x)", ()).await.unwrap(); for _ in 0..5000 { conn.execute("insert into test values (randomblob(1000))", ()) .await .unwrap(); } let db = Database::open_remote_with_connector("http://replica0:8080", "", TurmoilConnector) .unwrap(); let conn = db.connect().unwrap(); conn.execute_batch("begin immediate; select * from test limit (4000)") .await .unwrap(); Ok(()) }); sim.run().unwrap(); }