use std::convert::Infallible; use std::time::Duration; use hyper::{service::make_service_fn, Body, Response, StatusCode}; use insta::{assert_json_snapshot, assert_snapshot}; use libsql::{Database, Value}; use serde_json::json; use tempfile::tempdir; use tower::service_fn; use turmoil::Builder; use crate::common::http::Client; use crate::common::net::{TurmoilAcceptor, TurmoilConnector}; use crate::namespaces::make_primary; #[test] fn load_namespace_from_dump_from_url() { const DUMP: &str = r#" PRAGMA foreign_keys=OFF; BEGIN TRANSACTION; CREATE TABLE test (x); INSERT INTO test VALUES(42); COMMIT;"#; let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); let tmp = tempdir().unwrap(); make_primary(&mut sim, tmp.path().to_path_buf()); sim.host("dump-store", || async { let incoming = TurmoilAcceptor::bind(([0, 0, 0, 0], 8080)).await?; let server = hyper::server::Server::builder(incoming).serve(make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(|_req| async { Ok::<_, Infallible>(Response::new(Body::from(DUMP))) })) })); server.await.unwrap(); Ok(()) }); sim.client("client", async { let client = Client::new(); let resp = client .post( "http://primary:9090/v1/namespaces/foo/create", json!({ "dump_url": "http://dump-store:8080/"}), ) .await .unwrap(); assert_eq!(resp.status(), 200); assert_snapshot!(resp.body_string().await.unwrap()); let foo = Database::open_remote_with_connector("http://foo.primary:8080", "", TurmoilConnector)?; let foo_conn = foo.connect()?; let mut rows = foo_conn.query("select count(*) from test", ()).await?; assert!(matches!( rows.next().await.unwrap().unwrap().get_value(0)?, Value::Integer(1) )); Ok(()) }); sim.run().unwrap(); } #[test] fn load_namespace_from_dump_from_file() { const DUMP: &str = r#" PRAGMA foreign_keys=OFF; BEGIN TRANSACTION; CREATE TABLE test (x); INSERT INTO test VALUES(42); COMMIT;"#; let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); let tmp = tempdir().unwrap(); let tmp_path = tmp.path().to_path_buf(); std::fs::write(tmp_path.join("dump.sql"), DUMP).unwrap(); make_primary(&mut sim, tmp.path().to_path_buf()); sim.client("client", async move { let client = Client::new(); // path is not absolute is an error let resp = client .post( "http://primary:9090/v1/namespaces/foo/create", json!({ "dump_url": "file:dump.sql"}), ) .await .unwrap(); assert_eq!(resp.status(), StatusCode::BAD_REQUEST); // path doesn't exist is an error let resp = client .post( "http://primary:9090/v1/namespaces/foo/create", json!({ "dump_url": "file:/dump.sql"}), ) .await .unwrap(); assert_eq!(resp.status(), StatusCode::BAD_REQUEST); let resp = client .post( "http://primary:9090/v1/namespaces/foo/create", json!({ "dump_url": format!("file:{}", tmp_path.join("dump.sql").display())}), ) .await .unwrap(); assert_eq!( resp.status(), StatusCode::OK, "{}", resp.json::<serde_json::Value>().await.unwrap_or_default() ); let foo = Database::open_remote_with_connector("http://foo.primary:8080", "", TurmoilConnector)?; let foo_conn = foo.connect()?; let mut rows = foo_conn.query("select count(*) from test", ()).await?; assert!(matches!( rows.next().await.unwrap().unwrap().get_value(0)?, Value::Integer(1) )); Ok(()) }); sim.run().unwrap(); } #[test] fn load_namespace_from_no_commit() { const DUMP: &str = r#" PRAGMA foreign_keys=OFF; BEGIN TRANSACTION; CREATE TABLE test (x); INSERT INTO test VALUES(42); "#; let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); let tmp = tempdir().unwrap(); let tmp_path = tmp.path().to_path_buf(); std::fs::write(tmp_path.join("dump.sql"), DUMP).unwrap(); make_primary(&mut sim, tmp.path().to_path_buf()); sim.client("client", async move { let client = Client::new(); let resp = client .post( "http://primary:9090/v1/namespaces/foo/create", json!({ "dump_url": format!("file:{}", tmp_path.join("dump.sql").display())}), ) .await .unwrap(); // the dump is malformed assert_eq!( resp.status(), StatusCode::BAD_REQUEST, "{}", resp.json::<serde_json::Value>().await.unwrap_or_default() ); // namespace doesn't exist let foo = Database::open_remote_with_connector("http://foo.primary:8080", "", TurmoilConnector)?; let foo_conn = foo.connect()?; assert!(foo_conn .query("select count(*) from test", ()) .await .is_err()); Ok(()) }); sim.run().unwrap(); } #[test] fn load_namespace_from_no_txn() { const DUMP: &str = r#" PRAGMA foreign_keys=OFF; CREATE TABLE test (x); INSERT INTO test VALUES(42); COMMIT; "#; let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); let tmp = tempdir().unwrap(); let tmp_path = tmp.path().to_path_buf(); std::fs::write(tmp_path.join("dump.sql"), DUMP).unwrap(); make_primary(&mut sim, tmp.path().to_path_buf()); sim.client("client", async move { let client = Client::new(); let resp = client .post( "http://primary:9090/v1/namespaces/foo/create", json!({ "dump_url": format!("file:{}", tmp_path.join("dump.sql").display())}), ) .await?; // the dump is malformed assert_eq!( resp.status(), StatusCode::BAD_REQUEST, "{}", resp.json::<serde_json::Value>().await.unwrap_or_default() ); assert_json_snapshot!(resp.json_value().await.unwrap()); // namespace doesn't exist let foo = Database::open_remote_with_connector("http://foo.primary:8080", "", TurmoilConnector)?; let foo_conn = foo.connect()?; assert!(foo_conn .query("select count(*) from test", ()) .await .is_err()); Ok(()) }); sim.run().unwrap(); } #[test] fn export_dump() { let mut sim = Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); let tmp = tempdir().unwrap(); make_primary(&mut sim, tmp.path().to_path_buf()); sim.client("client", async move { let client = Client::new(); let resp = client .post("http://primary:9090/v1/namespaces/foo/create", json!({})) .await?; assert_eq!(resp.status(), StatusCode::OK); let foo = Database::open_remote_with_connector("http://foo.primary:8080", "", TurmoilConnector)?; let foo_conn = foo.connect()?; foo_conn.execute("create table test (x)", ()).await?; foo_conn.execute("insert into test values (42)", ()).await?; foo_conn .execute("insert into test values ('foo')", ()) .await?; foo_conn .execute("insert into test values ('bar')", ()) .await?; let resp = client.get("http://foo.primary:8080/dump").await?; assert_eq!(resp.status(), StatusCode::OK); assert_snapshot!(resp.body_string().await?); Ok(()) }); sim.run().unwrap(); }