mirror of
https://github.com/tursodatabase/libsql.git
synced 2025-01-22 12:38:56 +00:00
7ce212604e
* Add SQLD_HTTP_PRIMARY_URL config and pass it all the way down to user routes * Add broadcaster struct to namespaces to notify updates * Add user route that allows listening to updates * Aggregate events before broadcasting them * Unsubscribe when stream is dropped and other minor improvements * Add BrodcasterRegistry and BrodcasterHandle to manage broadcaster lifecycle * Simplify code * Update hooks during statement execution Connection can live arbitrarily long, so we can't only register hooks when they are created. * Always inline should serialize checker of broadcaster message Co-authored-by: Lucio Franco <luciofranco14@gmail.com> * Replace HashMap with BTreeMap on broadcaster state It improves performance on the case where we have thousands of updates to the same table. * Add counters to events sent and dropped --------- Co-authored-by: Lucio Franco <luciofranco14@gmail.com>
196 lines
6.2 KiB
Rust
196 lines
6.2 KiB
Rust
use std::path::{Path, PathBuf};
|
|
use std::sync::Arc;
|
|
|
|
use anyhow::Context;
|
|
use hyper::client::HttpConnector;
|
|
use hyper_rustls::HttpsConnector;
|
|
use libsql_sys::EncryptionConfig;
|
|
use sha256::try_digest;
|
|
use tokio::time::Duration;
|
|
use tonic::transport::Channel;
|
|
use tower::ServiceExt;
|
|
|
|
use crate::auth::{Auth, Disabled};
|
|
use crate::net::{AddrIncoming, Connector};
|
|
|
|
pub struct RpcClientConfig<C = HttpConnector> {
|
|
pub remote_url: String,
|
|
pub tls_config: Option<TlsConfig>,
|
|
pub connector: C,
|
|
}
|
|
|
|
impl<C: Connector> RpcClientConfig<C> {
|
|
pub(crate) async fn configure(&self) -> anyhow::Result<(Channel, tonic::transport::Uri)> {
|
|
let uri = tonic::transport::Uri::from_maybe_shared(self.remote_url.clone())?;
|
|
let mut builder = Channel::builder(uri.clone());
|
|
if let Some(ref tls_config) = self.tls_config {
|
|
let cert_pem = std::fs::read_to_string(&tls_config.cert)?;
|
|
let key_pem = std::fs::read_to_string(&tls_config.key)?;
|
|
let identity = tonic::transport::Identity::from_pem(cert_pem, key_pem);
|
|
|
|
let ca_cert_pem = std::fs::read_to_string(&tls_config.ca_cert)?;
|
|
let ca_cert = tonic::transport::Certificate::from_pem(ca_cert_pem);
|
|
|
|
let tls_config = tonic::transport::ClientTlsConfig::new()
|
|
.identity(identity)
|
|
.ca_certificate(ca_cert)
|
|
.domain_name("sqld");
|
|
builder = builder.tls_config(tls_config)?;
|
|
}
|
|
|
|
let channel =
|
|
builder.connect_with_connector_lazy(self.connector.clone().map_err(Into::into));
|
|
|
|
Ok((channel, uri))
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct TlsConfig {
|
|
pub cert: PathBuf,
|
|
pub key: PathBuf,
|
|
pub ca_cert: PathBuf,
|
|
}
|
|
|
|
pub struct RpcServerConfig<A = AddrIncoming> {
|
|
pub acceptor: A,
|
|
pub tls_config: Option<TlsConfig>,
|
|
}
|
|
|
|
pub struct UserApiConfig<A = AddrIncoming> {
|
|
pub hrana_ws_acceptor: Option<A>,
|
|
pub http_acceptor: Option<A>,
|
|
pub enable_http_console: bool,
|
|
pub self_url: Option<String>,
|
|
pub primary_url: Option<String>,
|
|
pub auth_strategy: Auth,
|
|
}
|
|
|
|
impl<A> Default for UserApiConfig<A> {
|
|
fn default() -> Self {
|
|
Self {
|
|
hrana_ws_acceptor: Default::default(),
|
|
http_acceptor: Default::default(),
|
|
enable_http_console: Default::default(),
|
|
self_url: Default::default(),
|
|
primary_url: Default::default(),
|
|
auth_strategy: Auth::new(Disabled::new()),
|
|
}
|
|
}
|
|
}
|
|
|
|
pub struct AdminApiConfig<A = AddrIncoming, C = HttpsConnector<HttpConnector>> {
|
|
pub acceptor: A,
|
|
pub connector: C,
|
|
pub disable_metrics: bool,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct DbConfig {
|
|
pub extensions_path: Option<Arc<Path>>,
|
|
pub bottomless_replication: Option<bottomless::replicator::Options>,
|
|
pub max_log_size: u64,
|
|
pub max_log_duration: Option<f32>,
|
|
pub soft_heap_limit_mb: Option<usize>,
|
|
pub hard_heap_limit_mb: Option<usize>,
|
|
pub max_response_size: u64,
|
|
pub max_total_response_size: u64,
|
|
pub snapshot_exec: Option<String>,
|
|
pub checkpoint_interval: Option<Duration>,
|
|
pub snapshot_at_shutdown: bool,
|
|
pub encryption_config: Option<EncryptionConfig>,
|
|
pub max_concurrent_requests: u64,
|
|
}
|
|
|
|
impl Default for DbConfig {
|
|
fn default() -> Self {
|
|
Self {
|
|
extensions_path: None,
|
|
bottomless_replication: None,
|
|
max_log_size: bytesize::mb(200u64),
|
|
max_log_duration: None,
|
|
soft_heap_limit_mb: None,
|
|
hard_heap_limit_mb: None,
|
|
max_response_size: bytesize::mb(10u64),
|
|
max_total_response_size: bytesize::mb(10u64),
|
|
snapshot_exec: None,
|
|
checkpoint_interval: None,
|
|
snapshot_at_shutdown: false,
|
|
encryption_config: None,
|
|
max_concurrent_requests: 128,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl DbConfig {
|
|
pub fn validate_extensions(&self) -> anyhow::Result<Arc<[PathBuf]>> {
|
|
let mut valid_extensions = vec![];
|
|
if let Some(ext_dir) = &self.extensions_path {
|
|
let extensions_list = ext_dir.join("trusted.lst");
|
|
|
|
let file_contents = std::fs::read_to_string(&extensions_list)
|
|
.with_context(|| format!("can't read {}", &extensions_list.display()))?;
|
|
|
|
let extensions = file_contents.lines().filter(|c| !c.is_empty());
|
|
|
|
for line in extensions {
|
|
let mut ext_info = line.trim().split_ascii_whitespace();
|
|
|
|
let ext_sha = ext_info.next().ok_or_else(|| {
|
|
anyhow::anyhow!("invalid line on {}: {}", &extensions_list.display(), line)
|
|
})?;
|
|
let ext_fname = ext_info.next().ok_or_else(|| {
|
|
anyhow::anyhow!("invalid line on {}: {}", &extensions_list.display(), line)
|
|
})?;
|
|
|
|
anyhow::ensure!(
|
|
ext_info.next().is_none(),
|
|
"extension list seem to contain a filename with whitespaces. Rejected"
|
|
);
|
|
|
|
let extension_full_path = ext_dir.join(ext_fname);
|
|
let digest = try_digest(extension_full_path.as_path()).with_context(|| {
|
|
format!(
|
|
"Failed to get sha256 digest, while trying to read {}",
|
|
extension_full_path.display()
|
|
)
|
|
})?;
|
|
|
|
anyhow::ensure!(
|
|
digest == ext_sha,
|
|
"sha256 differs for {}. Got {}",
|
|
ext_fname,
|
|
digest
|
|
);
|
|
valid_extensions.push(extension_full_path);
|
|
}
|
|
}
|
|
|
|
Ok(valid_extensions.into())
|
|
}
|
|
}
|
|
|
|
pub struct HeartbeatConfig {
|
|
pub heartbeat_url: Option<String>,
|
|
pub heartbeat_period: Duration,
|
|
pub heartbeat_auth: Option<String>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Default)]
|
|
pub struct MetaStoreConfig {
|
|
pub bottomless: Option<BottomlessConfig>,
|
|
pub allow_recover_from_fs: bool,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct BottomlessConfig {
|
|
pub access_key_id: String,
|
|
pub secret_access_key: String,
|
|
pub session_token: Option<String>,
|
|
pub region: String,
|
|
pub backup_id: String,
|
|
pub bucket_name: String,
|
|
pub backup_interval: Duration,
|
|
pub bucket_endpoint: String,
|
|
}
|