mirror of
https://github.com/tursodatabase/libsql.git
synced 2025-08-17 08:23:22 +00:00
* Add SQLD_HTTP_PRIMARY_URL config and pass it all the way down to user routes * Add broadcaster struct to namespaces to notify updates * Add user route that allows listening to updates * Aggregate events before broadcasting them * Unsubscribe when stream is dropped and other minor improvements * Add BrodcasterRegistry and BrodcasterHandle to manage broadcaster lifecycle * Simplify code * Update hooks during statement execution Connection can live arbitrarily long, so we can't only register hooks when they are created. * Always inline should serialize checker of broadcaster message Co-authored-by: Lucio Franco <luciofranco14@gmail.com> * Replace HashMap with BTreeMap on broadcaster state It improves performance on the case where we have thousands of updates to the same table. * Add counters to events sent and dropped --------- Co-authored-by: Lucio Franco <luciofranco14@gmail.com>
105 lines
3.0 KiB
Rust
105 lines
3.0 KiB
Rust
use std::{
|
|
collections::{hash_map::Entry, BTreeMap, HashMap},
|
|
mem,
|
|
sync::Arc,
|
|
};
|
|
|
|
use parking_lot::Mutex;
|
|
use serde::Serialize;
|
|
use tokio::sync::broadcast::{self};
|
|
use tokio_stream::wrappers::BroadcastStream;
|
|
|
|
#[derive(Debug, Copy, Clone, Serialize, Default)]
|
|
pub struct BroadcastMsg {
|
|
#[serde(skip_serializing_if = "is_zero")]
|
|
pub unknown: u64,
|
|
#[serde(skip_serializing_if = "is_zero")]
|
|
pub delete: u64,
|
|
#[serde(skip_serializing_if = "is_zero")]
|
|
pub insert: u64,
|
|
#[serde(skip_serializing_if = "is_zero")]
|
|
pub update: u64,
|
|
}
|
|
|
|
#[inline(always)]
|
|
fn is_zero(num: &u64) -> bool {
|
|
*num == 0
|
|
}
|
|
|
|
#[derive(Debug, Default)]
|
|
pub struct BroadcasterInner {
|
|
state: Mutex<BTreeMap<String, BroadcastMsg>>,
|
|
senders: Mutex<HashMap<String, broadcast::Sender<BroadcastMsg>>>,
|
|
}
|
|
|
|
#[derive(Debug, Default, Clone)]
|
|
pub struct Broadcaster {
|
|
inner: Arc<BroadcasterInner>,
|
|
}
|
|
|
|
impl Broadcaster {
|
|
const BROADCAST_CAP: usize = 1024;
|
|
|
|
pub fn notify(&self, table: &str, action: rusqlite::hooks::Action) {
|
|
let mut state = self.inner.state.lock();
|
|
if let Some(entry) = state.get_mut(table) {
|
|
Self::increment(entry, action);
|
|
} else {
|
|
let mut entry = BroadcastMsg::default();
|
|
Self::increment(&mut entry, action);
|
|
state.insert(table.into(), entry);
|
|
}
|
|
}
|
|
|
|
fn increment(value: &mut BroadcastMsg, action: rusqlite::hooks::Action) {
|
|
match action {
|
|
rusqlite::hooks::Action::SQLITE_DELETE => value.delete += 1,
|
|
rusqlite::hooks::Action::SQLITE_INSERT => value.insert += 1,
|
|
rusqlite::hooks::Action::SQLITE_UPDATE => value.update += 1,
|
|
_ => value.unknown += 1,
|
|
}
|
|
}
|
|
|
|
pub fn commit(&self) {
|
|
let senders = self.inner.senders.lock();
|
|
let mut state = self.inner.state.lock();
|
|
let state = mem::take(&mut *state);
|
|
|
|
state.into_iter().for_each(|(table, entry)| {
|
|
if let Some(sender) = senders.get(&table) {
|
|
let _ = sender.send(entry);
|
|
}
|
|
});
|
|
}
|
|
|
|
pub fn rollback(&self) {
|
|
self.inner.state.lock().clear();
|
|
}
|
|
|
|
pub fn subscribe(&self, table: String) -> BroadcastStream<BroadcastMsg> {
|
|
let receiver = match self.inner.senders.lock().entry(table) {
|
|
Entry::Occupied(entry) => entry.get().subscribe(),
|
|
Entry::Vacant(entry) => {
|
|
let (sender, receiver) = broadcast::channel(Self::BROADCAST_CAP);
|
|
entry.insert(sender);
|
|
receiver
|
|
}
|
|
};
|
|
|
|
BroadcastStream::new(receiver)
|
|
}
|
|
|
|
pub fn unsubscribe(&self, table: &String) -> bool {
|
|
let mut tables = self.inner.senders.lock();
|
|
if let Some(sender) = tables.get(table) {
|
|
if sender.receiver_count() == 0 {
|
|
tables.remove(table);
|
|
if tables.is_empty() {
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
}
|