0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-05-13 15:13:08 +00:00
Files
libsql/libsql-server/src/main.rs
ad hoc 5129bdb2a3 libsq wal integration (#1427)
* integrate libsql-wal

* fix wal bug

* shutdown registry on shutdown

* fmt
2024-06-03 09:30:40 +00:00

718 lines
26 KiB
Rust

use std::env;
use std::fs::OpenOptions;
use std::io::{stdout, Write};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{bail, Context as _, Result};
use bytesize::ByteSize;
use clap::Parser;
use hyper::client::HttpConnector;
use libsql_server::auth::{parse_http_basic_auth_arg, parse_jwt_key, user_auth_strategies, Auth};
// use mimalloc::MiMalloc;
use tokio::sync::Notify;
use tokio::time::Duration;
use tracing_subscriber::prelude::*;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::Layer;
use libsql_server::config::{
AdminApiConfig, BottomlessConfig, DbConfig, HeartbeatConfig, MetaStoreConfig, RpcClientConfig,
RpcServerConfig, TlsConfig, UserApiConfig,
};
use libsql_server::net::AddrIncoming;
use libsql_server::Server;
use libsql_server::{connection::dump::exporter::export_dump, version::Version};
use libsql_sys::{Cipher, EncryptionConfig};
// Use system allocator for now, seems like we are getting too much fragmentation.
// #[global_allocator]
// static GLOBAL: MiMalloc = MiMalloc;
/// SQL daemon
#[derive(Debug, Parser)]
#[command(name = "sqld")]
#[command(about = "SQL daemon", version = Version::default(), long_about = None)]
struct Cli {
#[clap(long, short, default_value = "data.sqld", env = "SQLD_DB_PATH")]
db_path: PathBuf,
/// The directory path where trusted extensions can be loaded from.
/// If not present, extension loading is disabled.
/// If present, the directory is expected to have a trusted.lst file containing the sha256 and name of each extension, one per line. Example:
///
/// 99890762817735984843bf5cf02a4b2ea648018fd05f04df6f9ce7f976841510 math.dylib
#[clap(long, short)]
extensions_path: Option<PathBuf>,
#[clap(long, default_value = "127.0.0.1:8080", env = "SQLD_HTTP_LISTEN_ADDR")]
http_listen_addr: SocketAddr,
#[clap(long)]
enable_http_console: bool,
/// Address and port for the legacy, Web-Socket-only Hrana server.
#[clap(long, short = 'l', env = "SQLD_HRANA_LISTEN_ADDR")]
hrana_listen_addr: Option<SocketAddr>,
/// The address and port for the admin HTTP API.
#[clap(long, env = "SQLD_ADMIN_LISTEN_ADDR")]
admin_listen_addr: Option<SocketAddr>,
/// Path to a file with a JWT decoding key used to authenticate clients in the Hrana and HTTP
/// APIs. The key is either a PKCS#8-encoded Ed25519 public key in PEM, or just plain bytes of
/// the Ed25519 public key in URL-safe base64.
///
/// You can also pass the key directly in the env variable SQLD_AUTH_JWT_KEY.
#[clap(long, env = "SQLD_AUTH_JWT_KEY_FILE")]
auth_jwt_key_file: Option<PathBuf>,
/// Specifies legacy HTTP basic authentication. The argument must be in format "basic:$PARAM",
/// where $PARAM is base64-encoded string "$USERNAME:$PASSWORD".
#[clap(long, env = "SQLD_HTTP_AUTH")]
http_auth: Option<String>,
/// URL that points to the HTTP API of this server. If set, this is used to implement "sticky
/// sessions" in Hrana over HTTP.
#[clap(long, env = "SQLD_HTTP_SELF_URL")]
http_self_url: Option<String>,
/// The address and port the inter-node RPC protocol listens to. Example: `0.0.0.0:5001`.
#[clap(
long,
conflicts_with = "primary_grpc_url",
env = "SQLD_GRPC_LISTEN_ADDR"
)]
grpc_listen_addr: Option<SocketAddr>,
#[clap(
long,
requires = "grpc_cert_file",
requires = "grpc_key_file",
requires = "grpc_ca_cert_file"
)]
grpc_tls: bool,
#[clap(long)]
grpc_cert_file: Option<PathBuf>,
#[clap(long)]
grpc_key_file: Option<PathBuf>,
#[clap(long)]
grpc_ca_cert_file: Option<PathBuf>,
/// The gRPC URL of the primary node to connect to for writes. Example: `http://localhost:5001`.
#[clap(long, env = "SQLD_PRIMARY_GRPC_URL")]
primary_grpc_url: Option<String>,
#[clap(
long,
requires = "primary_grpc_cert_file",
requires = "primary_grpc_key_file",
requires = "primary_grpc_ca_cert_file"
)]
primary_grpc_tls: bool,
#[clap(long)]
primary_grpc_cert_file: Option<PathBuf>,
#[clap(long)]
primary_grpc_key_file: Option<PathBuf>,
#[clap(long)]
primary_grpc_ca_cert_file: Option<PathBuf>,
/// Don't display welcome message
#[clap(long)]
no_welcome: bool,
#[clap(long, env = "SQLD_ENABLE_BOTTOMLESS_REPLICATION")]
enable_bottomless_replication: bool,
/// The duration, in second, after which to shutdown the server if no request have been
/// received.
/// By default, the server doesn't shutdown when idle.
#[clap(long, env = "SQLD_IDLE_SHUTDOWN_TIMEOUT_S")]
idle_shutdown_timeout_s: Option<u64>,
/// Like idle_shutdown_timeout_s but used only once after the server is started.
/// After that server either is shut down because it does not receive any requests
/// or idle_shutdown_timeout_s is used moving forward.
#[clap(long, env = "SQLD_INITIAL_IDLE_SHUTDOWN_TIMEOUT_S")]
initial_idle_shutdown_timeout_s: Option<u64>,
/// Maximum size the replication log is allowed to grow (in MB).
/// defaults to 200MB.
#[clap(long, env = "SQLD_MAX_LOG_SIZE", default_value = "200")]
max_log_size: u64,
/// Maximum duration before the replication log is compacted (in seconds).
/// By default, the log is compacted only if it grows above the limit specified with
/// `--max-log-size`.
#[clap(long, env = "SQLD_MAX_LOG_DURATION")]
max_log_duration: Option<f32>,
#[clap(subcommand)]
utils: Option<UtilsSubcommands>,
/// The URL to send a server heartbeat `POST` request to.
/// By default, the server doesn't send a heartbeat.
#[clap(long, env = "SQLD_HEARTBEAT_URL")]
heartbeat_url: Option<String>,
/// The HTTP "Authornization" header to include in the a server heartbeat
/// `POST` request.
/// By default, the server doesn't send a heartbeat.
#[clap(long, env = "SQLD_HEARTBEAT_AUTH")]
heartbeat_auth: Option<String>,
/// The heartbeat time period in seconds.
/// By default, the the period is 30 seconds.
#[clap(long, env = "SQLD_HEARTBEAT_PERIOD_S", default_value = "30")]
heartbeat_period_s: u64,
/// Soft heap size limit in mebibytes - libSQL will try to not go over this limit with memory usage.
#[clap(long, env = "SQLD_SOFT_HEAP_LIMIT_MB")]
soft_heap_limit_mb: Option<usize>,
/// Hard heap size limit in mebibytes - libSQL will bail out with SQLITE_NOMEM error
/// if it goes over this limit with memory usage.
#[clap(long, env = "SQLD_HARD_HEAP_LIMIT_MB")]
hard_heap_limit_mb: Option<usize>,
/// Set the maximum size for a response. e.g 5KB, 10MB...
#[clap(long, env = "SQLD_MAX_RESPONSE_SIZE", default_value = "10MB")]
max_response_size: ByteSize,
/// Set the maximum size for all responses. e.g 5KB, 10MB...
#[clap(long, env = "SQLD_MAX_TOTAL_RESPONSE_SIZE", default_value = "32MB")]
max_total_response_size: ByteSize,
/// Set a command to execute when a snapshot file is generated.
#[clap(long, env = "SQLD_SNAPSHOT_EXEC")]
snapshot_exec: Option<String>,
/// Interval in seconds, in which WAL checkpoint is being called.
/// By default, the interval is 1 hour.
#[clap(long, env = "SQLD_CHECKPOINT_INTERVAL_S")]
checkpoint_interval_s: Option<u64>,
/// By default, all request for which a namespace can't be determined fallaback to the default
/// namespace `default`. This flag disables that.
#[clap(long)]
disable_default_namespace: bool,
/// Enable the namespaces features. Namespaces are disabled by default, and all requests target
/// the default namespace.
#[clap(long)]
enable_namespaces: bool,
/// Enable snapshot at shutdown
#[clap(long)]
snapshot_at_shutdown: bool,
/// Max active namespaces kept in-memory
#[clap(long, env = "SQLD_MAX_ACTIVE_NAMESPACES", default_value = "100")]
max_active_namespaces: usize,
/// Enable backup for the metadata store
#[clap(long, env = "SQLD_BACKUP_META_STORE")]
backup_meta_store: bool,
/// S3 access key ID for the meta store backup
#[clap(long, env = "SQLD_META_STORE_ACCESS_KEY_ID")]
meta_store_access_key_id: Option<String>,
/// S3 secret access key for the meta store backup
#[clap(long, env = "SQLD_META_STORE_SECRET_ACCESS")]
meta_store_secret_access_key: Option<String>,
/// S3 session token for the meta store backup
#[clap(long, env = "SQLD_META_STORE_SESSION_TOKEN")]
meta_store_session_token: Option<String>,
/// S3 region for the metastore backup
#[clap(long, env = "SQLD_META_STORE_REGION")]
meta_store_region: Option<String>,
/// Id for the meta store backup
#[clap(long, env = "SQLD_META_STORE_BACKUP_ID")]
meta_store_backup_id: Option<String>,
/// S3 bucket name for the meta store backup
#[clap(long, env = "SQLD_META_STORE_BUCKET_NAME")]
meta_store_bucket_name: Option<String>,
/// Interval at which to perform backups of the meta store
#[clap(long, env = "SQLD_META_STORE_BACKUP_INTERVAL_S")]
meta_store_backup_interval_s: Option<usize>,
/// S3 endpoint for the meta store backups
#[clap(long, env = "SQLD_META_STORE_BUCKET_ENDPOINT")]
meta_store_bucket_endpoint: Option<String>,
/// encryption_key for encryption at rest
#[clap(long, env = "SQLD_ENCRYPTION_KEY")]
encryption_key: Option<bytes::Bytes>,
#[clap(long, default_value = "128", env = "SQLD_MAX_CONCURRENT_CONNECTIONS")]
max_concurrent_connections: usize,
// max number of concurrent requests across all connections
#[clap(long, default_value = "128", env = "SQLD_MAX_CONCURRENT_REQUESTS")]
max_concurrent_requests: u64,
/// Allow meta store to recover config from filesystem from older version, if meta store is
/// empty on startup
#[clap(long, env = "SQLD_ALLOW_METASTORE_RECOVERY")]
allow_metastore_recovery: bool,
/// Shutdown timeout duration in seconds, defaults to 30 seconds.
#[clap(long, env = "SQLD_SHUTDOWN_TIMEOUT")]
shutdown_timeout: Option<u64>,
#[clap(long)]
use_libsql_wal: bool,
}
#[derive(clap::Subcommand, Debug)]
enum UtilsSubcommands {
Dump {
#[clap(long)]
/// Path at which to write the dump
path: Option<PathBuf>,
#[clap(long)]
namespace: String,
},
}
impl Cli {
#[rustfmt::skip]
fn print_welcome_message(&self) {
// no welcome :'(
if self.no_welcome { return }
eprintln!(r#" _ _ "#);
eprintln!(r#" ___ __ _| | __| |"#);
eprintln!(r#"/ __|/ _` | |/ _` |"#);
eprintln!(r#"\__ \ (_| | | (_| |"#);
eprintln!(r#"|___/\__, |_|\__,_|"#);
eprintln!(r#" |_| "#);
eprintln!();
eprintln!("Welcome to sqld!");
eprintln!();
eprintln!("version: {}", env!("CARGO_PKG_VERSION"));
if env!("VERGEN_GIT_SHA") != "VERGEN_IDEMPOTENT_OUTPUT" {
eprintln!("commit SHA: {}", env!("VERGEN_GIT_SHA"));
}
eprintln!("build date: {}", env!("VERGEN_BUILD_DATE"));
eprintln!();
eprintln!("This software is in BETA version.");
eprintln!("If you encounter any bug, please open an issue at https://github.com/tursodatabase/libsql/issues");
eprintln!();
eprintln!("config:");
eprint!("\t- mode: ");
match (&self.grpc_listen_addr, &self.primary_grpc_url) {
(None, None) => eprintln!("standalone"),
(Some(addr), None) => eprintln!("primary ({addr})"),
(None, Some(url)) => eprintln!("replica (primary at {url})"),
_ => unreachable!("invalid configuration!"),
};
eprintln!("\t- database path: {}", self.db_path.display());
let extensions_str = self.extensions_path.clone().map_or("<disabled>".to_string(), |x| x.display().to_string());
eprintln!("\t- extensions path: {extensions_str}");
eprintln!("\t- listening for HTTP requests on: {}", self.http_listen_addr);
eprintln!("\t- grpc_tls: {}", if self.grpc_tls { "yes" } else { "no" });
#[cfg(feature = "encryption")]
eprintln!("\t- encryption at rest: {}", if self.encryption_key.is_some() { "enabled" } else { "disabled" });
}
}
fn perform_dump(dump_path: Option<&Path>, db_path: &Path) -> anyhow::Result<()> {
let out: Box<dyn Write> = match dump_path {
Some(path) => {
let f = OpenOptions::new()
.create_new(true)
.write(true)
.open(path)
.with_context(|| format!("file `{}` already exists", path.display()))?;
Box::new(f)
}
None => Box::new(stdout()),
};
let conn = if cfg!(feature = "unix-excl-vfs") {
rusqlite::Connection::open_with_flags_and_vfs(
db_path.join("data"),
rusqlite::OpenFlags::default(),
"unix-excl",
)
} else {
rusqlite::Connection::open(db_path.join("data"))
}?;
export_dump(conn, out)?;
Ok(())
}
#[cfg(feature = "debug-tools")]
fn enable_libsql_logging() {
use std::ffi::c_int;
use std::sync::Once;
static ONCE: Once = Once::new();
fn libsql_log(code: c_int, msg: &str) {
tracing::error!("sqlite error {code}: {msg}");
}
ONCE.call_once(|| unsafe {
rusqlite::trace::config_log(Some(libsql_log)).unwrap();
});
}
fn make_db_config(config: &Cli) -> anyhow::Result<DbConfig> {
let encryption_config = config.encryption_key.as_ref().map(|key| EncryptionConfig {
cipher: Cipher::Aes256Cbc,
encryption_key: key.clone(),
});
let mut bottomless_replication = config
.enable_bottomless_replication
.then(bottomless::replicator::Options::from_env)
.transpose()?;
// Inherit encryption key for bottomless from the db config, if not specified.
if let Some(ref mut bottomless_replication) = bottomless_replication {
if bottomless_replication.encryption_config.is_none() {
bottomless_replication.encryption_config = encryption_config.clone();
}
}
Ok(DbConfig {
extensions_path: config.extensions_path.clone().map(Into::into),
bottomless_replication,
max_log_size: config.max_log_size,
max_log_duration: config.max_log_duration,
soft_heap_limit_mb: config.soft_heap_limit_mb,
hard_heap_limit_mb: config.hard_heap_limit_mb,
max_response_size: config.max_response_size.as_u64(),
max_total_response_size: config.max_total_response_size.as_u64(),
snapshot_exec: config.snapshot_exec.clone(),
checkpoint_interval: config.checkpoint_interval_s.map(Duration::from_secs),
snapshot_at_shutdown: config.snapshot_at_shutdown,
encryption_config: encryption_config.clone(),
max_concurrent_requests: config.max_concurrent_requests,
})
}
async fn make_user_auth_strategy(config: &Cli) -> anyhow::Result<Auth> {
if let Some(http_auth) = config.http_auth.as_deref() {
tracing::info!("Using legacy HTTP basic authentication");
let credential =
parse_http_basic_auth_arg(http_auth)?.expect("Invalid HTTP Basic configuration");
return Ok(Auth::new(user_auth_strategies::HttpBasic::new(
credential.into(),
)));
}
let auth_jwt_key = if let Some(ref file_path) = config.auth_jwt_key_file {
let data = tokio::fs::read_to_string(file_path)
.await
.context("Could not read file with JWT key")?;
Some(data)
} else {
match env::var("SQLD_AUTH_JWT_KEY") {
Ok(key) => Some(key),
Err(env::VarError::NotPresent) => None,
Err(env::VarError::NotUnicode(_)) => {
bail!("Env variable SQLD_AUTH_JWT_KEY does not contain a valid Unicode value")
}
}
};
if let Some(jwt_key) = auth_jwt_key.as_deref() {
let jwt_key: jsonwebtoken::DecodingKey =
parse_jwt_key(jwt_key).context("Could not parse JWT decoding key")?;
tracing::info!("Using JWT-based authentication");
return Ok(Auth::new(user_auth_strategies::Jwt::new(jwt_key)));
}
Ok(Auth::new(user_auth_strategies::Disabled::new()))
}
async fn make_user_api_config(config: &Cli) -> anyhow::Result<UserApiConfig> {
let http_acceptor =
AddrIncoming::new(tokio::net::TcpListener::bind(config.http_listen_addr).await?);
tracing::info!(
"listening for incoming user HTTP connection on {}",
config.http_listen_addr
);
let hrana_ws_acceptor = match config.hrana_listen_addr {
Some(addr) => {
let incoming = AddrIncoming::new(tokio::net::TcpListener::bind(addr).await?);
tracing::info!(
"listening for incoming user hrana websocket connection on {}",
addr
);
Some(incoming)
}
None => None,
};
let auth_strategy = make_user_auth_strategy(&config).await?;
Ok(UserApiConfig {
http_acceptor: Some(http_acceptor),
hrana_ws_acceptor,
enable_http_console: config.enable_http_console,
self_url: config.http_self_url.clone(),
auth_strategy,
})
}
async fn make_admin_api_config(config: &Cli) -> anyhow::Result<Option<AdminApiConfig>> {
match config.admin_listen_addr {
Some(addr) => {
let acceptor = AddrIncoming::new(tokio::net::TcpListener::bind(addr).await?);
tracing::info!("listening for incoming adming HTTP connection on {}", addr);
let connector = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.https_or_http()
.enable_http1()
.build();
Ok(Some(AdminApiConfig {
acceptor,
connector,
disable_metrics: false,
}))
}
None => Ok(None),
}
}
async fn make_rpc_server_config(config: &Cli) -> anyhow::Result<Option<RpcServerConfig>> {
match config.grpc_listen_addr {
Some(addr) => {
let acceptor = AddrIncoming::new(tokio::net::TcpListener::bind(addr).await?);
tracing::info!("listening for incoming gRPC connection on {}", addr);
let tls_config = if config.grpc_tls {
Some(TlsConfig {
cert: config
.grpc_cert_file
.clone()
.context("server tls is enabled but cert file is missing")?,
key: config
.grpc_key_file
.clone()
.context("server tls is enabled but key file is missing")?,
ca_cert: config
.grpc_ca_cert_file
.clone()
.context("server tls is enabled but ca_cert file is missing")?,
})
} else {
None
};
Ok(Some(RpcServerConfig {
acceptor,
tls_config,
}))
}
None => Ok(None),
}
}
async fn make_rpc_client_config(config: &Cli) -> anyhow::Result<Option<RpcClientConfig>> {
match config.primary_grpc_url {
Some(ref url) => {
let mut connector = HttpConnector::new();
connector.enforce_http(false);
connector.set_nodelay(true);
let tls_config = if config.primary_grpc_tls {
Some(TlsConfig {
cert: config
.primary_grpc_cert_file
.clone()
.context("client tls is enabled but cert file is missing")?,
key: config
.primary_grpc_key_file
.clone()
.context("client tls is enabled but key file is missing")?,
ca_cert: config
.primary_grpc_ca_cert_file
.clone()
.context("client tls is enabled but ca_cert file is missing")?,
})
} else {
None
};
Ok(Some(RpcClientConfig {
remote_url: url.clone(),
connector,
tls_config,
}))
}
None => Ok(None),
}
}
fn make_hearbeat_config(config: &Cli) -> Option<HeartbeatConfig> {
Some(HeartbeatConfig {
heartbeat_url: config.heartbeat_url.clone(),
heartbeat_period: Duration::from_secs(config.heartbeat_period_s),
heartbeat_auth: config.heartbeat_auth.clone(),
})
}
async fn shutdown_signal() -> Result<&'static str> {
use tokio::signal::unix::{signal, SignalKind};
let mut int = signal(SignalKind::interrupt())?;
let mut term = signal(SignalKind::terminate())?;
let signal = tokio::select! {
_ = int.recv() => "SIGINT",
_ = term.recv() => "SIGTERM",
};
Ok(signal)
}
fn make_meta_store_config(config: &Cli) -> anyhow::Result<MetaStoreConfig> {
let bottomless = if config.backup_meta_store {
Some(BottomlessConfig {
access_key_id: config
.meta_store_access_key_id
.clone()
.context("missing meta store bucket access key id")?,
secret_access_key: config
.meta_store_secret_access_key
.clone()
.context("missing meta store bucket secret access key")?,
session_token: config.meta_store_session_token.clone(),
region: config
.meta_store_region
.clone()
.context("missing meta store bucket region")?,
backup_id: config
.meta_store_backup_id
.clone()
.context("missing meta store backup id")?,
bucket_name: config
.meta_store_bucket_name
.clone()
.context("missing meta store bucket name")?,
backup_interval: Duration::from_secs(
config
.meta_store_backup_interval_s
.context("missing meta store backup internal")? as _,
),
bucket_endpoint: config
.meta_store_bucket_endpoint
.clone()
.context("missing meta store bucket name")?,
})
} else {
None
};
Ok(MetaStoreConfig {
bottomless,
allow_recover_from_fs: config.allow_metastore_recovery,
})
}
async fn build_server(config: &Cli) -> anyhow::Result<Server> {
let db_config = make_db_config(config)?;
let user_api_config = make_user_api_config(config).await?;
let admin_api_config = make_admin_api_config(config).await?;
let rpc_server_config = make_rpc_server_config(config).await?;
let rpc_client_config = make_rpc_client_config(config).await?;
let heartbeat_config = make_hearbeat_config(config);
let meta_store_config = make_meta_store_config(config)?;
let shutdown = Arc::new(Notify::new());
tokio::spawn({
let shutdown = shutdown.clone();
async move {
loop {
let signal = shutdown_signal()
.await
.expect("Failed to registry shutdown signals");
tracing::info!(
"Got {} shutdown signal, gracefully shutting down...this may take some time",
signal
);
shutdown.notify_waiters();
}
}
});
Ok(Server {
path: config.db_path.clone().into(),
db_config,
user_api_config,
admin_api_config,
rpc_server_config,
rpc_client_config,
heartbeat_config,
idle_shutdown_timeout: config.idle_shutdown_timeout_s.map(Duration::from_secs),
initial_idle_shutdown_timeout: config
.initial_idle_shutdown_timeout_s
.map(Duration::from_secs),
disable_default_namespace: config.disable_default_namespace,
disable_namespaces: !config.enable_namespaces,
shutdown,
max_active_namespaces: config.max_active_namespaces,
meta_store_config,
max_concurrent_connections: config.max_concurrent_connections,
shutdown_timeout: config
.shutdown_timeout
.map(Duration::from_secs)
.unwrap_or(Duration::from_secs(30)),
use_libsql_wal: config.use_libsql_wal,
})
}
#[tokio::main]
async fn main() -> Result<()> {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}
let registry = tracing_subscriber::registry();
#[cfg(feature = "debug-tools")]
let registry = registry.with(console_subscriber::spawn());
#[cfg(feature = "debug-tools")]
enable_libsql_logging();
registry
.with(
tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_filter(tracing_subscriber::EnvFilter::from_default_env()),
)
.init();
let args = Cli::parse();
match args.utils {
Some(UtilsSubcommands::Dump { path, namespace }) => {
if let Some(ref path) = path {
eprintln!(
"Dumping database {} to {}",
args.db_path.display(),
path.display()
);
}
let db_path = args.db_path.join("dbs").join(&namespace);
if !db_path.exists() {
bail!("no database for namespace `{namespace}`");
}
perform_dump(path.as_deref(), &db_path)
}
None => {
args.print_welcome_message();
let server = build_server(&args).await?;
server.start().await?;
Ok(())
}
}
}