use std::time::Duration; use crate::common::{net::TurmoilConnector, snapshot_metrics}; use libsql::{params, Database, TransactionBehavior}; #[test] fn transaction_commit_and_rollback() { let mut sim = turmoil::Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); sim.host("primary", super::make_standalone_server); sim.client("client", async { let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?; let conn = db.connect()?; // initialize tables let tx = conn.transaction().await?; tx.execute_batch(r#"create table t(x text);"#).await?; tx.commit().await?; // transaction with temporary data let tx = conn.transaction().await?; tx.execute("insert into t(x) values('hello');", ()).await?; let mut rows = tx .query("select * from t where x = ?", params!["hello"]) .await?; assert_eq!(rows.column_count(), 1); assert_eq!(rows.column_name(0), Some("x")); assert_eq!(rows.next().await?.unwrap().get::<String>(0)?, "hello"); assert!(rows.next().await?.is_none()); tx.rollback().await?; // confirm that temporary that was not committed let mut rows = conn .query("select * from t where x = ?", params!["hello"]) .await?; assert_eq!(rows.column_count(), 1); assert_eq!(rows.column_name(0), Some("x")); assert!(rows.next().await?.is_none()); Ok(()) }); sim.run().unwrap(); } #[test] fn multiple_concurrent_transactions() { let mut sim = turmoil::Builder::new() .simulation_duration(Duration::from_secs(1000)) .build(); sim.host("primary", super::make_standalone_server); sim.client("client", async { let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?; let conn = db.connect()?; conn.execute_batch(r#"create table t(x text);"#).await?; // open first transaction and alter data let tx1 = conn .transaction_with_behavior(TransactionBehavior::Deferred) .await?; tx1.execute("insert into t(x) values('hello');", ()).await?; // while first transaction is still open open another read-only transaction and try to read let tx2 = conn .transaction_with_behavior(TransactionBehavior::ReadOnly) .await?; let mut rows = tx2 .query("select * from t where x = ?", params!["hello"]) .await?; assert_eq!(rows.column_count(), 1); assert_eq!(rows.column_name(0), Some("x")); assert!(rows.next().await?.is_none()); // commit first transaction - T2 should still read old data tx1.commit().await?; let mut rows = tx2 .query("select * from t where x = ?", params!["hello"]) .await?; assert_eq!(rows.column_count(), 1); assert_eq!(rows.column_name(0), Some("x")); assert!(rows.next().await?.is_none()); tx2.commit().await?; // finally open new transaction - it now should read actual data let tx3 = conn .transaction_with_behavior(TransactionBehavior::ReadOnly) .await?; let mut rows = tx3 .query("select * from t where x = ?", params!["hello"]) .await?; assert_eq!(rows.column_count(), 1); assert_eq!(rows.column_name(0), Some("x")); assert_eq!(rows.next().await?.unwrap().get::<String>(0)?, "hello"); assert!(rows.next().await?.is_none()); Ok(()) }); sim.run().unwrap(); } #[ignore = "FIXME: running a connection on a different runtime causes it to timeout early"] #[test] fn transaction_timeout() { let mut sim = turmoil::Builder::new() .tick_duration(Duration::from_millis(500)) .simulation_duration(Duration::from_secs(3600)) .build(); sim.host("primary", super::make_standalone_server); sim.client("client", async { let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?; let conn = db.connect()?; // initialize tables let tx = conn.transaction().await?; tx.execute_batch(r#"create table t(x text);"#).await?; tx.commit().await?; // transaction with temporary data let tx = conn.transaction().await?; tx.execute("insert into t(x) values('hello');", ()).await?; let mut rows = tx .query("select * from t where x = ?", params!["hello"]) .await?; assert_eq!(rows.column_count(), 1); assert_eq!(rows.column_name(0), Some("x")); assert_eq!(rows.next().await?.unwrap().get::<String>(0)?, "hello"); assert!(rows.next().await?.is_none()); // Sleep to trigger stream expiration tokio::time::sleep(Duration::from_secs(300)).await; tx.rollback().await.unwrap_err(); snapshot_metrics().assert_counter_label( "libsql_server_user_http_response", ("status", "400"), 1, ); Ok(()) }); sim.run().unwrap(); }