use std::env;
use std::fs::OpenOptions;
use std::io::{stdout, Write};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use anyhow::{bail, Context as _, Result};
use bytesize::ByteSize;
use clap::Parser;
use hyper::client::HttpConnector;
use libsql_server::auth::{parse_http_basic_auth_arg, parse_jwt_key, user_auth_strategies, Auth};
// use mimalloc::MiMalloc;
use tokio::sync::Notify;
use tokio::time::Duration;
use tracing_subscriber::prelude::*;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::Layer;

use libsql_server::config::{
    AdminApiConfig, BottomlessConfig, DbConfig, HeartbeatConfig, MetaStoreConfig, RpcClientConfig,
    RpcServerConfig, TlsConfig, UserApiConfig,
};
use libsql_server::net::AddrIncoming;
use libsql_server::Server;
use libsql_server::{connection::dump::exporter::export_dump, version::Version};
use libsql_sys::{Cipher, EncryptionConfig};

// Use system allocator for now, seems like we are getting too much fragmentation.
// #[global_allocator]
// static GLOBAL: MiMalloc = MiMalloc;

/// SQL daemon
#[derive(Debug, Parser)]
#[command(name = "sqld")]
#[command(about = "SQL daemon", version = Version::default(), long_about = None)]
struct Cli {
    #[clap(long, short, default_value = "data.sqld", env = "SQLD_DB_PATH")]
    db_path: PathBuf,

    /// The directory path where trusted extensions can be loaded from.
    /// If not present, extension loading is disabled.
    /// If present, the directory is expected to have a trusted.lst file containing the sha256 and name of each extension, one per line. Example:
    ///
    /// 99890762817735984843bf5cf02a4b2ea648018fd05f04df6f9ce7f976841510  math.dylib
    #[clap(long, short)]
    extensions_path: Option<PathBuf>,

    #[clap(long, default_value = "127.0.0.1:8080", env = "SQLD_HTTP_LISTEN_ADDR")]
    http_listen_addr: SocketAddr,
    #[clap(long)]
    enable_http_console: bool,

    /// Address and port for the legacy, Web-Socket-only Hrana server.
    #[clap(long, short = 'l', env = "SQLD_HRANA_LISTEN_ADDR")]
    hrana_listen_addr: Option<SocketAddr>,

    /// The address and port for the admin HTTP API.
    #[clap(long, env = "SQLD_ADMIN_LISTEN_ADDR")]
    admin_listen_addr: Option<SocketAddr>,

    /// Path to a file with a JWT decoding key used to authenticate clients in the Hrana and HTTP
    /// APIs. The key is either a PKCS#8-encoded Ed25519 public key in PEM, or just plain bytes of
    /// the Ed25519 public key in URL-safe base64.
    ///
    /// You can also pass the key directly in the env variable SQLD_AUTH_JWT_KEY.
    #[clap(long, env = "SQLD_AUTH_JWT_KEY_FILE")]
    auth_jwt_key_file: Option<PathBuf>,
    /// Specifies legacy HTTP basic authentication. The argument must be in format "basic:$PARAM",
    /// where $PARAM is base64-encoded string "$USERNAME:$PASSWORD".
    #[clap(long, env = "SQLD_HTTP_AUTH")]
    http_auth: Option<String>,
    /// URL that points to the HTTP API of this server. If set, this is used to implement "sticky
    /// sessions" in Hrana over HTTP.
    #[clap(long, env = "SQLD_HTTP_SELF_URL")]
    http_self_url: Option<String>,

