0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-07-22 05:00:35 +00:00
Files
libsql/libsql-server/src/lib.rs
2023-10-17 17:41:26 +02:00

584 lines
20 KiB
Rust

#![allow(clippy::type_complexity, clippy::too_many_arguments)]
use std::path::{Path, PathBuf};
use std::process::Command;
use std::str::FromStr;
use std::sync::{Arc, Weak};
use crate::auth::Auth;
use crate::connection::{Connection, MakeConnection};
use crate::error::Error;
use crate::migration::maybe_migrate;
use crate::net::Accept;
use crate::rpc::proxy::rpc::proxy_server::Proxy;
use crate::rpc::proxy::ProxyService;
use crate::rpc::replica_proxy::ReplicaProxyService;
use crate::rpc::replication_log::rpc::replication_log_server::ReplicationLog;
use crate::rpc::replication_log::ReplicationLogService;
use crate::rpc::replication_log_proxy::ReplicationLogProxyService;
use crate::rpc::run_rpc_server;
use crate::stats::Stats;
use anyhow::Context as AnyhowContext;
use config::{
AdminApiConfig, DbConfig, HeartbeatConfig, RpcClientConfig, RpcServerConfig, UserApiConfig,
};
use http::user::UserApi;
use hyper::client::HttpConnector;
use hyper_rustls::HttpsConnector;
use namespace::{
MakeNamespace, NamespaceName, NamespaceStore, PrimaryNamespaceConfig, PrimaryNamespaceMaker,
ReplicaNamespaceConfig, ReplicaNamespaceMaker,
};
use net::Connector;
use once_cell::sync::Lazy;
use replication::NamespacedSnapshotCallback;
pub use sqld_libsql_bindings as libsql_bindings;
use tokio::runtime::Runtime;
use tokio::sync::{mpsc, Notify};
use tokio::task::JoinSet;
use tokio::time::Duration;
use url::Url;
use utils::services::idle_shutdown::IdleShutdownKicker;
use self::net::AddrIncoming;
pub mod config;
pub mod connection;
pub mod net;
pub mod rpc;
pub mod version;
mod auth;
mod database;
mod error;
mod h2c;
mod heartbeat;
mod hrana;
mod http;
mod metrics;
mod migration;
mod namespace;
mod query;
mod query_analysis;
mod query_result_builder;
mod replication;
mod stats;
#[cfg(test)]
mod test;
mod utils;
const MAX_CONCURRENT_DBS: usize = 128;
const DB_CREATE_TIMEOUT: Duration = Duration::from_secs(1);
const DEFAULT_AUTO_CHECKPOINT: u32 = 1000;
const LIBSQL_PAGE_SIZE: u64 = 4096;
pub(crate) static BLOCKING_RT: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.max_blocking_threads(50_000)
.enable_all()
.build()
.unwrap()
});
type Result<T, E = Error> = std::result::Result<T, E>;
type StatsSender = mpsc::Sender<(NamespaceName, Weak<Stats>)>;
pub struct Server<C = HttpConnector, A = AddrIncoming, D = HttpsConnector<HttpConnector>> {
pub path: Arc<Path>,
pub db_config: DbConfig,
pub user_api_config: UserApiConfig<A>,
pub admin_api_config: Option<AdminApiConfig<A, D>>,
pub rpc_server_config: Option<RpcServerConfig<A>>,
pub rpc_client_config: Option<RpcClientConfig<C>>,
pub idle_shutdown_timeout: Option<Duration>,
pub initial_idle_shutdown_timeout: Option<Duration>,
pub disable_default_namespace: bool,
pub heartbeat_config: Option<HeartbeatConfig>,
pub disable_namespaces: bool,
pub shutdown: Arc<Notify>,
}
impl<C, A, D> Default for Server<C, A, D> {
fn default() -> Self {
Self {
path: PathBuf::from("data.sqld").into(),
db_config: Default::default(),
user_api_config: Default::default(),
admin_api_config: Default::default(),
rpc_server_config: Default::default(),
rpc_client_config: Default::default(),
idle_shutdown_timeout: Default::default(),
initial_idle_shutdown_timeout: Default::default(),
disable_default_namespace: false,
heartbeat_config: Default::default(),
disable_namespaces: true,
shutdown: Default::default(),
}
}
}
struct Services<M: MakeNamespace, A, P, S, C> {
namespaces: NamespaceStore<M>,
idle_shutdown_kicker: Option<IdleShutdownKicker>,
proxy_service: P,
replication_service: S,
user_api_config: UserApiConfig<A>,
admin_api_config: Option<AdminApiConfig<A, C>>,
disable_namespaces: bool,
disable_default_namespace: bool,
db_config: DbConfig,
auth: Arc<Auth>,
path: Arc<Path>,
}
impl<M, A, P, S, C> Services<M, A, P, S, C>
where
M: MakeNamespace,
A: crate::net::Accept,
P: Proxy,
S: ReplicationLog,
C: Connector,
{
fn configure(self, join_set: &mut JoinSet<anyhow::Result<()>>) {
let user_http = UserApi {
http_acceptor: self.user_api_config.http_acceptor,
hrana_ws_acceptor: self.user_api_config.hrana_ws_acceptor,
auth: self.auth,
namespaces: self.namespaces.clone(),
idle_shutdown_kicker: self.idle_shutdown_kicker.clone(),
proxy_service: self.proxy_service,
replication_service: self.replication_service,
disable_default_namespace: self.disable_default_namespace,
disable_namespaces: self.disable_namespaces,
max_response_size: self.db_config.max_response_size,
enable_console: self.user_api_config.enable_http_console,
self_url: self.user_api_config.self_url,
path: self.path.clone(),
};
let user_http_service = user_http.configure(join_set);
if let Some(AdminApiConfig {
acceptor,
connector,
disable_metrics,
}) = self.admin_api_config
{
join_set.spawn(http::admin::run(
acceptor,
user_http_service,
self.namespaces,
connector,
disable_metrics,
));
}
}
}
async fn run_periodic_checkpoint<C>(
connection_maker: Arc<C>,
period: Duration,
) -> anyhow::Result<()>
where
C: MakeConnection,
{
use tokio::time::{interval, sleep, Instant, MissedTickBehavior};
const RETRY_INTERVAL: Duration = Duration::from_secs(60);
tracing::info!("setting checkpoint interval to {:?}", period);
let mut interval = interval(period);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let mut retry: Option<Duration> = None;
loop {
if let Some(retry) = retry.take() {
if retry.is_zero() {
tracing::warn!("database was not set in WAL journal mode");
return Ok(());
}
sleep(retry).await;
} else {
interval.tick().await;
}
retry = match connection_maker.create().await {
Ok(conn) => {
if let Err(e) = conn.vacuum_if_needed().await {
tracing::warn!("vacuum failed: {}", e);
}
tracing::info!("database checkpoint starts");
let start = Instant::now();
match conn.checkpoint().await {
Ok(_) => {
let elapsed = Instant::now() - start;
if elapsed >= Duration::from_secs(10) {
tracing::warn!("database checkpoint finished (took: {:?})", elapsed);
} else {
tracing::info!("database checkpoint finished (took: {:?})", elapsed);
}
None
}
Err(err) => {
tracing::warn!("failed to execute checkpoint: {}", err);
Some(RETRY_INTERVAL)
}
}
}
Err(err) => {
tracing::warn!("couldn't connect: {}", err);
Some(RETRY_INTERVAL)
}
}
}
}
fn sentinel_file_path(path: &Path) -> PathBuf {
path.join(".sentinel")
}
/// initialize the sentinel file. This file is created at the beginning of the process, and is
/// deleted at the end, on a clean exit. If the file is present when we start the process, this
/// means that the database was not shutdown properly, and might need repair. This function return
/// `true` if the database is dirty and needs repair.
fn init_sentinel_file(path: &Path) -> anyhow::Result<bool> {
let path = sentinel_file_path(path);
if path.try_exists()? {
return Ok(true);
}
std::fs::File::create(path)?;
Ok(false)
}
fn init_version_file(db_path: &Path) -> anyhow::Result<()> {
// try to detect the presence of the data file at the root of db_path. If it's there, it's a
// pre-0.18.0 database and needs to be migrated
if db_path.join("data").exists() {
return Ok(());
}
let version_path = db_path.join(".version");
if !version_path.exists() {
std::fs::create_dir_all(db_path)?;
std::fs::write(version_path, env!("CARGO_PKG_VERSION"))?;
}
Ok(())
}
impl<C, A, D> Server<C, A, D>
where
C: Connector,
A: Accept,
D: Connector,
{
/// Setup sqlite global environment
fn init_sqlite_globals(&self) {
if self.db_config.bottomless_replication.is_some() {
bottomless::static_init::register_bottomless_methods();
}
if let Some(soft_limit_mb) = self.db_config.soft_heap_limit_mb {
tracing::warn!("Setting soft heap limit to {soft_limit_mb}MiB");
unsafe {
sqld_libsql_bindings::ffi::sqlite3_soft_heap_limit64(
soft_limit_mb as i64 * 1024 * 1024,
)
};
}
if let Some(hard_limit_mb) = self.db_config.hard_heap_limit_mb {
tracing::warn!("Setting hard heap limit to {hard_limit_mb}MiB");
unsafe {
sqld_libsql_bindings::ffi::sqlite3_hard_heap_limit64(
hard_limit_mb as i64 * 1024 * 1024,
)
};
}
}
pub fn make_snapshot_callback(&self) -> NamespacedSnapshotCallback {
let snapshot_exec = self.db_config.snapshot_exec.clone();
Arc::new(move |snapshot_file: &Path, namespace: &NamespaceName| {
if let Some(exec) = snapshot_exec.as_ref() {
let status = Command::new(exec)
.arg(snapshot_file)
.arg(namespace.as_str())
.status()?;
anyhow::ensure!(
status.success(),
"Snapshot exec process failed with status {status}"
);
}
Ok(())
})
}
fn spawn_monitoring_tasks(
&self,
join_set: &mut JoinSet<anyhow::Result<()>>,
stats_receiver: mpsc::Receiver<(NamespaceName, Weak<Stats>)>,
) -> anyhow::Result<()> {
match self.heartbeat_config {
Some(ref config) => {
tracing::info!(
"Server sending heartbeat to URL {} every {:?}",
config.heartbeat_url,
config.heartbeat_period,
);
join_set.spawn({
let heartbeat_auth = config.heartbeat_auth.clone();
let heartbeat_period = config.heartbeat_period;
let heartbeat_url =
Url::from_str(&config.heartbeat_url).context("invalid heartbeat URL")?;
async move {
heartbeat::server_heartbeat(
heartbeat_url,
heartbeat_auth,
heartbeat_period,
stats_receiver,
)
.await;
Ok(())
}
});
// join_set.spawn(run_storage_monitor(self.path.clone(), stats));
}
None => {
tracing::warn!("No server heartbeat configured")
}
}
Ok(())
}
pub async fn start(self) -> anyhow::Result<()> {
let mut join_set = JoinSet::new();
init_version_file(&self.path)?;
maybe_migrate(&self.path)?;
let (stats_sender, stats_receiver) = mpsc::channel(8);
self.spawn_monitoring_tasks(&mut join_set, stats_receiver)?;
self.init_sqlite_globals();
let db_is_dirty = init_sentinel_file(&self.path)?;
let idle_shutdown_kicker = self.setup_shutdown();
let snapshot_callback = self.make_snapshot_callback();
let auth = self.user_api_config.get_auth().map(Arc::new)?;
let extensions = self.db_config.validate_extensions()?;
match self.rpc_client_config {
Some(rpc_config) => {
let replica = Replica {
rpc_config,
stats_sender,
extensions,
db_config: self.db_config.clone(),
base_path: self.path.clone(),
auth: auth.clone(),
};
let (namespaces, proxy_service, replication_service) = replica.configure().await?;
let services = Services {
namespaces,
idle_shutdown_kicker,
proxy_service,
replication_service,
user_api_config: self.user_api_config,
admin_api_config: self.admin_api_config,
disable_namespaces: self.disable_namespaces,
disable_default_namespace: self.disable_default_namespace,
db_config: self.db_config,
auth,
path: self.path.clone(),
};
services.configure(&mut join_set);
}
None => {
let primary = Primary {
rpc_config: self.rpc_server_config,
db_config: self.db_config.clone(),
idle_shutdown_kicker: idle_shutdown_kicker.clone(),
stats_sender,
db_is_dirty,
snapshot_callback,
extensions,
base_path: self.path.clone(),
disable_namespaces: self.disable_namespaces,
join_set: &mut join_set,
auth: auth.clone(),
};
let (namespaces, proxy_service, replication_service) = primary.configure().await?;
let services = Services {
namespaces,
idle_shutdown_kicker,
proxy_service,
replication_service,
user_api_config: self.user_api_config,
admin_api_config: self.admin_api_config,
disable_namespaces: self.disable_namespaces,
disable_default_namespace: self.disable_default_namespace,
db_config: self.db_config,
auth,
path: self.path.clone(),
};
services.configure(&mut join_set);
}
}
tokio::select! {
_ = self.shutdown.notified() => {
join_set.shutdown().await;
// clean shutdown, remove sentinel file
std::fs::remove_file(sentinel_file_path(&self.path))?;
}
Some(res) = join_set.join_next() => {
res??;
},
else => (),
}
Ok(())
}
fn setup_shutdown(&self) -> Option<IdleShutdownKicker> {
let shutdown_notify = self.shutdown.clone();
self.idle_shutdown_timeout.map(|d| {
IdleShutdownKicker::new(d, self.initial_idle_shutdown_timeout, shutdown_notify)
})
}
}
struct Primary<'a, A> {
rpc_config: Option<RpcServerConfig<A>>,
db_config: DbConfig,
idle_shutdown_kicker: Option<IdleShutdownKicker>,
stats_sender: StatsSender,
db_is_dirty: bool,
snapshot_callback: NamespacedSnapshotCallback,
extensions: Arc<[PathBuf]>,
base_path: Arc<Path>,
disable_namespaces: bool,
auth: Arc<Auth>,
join_set: &'a mut JoinSet<anyhow::Result<()>>,
}
impl<A> Primary<'_, A>
where
A: Accept,
{
async fn configure(
mut self,
) -> anyhow::Result<(
NamespaceStore<PrimaryNamespaceMaker>,
ProxyService,
ReplicationLogService,
)> {
let conf = PrimaryNamespaceConfig {
base_path: self.base_path,
max_log_size: self.db_config.max_log_size,
db_is_dirty: self.db_is_dirty,
max_log_duration: self.db_config.max_log_duration.map(Duration::from_secs_f32),
snapshot_callback: self.snapshot_callback,
bottomless_replication: self.db_config.bottomless_replication,
extensions: self.extensions,
stats_sender: self.stats_sender.clone(),
max_response_size: self.db_config.max_response_size,
max_total_response_size: self.db_config.max_total_response_size,
checkpoint_interval: self.db_config.checkpoint_interval,
disable_namespace: self.disable_namespaces,
};
let factory = PrimaryNamespaceMaker::new(conf);
let namespaces = NamespaceStore::new(factory, false);
// eagerly load the default namespace when namespaces are disabled
if self.disable_namespaces {
namespaces
.create(NamespaceName::default(), namespace::RestoreOption::Latest)
.await?;
}
if let Some(config) = self.rpc_config.take() {
let proxy_service =
ProxyService::new(namespaces.clone(), None, self.disable_namespaces);
// Garbage collect proxy clients every 30 seconds
self.join_set.spawn({
let clients = proxy_service.clients();
async move {
loop {
tokio::time::sleep(Duration::from_secs(30)).await;
rpc::proxy::garbage_collect(&mut *clients.write().await).await;
}
}
});
self.join_set.spawn(run_rpc_server(
proxy_service,
config.acceptor,
config.tls_config,
self.idle_shutdown_kicker.clone(),
namespaces.clone(),
self.disable_namespaces,
));
}
let logger_service = ReplicationLogService::new(
namespaces.clone(),
self.idle_shutdown_kicker,
Some(self.auth.clone()),
self.disable_namespaces,
);
let proxy_service =
ProxyService::new(namespaces.clone(), Some(self.auth), self.disable_namespaces);
// Garbage collect proxy clients every 30 seconds
self.join_set.spawn({
let clients = proxy_service.clients();
async move {
loop {
tokio::time::sleep(Duration::from_secs(30)).await;
rpc::proxy::garbage_collect(&mut *clients.write().await).await;
}
}
});
Ok((namespaces, proxy_service, logger_service))
}
}
struct Replica<C> {
rpc_config: RpcClientConfig<C>,
stats_sender: StatsSender,
extensions: Arc<[PathBuf]>,
db_config: DbConfig,
base_path: Arc<Path>,
auth: Arc<Auth>,
}
impl<C: Connector> Replica<C> {
async fn configure(
self,
) -> anyhow::Result<(
NamespaceStore<impl MakeNamespace>,
impl Proxy,
impl ReplicationLog,
)> {
let (channel, uri) = self.rpc_config.configure().await?;
let conf = ReplicaNamespaceConfig {
channel: channel.clone(),
uri: uri.clone(),
extensions: self.extensions.clone(),
stats_sender: self.stats_sender.clone(),
base_path: self.base_path,
max_response_size: self.db_config.max_response_size,
max_total_response_size: self.db_config.max_total_response_size,
};
let factory = ReplicaNamespaceMaker::new(conf);
let namespaces = NamespaceStore::new(factory, true);
let replication_service = ReplicationLogProxyService::new(channel.clone(), uri.clone());
let proxy_service = ReplicaProxyService::new(channel, uri, self.auth.clone());
Ok((namespaces, proxy_service, replication_service))
}
}