0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-06-12 18:23:01 +00:00
Files
libsql/libsql-server/src/database/mod.rs
ad hoc 833350dae5 handle scheduler errors (#1175)
* handle scheduler errors

* fix query
2024-03-09 11:40:31 +00:00

205 lines
6.1 KiB
Rust

use std::fmt;
use std::sync::Arc;
use bottomless::SavepointTracker;
use crate::connection::{MakeConnection, RequestContext};
pub use self::primary::{PrimaryConnection, PrimaryConnectionMaker, PrimaryDatabase};
pub use self::replica::{ReplicaConnection, ReplicaDatabase};
pub use self::schema::{SchemaConnection, SchemaDatabase};
mod primary;
mod replica;
mod schema;
#[derive(Debug, Clone, serde::Deserialize, Copy)]
#[serde(rename_all = "snake_case")]
pub enum DatabaseKind {
Primary,
Replica,
}
impl DatabaseKind {
/// Returns `true` if the database kind is [`Replica`].
///
/// [`Replica`]: DatabaseKind::Replica
#[must_use]
pub fn is_replica(&self) -> bool {
matches!(self, Self::Replica)
}
/// Returns `true` if the database kind is [`Primary`].
///
/// [`Primary`]: DatabaseKind::Primary
#[must_use]
pub fn is_primary(&self) -> bool {
matches!(self, Self::Primary)
}
}
pub type Result<T> = anyhow::Result<T>;
pub enum Connection {
Primary(PrimaryConnection),
Replica(ReplicaConnection),
Schema(SchemaConnection),
}
impl fmt::Debug for Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Primary(_) => write!(f, "Primary"),
Self::Replica(_) => write!(f, "Replica"),
Self::Schema(_) => write!(f, "Schema"),
}
}
}
impl Connection {
/// Returns `true` if the connection is [`Primary`].
///
/// [`Primary`]: Connection::Primary
#[must_use]
pub fn is_primary(&self) -> bool {
matches!(self, Self::Primary(..))
}
}
#[async_trait::async_trait]
impl crate::connection::Connection for Connection {
async fn execute_program<B: crate::query_result_builder::QueryResultBuilder>(
&self,
pgm: crate::connection::program::Program,
ctx: RequestContext,
response_builder: B,
replication_index: Option<crate::replication::FrameNo>,
) -> crate::Result<B> {
match self {
Connection::Primary(conn) => {
conn.execute_program(pgm, ctx, response_builder, replication_index)
.await
}
Connection::Replica(conn) => {
conn.execute_program(pgm, ctx, response_builder, replication_index)
.await
}
Connection::Schema(conn) => {
conn.execute_program(pgm, ctx, response_builder, replication_index)
.await
}
}
}
async fn describe(
&self,
sql: String,
ctx: RequestContext,
replication_index: Option<crate::replication::FrameNo>,
) -> crate::Result<crate::Result<crate::connection::program::DescribeResponse>> {
match self {
Connection::Primary(conn) => conn.describe(sql, ctx, replication_index).await,
Connection::Replica(conn) => conn.describe(sql, ctx, replication_index).await,
Connection::Schema(conn) => conn.describe(sql, ctx, replication_index).await,
}
}
async fn is_autocommit(&self) -> crate::Result<bool> {
match self {
Connection::Primary(conn) => conn.is_autocommit().await,
Connection::Replica(conn) => conn.is_autocommit().await,
Connection::Schema(conn) => conn.is_autocommit().await,
}
}
async fn checkpoint(&self) -> crate::Result<()> {
match self {
Connection::Primary(conn) => conn.checkpoint().await,
Connection::Replica(conn) => conn.checkpoint().await,
Connection::Schema(conn) => conn.checkpoint().await,
}
}
async fn vacuum_if_needed(&self) -> crate::Result<()> {
match self {
Connection::Primary(conn) => conn.vacuum_if_needed().await,
Connection::Replica(conn) => conn.vacuum_if_needed().await,
Connection::Schema(conn) => conn.vacuum_if_needed().await,
}
}
fn diagnostics(&self) -> String {
match self {
Connection::Primary(conn) => conn.diagnostics(),
Connection::Replica(conn) => conn.diagnostics(),
Connection::Schema(conn) => conn.diagnostics(),
}
}
}
pub enum Database {
Primary(PrimaryDatabase),
Replica(ReplicaDatabase),
Schema(SchemaDatabase),
}
impl fmt::Debug for Database {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Primary(_) => write!(f, "Primary"),
Self::Replica(_) => write!(f, "Replica"),
Database::Schema(_) => write!(f, "Schema"),
}
}
}
impl Database {
pub fn connection_maker(&self) -> Arc<dyn MakeConnection<Connection = Connection>> {
match self {
Database::Primary(db) => Arc::new(db.connection_maker().map(Connection::Primary)),
Database::Replica(db) => Arc::new(db.connection_maker().map(Connection::Replica)),
Database::Schema(db) => Arc::new(db.connection_maker().map(Connection::Schema)),
}
}
pub fn destroy(self) {
match self {
Database::Primary(db) => db.destroy(),
Database::Replica(db) => db.destroy(),
Database::Schema(db) => db.destroy(),
}
}
pub async fn shutdown(self) -> Result<()> {
match self {
Database::Primary(db) => db.shutdown().await,
Database::Replica(db) => db.shutdown().await,
Database::Schema(db) => db.shutdown().await,
}
}
pub fn as_primary(&self) -> Option<&PrimaryDatabase> {
if let Self::Primary(v) = self {
Some(v)
} else {
None
}
}
pub(crate) fn as_schema(&self) -> Option<&SchemaDatabase> {
if let Self::Schema(v) = self {
Some(v)
} else {
None
}
}
pub(crate) fn backup_savepoint(&self) -> Option<SavepointTracker> {
match self {
Database::Primary(db) => db.backup_savepoint(),
Database::Replica(_) => None,
Database::Schema(db) => db.backup_savepoint(),
}
}
}