#![allow(deprecated)] mod local; use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, Instant}; use crate::common::auth::encode; use crate::common::http::Client; use crate::common::net::{init_tracing, SimServer, TestServer, TurmoilAcceptor, TurmoilConnector}; use crate::common::{self, snapshot_metrics}; use libsql::Database; use libsql_server::auth::{user_auth_strategies, Auth}; use libsql_server::config::{ AdminApiConfig, DbConfig, RpcClientConfig, RpcServerConfig, UserApiConfig, }; use serde_json::json; use tempfile::tempdir; use tokio::sync::Notify; use tokio_stream::StreamExt; use turmoil::{Builder, Sim}; fn enable_libsql_logging() { use std::ffi::c_int; use std::sync::Once; static ONCE: Once = Once::new(); fn libsql_log(code: c_int, msg: &str) { tracing::error!("sqlite error {code}: {msg}"); } ONCE.call_once(|| unsafe { rusqlite::trace::config_log(Some(libsql_log)).unwrap(); }); } fn make_primary(sim: &mut Sim, path: PathBuf) { init_tracing(); enable_libsql_logging(); sim.host("primary", move || { let path = path.clone(); async move { let server = TestServer { path: path.into(), db_config: DbConfig { max_log_size: 1, max_log_duration: Some(5.0), ..Default::default() }, user_api_config: UserApiConfig { ..Default::default() }, admin_api_config: Some(AdminApiConfig { acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, connector: TurmoilConnector, disable_metrics: false, auth_key: None, }), rpc_server_config: Some(RpcServerConfig { acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?, tls_config: None, }), disable_namespaces: false, disable_default_namespace: true, ..Default::default() }; server.start_sim(8080).await?; Ok(()) } }); } #[test] fn embedded_replica() { let tmp_embedded = tempdir().unwrap(); let tmp_host = tempdir().unwrap(); let tmp_embedded_path = tmp_embedded.path().to_owned(); let tmp_host_path = tmp_host.path().to_owned(); let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); make_primary(&mut sim, tmp_host_path.clone()); sim.client("client", async move { let client = Client::new(); client .post("http://primary:9090/v1/namespaces/foo/create", json!({})) .await?; let path = tmp_embedded_path.join("embedded"); let db = Database::open_with_remote_sync_connector( path.to_str().unwrap(), "http://foo.primary:8080", "", TurmoilConnector, false, None, ) .await?; let n = db.sync().await?.frame_no(); assert_eq!(n, None); let conn = db.connect()?; conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ()) .await?; let n = db.sync().await?.frame_no(); assert_eq!(n, Some(1)); let err = conn .execute("INSERT INTO user(id) VALUES (1), (1)", ()) .await .unwrap_err(); let libsql::Error::RemoteSqliteFailure(code, extended_code, _) = err else { panic!() }; assert_eq!(code, 3); assert_eq!(extended_code, 1555); let snapshot = snapshot_metrics(); for (key, (_, _, val)) in snapshot.snapshot() { if key.kind() == metrics_util::MetricKind::Counter && key.key().name() == "libsql_client_version" { assert_eq!(val, &metrics_util::debugging::DebugValue::Counter(6)); let label = key.key().labels().next().unwrap(); assert!(label.value().starts_with("libsql-rpc-")); } } snapshot.assert_counter("libsql_server_user_http_response", 6); Ok(()) }); sim.run().unwrap(); } #[test] fn execute_batch() { let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); let tmp_embedded = tempdir().unwrap(); let tmp_host = tempdir().unwrap(); let tmp_embedded_path = tmp_embedded.path().to_owned(); let tmp_host_path = tmp_host.path().to_owned(); make_primary(&mut sim, tmp_host_path.clone()); sim.client("client", async move { let client = Client::new(); client .post("http://primary:9090/v1/namespaces/foo/create", json!({})) .await?; let path = tmp_embedded_path.join("embedded"); let db = Database::open_with_remote_sync_connector( path.to_str().unwrap(), "http://foo.primary:8080", "", TurmoilConnector, false, None, ) .await?; let n = db.sync().await?.frame_no(); assert_eq!(n, None); let conn = db.connect()?; conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ()) .await?; assert_eq!(db.max_write_replication_index(), Some(1)); let n = db.sync().await?.frame_no(); assert_eq!(n, Some(1)); conn.execute_batch( "BEGIN; INSERT INTO user (id) VALUES (2);", // COMMIT;", ) .await?; Ok(()) }); sim.run().unwrap(); } #[test] fn stream() { let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); let tmp_embedded = tempdir().unwrap(); let tmp_host = tempdir().unwrap(); let tmp_embedded_path = tmp_embedded.path().to_owned(); let tmp_host_path = tmp_host.path().to_owned(); make_primary(&mut sim, tmp_host_path.clone()); sim.client("client", async move { let client = Client::new(); client .post("http://primary:9090/v1/namespaces/foo/create", json!({})) .await?; let path = tmp_embedded_path.join("embedded"); let db = Database::open_with_remote_sync_connector( path.to_str().unwrap(), "http://foo.primary:8080", "", TurmoilConnector, false, None, ) .await?; let n = db.sync().await?.frame_no(); assert_eq!(n, None); let conn = db.connect()?; conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ()) .await?; assert_eq!(db.max_write_replication_index(), Some(1)); let n = db.sync().await?.frame_no(); assert_eq!(n, Some(1)); conn.execute_batch( " INSERT INTO user (id) VALUES (2); INSERT INTO user (id) VALUES (3); INSERT INTO user (id) VALUES (4); INSERT INTO user (id) VALUES (5); ", ) .await?; let replication_index = db.max_write_replication_index(); let synced_replication_index = db.sync().await.unwrap().frame_no(); assert_eq!(synced_replication_index, replication_index); let rows = conn.query("select * from user", ()).await.unwrap(); let rows = rows .into_stream() .map(|r| r.unwrap().get::<u64>(0).unwrap()) .collect::<Vec<_>>() .await; assert_eq!(rows.len(), 4); Ok(()) }); sim.run().unwrap(); } #[test] #[cfg(feature = "test-encryption")] fn embedded_replica_with_encryption() { use bytes::Bytes; let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); let tmp_embedded = tempdir().unwrap(); let tmp_host = tempdir().unwrap(); let tmp_embedded_path = tmp_embedded.path().to_owned(); let tmp_host_path = tmp_host.path().to_owned(); make_primary(&mut sim, tmp_host_path.clone()); sim.client("client", async move { let client = Client::new(); client .post("http://primary:9090/v1/namespaces/foo/create", json!({})) .await?; let path = tmp_embedded_path.join("embedded"); let db = Database::open_with_remote_sync_connector( path.to_str().unwrap(), "http://foo.primary:8080", "", TurmoilConnector, false, Some(libsql::EncryptionConfig::new( libsql::Cipher::Aes256Cbc, Bytes::from_static(b"SecretKey"), )), ) .await?; let n = db.sync().await?.frame_no(); assert_eq!(n, None); let conn = db.connect()?; conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ()) .await?; let n = db.sync().await?.frame_no(); assert_eq!(n, Some(1)); let err = conn .execute("INSERT INTO user(id) VALUES (1), (1)", ()) .await .unwrap_err(); let libsql::Error::RemoteSqliteFailure(code, extended_code, _) = err else { panic!() }; assert_eq!(code, 3); assert_eq!(extended_code, 1555); let snapshot = snapshot_metrics(); for (key, (_, _, val)) in snapshot.snapshot() { if key.kind() == metrics_util::MetricKind::Counter && key.key().name() == "libsql_client_version" { assert_eq!(val, &metrics_util::debugging::DebugValue::Counter(6)); let label = key.key().labels().next().unwrap(); assert!(label.value().starts_with("libsql-rpc-")); } } snapshot.assert_counter("libsql_server_user_http_response", 6); conn.execute("INSERT INTO user(id) VALUES (1)", ()) .await .unwrap(); drop(conn); drop(db); let db = Database::open_with_remote_sync_connector( path.to_str().unwrap(), "http://foo.primary:8080", "", TurmoilConnector, false, Some(libsql::EncryptionConfig::new( libsql::Cipher::Aes256Cbc, Bytes::from_static(b"SecretKey"), )), ) .await?; db.sync().await.unwrap(); let conn = db.connect()?; let mut res = conn.query("SELECT id FROM user", ()).await?; let row = res.next().await?; assert!(row.is_some()); assert_eq!(1, row.unwrap().get::<i32>(0)?); let row = res.next().await?; assert!(row.is_none()); Ok(()) }); sim.run().unwrap(); } #[test] fn replica_primary_reset() { let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); let tmp = tempdir().unwrap(); let notify = Arc::new(Notify::new()); let notify_clone = notify.clone(); init_tracing(); sim.host("primary", move || { let notify = notify_clone.clone(); let path = tmp.path().to_path_buf(); async move { let make_server = || async { TestServer { path: path.clone().into(), user_api_config: UserApiConfig { ..Default::default() }, admin_api_config: Some(AdminApiConfig { acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(), connector: TurmoilConnector, disable_metrics: true, auth_key: None, }), rpc_server_config: Some(RpcServerConfig { acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await.unwrap(), tls_config: None, }), ..Default::default() } }; let server = make_server().await; let shutdown = server.shutdown.clone(); let fut = async move { server.start_sim(8080).await }; tokio::pin!(fut); loop { tokio::select! { res = &mut fut => { res.unwrap(); break } _ = notify.notified() => { shutdown.notify_waiters(); }, } } // remove the wallog and start again tokio::fs::remove_file(path.join("dbs/default/wallog")) .await .unwrap(); notify.notify_waiters(); let server = make_server().await; server.start_sim(8080).await.unwrap(); Ok(()) } }); sim.client("client", async move { let primary = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?; let conn = primary.connect()?; // insert a few valued into the primary conn.execute("create table test (x)", ()).await.unwrap(); for _ in 0..50 { conn.execute("insert into test values (42)", ()) .await .unwrap(); } let tmp = tempdir().unwrap(); let replica = Database::open_with_remote_sync_connector( tmp.path().join("data").display().to_string(), "http://primary:8080", "", TurmoilConnector, false, None, ) .await .unwrap(); let replica_index = replica.sync().await.unwrap().frame_no().unwrap(); let primary_index = Client::new() .get("http://primary:9090/v1/namespaces/default/stats") .await .unwrap() .json_value() .await .unwrap()["replication_index"] .clone() .as_u64() .unwrap(); assert_eq!(replica_index, primary_index); let replica_count = *replica .connect() .unwrap() .query("select count(*) from test", ()) .await .unwrap() .next() .await .unwrap() .unwrap() .get_value(0) .unwrap() .as_integer() .unwrap(); let primary_count = *primary .connect() .unwrap() .query("select count(*) from test", ()) .await .unwrap() .next() .await .unwrap() .unwrap() .get_value(0) .unwrap() .as_integer() .unwrap(); assert_eq!(primary_count, replica_count); notify.notify_waiters(); notify.notified().await; // drop the replica here, to make sure not to reuse an open connection. drop(replica); let replica = Database::open_with_remote_sync_connector( tmp.path().join("data").display().to_string(), "http://primary:8080", "", TurmoilConnector, false, None, ) .await .unwrap(); let replica_index = replica.sync().await.unwrap().frame_no().unwrap(); let primary_index = Client::new() .get("http://primary:9090/v1/namespaces/default/stats") .await .unwrap() .json_value() .await .unwrap()["replication_index"] .clone() .as_u64() .unwrap(); assert_eq!(replica_index, primary_index); let replica_count = *replica .connect() .unwrap() .query("select count(*) from test", ()) .await .unwrap() .next() .await .unwrap() .unwrap() .get_value(0) .unwrap() .as_integer() .unwrap(); let primary_count = *primary .connect() .unwrap() .query("select count(*) from test", ()) .await .unwrap() .next() .await .unwrap() .unwrap() .get_value(0) .unwrap() .as_integer() .unwrap(); assert_eq!(primary_count, replica_count); Ok(()) }); sim.run().unwrap(); } #[test] fn replica_no_resync_on_restart() { let mut sim = Builder::new() .simulation_duration(Duration::from_secs(600)) .build(); let tmp = tempdir().unwrap(); init_tracing(); sim.host("primary", move || { let path = tmp.path().to_path_buf(); async move { let make_server = || async { TestServer { path: path.clone().into(), user_api_config: UserApiConfig { ..Default::default() }, ..Default::default() } }; let server = make_server().await; server.start_sim(8080).await.unwrap(); Ok(()) } }); sim.client("client", async { // seed database { let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector) .unwrap(); let conn = db.connect().unwrap(); conn.execute("create table test (x)", ()).await.unwrap(); for _ in 0..500 { conn.execute("insert into test values (42)", ()) .await .unwrap(); } } let tmp = tempdir().unwrap(); let db_path = tmp.path().join("data"); let before = Instant::now(); let first_sync_index = { let db = Database::open_with_remote_sync_connector( db_path.display().to_string(), "http://primary:8080", "", TurmoilConnector, false, None, ) .await .unwrap(); db.sync().await.unwrap().frame_no().unwrap() }; let first_sync = before.elapsed(); let before = Instant::now(); let second_sync_index = { let db = Database::open_with_remote_sync_connector( db_path.display().to_string(), "http://primary:8080", "", TurmoilConnector, false, None, ) .await .unwrap(); db.sync().await.unwrap().frame_no().unwrap() }; let second_sync = before.elapsed(); assert_eq!(first_sync_index, second_sync_index); // very sketchy way of checking the the second sync was very fast, because it performed // only a handshake. assert!(second_sync.as_secs_f64() / first_sync.as_secs_f64() < 0.10); Ok(()) }); sim.run().unwrap() } #[test] fn replicate_with_snapshots() { let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .tcp_capacity(200) .build(); const ROW_COUNT: i64 = 200; let tmp = tempdir().unwrap(); init_tracing(); sim.host("primary", move || { let path = tmp.path().to_path_buf(); async move { let server = TestServer { path: path.clone().into(), user_api_config: UserApiConfig { ..Default::default() }, db_config: DbConfig { max_log_size: 1, // very small log size to force snapshot creation ..Default::default() }, admin_api_config: Some(AdminApiConfig { acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(), connector: TurmoilConnector, disable_metrics: true, auth_key: None, }), rpc_server_config: Some(RpcServerConfig { acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await.unwrap(), tls_config: None, }), ..Default::default() }; server.start_sim(8080).await.unwrap(); Ok(()) } }); sim.client("client", async { let client = Client::new(); client .post("http://primary:9090/v1/namespaces/foo/create", json!({})) .await?; let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector) .unwrap(); let conn = db.connect().unwrap(); conn.execute("create table test (x)", ()).await.unwrap(); // insert enough to trigger snapshot creation. for _ in 0..ROW_COUNT { conn.execute("INSERT INTO test values (randomblob(6000))", ()) .await .unwrap(); } let tmp = tempdir().unwrap(); let db = Database::open_with_remote_sync_connector( tmp.path().join("data").display().to_string(), "http://primary:8080", "", TurmoilConnector, false, None, ) .await .unwrap(); let rep = db.sync().await.unwrap(); assert_eq!(rep.frames_synced(), 427); let conn = db.connect().unwrap(); let mut res = conn.query("select count(*) from test", ()).await.unwrap(); assert_eq!( *res.next() .await .unwrap() .unwrap() .get_value(0) .unwrap() .as_integer() .unwrap(), ROW_COUNT ); let stats = client .get("http://primary:9090/v1/namespaces/default/stats") .await? .json_value() .await .unwrap(); let stat = stats .get("embedded_replica_frames_replicated") .unwrap() .as_u64() .unwrap(); assert_eq!(stat, 427); let rep = db.sync().await.unwrap(); assert_eq!(rep.frames_synced(), 0); let conn = db.connect().unwrap(); let mut res = conn.query("select count(*) from test", ()).await.unwrap(); assert_eq!( *res.next() .await .unwrap() .unwrap() .get_value(0) .unwrap() .as_integer() .unwrap(), ROW_COUNT ); let stats = client .get("http://primary:9090/v1/namespaces/default/stats") .await? .json_value() .await .unwrap(); let stat = stats .get("embedded_replica_frames_replicated") .unwrap() .as_u64() .unwrap(); assert_eq!(stat, 427); Ok(()) }); sim.run().unwrap(); } #[test] fn read_your_writes() { let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); let tmp_embedded = tempdir().unwrap(); let tmp_host = tempdir().unwrap(); let tmp_embedded_path = tmp_embedded.path().to_owned(); let tmp_host_path = tmp_host.path().to_owned(); make_primary(&mut sim, tmp_host_path.clone()); sim.client("client", async move { let client = Client::new(); client .post("http://primary:9090/v1/namespaces/foo/create", json!({})) .await?; let path = tmp_embedded_path.join("embedded"); let db = Database::open_with_remote_sync_connector( path.to_str().unwrap(), "http://foo.primary:8080", "", TurmoilConnector, true, None, ) .await?; let conn = db.connect()?; conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ()) .await?; conn.execute("INSERT INTO user(id) VALUES (1)", ()) .await .unwrap(); Ok(()) }); sim.run().unwrap(); } #[test] fn proxy_write_returning_row() { let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); let tmp_embedded = tempdir().unwrap(); let tmp_host = tempdir().unwrap(); let tmp_embedded_path = tmp_embedded.path().to_owned(); let tmp_host_path = tmp_host.path().to_owned(); make_primary(&mut sim, tmp_host_path.clone()); sim.client("client", async move { let client = Client::new(); client .post("http://primary:9090/v1/namespaces/foo/create", json!({})) .await?; let path = tmp_embedded_path.join("embedded"); let db = Database::open_with_remote_sync_connector( path.to_str().unwrap(), "http://foo.primary:8080", "", TurmoilConnector, true, None, ) .await?; let conn = db.connect()?; conn.execute("create table test (x)", ()).await?; let mut rows = conn .query("insert into test values (12) returning rowid as id", ()) .await .unwrap(); rows.next().await.unwrap().unwrap(); Ok(()) }); sim.run().unwrap(); } #[test] fn freeze() { let mut sim = Builder::new() .simulation_duration(Duration::from_secs(u64::MAX)) .build(); let tmp_embedded = tempdir().unwrap(); let tmp_host = tempdir().unwrap(); let tmp_embedded_path = tmp_embedded.path().to_owned(); let tmp_host_path = tmp_host.path().to_owned(); make_primary(&mut sim, tmp_host_path.clone()); sim.client("client", async move { let client = Client::new(); client .post("http://primary:9090/v1/namespaces/foo/create", json!({})) .await?; let path = tmp_embedded_path.join("embedded"); let db = Database::open_with_remote_sync_connector( path.to_str().unwrap(), "http://foo.primary:8080", "", TurmoilConnector, true, None, ) .await?; let conn = db.connect()?; conn.execute("create table test (x)", ()).await?; for _ in 0..50 { conn.execute("insert into test values (12)", ()) .await .unwrap(); } drop(conn); drop(db); let db = Database::open_with_remote_sync_connector( path.to_str().unwrap(), "http://foo.primary:8080", "", TurmoilConnector, true, None, ) .await?; db.sync().await.unwrap(); let db = db.freeze().unwrap(); let conn = db.connect().unwrap(); let mut rows = conn.query("select count(*) from test", ()).await.unwrap(); let row = rows.next().await.unwrap().unwrap(); let count = row.get::<u64>(0).unwrap(); assert_eq!(count, 50); Ok(()) }); sim.run().unwrap(); } #[test] fn sync_interval() { let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); let tmp_embedded = tempdir().unwrap(); let tmp_host = tempdir().unwrap(); let tmp_embedded_path = tmp_embedded.path().to_owned(); let tmp_host_path = tmp_host.path().to_owned(); make_primary(&mut sim, tmp_host_path.clone()); sim.client("client", async move { let client = Client::new(); client .post("http://primary:9090/v1/namespaces/foo/create", json!({})) .await?; let path = tmp_embedded_path.join("embedded"); let db = libsql::Builder::new_remote_replica( path.to_str().unwrap(), "http://foo.primary:8080".to_string(), "".to_string(), ) .connector(TurmoilConnector) .sync_interval(Duration::from_millis(100)) .build() .await?; let conn = db.connect()?; conn.execute("create table test (x)", ()).await?; conn.execute("insert into test values (12)", ()) .await .unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; let mut rows = conn.query("select * from test", ()).await.unwrap(); let row = rows.next().await.unwrap().unwrap(); assert_eq!(row.get::<u64>(0).unwrap(), 12); Ok(()) }); sim.run().unwrap(); } #[test] fn errors_on_bad_replica() { let mut sim = Builder::new() .simulation_duration(Duration::from_secs(u64::MAX)) .build(); let tmp_embedded = tempdir().unwrap(); let tmp_host = tempdir().unwrap(); let tmp_embedded_path = tmp_embedded.path().to_owned(); let tmp_host_path = tmp_host.path().to_owned(); make_primary(&mut sim, tmp_host_path.clone()); sim.client("client", async move { let client = Client::new(); client .post("http://primary:9090/v1/namespaces/foo/create", json!({})) .await?; let path = tmp_embedded_path.join("embedded"); let db = libsql::Builder::new_remote_replica( path.to_str().unwrap(), "http://foo.primary:8080".to_string(), "".to_string(), ) .connector(TurmoilConnector) .build() .await?; let conn = db.connect()?; conn.execute("create table test (x)", ()).await?; conn.execute("insert into test values (12)", ()) .await .unwrap(); db.sync().await.unwrap(); drop(conn); drop(db); let wal_index_file = format!("{}-client_wal_index", path.to_str().unwrap()); std::fs::remove_file(wal_index_file).unwrap(); libsql::Builder::new_remote_replica( path.to_str().unwrap(), "http://foo.primary:8080".to_string(), "".to_string(), ) .connector(TurmoilConnector) .build() .await .unwrap_err(); std::fs::remove_file(&path).unwrap(); libsql::Builder::new_remote_replica( path.to_str().unwrap(), "http://foo.primary:8080".to_string(), "".to_string(), ) .connector(TurmoilConnector) .build() .await .unwrap(); Ok(()) }); sim.run().unwrap(); } #[test] fn malformed_database() { let mut sim = Builder::new() .simulation_duration(Duration::from_secs(u64::MAX)) .build(); let tmp_embedded = tempdir().unwrap(); let tmp_host = tempdir().unwrap(); let tmp_embedded_path = tmp_embedded.path().to_owned(); let tmp_host_path = tmp_host.path().to_owned(); make_primary(&mut sim, tmp_host_path.clone()); sim.client("client", async move { let client = Client::new(); client .post("http://primary:9090/v1/namespaces/foo/create", json!({})) .await?; let path = tmp_embedded_path.join("embedded"); let db = libsql::Builder::new_remote_replica( path.to_str().unwrap(), "http://foo.primary:8080".to_string(), "".to_string(), ) .read_your_writes(true) .connector(TurmoilConnector) .build() .await?; let conn = db.connect()?; let dir = env!("CARGO_MANIFEST_DIR").to_string(); let file = std::fs::read_to_string(dir + "/output.sql").unwrap(); let sqls = file.lines(); for sql in sqls { if !sql.starts_with("--") { conn.execute(sql, ()).await.unwrap(); } } db.sync().await.unwrap(); Ok(()) }); sim.run().unwrap(); } #[test] fn txn_bug_issue_1283() { let mut sim = Builder::new() .simulation_duration(Duration::from_secs(u64::MAX)) .build(); let tmp_embedded = tempdir().unwrap(); let tmp_host = tempdir().unwrap(); let tmp_embedded_path = tmp_embedded.path().to_owned(); let tmp_host_path = tmp_host.path().to_owned(); make_primary(&mut sim, tmp_host_path.clone()); sim.client("client", async move { let path = tmp_embedded_path.join("embedded"); let client = Client::new(); client .post("http://primary:9090/v1/namespaces/foo/create", json!({})) .await?; let db_url = "http://foo.primary:8080"; let replica = libsql::Builder::new_remote_replica( path.to_str().unwrap(), db_url.to_string(), String::new(), ) .connector(TurmoilConnector) .build() .await .unwrap(); let remote = libsql::Builder::new_remote(db_url.to_string(), String::new()) .connector(TurmoilConnector) .build() .await .unwrap(); let replica_conn_1 = replica.connect().unwrap(); let replica_conn_2 = replica.connect().unwrap(); // Not really an embedded replica test but good to check this for remote connections too let remote_conn_1 = remote.connect().unwrap(); let remote_conn_2 = remote.connect().unwrap(); let remote_task_1 = tokio::task::spawn(async move { db_work(remote_conn_1).await }); let remote_task_2 = tokio::task::spawn(async move { db_work(remote_conn_2).await }); let (task_1_res, task_2_res) = tokio::join!(remote_task_1, remote_task_2); let remote_task_1_res = task_1_res.unwrap(); let remote_task_2_res = task_2_res.unwrap(); // Everything works as expected in case of remote connections. assert!(remote_task_1_res.is_ok()); assert!(remote_task_2_res.is_ok()); let replica_task_1 = tokio::task::spawn(async move { db_work(replica_conn_1).await }); let replica_task_2 = tokio::task::spawn(async move { db_work(replica_conn_2).await }); let (task_1_res, task_2_res) = tokio::join!(replica_task_1, replica_task_2); let replica_task_1_res = task_1_res.unwrap(); let replica_task_2_res = task_2_res.unwrap(); if replica_task_1_res.is_err() { panic!("Task 1 failed: {:?}", replica_task_1_res); } if replica_task_2_res.is_err() { panic!("Task 2 failed: {:?}", replica_task_2_res); } // One of these concurrent tasks fail currently. Both tasks should succeed. assert!(replica_task_1_res.is_ok()); assert!(replica_task_2_res.is_ok()); Ok(()) }); async fn db_work(conn: libsql::Connection) -> Result<(), anyhow::Error> { let tx = conn.transaction().await?; // Some business logic here... tokio::time::sleep(std::time::Duration::from_secs(2)).await; tx.execute("SELECT 1", ()).await?; tx.commit().await?; Ok(()) } sim.run().unwrap(); } #[test] fn replicated_return() { let tmp_embedded = tempdir().unwrap(); let tmp_embedded_path = tmp_embedded.path().to_owned(); let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); let tmp = tempdir().unwrap(); let notify = Arc::new(Notify::new()); let notify_clone = notify.clone(); init_tracing(); sim.host("primary", move || { let notify = notify_clone.clone(); let path = tmp.path().to_path_buf(); async move { let make_server = || async { TestServer { path: path.clone().into(), user_api_config: UserApiConfig { ..Default::default() }, admin_api_config: Some(AdminApiConfig { acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(), connector: TurmoilConnector, disable_metrics: true, auth_key: None, }), rpc_server_config: Some(RpcServerConfig { acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await.unwrap(), tls_config: None, }), ..Default::default() } }; let server = make_server().await; let shutdown = server.shutdown.clone(); let fut = async move { server.start_sim(8080).await }; tokio::pin!(fut); loop { tokio::select! { res = &mut fut => { res.unwrap(); break } _ = notify.notified() => { shutdown.notify_waiters(); }, } } drop(fut); tokio::fs::File::create(path.join("dbs").join("default").join(".sentinel")) .await .unwrap(); notify.notify_waiters(); let server = make_server().await; server.start_sim(8080).await.unwrap(); Ok(()) } }); sim.client("client", async move { let path = tmp_embedded_path.join("embedded"); let db = Database::open_with_remote_sync_connector( path.to_str().unwrap(), "http://primary:8080", "", TurmoilConnector, false, None, ) .await?; let rep = db.sync().await.unwrap(); assert_eq!(rep.frame_no(), None); assert_eq!(rep.frames_synced(), 0); let conn = db.connect()?; conn.execute("CREATE TABLE user (id INTEGER)", ()) .await .unwrap(); let rep = db.sync().await.unwrap(); assert_eq!(rep.frame_no(), Some(1)); assert_eq!(rep.frames_synced(), 2); conn.execute_batch( " INSERT into user(id) values (randomblob(4096)); INSERT into user(id) values (randomblob(4096)); INSERT into user(id) values (randomblob(4096)); ", ) .await .unwrap(); let rep = db.sync().await.unwrap(); assert_eq!(rep.frame_no(), Some(10)); assert_eq!(rep.frames_synced(), 9); // Regenerate log notify.notify_waiters(); notify.notified().await; tokio::time::sleep(std::time::Duration::from_secs(5)).await; let rep = db.sync().await.unwrap(); assert_eq!(rep.frame_no(), Some(4)); assert_eq!(rep.frames_synced(), 5); let mut row = conn.query("select count(*) from user", ()).await.unwrap(); let count = row.next().await.unwrap().unwrap().get::<u64>(0).unwrap(); assert_eq!(count, 3); Ok(()) }); sim.run().unwrap(); } #[test] fn replicate_auth() { init_tracing(); let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); let (encoding, decoding) = common::auth::key_pair(); sim.host("primary", { let decoding = decoding.clone(); move || { let decoding = decoding.clone(); async move { let tmp = tempdir()?; let jwt_keys = vec![jsonwebtoken::DecodingKey::from_ed_components(&decoding).unwrap()]; let auth = Auth::new(user_auth_strategies::Jwt::new(jwt_keys)); let server = TestServer { path: tmp.path().to_owned().into(), user_api_config: UserApiConfig { hrana_ws_acceptor: None, auth_strategy: auth, ..Default::default() }, admin_api_config: Some(AdminApiConfig { acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, connector: TurmoilConnector, disable_metrics: true, auth_key: None, }), rpc_server_config: Some(RpcServerConfig { acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?, tls_config: None, }), ..Default::default() }; server.start_sim(8080).await?; Ok(()) } } }); sim.host("replica", { let decoding = decoding.clone(); move || { let decoding = decoding.clone(); async move { let tmp = tempdir()?; let jwt_keys = vec![jsonwebtoken::DecodingKey::from_ed_components(&decoding).unwrap()]; let auth = Auth::new(user_auth_strategies::Jwt::new(jwt_keys)); let server = TestServer { path: tmp.path().to_owned().into(), user_api_config: UserApiConfig { hrana_ws_acceptor: None, auth_strategy: auth, ..Default::default() }, admin_api_config: Some(AdminApiConfig { acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, connector: TurmoilConnector, disable_metrics: true, auth_key: None, }), rpc_client_config: Some(RpcClientConfig { remote_url: "http://primary:4567".into(), connector: TurmoilConnector, tls_config: None, }), ..Default::default() }; server.start_sim(8080).await?; Ok(()) } } }); sim.client("client", async move { let token = encode( &serde_json::json!({ "id": "default", }), &encoding, ); // no auth let tmp = tempdir().unwrap(); let db = Database::open_with_remote_sync_connector( tmp.path().join("embedded").to_str().unwrap(), "http://primary:8080", "", TurmoilConnector, false, None, ) .await?; assert!(db.sync().await.is_err()); let tmp = tempdir().unwrap(); let db = Database::open_with_remote_sync_connector( tmp.path().join("embedded").to_str().unwrap(), "http://replica:8080", "", TurmoilConnector, false, None, ) .await?; assert!(db.sync().await.is_err()); // auth let tmp = tempdir().unwrap(); let db = Database::open_with_remote_sync_connector( tmp.path().join("embedded").to_str().unwrap(), "http://primary:8080", token.clone(), TurmoilConnector, false, None, ) .await?; assert!(db.sync().await.is_ok()); let tmp = tempdir().unwrap(); let db = Database::open_with_remote_sync_connector( tmp.path().join("embedded").to_str().unwrap(), "http://replica:8080", token.clone(), TurmoilConnector, false, None, ) .await?; assert!(db.sync().await.is_ok()); Ok(()) }); sim.run().unwrap(); } #[test] fn replicated_synced_frames_zero_when_no_data_synced() { let tmp_embedded = tempdir().unwrap(); let tmp_embedded_path = tmp_embedded.path().to_owned(); let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); let tmp = tempdir().unwrap(); let notify = Arc::new(Notify::new()); let notify_clone = notify.clone(); init_tracing(); sim.host("primary", move || { let notify = notify_clone.clone(); let path = tmp.path().to_path_buf(); async move { let make_server = || async { TestServer { path: path.clone().into(), user_api_config: UserApiConfig { ..Default::default() }, admin_api_config: Some(AdminApiConfig { acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(), connector: TurmoilConnector, disable_metrics: true, auth_key: None, }), rpc_server_config: Some(RpcServerConfig { acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await.unwrap(), tls_config: None, }), ..Default::default() } }; let server = make_server().await; let shutdown = server.shutdown.clone(); let fut = async move { server.start_sim(8080).await }; tokio::pin!(fut); loop { tokio::select! { res = &mut fut => { res.unwrap(); break } _ = notify.notified() => { shutdown.notify_waiters(); }, } } drop(fut); tokio::fs::File::create(path.join("dbs").join("default").join(".sentinel")) .await .unwrap(); notify.notify_waiters(); let server = make_server().await; server.start_sim(8080).await.unwrap(); Ok(()) } }); sim.client("client", async move { let path = tmp_embedded_path.join("embedded"); let db = Database::open_with_remote_sync_connector( path.to_str().unwrap(), "http://primary:8080", "", TurmoilConnector, false, None, ) .await?; let rep = db.sync().await.unwrap(); assert_eq!(rep.frame_no(), None); assert_eq!(rep.frames_synced(), 0); let conn = db.connect()?; conn.execute("CREATE TABLE user (id INTEGER)", ()) .await .unwrap(); let rep = db.sync().await.unwrap(); assert_eq!(rep.frame_no(), Some(1)); assert_eq!(rep.frames_synced(), 2); conn.execute("INSERT into user(id) values (randomblob(4096));", ()) .await .unwrap(); let rep = db.sync().await.unwrap(); assert_eq!(rep.frame_no(), Some(4)); assert_eq!(rep.frames_synced(), 3); let rep = db.sync().await.unwrap(); assert_eq!(rep.frame_no(), Some(4)); assert_eq!(rep.frames_synced(), 0); let rep = db.sync().await.unwrap(); assert_eq!(rep.frame_no(), Some(4)); assert_eq!(rep.frames_synced(), 0); Ok(()) }); sim.run().unwrap(); } #[test] fn schema_db() { let tmp_embedded = tempdir().unwrap(); let tmp_host = tempdir().unwrap(); let tmp_embedded_path = tmp_embedded.path().to_owned(); let tmp_host_path = tmp_host.path().to_owned(); let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); make_primary(&mut sim, tmp_host_path.clone()); sim.client("client", async move { let http = Client::new(); assert!(http .post( "http://primary:9090/v1/namespaces/schema/create", json!({ "shared_schema": true }) ) .await .unwrap() .status() .is_success()); assert!(http .post( "http://primary:9090/v1/namespaces/foo/create", json!({ "shared_schema_name": "schema" }) ) .await .unwrap() .status() .is_success()); let path = tmp_embedded_path.join("embedded"); let db = Database::open_with_remote_sync_connector( path.to_str().unwrap(), "http://schema.primary:8080", "", TurmoilConnector, false, None, ) .await .unwrap(); db.sync().await.unwrap_err(); let conn = db.connect().unwrap(); conn.execute("create table test (x)", ()).await.unwrap_err(); Ok(()) }); sim.run().unwrap(); }