    /// The address and port the inter-node RPC protocol listens to. Example: `0.0.0.0:5001`.
    #[clap(
        long,
        conflicts_with = "primary_grpc_url",
        env = "SQLD_GRPC_LISTEN_ADDR"
    )]
    grpc_listen_addr: Option<SocketAddr>,
    #[clap(
        long,
        requires = "grpc_cert_file",
        requires = "grpc_key_file",
        requires = "grpc_ca_cert_file"
    )]
    grpc_tls: bool,
    #[clap(long)]
    grpc_cert_file: Option<PathBuf>,
    #[clap(long)]
    grpc_key_file: Option<PathBuf>,
    #[clap(long)]
    grpc_ca_cert_file: Option<PathBuf>,

    /// The gRPC URL of the primary node to connect to for writes. Example: `http://localhost:5001`.
    #[clap(long, env = "SQLD_PRIMARY_GRPC_URL")]
    primary_grpc_url: Option<String>,
    #[clap(
        long,
        requires = "primary_grpc_cert_file",
        requires = "primary_grpc_key_file",
        requires = "primary_grpc_ca_cert_file"
    )]
    primary_grpc_tls: bool,
    #[clap(long)]
    primary_grpc_cert_file: Option<PathBuf>,
    #[clap(long)]
    primary_grpc_key_file: Option<PathBuf>,
    #[clap(long)]
    primary_grpc_ca_cert_file: Option<PathBuf>,

    /// Don't display welcome message
    #[clap(long)]
    no_welcome: bool,
    #[clap(long, env = "SQLD_ENABLE_BOTTOMLESS_REPLICATION")]
    enable_bottomless_replication: bool,
    /// The duration, in second, after which to shutdown the server if no request have been
    /// received.
    /// By default, the server doesn't shutdown when idle.
    #[clap(long, env = "SQLD_IDLE_SHUTDOWN_TIMEOUT_S")]
    idle_shutdown_timeout_s: Option<u64>,

    /// Like idle_shutdown_timeout_s but used only once after the server is started.
    /// After that server either is shut down because it does not receive any requests
    /// or idle_shutdown_timeout_s is used moving forward.
    #[clap(long, env = "SQLD_INITIAL_IDLE_SHUTDOWN_TIMEOUT_S")]
    initial_idle_shutdown_timeout_s: Option<u64>,

    /// Maximum size the replication log is allowed to grow (in MB).
    /// defaults to 200MB.
    #[clap(long, env = "SQLD_MAX_LOG_SIZE", default_value = "200")]
    max_log_size: u64,
    /// Maximum duration before the replication log is compacted (in seconds).
    /// By default, the log is compacted only if it grows above the limit specified with
    /// `--max-log-size`.
    #[clap(long, env = "SQLD_MAX_LOG_DURATION")]
    max_log_duration: Option<f32>,

    #[clap(subcommand)]
    utils: Option<UtilsSubcommands>,

    /// The URL to send a server heartbeat `POST` request to.
    /// By default, the server doesn't send a heartbeat.
    #[clap(long, env = "SQLD_HEARTBEAT_URL")]
    heartbeat_url: Option<String>,

    /// The HTTP "Authornization" header to include in the a server heartbeat
    /// `POST` request.
    /// By default, the server doesn't send a heartbeat.
    #[clap(long, env = "SQLD_HEARTBEAT_AUTH")]
    heartbeat_auth: Option<String>,

    /// The heartbeat time period in seconds.
    /// By default, the the period is 30 seconds.
    #[clap(long, env = "SQLD_HEARTBEAT_PERIOD_S", default_value = "30")]
    heartbeat_period_s: u64,

    /// Soft heap size limit in mebibytes - libSQL will try to not go over this limit with memory usage.
    #[clap(long, env = "SQLD_SOFT_HEAP_LIMIT_MB")]
    soft_heap_limit_mb: Option<usize>,

    /// Hard heap size limit in mebibytes - libSQL will bail out with SQLITE_NOMEM error
    /// if it goes over this limit with memory usage.
    #[clap(long, env = "SQLD_HARD_HEAP_LIMIT_MB")]
    hard_heap_limit_mb: Option<usize>,

    /// Set the maximum size for a response. e.g 5KB, 10MB...
    #[clap(long, env = "SQLD_MAX_RESPONSE_SIZE", default_value = "10MB")]
    max_response_size: ByteSize,

    /// Set the maximum size for all responses. e.g 5KB, 10MB...
    #[clap(long, env = "SQLD_MAX_TOTAL_RESPONSE_SIZE", default_value = "32MB")]
    max_total_response_size: ByteSize,

    /// Set a command to execute when a snapshot file is generated.
    #[clap(long, env = "SQLD_SNAPSHOT_EXEC")]
    snapshot_exec: Option<String>,

    /// Interval in seconds, in which WAL checkpoint is being called.
    /// By default, the interval is 1 hour.
    #[clap(long, env = "SQLD_CHECKPOINT_INTERVAL_S")]
    checkpoint_interval_s: Option<u64>,

    /// By default, all request for which a namespace can't be determined fallaback to the default
    /// namespace `default`. This flag disables that.
    #[clap(long)]
    disable_default_namespace: bool,

    /// Enable the namespaces features. Namespaces are disabled by default, and all requests target
    /// the default namespace.
    #[clap(long)]
    enable_namespaces: bool,

    /// Enable snapshot at shutdown
    #[clap(long)]
    snapshot_at_shutdown: bool,

    /// Max active namespaces kept in-memory
    #[clap(long, env = "SQLD_MAX_ACTIVE_NAMESPACES", default_value = "100")]
    max_active_namespaces: usize,

    /// Enable backup for the metadata store
    #[clap(long, env = "SQLD_BACKUP_META_STORE")]
    backup_meta_store: bool,
    /// S3 access key ID for the meta store backup
    #[clap(long, env = "SQLD_META_STORE_ACCESS_KEY_ID")]
    meta_store_access_key_id: Option<String>,
    /// S3 secret access key for the meta store backup
    #[clap(long, env = "SQLD_META_STORE_SECRET_ACCESS")]
    meta_store_secret_access_key: Option<String>,
    /// S3 session token for the meta store backup
    #[clap(long, env = "SQLD_META_STORE_SESSION_TOKEN")]
    meta_store_session_token: Option<String>,
    /// S3 region for the metastore backup
    #[clap(long, env = "SQLD_META_STORE_REGION")]
    meta_store_region: Option<String>,
    /// Id for the meta store backup
    #[clap(long, env = "SQLD_META_STORE_BACKUP_ID")]
    meta_store_backup_id: Option<String>,
    /// S3 bucket name for the meta store backup
    #[clap(long, env = "SQLD_META_STORE_BUCKET_NAME")]
    meta_store_bucket_name: Option<String>,
    /// Interval at which to perform backups of the meta store
    #[clap(long, env = "SQLD_META_STORE_BACKUP_INTERVAL_S")]
    meta_store_backup_interval_s: Option<usize>,
    /// S3 endpoint for the meta store backups
    #[clap(long, env = "SQLD_META_STORE_BUCKET_ENDPOINT")]
    meta_store_bucket_endpoint: Option<String>,

    /// encryption_key for encryption at rest
    #[clap(long, env = "SQLD_ENCRYPTION_KEY")]
    encryption_key: Option<bytes::Bytes>,

    #[clap(long, default_value = "128", env = "SQLD_MAX_CONCURRENT_CONNECTIONS")]
    max_concurrent_connections: usize,
    // max number of concurrent requests across all connections
    #[clap(long, default_value = "128", env = "SQLD_MAX_CONCURRENT_REQUESTS")]
    max_concurrent_requests: u64,

    /// Allow meta store to recover config from filesystem from older version, if meta store is
    /// empty on startup
    #[clap(long, env = "SQLD_ALLOW_METASTORE_RECOVERY")]
    allow_metastore_recovery: bool,

    /// Shutdown timeout duration in seconds, defaults to 30 seconds.
    #[clap(long, env = "SQLD_SHUTDOWN_TIMEOUT")]
    shutdown_timeout: Option<u64>,

    #[clap(long)]
    use_libsql_wal: bool,
}

