0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-05-23 04:48:11 +00:00
Files
libsql/libsql-server/tests/libsql_wal/services/clients.rs
ad hoc 4f19c351d4 wip
2024-10-03 11:24:29 +02:00

176 lines
5.5 KiB
Rust

use std::time::Duration;
use hashbrown::HashMap;
use itertools::Itertools;
use jsonwebtoken::EncodingKey;
use libsql::Database;
use rand::seq::SliceRandom;
use rand::{Rng, RngCore};
use tokio::sync::oneshot;
use uuid::Uuid;
use crate::libsql_wal::config::SimConfig;
use crate::{
auth::make_default_token,
libsql_wal::{dns::Dns, net::TurmoilConnector},
};
use super::SimService;
#[derive(Debug)]
struct Namespace {
db: Database,
}
enum State {
Init,
/// waiting for primary healthcheck to pass
WaitingHealth(oneshot::Receiver<()>),
/// waiting for namespace creation
WaitingForNamespace(oneshot::Receiver<HashMap<Uuid, Namespace>>),
Running {
namespaces: HashMap<Uuid, Namespace>,
},
}
type HttpClient = crate::common::http::Client<TurmoilConnector>;
pub struct ClientsService {
state: State,
dns: Dns,
encoding_key: EncodingKey,
}
impl ClientsService {
pub fn configure(dns: Dns, encoding_key: EncodingKey) -> Self {
Self {
state: State::Init,
dns,
encoding_key,
}
}
fn http_client(&self) -> HttpClient {
crate::common::http::Client::from(
hyper::client::Client::builder().build(TurmoilConnector::new(self.dns.clone())),
)
}
}
impl SimService for ClientsService {
fn tick(
&mut self,
sim: &mut turmoil::Sim,
config: &crate::libsql_wal::config::SimConfig,
rng: &mut dyn rand::RngCore,
) -> bool {
match self.state {
State::Init => {
let (snd, rcv) = oneshot::channel();
let id = Uuid::from_u128(rng.gen());
let client = self.http_client();
sim.client(id.to_string(), async move {
loop {
if client.get("http://primary:8080/health").await.is_ok() {
let _ = snd.send(());
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
Ok(())
});
self.state = State::WaitingHealth(rcv);
}
State::WaitingHealth(ref mut rcv) => {
if rcv.try_recv().is_ok() {
let (snd, rcv) = oneshot::channel();
let id = Uuid::from_u128(rng.gen());
let mut namespaces = HashMap::new();
let token = make_default_token(&self.encoding_key);
for _ in 0..config.n_namespaces {
let ns_id = Uuid::from_u128(rng.gen());
let mut b = vec![0; 4096];
rng.fill_bytes(&mut b);
let mut u = Unstructured::new(&b);
let schema = Schema::arbitrary(&mut u).unwrap();
let ns = Namespace {
schema,
#[allow(deprecated)] // the builder uses async for no reason
db: Database::open_remote_with_connector(
format!("http://{ns_id}.primary:8080"),
&token,
TurmoilConnector::new(self.dns.clone()),
).unwrap(),
};
namespaces.insert(ns_id, ns);
}
let client = self.http_client();
sim.client(id.to_string(), async move {
for (id, ns) in namespaces.iter() {
let resp = client
.post(
&format!("http://primary:9090/v1/namespaces/{id}/create"),
serde_json::json!({}),
)
.await
.unwrap();
assert!(resp.status().is_success());
let schema_sql = ns.schema.to_sql();
let conn = ns.db.connect().unwrap();
let tx = conn.transaction().await.unwrap();
tx.execute("CREATE TABLE IF NOT EXISTS t1(a INTEGER PRIMARY KEY, b BLOB(16), c BLOB(16), d BLOB(400))", ()).await?;
tx.execute("CREATE INDEX IF NOT EXISTS i1 ON t1(b)", ()).await?;
tx.execute("CREATE INDEX IF NOT EXISTS i2 ON t1(c)", ()).await?;
tx.commit().await.unwrap();
}
snd.send(namespaces).unwrap();
Ok(())
});
self.state = State::WaitingForNamespace(rcv);
}
}
State::WaitingForNamespace(ref mut s) => {
if let Ok(namespaces) = s.try_recv() {
self.state = State::Running { namespaces };
}
}
State::Running { .. } => {
todo!()
}
}
true
}
}
enum Workload {
Read,
Write,
Batch(Vec<Self>),
InteractiveTxn(Vec<Self>),
}
enum Op {
Read,
Write,
Batch,
InteractiveTxn,
}
impl Workload {
fn generate(config: &SimConfig, rng: &mut impl Rng) -> Self {
let mut steps = rng.gen_range(1..config.max_steps);
while steps != 0 {
let opts = [(Op::Read), (Op::Write), (Op::Batch), (Op::InteractiveTxn)].choose_weighted(rng, weight)
}
}
}