0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-08-17 08:23:22 +00:00
Files
libsql/libsql-server/src/broadcaster.rs
Athos 7ce212604e Proof of concept of CDC on libsql-server (#1388)
* Add SQLD_HTTP_PRIMARY_URL config and pass it all the way down to user routes

* Add broadcaster struct to namespaces to notify updates

* Add user route that allows listening to updates

* Aggregate events before broadcasting them

* Unsubscribe when stream is dropped and other minor improvements

* Add BrodcasterRegistry and BrodcasterHandle to manage broadcaster lifecycle

* Simplify code

* Update hooks during statement execution

Connection can live arbitrarily long, so we can't only register hooks
when they are created.

* Always inline should serialize checker of broadcaster message

Co-authored-by: Lucio Franco <luciofranco14@gmail.com>

* Replace HashMap with BTreeMap on broadcaster state

It improves performance on the case where we have thousands of updates
to the same table.

* Add counters to events sent and dropped

---------

Co-authored-by: Lucio Franco <luciofranco14@gmail.com>
2024-06-13 13:28:06 +00:00

105 lines
3.0 KiB
Rust

use std::{
collections::{hash_map::Entry, BTreeMap, HashMap},
mem,
sync::Arc,
};
use parking_lot::Mutex;
use serde::Serialize;
use tokio::sync::broadcast::{self};
use tokio_stream::wrappers::BroadcastStream;
#[derive(Debug, Copy, Clone, Serialize, Default)]
pub struct BroadcastMsg {
#[serde(skip_serializing_if = "is_zero")]
pub unknown: u64,
#[serde(skip_serializing_if = "is_zero")]
pub delete: u64,
#[serde(skip_serializing_if = "is_zero")]
pub insert: u64,
#[serde(skip_serializing_if = "is_zero")]
pub update: u64,
}
#[inline(always)]
fn is_zero(num: &u64) -> bool {
*num == 0
}
#[derive(Debug, Default)]
pub struct BroadcasterInner {
state: Mutex<BTreeMap<String, BroadcastMsg>>,
senders: Mutex<HashMap<String, broadcast::Sender<BroadcastMsg>>>,
}
#[derive(Debug, Default, Clone)]
pub struct Broadcaster {
inner: Arc<BroadcasterInner>,
}
impl Broadcaster {
const BROADCAST_CAP: usize = 1024;
pub fn notify(&self, table: &str, action: rusqlite::hooks::Action) {
let mut state = self.inner.state.lock();
if let Some(entry) = state.get_mut(table) {
Self::increment(entry, action);
} else {
let mut entry = BroadcastMsg::default();
Self::increment(&mut entry, action);
state.insert(table.into(), entry);
}
}
fn increment(value: &mut BroadcastMsg, action: rusqlite::hooks::Action) {
match action {
rusqlite::hooks::Action::SQLITE_DELETE => value.delete += 1,
rusqlite::hooks::Action::SQLITE_INSERT => value.insert += 1,
rusqlite::hooks::Action::SQLITE_UPDATE => value.update += 1,
_ => value.unknown += 1,
}
}
pub fn commit(&self) {
let senders = self.inner.senders.lock();
let mut state = self.inner.state.lock();
let state = mem::take(&mut *state);
state.into_iter().for_each(|(table, entry)| {
if let Some(sender) = senders.get(&table) {
let _ = sender.send(entry);
}
});
}
pub fn rollback(&self) {
self.inner.state.lock().clear();
}
pub fn subscribe(&self, table: String) -> BroadcastStream<BroadcastMsg> {
let receiver = match self.inner.senders.lock().entry(table) {
Entry::Occupied(entry) => entry.get().subscribe(),
Entry::Vacant(entry) => {
let (sender, receiver) = broadcast::channel(Self::BROADCAST_CAP);
entry.insert(sender);
receiver
}
};
BroadcastStream::new(receiver)
}
pub fn unsubscribe(&self, table: &String) -> bool {
let mut tables = self.inner.senders.lock();
if let Some(sender) = tables.get(table) {
if sender.receiver_count() == 0 {
tables.remove(table);
if tables.is_empty() {
return false;
}
}
}
return true;
}
}