0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-09-06 07:10:34 +00:00

feat: add columns information for Hrana Statements

This commit is contained in:
Levy A.
2025-05-12 19:39:14 -03:00
parent db0cdf8eb8
commit 064092cfaf
7 changed files with 53 additions and 24 deletions

View File

@@ -7,7 +7,6 @@ pub use builder::Builder;
#[cfg(feature = "core")]
pub use libsql_sys::{Cipher, EncryptionConfig};
use crate::sync::DropAbort;
use crate::{Connection, Result};
use std::fmt;
use std::sync::atomic::AtomicU64;
@@ -101,7 +100,7 @@ enum DbType {
url: String,
auth_token: String,
connector: crate::util::ConnectorService,
_bg_abort: Option<Arc<DropAbort>>,
_bg_abort: Option<Arc<crate::sync::DropAbort>>,
},
#[cfg(feature = "remote")]
Remote {
@@ -679,10 +678,8 @@ impl Database {
..
} => {
use crate::{
hrana::connection::HttpConnection,
local::impls::LibsqlConnection,
replication::connection::State,
sync::connection::SyncedConnection,
hrana::connection::HttpConnection, local::impls::LibsqlConnection,
replication::connection::State, sync::connection::SyncedConnection,
};
use tokio::sync::Mutex;

View File

@@ -6,7 +6,7 @@ use std::sync::Arc;
use tracing::Instrument as _;
use crate::{sync::DropAbort, Database, Result};
use crate::{Database, Result};
use super::DbType;
@@ -645,7 +645,7 @@ cfg_sync! {
db.sync_ctx.as_ref().unwrap().lock().await.set_push_batch_size(push_batch_size);
}
let mut bg_abort: Option<Arc<DropAbort>> = None;
let mut bg_abort: Option<Arc<crate::sync::DropAbort>> = None;
let conn = db.connect()?;
let sync_ctx = db.sync_ctx.as_ref().unwrap().clone();
@@ -654,6 +654,7 @@ cfg_sync! {
let jh = tokio::spawn(
async move {
loop {
tokio::time::sleep(sync_interval).await;
tracing::trace!("trying to sync");
let mut ctx = sync_ctx.lock().await;
if remote_writes {
@@ -665,13 +666,12 @@ cfg_sync! {
tracing::error!("sync error: {}", e);
}
}
tokio::time::sleep(sync_interval).await;
}
}
.instrument(tracing::info_span!("sync_interval")),
);
bg_abort.replace(Arc::new(DropAbort(jh.abort_handle())));
bg_abort.replace(Arc::new(crate::sync::DropAbort(jh.abort_handle())));
}
Ok(Database {

View File

@@ -80,9 +80,9 @@ where
)
}
pub fn prepare(&self, sql: &str) -> crate::Result<Statement<T>> {
pub async fn prepare(&self, sql: &str) -> crate::Result<Statement<T>> {
let stream = self.current_stream().clone();
Statement::new(stream, sql.to_string(), true)
Statement::new(stream, sql.to_string(), true).await
}
}

View File

@@ -131,7 +131,7 @@ impl Conn for HttpConnection<HttpSender> {
async fn prepare(&self, sql: &str) -> crate::Result<Statement> {
let stream = self.current_stream().clone();
let stmt = crate::hrana::Statement::new(stream, sql.to_string(), true)?;
let stmt = crate::hrana::Statement::new(stream, sql.to_string(), true).await?;
Ok(Statement {
inner: Box::new(stmt),
})
@@ -241,7 +241,16 @@ impl crate::statement::Stmt for crate::hrana::Statement<HttpSender> {
// 2. Even if we do execute query, Hrana doesn't return all info that Column exposes.
// 3. Even if we would like to return some of the column info ie. column [ValueType], this information is not
// present in Hrana [Col] but rather inferred from the row cell type.
vec![]
self.cols
.iter()
.map(|name| crate::Column {
name,
origin_name: None,
table_name: None,
database_name: None,
decl_type: None,
})
.collect()
}
}
@@ -350,7 +359,7 @@ impl Conn for HranaStream<HttpSender> {
}
async fn prepare(&self, sql: &str) -> crate::Result<Statement> {
let stmt = crate::hrana::Statement::new(self.clone(), sql.to_string(), true)?;
let stmt = crate::hrana::Statement::new(self.clone(), sql.to_string(), true).await?;
Ok(Statement {
inner: Box::new(stmt),
})

View File

@@ -119,13 +119,22 @@ where
stream: HranaStream<T>,
close_stream: bool,
inner: Stmt,
cols: Vec<String>,
}
impl<T> Statement<T>
where
T: HttpSend + Send + Sync + 'static,
{
pub(crate) fn new(stream: HranaStream<T>, sql: String, want_rows: bool) -> crate::Result<Self> {
pub(crate) async fn new(
stream: HranaStream<T>,
sql: String,
want_rows: bool,
) -> crate::Result<Self> {
let desc = stream.describe(&sql).await?;
let cols: Vec<_> = desc.cols.into_iter().map(|col| col.name).collect();
// in SQLite when a multiple statements are glued together into one string, only the first one is
// executed and then a handle to continue execution is returned. However Hrana API doesn't allow
// passing multi-statement strings, so we just pick first one.
@@ -147,6 +156,7 @@ where
stream,
close_stream,
inner,
cols,
})
}
}

View File

@@ -1,14 +1,14 @@
// TODO(lucio): Move this to `remote/mod.rs`
use std::time::Duration;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use libsql_replication::rpc::proxy::{
describe_result, query_result::RowResult, Cond, DescribeResult, ExecuteResults, NotCond,
OkCond, Positional, Query, ResultRows, State as RemoteState, Step,
};
use parking_lot::Mutex;
use std::str::FromStr;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
use crate::parser;
use crate::parser::StmtKind;
@@ -168,7 +168,11 @@ impl From<RemoteState> for State {
}
impl RemoteConnection {
pub(crate) fn new(local: LibsqlConnection, writer: Option<Writer>, max_write_replication_index: Arc<AtomicU64>) -> Self {
pub(crate) fn new(
local: LibsqlConnection,
writer: Option<Writer>,
max_write_replication_index: Arc<AtomicU64>,
) -> Self {
let state = Arc::new(Mutex::new(Inner::default()));
Self {
local,
@@ -180,9 +184,16 @@ impl RemoteConnection {
fn update_max_write_replication_index(&self, index: Option<u64>) {
if let Some(index) = index {
let mut current = self.max_write_replication_index.load(std::sync::atomic::Ordering::SeqCst);
let mut current = self
.max_write_replication_index
.load(std::sync::atomic::Ordering::SeqCst);
while index > current {
match self.max_write_replication_index.compare_exchange(current, index, std::sync::atomic::Ordering::SeqCst, std::sync::atomic::Ordering::SeqCst) {
match self.max_write_replication_index.compare_exchange(
current,
index,
std::sync::atomic::Ordering::SeqCst,
std::sync::atomic::Ordering::SeqCst,
) {
Ok(_) => break,
Err(new_current) => current = new_current,
}

View File

@@ -128,9 +128,11 @@ impl Conn for SyncedConnection {
})
} else {
let stmt = Statement {
inner: Box::new(self.remote.prepare(sql)?),
inner: Box::new(self.remote.prepare(sql).await?),
};
dbg!(stmt.columns().iter().map(|c| c.name()).collect::<Vec<_>>());
if self.read_your_writes {
Ok(Statement {
inner: Box::new(SyncedStatement {