0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2024-12-15 17:59:41 +00:00
libsql/libsql-server/tests/hrana/transaction.rs
2024-07-17 19:37:24 +00:00

151 lines
5.0 KiB
Rust

use std::time::Duration;
use crate::common::{net::TurmoilConnector, snapshot_metrics};
use libsql::{params, Database, TransactionBehavior};
#[test]
fn transaction_commit_and_rollback() {
let mut sim = turmoil::Builder::new()
.simulation_duration(Duration::from_secs(1000))
.build();
sim.host("primary", super::make_standalone_server);
sim.client("client", async {
let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?;
let conn = db.connect()?;
// initialize tables
let tx = conn.transaction().await?;
tx.execute_batch(r#"create table t(x text);"#).await?;
tx.commit().await?;
// transaction with temporary data
let tx = conn.transaction().await?;
tx.execute("insert into t(x) values('hello');", ()).await?;
let mut rows = tx
.query("select * from t where x = ?", params!["hello"])
.await?;
assert_eq!(rows.column_count(), 1);
assert_eq!(rows.column_name(0), Some("x"));
assert_eq!(rows.next().await?.unwrap().get::<String>(0)?, "hello");
assert!(rows.next().await?.is_none());
tx.rollback().await?;
// confirm that temporary that was not committed
let mut rows = conn
.query("select * from t where x = ?", params!["hello"])
.await?;
assert_eq!(rows.column_count(), 1);
assert_eq!(rows.column_name(0), Some("x"));
assert!(rows.next().await?.is_none());
Ok(())
});
sim.run().unwrap();
}
#[test]
fn multiple_concurrent_transactions() {
let mut sim = turmoil::Builder::new()
.simulation_duration(Duration::from_secs(1000))
.build();
sim.host("primary", super::make_standalone_server);
sim.client("client", async {
let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?;
let conn = db.connect()?;
conn.execute_batch(r#"create table t(x text);"#).await?;
// open first transaction and alter data
let tx1 = conn
.transaction_with_behavior(TransactionBehavior::Deferred)
.await?;
tx1.execute("insert into t(x) values('hello');", ()).await?;
// while first transaction is still open open another read-only transaction and try to read
let tx2 = conn
.transaction_with_behavior(TransactionBehavior::ReadOnly)
.await?;
let mut rows = tx2
.query("select * from t where x = ?", params!["hello"])
.await?;
assert_eq!(rows.column_count(), 1);
assert_eq!(rows.column_name(0), Some("x"));
assert!(rows.next().await?.is_none());
// commit first transaction - T2 should still read old data
tx1.commit().await?;
let mut rows = tx2
.query("select * from t where x = ?", params!["hello"])
.await?;
assert_eq!(rows.column_count(), 1);
assert_eq!(rows.column_name(0), Some("x"));
assert!(rows.next().await?.is_none());
tx2.commit().await?;
// finally open new transaction - it now should read actual data
let tx3 = conn
.transaction_with_behavior(TransactionBehavior::ReadOnly)
.await?;
let mut rows = tx3
.query("select * from t where x = ?", params!["hello"])
.await?;
assert_eq!(rows.column_count(), 1);
assert_eq!(rows.column_name(0), Some("x"));
assert_eq!(rows.next().await?.unwrap().get::<String>(0)?, "hello");
assert!(rows.next().await?.is_none());
Ok(())
});
sim.run().unwrap();
}
#[test]
fn transaction_timeout() {
let mut sim = turmoil::Builder::new()
.tick_duration(Duration::from_millis(500))
.simulation_duration(Duration::from_secs(3600))
.build();
sim.host("primary", super::make_standalone_server);
sim.client("client", async {
let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?;
let conn = db.connect()?;
// initialize tables
let tx = conn.transaction().await?;
tx.execute_batch(r#"create table t(x text);"#).await?;
tx.commit().await?;
// transaction with temporary data
let tx = conn.transaction().await?;
tx.execute("insert into t(x) values('hello');", ()).await?;
let mut rows = tx
.query("select * from t where x = ?", params!["hello"])
.await?;
assert_eq!(rows.column_count(), 1);
assert_eq!(rows.column_name(0), Some("x"));
assert_eq!(rows.next().await?.unwrap().get::<String>(0)?, "hello");
assert!(rows.next().await?.is_none());
// Sleep to trigger stream expiration
tokio::time::sleep(Duration::from_secs(11)).await;
tx.rollback().await.unwrap_err();
snapshot_metrics().assert_counter_label(
"libsql_server_user_http_response",
("status", "400"),
1,
);
Ok(())
});
sim.run().unwrap();
}