0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-05-23 18:57:55 +00:00
Files
libsql/libsql-server/tests/embedded_replica/mod.rs
Lucio Franco d1a75ba344 libsql: fix deferred txn with embedded replicas
This fixes an issue where embedded replica based connections would have
issues dispatching executes on deferred statements. This is due to sqld
using `sqlite3_txn_state` to return the connection's state. While this
works for other transactions, in a deferred transaction the connection
does not change its state to be in a transaction. This is due to the
nature of a deferred transaction. The problem is that the connection
does set `is_autocommit` to `false`. This means that sqld is returning
that it is not in a txn state which is used by clients to set its
`is_autocommit` state. For now the work around is to track when the
client is in a `DEFERRED` transaction and to handle remote state
transition's in such a way that it ignores the value from the server if
it is currently in the deferred state.
2023-11-02 14:44:53 -04:00

217 lines
5.6 KiB
Rust

use std::path::PathBuf;
use crate::common::http::Client;
use crate::common::net::{init_tracing, SimServer, TestServer, TurmoilAcceptor, TurmoilConnector};
use libsql::Database;
use serde_json::json;
use sqld::config::{AdminApiConfig, RpcServerConfig, UserApiConfig};
use tempfile::tempdir;
use turmoil::{Builder, Sim};
fn enable_libsql_logging() {
use std::ffi::c_int;
use std::sync::Once;
static ONCE: Once = Once::new();
fn libsql_log(code: c_int, msg: &str) {
tracing::error!("sqlite error {code}: {msg}");
}
ONCE.call_once(|| unsafe {
rusqlite::trace::config_log(Some(libsql_log)).unwrap();
});
}
fn make_primary(sim: &mut Sim, path: PathBuf) {
init_tracing();
enable_libsql_logging();
sim.host("primary", move || {
let path = path.clone();
async move {
let server = TestServer {
path: path.into(),
user_api_config: UserApiConfig {
..Default::default()
},
admin_api_config: Some(AdminApiConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?,
connector: TurmoilConnector,
disable_metrics: false,
}),
rpc_server_config: Some(RpcServerConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?,
tls_config: None,
}),
disable_namespaces: false,
disable_default_namespace: true,
..Default::default()
};
server.start_sim(8080).await?;
Ok(())
}
});
}
#[test]
fn embedded_replica() {
let mut sim = Builder::new().build();
let tmp_embedded = tempdir().unwrap();
let tmp_host = tempdir().unwrap();
let tmp_embedded_path = tmp_embedded.path().to_owned();
let tmp_host_path = tmp_host.path().to_owned();
make_primary(&mut sim, tmp_host_path.clone());
sim.client("client", async move {
let client = Client::new();
client
.post("http://primary:9090/v1/namespaces/foo/create", json!({}))
.await?;
let path = tmp_embedded_path.join("embedded");
let db = Database::open_with_remote_sync_connector(
path.to_str().unwrap(),
"http://foo.primary:8080",
"",
TurmoilConnector,
)
.await?;
let n = db.sync().await?;
assert_eq!(n, None);
let conn = db.connect()?;
conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
.await?;
let n = db.sync().await?;
assert_eq!(n, Some(1));
let err = conn
.execute("INSERT INTO user(id) VALUES (1), (1)", ())
.await
.unwrap_err();
let libsql::Error::RemoteSqliteFailure(code, extended_code, _) = err else {
panic!()
};
assert_eq!(code, 3);
assert_eq!(extended_code, 1555);
Ok(())
});
sim.run().unwrap();
}
#[test]
fn execute_batch() {
let mut sim = Builder::new().build();
let tmp_embedded = tempdir().unwrap();
let tmp_host = tempdir().unwrap();
let tmp_embedded_path = tmp_embedded.path().to_owned();
let tmp_host_path = tmp_host.path().to_owned();
make_primary(&mut sim, tmp_host_path.clone());
sim.client("client", async move {
let client = Client::new();
client
.post("http://primary:9090/v1/namespaces/foo/create", json!({}))
.await?;
let path = tmp_embedded_path.join("embedded");
let db = Database::open_with_remote_sync_connector(
path.to_str().unwrap(),
"http://foo.primary:8080",
"",
TurmoilConnector,
)
.await?;
let n = db.sync().await?;
assert_eq!(n, None);
let conn = db.connect()?;
conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
.await?;
let n = db.sync().await?;
assert_eq!(n, Some(1));
conn.execute_batch(
"BEGIN;
INSERT INTO user (id) VALUES (2);", // COMMIT;",
)
.await?;
Ok(())
});
sim.run().unwrap();
}
#[test]
fn txn_autocommit() {
let mut sim = Builder::new().build();
let tmp_embedded = tempdir().unwrap();
let tmp_host = tempdir().unwrap();
let tmp_embedded_path = tmp_embedded.path().to_owned();
let tmp_host_path = tmp_host.path().to_owned();
make_primary(&mut sim, tmp_host_path.clone());
sim.client("client", async move {
let client = Client::new();
client
.post("http://primary:9090/v1/namespaces/foo/create", json!({}))
.await?;
let path = tmp_embedded_path.join("embedded");
let db = Database::open_with_remote_sync_connector(
path.to_str().unwrap(),
"http://foo.primary:8080",
"",
TurmoilConnector,
)
.await?;
let n = db.sync().await?;
assert_eq!(n, None);
let conn = db.connect()?;
let txn = conn
.transaction_with_behavior(libsql::TransactionBehavior::Deferred)
.await?;
assert!(!txn.is_autocommit());
txn.execute("select 1", ()).await?;
txn.commit().await?;
assert!(conn.is_autocommit());
let txn = conn
.transaction_with_behavior(libsql::TransactionBehavior::Deferred)
.await?;
assert!(!txn.is_autocommit());
txn.execute("select 1", ()).await?;
txn.commit().await?;
Ok(())
});
sim.run().unwrap();
}