mirror of
https://github.com/tursodatabase/libsql.git
synced 2025-05-13 15:13:08 +00:00
718 lines
26 KiB
Rust
718 lines
26 KiB
Rust
use std::env;
|
|
use std::fs::OpenOptions;
|
|
use std::io::{stdout, Write};
|
|
use std::net::SocketAddr;
|
|
use std::path::{Path, PathBuf};
|
|
use std::sync::Arc;
|
|
|
|
use anyhow::{bail, Context as _, Result};
|
|
use bytesize::ByteSize;
|
|
use clap::Parser;
|
|
use hyper::client::HttpConnector;
|
|
use libsql_server::auth::{parse_http_basic_auth_arg, parse_jwt_key, user_auth_strategies, Auth};
|
|
// use mimalloc::MiMalloc;
|
|
use tokio::sync::Notify;
|
|
use tokio::time::Duration;
|
|
use tracing_subscriber::prelude::*;
|
|
use tracing_subscriber::util::SubscriberInitExt;
|
|
use tracing_subscriber::Layer;
|
|
|
|
use libsql_server::config::{
|
|
AdminApiConfig, BottomlessConfig, DbConfig, HeartbeatConfig, MetaStoreConfig, RpcClientConfig,
|
|
RpcServerConfig, TlsConfig, UserApiConfig,
|
|
};
|
|
use libsql_server::net::AddrIncoming;
|
|
use libsql_server::Server;
|
|
use libsql_server::{connection::dump::exporter::export_dump, version::Version};
|
|
use libsql_sys::{Cipher, EncryptionConfig};
|
|
|
|
// Use system allocator for now, seems like we are getting too much fragmentation.
|
|
// #[global_allocator]
|
|
// static GLOBAL: MiMalloc = MiMalloc;
|
|
|
|
/// SQL daemon
|
|
#[derive(Debug, Parser)]
|
|
#[command(name = "sqld")]
|
|
#[command(about = "SQL daemon", version = Version::default(), long_about = None)]
|
|
struct Cli {
|
|
#[clap(long, short, default_value = "data.sqld", env = "SQLD_DB_PATH")]
|
|
db_path: PathBuf,
|
|
|
|
/// The directory path where trusted extensions can be loaded from.
|
|
/// If not present, extension loading is disabled.
|
|
/// If present, the directory is expected to have a trusted.lst file containing the sha256 and name of each extension, one per line. Example:
|
|
///
|
|
/// 99890762817735984843bf5cf02a4b2ea648018fd05f04df6f9ce7f976841510 math.dylib
|
|
#[clap(long, short)]
|
|
extensions_path: Option<PathBuf>,
|
|
|
|
#[clap(long, default_value = "127.0.0.1:8080", env = "SQLD_HTTP_LISTEN_ADDR")]
|
|
http_listen_addr: SocketAddr,
|
|
#[clap(long)]
|
|
enable_http_console: bool,
|
|
|
|
/// Address and port for the legacy, Web-Socket-only Hrana server.
|
|
#[clap(long, short = 'l', env = "SQLD_HRANA_LISTEN_ADDR")]
|
|
hrana_listen_addr: Option<SocketAddr>,
|
|
|
|
/// The address and port for the admin HTTP API.
|
|
#[clap(long, env = "SQLD_ADMIN_LISTEN_ADDR")]
|
|
admin_listen_addr: Option<SocketAddr>,
|
|
|
|
/// Path to a file with a JWT decoding key used to authenticate clients in the Hrana and HTTP
|
|
/// APIs. The key is either a PKCS#8-encoded Ed25519 public key in PEM, or just plain bytes of
|
|
/// the Ed25519 public key in URL-safe base64.
|
|
///
|
|
/// You can also pass the key directly in the env variable SQLD_AUTH_JWT_KEY.
|
|
#[clap(long, env = "SQLD_AUTH_JWT_KEY_FILE")]
|
|
auth_jwt_key_file: Option<PathBuf>,
|
|
/// Specifies legacy HTTP basic authentication. The argument must be in format "basic:$PARAM",
|
|
/// where $PARAM is base64-encoded string "$USERNAME:$PASSWORD".
|
|
#[clap(long, env = "SQLD_HTTP_AUTH")]
|
|
http_auth: Option<String>,
|
|
/// URL that points to the HTTP API of this server. If set, this is used to implement "sticky
|
|
/// sessions" in Hrana over HTTP.
|
|
#[clap(long, env = "SQLD_HTTP_SELF_URL")]
|
|
http_self_url: Option<String>,
|
|
|
|
/// The address and port the inter-node RPC protocol listens to. Example: `0.0.0.0:5001`.
|
|
#[clap(
|
|
long,
|
|
conflicts_with = "primary_grpc_url",
|
|
env = "SQLD_GRPC_LISTEN_ADDR"
|
|
)]
|
|
grpc_listen_addr: Option<SocketAddr>,
|
|
#[clap(
|
|
long,
|
|
requires = "grpc_cert_file",
|
|
requires = "grpc_key_file",
|
|
requires = "grpc_ca_cert_file"
|
|
)]
|
|
grpc_tls: bool,
|
|
#[clap(long)]
|
|
grpc_cert_file: Option<PathBuf>,
|
|
#[clap(long)]
|
|
grpc_key_file: Option<PathBuf>,
|
|
#[clap(long)]
|
|
grpc_ca_cert_file: Option<PathBuf>,
|
|
|
|
/// The gRPC URL of the primary node to connect to for writes. Example: `http://localhost:5001`.
|
|
#[clap(long, env = "SQLD_PRIMARY_GRPC_URL")]
|
|
primary_grpc_url: Option<String>,
|
|
#[clap(
|
|
long,
|
|
requires = "primary_grpc_cert_file",
|
|
requires = "primary_grpc_key_file",
|
|
requires = "primary_grpc_ca_cert_file"
|
|
)]
|
|
primary_grpc_tls: bool,
|
|
#[clap(long)]
|
|
primary_grpc_cert_file: Option<PathBuf>,
|
|
#[clap(long)]
|
|
primary_grpc_key_file: Option<PathBuf>,
|
|
#[clap(long)]
|
|
primary_grpc_ca_cert_file: Option<PathBuf>,
|
|
|
|
/// Don't display welcome message
|
|
#[clap(long)]
|
|
no_welcome: bool,
|
|
#[clap(long, env = "SQLD_ENABLE_BOTTOMLESS_REPLICATION")]
|
|
enable_bottomless_replication: bool,
|
|
/// The duration, in second, after which to shutdown the server if no request have been
|
|
/// received.
|
|
/// By default, the server doesn't shutdown when idle.
|
|
#[clap(long, env = "SQLD_IDLE_SHUTDOWN_TIMEOUT_S")]
|
|
idle_shutdown_timeout_s: Option<u64>,
|
|
|
|
/// Like idle_shutdown_timeout_s but used only once after the server is started.
|
|
/// After that server either is shut down because it does not receive any requests
|
|
/// or idle_shutdown_timeout_s is used moving forward.
|
|
#[clap(long, env = "SQLD_INITIAL_IDLE_SHUTDOWN_TIMEOUT_S")]
|
|
initial_idle_shutdown_timeout_s: Option<u64>,
|
|
|
|
/// Maximum size the replication log is allowed to grow (in MB).
|
|
/// defaults to 200MB.
|
|
#[clap(long, env = "SQLD_MAX_LOG_SIZE", default_value = "200")]
|
|
max_log_size: u64,
|
|
/// Maximum duration before the replication log is compacted (in seconds).
|
|
/// By default, the log is compacted only if it grows above the limit specified with
|
|
/// `--max-log-size`.
|
|
#[clap(long, env = "SQLD_MAX_LOG_DURATION")]
|
|
max_log_duration: Option<f32>,
|
|
|
|
#[clap(subcommand)]
|
|
utils: Option<UtilsSubcommands>,
|
|
|
|
/// The URL to send a server heartbeat `POST` request to.
|
|
/// By default, the server doesn't send a heartbeat.
|
|
#[clap(long, env = "SQLD_HEARTBEAT_URL")]
|
|
heartbeat_url: Option<String>,
|
|
|
|
/// The HTTP "Authornization" header to include in the a server heartbeat
|
|
/// `POST` request.
|
|
/// By default, the server doesn't send a heartbeat.
|
|
#[clap(long, env = "SQLD_HEARTBEAT_AUTH")]
|
|
heartbeat_auth: Option<String>,
|
|
|
|
/// The heartbeat time period in seconds.
|
|
/// By default, the the period is 30 seconds.
|
|
#[clap(long, env = "SQLD_HEARTBEAT_PERIOD_S", default_value = "30")]
|
|
heartbeat_period_s: u64,
|
|
|
|
/// Soft heap size limit in mebibytes - libSQL will try to not go over this limit with memory usage.
|
|
#[clap(long, env = "SQLD_SOFT_HEAP_LIMIT_MB")]
|
|
soft_heap_limit_mb: Option<usize>,
|
|
|
|
/// Hard heap size limit in mebibytes - libSQL will bail out with SQLITE_NOMEM error
|
|
/// if it goes over this limit with memory usage.
|
|
#[clap(long, env = "SQLD_HARD_HEAP_LIMIT_MB")]
|
|
hard_heap_limit_mb: Option<usize>,
|
|
|
|
/// Set the maximum size for a response. e.g 5KB, 10MB...
|
|
#[clap(long, env = "SQLD_MAX_RESPONSE_SIZE", default_value = "10MB")]
|
|
max_response_size: ByteSize,
|
|
|
|
/// Set the maximum size for all responses. e.g 5KB, 10MB...
|
|
#[clap(long, env = "SQLD_MAX_TOTAL_RESPONSE_SIZE", default_value = "32MB")]
|
|
max_total_response_size: ByteSize,
|
|
|
|
/// Set a command to execute when a snapshot file is generated.
|
|
#[clap(long, env = "SQLD_SNAPSHOT_EXEC")]
|
|
snapshot_exec: Option<String>,
|
|
|
|
/// Interval in seconds, in which WAL checkpoint is being called.
|
|
/// By default, the interval is 1 hour.
|
|
#[clap(long, env = "SQLD_CHECKPOINT_INTERVAL_S")]
|
|
checkpoint_interval_s: Option<u64>,
|
|
|
|
/// By default, all request for which a namespace can't be determined fallaback to the default
|
|
/// namespace `default`. This flag disables that.
|
|
#[clap(long)]
|
|
disable_default_namespace: bool,
|
|
|
|
/// Enable the namespaces features. Namespaces are disabled by default, and all requests target
|
|
/// the default namespace.
|
|
#[clap(long)]
|
|
enable_namespaces: bool,
|
|
|
|
/// Enable snapshot at shutdown
|
|
#[clap(long)]
|
|
snapshot_at_shutdown: bool,
|
|
|
|
/// Max active namespaces kept in-memory
|
|
#[clap(long, env = "SQLD_MAX_ACTIVE_NAMESPACES", default_value = "100")]
|
|
max_active_namespaces: usize,
|
|
|
|
/// Enable backup for the metadata store
|
|
#[clap(long, env = "SQLD_BACKUP_META_STORE")]
|
|
backup_meta_store: bool,
|
|
/// S3 access key ID for the meta store backup
|
|
#[clap(long, env = "SQLD_META_STORE_ACCESS_KEY_ID")]
|
|
meta_store_access_key_id: Option<String>,
|
|
/// S3 secret access key for the meta store backup
|
|
#[clap(long, env = "SQLD_META_STORE_SECRET_ACCESS")]
|
|
meta_store_secret_access_key: Option<String>,
|
|
/// S3 session token for the meta store backup
|
|
#[clap(long, env = "SQLD_META_STORE_SESSION_TOKEN")]
|
|
meta_store_session_token: Option<String>,
|
|
/// S3 region for the metastore backup
|
|
#[clap(long, env = "SQLD_META_STORE_REGION")]
|
|
meta_store_region: Option<String>,
|
|
/// Id for the meta store backup
|
|
#[clap(long, env = "SQLD_META_STORE_BACKUP_ID")]
|
|
meta_store_backup_id: Option<String>,
|
|
/// S3 bucket name for the meta store backup
|
|
#[clap(long, env = "SQLD_META_STORE_BUCKET_NAME")]
|
|
meta_store_bucket_name: Option<String>,
|
|
/// Interval at which to perform backups of the meta store
|
|
#[clap(long, env = "SQLD_META_STORE_BACKUP_INTERVAL_S")]
|
|
meta_store_backup_interval_s: Option<usize>,
|
|
/// S3 endpoint for the meta store backups
|
|
#[clap(long, env = "SQLD_META_STORE_BUCKET_ENDPOINT")]
|
|
meta_store_bucket_endpoint: Option<String>,
|
|
|
|
/// encryption_key for encryption at rest
|
|
#[clap(long, env = "SQLD_ENCRYPTION_KEY")]
|
|
encryption_key: Option<bytes::Bytes>,
|
|
|
|
#[clap(long, default_value = "128", env = "SQLD_MAX_CONCURRENT_CONNECTIONS")]
|
|
max_concurrent_connections: usize,
|
|
// max number of concurrent requests across all connections
|
|
#[clap(long, default_value = "128", env = "SQLD_MAX_CONCURRENT_REQUESTS")]
|
|
max_concurrent_requests: u64,
|
|
|
|
/// Allow meta store to recover config from filesystem from older version, if meta store is
|
|
/// empty on startup
|
|
#[clap(long, env = "SQLD_ALLOW_METASTORE_RECOVERY")]
|
|
allow_metastore_recovery: bool,
|
|
|
|
/// Shutdown timeout duration in seconds, defaults to 30 seconds.
|
|
#[clap(long, env = "SQLD_SHUTDOWN_TIMEOUT")]
|
|
shutdown_timeout: Option<u64>,
|
|
|
|
#[clap(long)]
|
|
use_libsql_wal: bool,
|
|
}
|
|
|
|
#[derive(clap::Subcommand, Debug)]
|
|
enum UtilsSubcommands {
|
|
Dump {
|
|
#[clap(long)]
|
|
/// Path at which to write the dump
|
|
path: Option<PathBuf>,
|
|
#[clap(long)]
|
|
namespace: String,
|
|
},
|
|
}
|
|
|
|
impl Cli {
|
|
#[rustfmt::skip]
|
|
fn print_welcome_message(&self) {
|
|
// no welcome :'(
|
|
if self.no_welcome { return }
|
|
|
|
eprintln!(r#" _ _ "#);
|
|
eprintln!(r#" ___ __ _| | __| |"#);
|
|
eprintln!(r#"/ __|/ _` | |/ _` |"#);
|
|
eprintln!(r#"\__ \ (_| | | (_| |"#);
|
|
eprintln!(r#"|___/\__, |_|\__,_|"#);
|
|
eprintln!(r#" |_| "#);
|
|
|
|
eprintln!();
|
|
eprintln!("Welcome to sqld!");
|
|
eprintln!();
|
|
eprintln!("version: {}", env!("CARGO_PKG_VERSION"));
|
|
if env!("VERGEN_GIT_SHA") != "VERGEN_IDEMPOTENT_OUTPUT" {
|
|
eprintln!("commit SHA: {}", env!("VERGEN_GIT_SHA"));
|
|
}
|
|
eprintln!("build date: {}", env!("VERGEN_BUILD_DATE"));
|
|
eprintln!();
|
|
eprintln!("This software is in BETA version.");
|
|
eprintln!("If you encounter any bug, please open an issue at https://github.com/tursodatabase/libsql/issues");
|
|
eprintln!();
|
|
|
|
eprintln!("config:");
|
|
|
|
eprint!("\t- mode: ");
|
|
match (&self.grpc_listen_addr, &self.primary_grpc_url) {
|
|
(None, None) => eprintln!("standalone"),
|
|
(Some(addr), None) => eprintln!("primary ({addr})"),
|
|
(None, Some(url)) => eprintln!("replica (primary at {url})"),
|
|
_ => unreachable!("invalid configuration!"),
|
|
};
|
|
eprintln!("\t- database path: {}", self.db_path.display());
|
|
let extensions_str = self.extensions_path.clone().map_or("<disabled>".to_string(), |x| x.display().to_string());
|
|
eprintln!("\t- extensions path: {extensions_str}");
|
|
eprintln!("\t- listening for HTTP requests on: {}", self.http_listen_addr);
|
|
eprintln!("\t- grpc_tls: {}", if self.grpc_tls { "yes" } else { "no" });
|
|
#[cfg(feature = "encryption")]
|
|
eprintln!("\t- encryption at rest: {}", if self.encryption_key.is_some() { "enabled" } else { "disabled" });
|
|
}
|
|
}
|
|
|
|
fn perform_dump(dump_path: Option<&Path>, db_path: &Path) -> anyhow::Result<()> {
|
|
let out: Box<dyn Write> = match dump_path {
|
|
Some(path) => {
|
|
let f = OpenOptions::new()
|
|
.create_new(true)
|
|
.write(true)
|
|
.open(path)
|
|
.with_context(|| format!("file `{}` already exists", path.display()))?;
|
|
Box::new(f)
|
|
}
|
|
None => Box::new(stdout()),
|
|
};
|
|
let conn = if cfg!(feature = "unix-excl-vfs") {
|
|
rusqlite::Connection::open_with_flags_and_vfs(
|
|
db_path.join("data"),
|
|
rusqlite::OpenFlags::default(),
|
|
"unix-excl",
|
|
)
|
|
} else {
|
|
rusqlite::Connection::open(db_path.join("data"))
|
|
}?;
|
|
|
|
export_dump(conn, out)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[cfg(feature = "debug-tools")]
|
|
fn enable_libsql_logging() {
|
|
use std::ffi::c_int;
|
|
use std::sync::Once;
|
|
static ONCE: Once = Once::new();
|
|
|
|
fn libsql_log(code: c_int, msg: &str) {
|
|
tracing::error!("sqlite error {code}: {msg}");
|
|
}
|
|
|
|
ONCE.call_once(|| unsafe {
|
|
rusqlite::trace::config_log(Some(libsql_log)).unwrap();
|
|
});
|
|
}
|
|
|
|
fn make_db_config(config: &Cli) -> anyhow::Result<DbConfig> {
|
|
let encryption_config = config.encryption_key.as_ref().map(|key| EncryptionConfig {
|
|
cipher: Cipher::Aes256Cbc,
|
|
encryption_key: key.clone(),
|
|
});
|
|
let mut bottomless_replication = config
|
|
.enable_bottomless_replication
|
|
.then(bottomless::replicator::Options::from_env)
|
|
.transpose()?;
|
|
// Inherit encryption key for bottomless from the db config, if not specified.
|
|
if let Some(ref mut bottomless_replication) = bottomless_replication {
|
|
if bottomless_replication.encryption_config.is_none() {
|
|
bottomless_replication.encryption_config = encryption_config.clone();
|
|
}
|
|
}
|
|
Ok(DbConfig {
|
|
extensions_path: config.extensions_path.clone().map(Into::into),
|
|
bottomless_replication,
|
|
max_log_size: config.max_log_size,
|
|
max_log_duration: config.max_log_duration,
|
|
soft_heap_limit_mb: config.soft_heap_limit_mb,
|
|
hard_heap_limit_mb: config.hard_heap_limit_mb,
|
|
max_response_size: config.max_response_size.as_u64(),
|
|
max_total_response_size: config.max_total_response_size.as_u64(),
|
|
snapshot_exec: config.snapshot_exec.clone(),
|
|
checkpoint_interval: config.checkpoint_interval_s.map(Duration::from_secs),
|
|
snapshot_at_shutdown: config.snapshot_at_shutdown,
|
|
encryption_config: encryption_config.clone(),
|
|
max_concurrent_requests: config.max_concurrent_requests,
|
|
})
|
|
}
|
|
|
|
async fn make_user_auth_strategy(config: &Cli) -> anyhow::Result<Auth> {
|
|
if let Some(http_auth) = config.http_auth.as_deref() {
|
|
tracing::info!("Using legacy HTTP basic authentication");
|
|
|
|
let credential =
|
|
parse_http_basic_auth_arg(http_auth)?.expect("Invalid HTTP Basic configuration");
|
|
|
|
return Ok(Auth::new(user_auth_strategies::HttpBasic::new(
|
|
credential.into(),
|
|
)));
|
|
}
|
|
|
|
let auth_jwt_key = if let Some(ref file_path) = config.auth_jwt_key_file {
|
|
let data = tokio::fs::read_to_string(file_path)
|
|
.await
|
|
.context("Could not read file with JWT key")?;
|
|
Some(data)
|
|
} else {
|
|
match env::var("SQLD_AUTH_JWT_KEY") {
|
|
Ok(key) => Some(key),
|
|
Err(env::VarError::NotPresent) => None,
|
|
Err(env::VarError::NotUnicode(_)) => {
|
|
bail!("Env variable SQLD_AUTH_JWT_KEY does not contain a valid Unicode value")
|
|
}
|
|
}
|
|
};
|
|
|
|
if let Some(jwt_key) = auth_jwt_key.as_deref() {
|
|
let jwt_key: jsonwebtoken::DecodingKey =
|
|
parse_jwt_key(jwt_key).context("Could not parse JWT decoding key")?;
|
|
tracing::info!("Using JWT-based authentication");
|
|
return Ok(Auth::new(user_auth_strategies::Jwt::new(jwt_key)));
|
|
}
|
|
|
|
Ok(Auth::new(user_auth_strategies::Disabled::new()))
|
|
}
|
|
|
|
async fn make_user_api_config(config: &Cli) -> anyhow::Result<UserApiConfig> {
|
|
let http_acceptor =
|
|
AddrIncoming::new(tokio::net::TcpListener::bind(config.http_listen_addr).await?);
|
|
tracing::info!(
|
|
"listening for incoming user HTTP connection on {}",
|
|
config.http_listen_addr
|
|
);
|
|
|
|
let hrana_ws_acceptor = match config.hrana_listen_addr {
|
|
Some(addr) => {
|
|
let incoming = AddrIncoming::new(tokio::net::TcpListener::bind(addr).await?);
|
|
|
|
tracing::info!(
|
|
"listening for incoming user hrana websocket connection on {}",
|
|
addr
|
|
);
|
|
|
|
Some(incoming)
|
|
}
|
|
None => None,
|
|
};
|
|
|
|
let auth_strategy = make_user_auth_strategy(&config).await?;
|
|
|
|
Ok(UserApiConfig {
|
|
http_acceptor: Some(http_acceptor),
|
|
hrana_ws_acceptor,
|
|
enable_http_console: config.enable_http_console,
|
|
self_url: config.http_self_url.clone(),
|
|
auth_strategy,
|
|
})
|
|
}
|
|
|
|
async fn make_admin_api_config(config: &Cli) -> anyhow::Result<Option<AdminApiConfig>> {
|
|
match config.admin_listen_addr {
|
|
Some(addr) => {
|
|
let acceptor = AddrIncoming::new(tokio::net::TcpListener::bind(addr).await?);
|
|
|
|
tracing::info!("listening for incoming adming HTTP connection on {}", addr);
|
|
let connector = hyper_rustls::HttpsConnectorBuilder::new()
|
|
.with_native_roots()
|
|
.https_or_http()
|
|
.enable_http1()
|
|
.build();
|
|
|
|
Ok(Some(AdminApiConfig {
|
|
acceptor,
|
|
connector,
|
|
disable_metrics: false,
|
|
}))
|
|
}
|
|
None => Ok(None),
|
|
}
|
|
}
|
|
|
|
async fn make_rpc_server_config(config: &Cli) -> anyhow::Result<Option<RpcServerConfig>> {
|
|
match config.grpc_listen_addr {
|
|
Some(addr) => {
|
|
let acceptor = AddrIncoming::new(tokio::net::TcpListener::bind(addr).await?);
|
|
|
|
tracing::info!("listening for incoming gRPC connection on {}", addr);
|
|
|
|
let tls_config = if config.grpc_tls {
|
|
Some(TlsConfig {
|
|
cert: config
|
|
.grpc_cert_file
|
|
.clone()
|
|
.context("server tls is enabled but cert file is missing")?,
|
|
key: config
|
|
.grpc_key_file
|
|
.clone()
|
|
.context("server tls is enabled but key file is missing")?,
|
|
ca_cert: config
|
|
.grpc_ca_cert_file
|
|
.clone()
|
|
.context("server tls is enabled but ca_cert file is missing")?,
|
|
})
|
|
} else {
|
|
None
|
|
};
|
|
|
|
Ok(Some(RpcServerConfig {
|
|
acceptor,
|
|
tls_config,
|
|
}))
|
|
}
|
|
None => Ok(None),
|
|
}
|
|
}
|
|
|
|
async fn make_rpc_client_config(config: &Cli) -> anyhow::Result<Option<RpcClientConfig>> {
|
|
match config.primary_grpc_url {
|
|
Some(ref url) => {
|
|
let mut connector = HttpConnector::new();
|
|
connector.enforce_http(false);
|
|
connector.set_nodelay(true);
|
|
let tls_config = if config.primary_grpc_tls {
|
|
Some(TlsConfig {
|
|
cert: config
|
|
.primary_grpc_cert_file
|
|
.clone()
|
|
.context("client tls is enabled but cert file is missing")?,
|
|
key: config
|
|
.primary_grpc_key_file
|
|
.clone()
|
|
.context("client tls is enabled but key file is missing")?,
|
|
ca_cert: config
|
|
.primary_grpc_ca_cert_file
|
|
.clone()
|
|
.context("client tls is enabled but ca_cert file is missing")?,
|
|
})
|
|
} else {
|
|
None
|
|
};
|
|
|
|
Ok(Some(RpcClientConfig {
|
|
remote_url: url.clone(),
|
|
connector,
|
|
tls_config,
|
|
}))
|
|
}
|
|
None => Ok(None),
|
|
}
|
|
}
|
|
|
|
fn make_hearbeat_config(config: &Cli) -> Option<HeartbeatConfig> {
|
|
Some(HeartbeatConfig {
|
|
heartbeat_url: config.heartbeat_url.clone(),
|
|
heartbeat_period: Duration::from_secs(config.heartbeat_period_s),
|
|
heartbeat_auth: config.heartbeat_auth.clone(),
|
|
})
|
|
}
|
|
|
|
async fn shutdown_signal() -> Result<&'static str> {
|
|
use tokio::signal::unix::{signal, SignalKind};
|
|
|
|
let mut int = signal(SignalKind::interrupt())?;
|
|
let mut term = signal(SignalKind::terminate())?;
|
|
|
|
let signal = tokio::select! {
|
|
_ = int.recv() => "SIGINT",
|
|
_ = term.recv() => "SIGTERM",
|
|
};
|
|
|
|
Ok(signal)
|
|
}
|
|
|
|
fn make_meta_store_config(config: &Cli) -> anyhow::Result<MetaStoreConfig> {
|
|
let bottomless = if config.backup_meta_store {
|
|
Some(BottomlessConfig {
|
|
access_key_id: config
|
|
.meta_store_access_key_id
|
|
.clone()
|
|
.context("missing meta store bucket access key id")?,
|
|
secret_access_key: config
|
|
.meta_store_secret_access_key
|
|
.clone()
|
|
.context("missing meta store bucket secret access key")?,
|
|
session_token: config.meta_store_session_token.clone(),
|
|
region: config
|
|
.meta_store_region
|
|
.clone()
|
|
.context("missing meta store bucket region")?,
|
|
backup_id: config
|
|
.meta_store_backup_id
|
|
.clone()
|
|
.context("missing meta store backup id")?,
|
|
bucket_name: config
|
|
.meta_store_bucket_name
|
|
.clone()
|
|
.context("missing meta store bucket name")?,
|
|
backup_interval: Duration::from_secs(
|
|
config
|
|
.meta_store_backup_interval_s
|
|
.context("missing meta store backup internal")? as _,
|
|
),
|
|
bucket_endpoint: config
|
|
.meta_store_bucket_endpoint
|
|
.clone()
|
|
.context("missing meta store bucket name")?,
|
|
})
|
|
} else {
|
|
None
|
|
};
|
|
|
|
Ok(MetaStoreConfig {
|
|
bottomless,
|
|
allow_recover_from_fs: config.allow_metastore_recovery,
|
|
})
|
|
}
|
|
|
|
async fn build_server(config: &Cli) -> anyhow::Result<Server> {
|
|
let db_config = make_db_config(config)?;
|
|
let user_api_config = make_user_api_config(config).await?;
|
|
let admin_api_config = make_admin_api_config(config).await?;
|
|
let rpc_server_config = make_rpc_server_config(config).await?;
|
|
let rpc_client_config = make_rpc_client_config(config).await?;
|
|
let heartbeat_config = make_hearbeat_config(config);
|
|
let meta_store_config = make_meta_store_config(config)?;
|
|
|
|
let shutdown = Arc::new(Notify::new());
|
|
tokio::spawn({
|
|
let shutdown = shutdown.clone();
|
|
async move {
|
|
loop {
|
|
let signal = shutdown_signal()
|
|
.await
|
|
.expect("Failed to registry shutdown signals");
|
|
|
|
tracing::info!(
|
|
"Got {} shutdown signal, gracefully shutting down...this may take some time",
|
|
signal
|
|
);
|
|
|
|
shutdown.notify_waiters();
|
|
}
|
|
}
|
|
});
|
|
|
|
Ok(Server {
|
|
path: config.db_path.clone().into(),
|
|
db_config,
|
|
user_api_config,
|
|
admin_api_config,
|
|
rpc_server_config,
|
|
rpc_client_config,
|
|
heartbeat_config,
|
|
idle_shutdown_timeout: config.idle_shutdown_timeout_s.map(Duration::from_secs),
|
|
initial_idle_shutdown_timeout: config
|
|
.initial_idle_shutdown_timeout_s
|
|
.map(Duration::from_secs),
|
|
disable_default_namespace: config.disable_default_namespace,
|
|
disable_namespaces: !config.enable_namespaces,
|
|
shutdown,
|
|
max_active_namespaces: config.max_active_namespaces,
|
|
meta_store_config,
|
|
max_concurrent_connections: config.max_concurrent_connections,
|
|
shutdown_timeout: config
|
|
.shutdown_timeout
|
|
.map(Duration::from_secs)
|
|
.unwrap_or(Duration::from_secs(30)),
|
|
use_libsql_wal: config.use_libsql_wal,
|
|
})
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> Result<()> {
|
|
if std::env::var("RUST_LOG").is_err() {
|
|
std::env::set_var("RUST_LOG", "info");
|
|
}
|
|
|
|
let registry = tracing_subscriber::registry();
|
|
|
|
#[cfg(feature = "debug-tools")]
|
|
let registry = registry.with(console_subscriber::spawn());
|
|
|
|
#[cfg(feature = "debug-tools")]
|
|
enable_libsql_logging();
|
|
|
|
registry
|
|
.with(
|
|
tracing_subscriber::fmt::layer()
|
|
.with_ansi(false)
|
|
.with_filter(tracing_subscriber::EnvFilter::from_default_env()),
|
|
)
|
|
.init();
|
|
|
|
let args = Cli::parse();
|
|
|
|
match args.utils {
|
|
Some(UtilsSubcommands::Dump { path, namespace }) => {
|
|
if let Some(ref path) = path {
|
|
eprintln!(
|
|
"Dumping database {} to {}",
|
|
args.db_path.display(),
|
|
path.display()
|
|
);
|
|
}
|
|
let db_path = args.db_path.join("dbs").join(&namespace);
|
|
if !db_path.exists() {
|
|
bail!("no database for namespace `{namespace}`");
|
|
}
|
|
|
|
perform_dump(path.as_deref(), &db_path)
|
|
}
|
|
None => {
|
|
args.print_welcome_message();
|
|
let server = build_server(&args).await?;
|
|
server.start().await?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
}
|