#[derive(clap::Subcommand, Debug)]
enum UtilsSubcommands {
    Dump {
        #[clap(long)]
        /// Path at which to write the dump
        path: Option<PathBuf>,
        #[clap(long)]
        namespace: String,
    },
}

impl Cli {
    #[rustfmt::skip]
    fn print_welcome_message(&self) {
        // no welcome :'(
        if self.no_welcome { return }

        eprintln!(r#"           _     _ "#);
        eprintln!(r#" ___  __ _| | __| |"#);
        eprintln!(r#"/ __|/ _` | |/ _` |"#);
        eprintln!(r#"\__ \ (_| | | (_| |"#);
        eprintln!(r#"|___/\__, |_|\__,_|"#);
        eprintln!(r#"        |_|        "#);

        eprintln!();
        eprintln!("Welcome to sqld!");
        eprintln!();
        eprintln!("version: {}", env!("CARGO_PKG_VERSION"));
        if env!("VERGEN_GIT_SHA") != "VERGEN_IDEMPOTENT_OUTPUT" {
            eprintln!("commit SHA: {}", env!("VERGEN_GIT_SHA"));
        }
        eprintln!("build date: {}", env!("VERGEN_BUILD_DATE"));
        eprintln!();
        eprintln!("This software is in BETA version.");
        eprintln!("If you encounter any bug, please open an issue at https://github.com/tursodatabase/libsql/issues");
        eprintln!();

        eprintln!("config:");

        eprint!("\t- mode: ");
        match (&self.grpc_listen_addr, &self.primary_grpc_url) {
            (None, None) => eprintln!("standalone"),
            (Some(addr), None) => eprintln!("primary ({addr})"),
            (None, Some(url)) => eprintln!("replica (primary at {url})"),
            _ => unreachable!("invalid configuration!"),
        };
        eprintln!("\t- database path: {}", self.db_path.display());
        let extensions_str = self.extensions_path.clone().map_or("<disabled>".to_string(), |x| x.display().to_string());
        eprintln!("\t- extensions path: {extensions_str}");
        eprintln!("\t- listening for HTTP requests on: {}", self.http_listen_addr);
        eprintln!("\t- grpc_tls: {}", if self.grpc_tls { "yes" } else { "no" });
        #[cfg(feature = "encryption")]
        eprintln!("\t- encryption at rest: {}", if self.encryption_key.is_some() { "enabled" } else { "disabled" });
    }
}

fn perform_dump(dump_path: Option<&Path>, db_path: &Path) -> anyhow::Result<()> {
    let out: Box<dyn Write> = match dump_path {
        Some(path) => {
            let f = OpenOptions::new()
                .create_new(true)
                .write(true)
                .open(path)
                .with_context(|| format!("file `{}` already exists", path.display()))?;
            Box::new(f)
        }
        None => Box::new(stdout()),
    };
    let conn = if cfg!(feature = "unix-excl-vfs") {
        rusqlite::Connection::open_with_flags_and_vfs(
            db_path.join("data"),
            rusqlite::OpenFlags::default(),
            "unix-excl",
        )
    } else {
        rusqlite::Connection::open(db_path.join("data"))
    }?;

    export_dump(conn, out)?;

    Ok(())
}

#[cfg(feature = "debug-tools")]
fn enable_libsql_logging() {
    use std::ffi::c_int;
    use std::sync::Once;
    static ONCE: Once = Once::new();

    fn libsql_log(code: c_int, msg: &str) {
        tracing::error!("sqlite error {code}: {msg}");
    }

    ONCE.call_once(|| unsafe {
        rusqlite::trace::config_log(Some(libsql_log)).unwrap();
    });
}

fn make_db_config(config: &Cli) -> anyhow::Result<DbConfig> {
    let encryption_config = config.encryption_key.as_ref().map(|key| EncryptionConfig {
        cipher: Cipher::Aes256Cbc,
        encryption_key: key.clone(),
    });
    let mut bottomless_replication = config
        .enable_bottomless_replication
        .then(bottomless::replicator::Options::from_env)
        .transpose()?;
    // Inherit encryption key for bottomless from the db config, if not specified.
    if let Some(ref mut bottomless_replication) = bottomless_replication {
        if bottomless_replication.encryption_config.is_none() {
            bottomless_replication.encryption_config = encryption_config.clone();
        }
    }
    Ok(DbConfig {
        extensions_path: config.extensions_path.clone().map(Into::into),
        bottomless_replication,
        max_log_size: config.max_log_size,
        max_log_duration: config.max_log_duration,
        soft_heap_limit_mb: config.soft_heap_limit_mb,
        hard_heap_limit_mb: config.hard_heap_limit_mb,
        max_response_size: config.max_response_size.as_u64(),
        max_total_response_size: config.max_total_response_size.as_u64(),
        snapshot_exec: config.snapshot_exec.clone(),
        checkpoint_interval: config.checkpoint_interval_s.map(Duration::from_secs),
        snapshot_at_shutdown: config.snapshot_at_shutdown,
        encryption_config: encryption_config.clone(),
        max_concurrent_requests: config.max_concurrent_requests,
    })
}

async fn make_user_auth_strategy(config: &Cli) -> anyhow::Result<Auth> {
    if let Some(http_auth) = config.http_auth.as_deref() {
        tracing::info!("Using legacy HTTP basic authentication");

        let credential =
            parse_http_basic_auth_arg(http_auth)?.expect("Invalid HTTP Basic configuration");

        return Ok(Auth::new(user_auth_strategies::HttpBasic::new(
            credential.into(),
        )));
    }

    let auth_jwt_key = if let Some(ref file_path) = config.auth_jwt_key_file {
        let data = tokio::fs::read_to_string(file_path)
            .await
            .context("Could not read file with JWT key")?;
        Some(data)
    } else {
        match env::var("SQLD_AUTH_JWT_KEY") {
            Ok(key) => Some(key),
            Err(env::VarError::NotPresent) => None,
            Err(env::VarError::NotUnicode(_)) => {
                bail!("Env variable SQLD_AUTH_JWT_KEY does not contain a valid Unicode value")
            }
        }
    };

    if let Some(jwt_key) = auth_jwt_key.as_deref() {
        let jwt_key: jsonwebtoken::DecodingKey =
            parse_jwt_key(jwt_key).context("Could not parse JWT decoding key")?;
        tracing::info!("Using JWT-based authentication");
        return Ok(Auth::new(user_auth_strategies::Jwt::new(jwt_key)));
    }

    Ok(Auth::new(user_auth_strategies::Disabled::new()))
}

async fn make_user_api_config(config: &Cli) -> anyhow::Result<UserApiConfig> {
    let http_acceptor =
        AddrIncoming::new(tokio::net::TcpListener::bind(config.http_listen_addr).await?);
    tracing::info!(
        "listening for incoming user HTTP connection on {}",
        config.http_listen_addr
    );

    let hrana_ws_acceptor = match config.hrana_listen_addr {
        Some(addr) => {
            let incoming = AddrIncoming::new(tokio::net::TcpListener::bind(addr).await?);

            tracing::info!(
                "listening for incoming user hrana websocket connection on {}",
                addr
            );

            Some(incoming)
        }
        None => None,
    };

    let auth_strategy = make_user_auth_strategy(&config).await?;

    Ok(UserApiConfig {
        http_acceptor: Some(http_acceptor),
        hrana_ws_acceptor,
        enable_http_console: config.enable_http_console,
        self_url: config.http_self_url.clone(),
        auth_strategy,
    })
}

async fn make_admin_api_config(config: &Cli) -> anyhow::Result<Option<AdminApiConfig>> {
    match config.admin_listen_addr {
        Some(addr) => {
            let acceptor = AddrIncoming::new(tokio::net::TcpListener::bind(addr).await?);

            tracing::info!("listening for incoming adming HTTP connection on {}", addr);
            let connector = hyper_rustls::HttpsConnectorBuilder::new()
                .with_native_roots()
                .https_or_http()
                .enable_http1()
                .build();

            Ok(Some(AdminApiConfig {
                acceptor,
                connector,
                disable_metrics: false,
            }))
        }
        None => Ok(None),
    }
}

async fn make_rpc_server_config(config: &Cli) -> anyhow::Result<Option<RpcServerConfig>> {
    match config.grpc_listen_addr {
        Some(addr) => {
            let acceptor = AddrIncoming::new(tokio::net::TcpListener::bind(addr).await?);

            tracing::info!("listening for incoming gRPC connection on {}", addr);

            let tls_config = if config.grpc_tls {
                Some(TlsConfig {
                    cert: config
                        .grpc_cert_file
                        .clone()
                        .context("server tls is enabled but cert file is missing")?,
                    key: config
                        .grpc_key_file
                        .clone()
                        .context("server tls is enabled but key file is missing")?,
                    ca_cert: config
                        .grpc_ca_cert_file
                        .clone()
                        .context("server tls is enabled but ca_cert file is missing")?,
                })
            } else {
                None
            };

            Ok(Some(RpcServerConfig {
                acceptor,
                tls_config,
            }))
        }
        None => Ok(None),
    }
}

async fn make_rpc_client_config(config: &Cli) -> anyhow::Result<Option<RpcClientConfig>> {
    match config.primary_grpc_url {
        Some(ref url) => {
            let mut connector = HttpConnector::new();
            connector.enforce_http(false);
            connector.set_nodelay(true);
            let tls_config = if config.primary_grpc_tls {
                Some(TlsConfig {
                    cert: config
                        .primary_grpc_cert_file
                        .clone()
                        .context("client tls is enabled but cert file is missing")?,
                    key: config
                        .primary_grpc_key_file
                        .clone()
                        .context("client tls is enabled but key file is missing")?,
                    ca_cert: config
                        .primary_grpc_ca_cert_file
                        .clone()
                        .context("client tls is enabled but ca_cert file is missing")?,
                })
            } else {
                None
            };

            Ok(Some(RpcClientConfig {
                remote_url: url.clone(),
                connector,
                tls_config,
            }))
        }
        None => Ok(None),
    }
}

fn make_hearbeat_config(config: &Cli) -> Option<HeartbeatConfig> {
    Some(HeartbeatConfig {
        heartbeat_url: config.heartbeat_url.clone(),
        heartbeat_period: Duration::from_secs(config.heartbeat_period_s),
        heartbeat_auth: config.heartbeat_auth.clone(),
    })
}

async fn shutdown_signal() -> Result<&'static str> {
    use tokio::signal::unix::{signal, SignalKind};

    let mut int = signal(SignalKind::interrupt())?;
    let mut term = signal(SignalKind::terminate())?;

    let signal = tokio::select! {
        _ = int.recv() => "SIGINT",
        _ = term.recv() => "SIGTERM",
    };

    Ok(signal)
}

fn make_meta_store_config(config: &Cli) -> anyhow::Result<MetaStoreConfig> {
    let bottomless = if config.backup_meta_store {
        Some(BottomlessConfig {
            access_key_id: config
                .meta_store_access_key_id
                .clone()
                .context("missing meta store bucket access key id")?,
            secret_access_key: config
                .meta_store_secret_access_key
                .clone()
                .context("missing meta store bucket secret access key")?,
            session_token: config.meta_store_session_token.clone(),
            region: config
                .meta_store_region
                .clone()
                .context("missing meta store bucket region")?,
            backup_id: config
                .meta_store_backup_id
                .clone()
                .context("missing meta store backup id")?,
            bucket_name: config
                .meta_store_bucket_name
                .clone()
                .context("missing meta store bucket name")?,
            backup_interval: Duration::from_secs(
                config
                    .meta_store_backup_interval_s
                    .context("missing meta store backup internal")? as _,
            ),
            bucket_endpoint: config
                .meta_store_bucket_endpoint
                .clone()
                .context("missing meta store bucket name")?,
        })
    } else {
        None
    };

    Ok(MetaStoreConfig {
        bottomless,
        allow_recover_from_fs: config.allow_metastore_recovery,
    })
}

async fn build_server(config: &Cli) -> anyhow::Result<Server> {
    let db_config = make_db_config(config)?;
    let user_api_config = make_user_api_config(config).await?;
    let admin_api_config = make_admin_api_config(config).await?;
    let rpc_server_config = make_rpc_server_config(config).await?;
    let rpc_client_config = make_rpc_client_config(config).await?;
    let heartbeat_config = make_hearbeat_config(config);
    let meta_store_config = make_meta_store_config(config)?;

    let shutdown = Arc::new(Notify::new());
    tokio::spawn({
        let shutdown = shutdown.clone();
        async move {
            loop {
                let signal = shutdown_signal()
                    .await
                    .expect("Failed to registry shutdown signals");

                tracing::info!(
                    "Got {} shutdown signal, gracefully shutting down...this may take some time",
                    signal
                );

                shutdown.notify_waiters();
            }
        }
    });

    Ok(Server {
        path: config.db_path.clone().into(),
        db_config,
        user_api_config,
        admin_api_config,
        rpc_server_config,
        rpc_client_config,
        heartbeat_config,
        idle_shutdown_timeout: config.idle_shutdown_timeout_s.map(Duration::from_secs),
        initial_idle_shutdown_timeout: config
            .initial_idle_shutdown_timeout_s
            .map(Duration::from_secs),
        disable_default_namespace: config.disable_default_namespace,
        disable_namespaces: !config.enable_namespaces,
        shutdown,
        max_active_namespaces: config.max_active_namespaces,
        meta_store_config,
        max_concurrent_connections: config.max_concurrent_connections,
        shutdown_timeout: config
            .shutdown_timeout
            .map(Duration::from_secs)
            .unwrap_or(Duration::from_secs(30)),
        use_libsql_wal: config.use_libsql_wal,
    })
}

#[tokio::main]
async fn main() -> Result<()> {
    if std::env::var("RUST_LOG").is_err() {
        std::env::set_var("RUST_LOG", "info");
    }

    let registry = tracing_subscriber::registry();

    #[cfg(feature = "debug-tools")]
    let registry = registry.with(console_subscriber::spawn());

    #[cfg(feature = "debug-tools")]
    enable_libsql_logging();

    registry
        .with(
            tracing_subscriber::fmt::layer()
                .with_ansi(false)
                .with_filter(tracing_subscriber::EnvFilter::from_default_env()),
        )
        .init();

    let args = Cli::parse();

    match args.utils {
        Some(UtilsSubcommands::Dump { path, namespace }) => {
            if let Some(ref path) = path {
                eprintln!(
                    "Dumping database {} to {}",
                    args.db_path.display(),
                    path.display()
                );
            }
            let db_path = args.db_path.join("dbs").join(&namespace);
            if !db_path.exists() {
                bail!("no database for namespace `{namespace}`");
            }

            perform_dump(path.as_deref(), &db_path)
        }
        None => {
            args.print_welcome_message();
            let server = build_server(&args).await?;
            server.start().await?;

            Ok(())
        }
    }
}