0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-01-23 22:26:48 +00:00
ad hoc 96a977862d libsql wal refactors (#1539)
* introduce IoBuf::map_slice

* rename db_size to size_after

* introduce Segment trait

* implement Segment for Arc<T>

* make SealedSegment clone

* implement Segment for SealedSegment

* rename bottomless to storage

- introduce `Storage` trait
- rename old `Storage` to `Backend`
- introduce `AsyncStorage` (former bottomless)

* add Storage to WalRegistry

remove namespace resolver from registry

* add async_get to registry

* decouple shared wal from registry

* move CompactedSegment headers to own module

* refactor SegmentList

* add durable frame_no to shared_wal

* make shared_wal checkpoint async

* expose TestEnv

create test environment for libsql-wal

* ignore dead_code

stuff still need to be wired

* let LibsqlWalManager resolve namespace name

* fix tests

* fix libsql-server

* fmt

* fix bench

* fix conflicts

* fmt
2024-07-10 13:20:01 +00:00

821 lines
28 KiB
Rust

#![allow(clippy::type_complexity, clippy::too_many_arguments)]
use std::alloc::Layout;
use std::ffi::c_void;
use std::mem::{align_of, size_of};
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::str::FromStr;
use std::sync::{Arc, Weak};
use crate::connection::{Connection, MakeConnection};
use crate::database::DatabaseKind;
use crate::error::Error;
use crate::migration::maybe_migrate;
use crate::namespace::meta_store::{metastore_connection_maker, MetaStore};
use crate::net::Accept;
use crate::pager::{make_pager, PAGER_CACHE_SIZE};
use crate::rpc::proxy::rpc::proxy_server::Proxy;
use crate::rpc::proxy::ProxyService;
use crate::rpc::replica_proxy::ReplicaProxyService;
use crate::rpc::replication_log::rpc::replication_log_server::ReplicationLog;
use crate::rpc::replication_log::ReplicationLogService;
use crate::rpc::replication_log_proxy::ReplicationLogProxyService;
use crate::rpc::run_rpc_server;
use crate::schema::Scheduler;
use crate::stats::Stats;
use anyhow::Context as AnyhowContext;
use auth::Auth;
use config::{
AdminApiConfig, DbConfig, HeartbeatConfig, RpcClientConfig, RpcServerConfig, UserApiConfig,
};
use futures::future::ready;
use futures::Future;
use http::user::UserApi;
use hyper::client::HttpConnector;
use hyper_rustls::HttpsConnector;
#[cfg(feature = "durable-wal")]
use libsql_storage::{DurableWalManager, LockManager};
#[cfg(not(feature = "durable-wal"))]
use libsql_sys::wal::either::Either as EitherWAL;
#[cfg(feature = "durable-wal")]
use libsql_sys::wal::either::Either3 as EitherWAL;
use libsql_sys::wal::Sqlite3WalManager;
use libsql_wal::registry::WalRegistry;
use libsql_wal::storage::NoStorage;
use libsql_wal::wal::LibsqlWalManager;
use namespace::meta_store::MetaStoreHandle;
use namespace::{NamespaceConfig, NamespaceName};
use net::Connector;
use once_cell::sync::Lazy;
use rusqlite::ffi::SQLITE_CONFIG_MALLOC;
use rusqlite::ffi::{sqlite3_config, SQLITE_CONFIG_PCACHE2};
use tokio::runtime::Runtime;
use tokio::sync::{mpsc, Notify, Semaphore};
use tokio::task::JoinSet;
use tokio::time::Duration;
use url::Url;
use utils::services::idle_shutdown::IdleShutdownKicker;
use self::config::MetaStoreConfig;
use self::connection::connection_manager::InnerWalManager;
use self::namespace::NamespaceStore;
use self::net::AddrIncoming;
use self::replication::script_backup_manager::{CommandHandler, ScriptBackupManager};
pub mod auth;
mod broadcaster;
pub mod config;
pub mod connection;
pub mod net;
pub mod rpc;
pub mod version;
pub use hrana::proto as hrana_proto;
mod database;
mod error;
mod h2c;
mod heartbeat;
mod hrana;
mod http;
mod metrics;
mod migration;
mod namespace;
mod pager;
mod query;
mod query_analysis;
mod query_result_builder;
mod replication;
mod schema;
mod stats;
#[cfg(test)]
mod test;
mod utils;
const DB_CREATE_TIMEOUT: Duration = Duration::from_secs(1);
const DEFAULT_AUTO_CHECKPOINT: u32 = 1000;
const LIBSQL_PAGE_SIZE: u64 = 4096;
pub(crate) static BLOCKING_RT: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.max_blocking_threads(50_000)
.enable_all()
.build()
.unwrap()
});
type Result<T, E = Error> = std::result::Result<T, E>;
type StatsSender = mpsc::Sender<(NamespaceName, MetaStoreHandle, Weak<Stats>)>;
// #[global_allocator]
// static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
#[global_allocator]
static GLOBAL: rheaper::Allocator<mimalloc::MiMalloc> =
rheaper::Allocator::from_allocator(mimalloc::MiMalloc);
#[derive(clap::ValueEnum, PartialEq, Clone, Copy, Debug)]
pub enum CustomWAL {
LibsqlWal,
#[cfg(feature = "durable-wal")]
DurableWal,
}
pub struct Server<C = HttpConnector, A = AddrIncoming, D = HttpsConnector<HttpConnector>> {
pub path: Arc<Path>,
pub db_config: DbConfig,
pub user_api_config: UserApiConfig<A>,
pub admin_api_config: Option<AdminApiConfig<A, D>>,
pub rpc_server_config: Option<RpcServerConfig<A>>,
pub rpc_client_config: Option<RpcClientConfig<C>>,
pub idle_shutdown_timeout: Option<Duration>,
pub initial_idle_shutdown_timeout: Option<Duration>,
pub disable_default_namespace: bool,
pub heartbeat_config: Option<HeartbeatConfig>,
pub disable_namespaces: bool,
pub shutdown: Arc<Notify>,
pub max_active_namespaces: usize,
pub meta_store_config: MetaStoreConfig,
pub max_concurrent_connections: usize,
pub shutdown_timeout: std::time::Duration,
pub use_custom_wal: Option<CustomWAL>,
pub storage_server_address: String,
}
impl<C, A, D> Default for Server<C, A, D> {
fn default() -> Self {
Self {
path: PathBuf::from("data.sqld").into(),
db_config: Default::default(),
user_api_config: Default::default(),
admin_api_config: Default::default(),
rpc_server_config: Default::default(),
rpc_client_config: Default::default(),
idle_shutdown_timeout: Default::default(),
initial_idle_shutdown_timeout: Default::default(),
disable_default_namespace: false,
heartbeat_config: Default::default(),
disable_namespaces: true,
shutdown: Default::default(),
max_active_namespaces: 100,
meta_store_config: Default::default(),
max_concurrent_connections: 128,
shutdown_timeout: Duration::from_secs(30),
use_custom_wal: None,
storage_server_address: Default::default(),
}
}
}
struct Services<A, P, S, C> {
namespace_store: NamespaceStore,
idle_shutdown_kicker: Option<IdleShutdownKicker>,
proxy_service: P,
replication_service: S,
user_api_config: UserApiConfig<A>,
admin_api_config: Option<AdminApiConfig<A, C>>,
disable_namespaces: bool,
disable_default_namespace: bool,
db_config: DbConfig,
user_auth_strategy: Auth,
path: Arc<Path>,
shutdown: Arc<Notify>,
}
impl<A, P, S, C> Services<A, P, S, C>
where
A: crate::net::Accept,
P: Proxy,
S: ReplicationLog,
C: Connector,
{
fn configure(self, join_set: &mut JoinSet<anyhow::Result<()>>) {
let user_http = UserApi {
http_acceptor: self.user_api_config.http_acceptor,
hrana_ws_acceptor: self.user_api_config.hrana_ws_acceptor,
user_auth_strategy: self.user_auth_strategy,
namespaces: self.namespace_store.clone(),
idle_shutdown_kicker: self.idle_shutdown_kicker.clone(),
proxy_service: self.proxy_service,
replication_service: self.replication_service,
disable_default_namespace: self.disable_default_namespace,
disable_namespaces: self.disable_namespaces,
max_response_size: self.db_config.max_response_size,
enable_console: self.user_api_config.enable_http_console,
self_url: self.user_api_config.self_url,
primary_url: self.user_api_config.primary_url,
path: self.path.clone(),
shutdown: self.shutdown.clone(),
};
let user_http_service = user_http.configure(join_set);
if let Some(AdminApiConfig {
acceptor,
connector,
disable_metrics,
}) = self.admin_api_config
{
let shutdown = self.shutdown.clone();
join_set.spawn(http::admin::run(
acceptor,
user_http_service,
self.namespace_store,
connector,
disable_metrics,
shutdown,
));
}
}
}
#[tracing::instrument(skip(connection_maker))]
async fn run_periodic_checkpoint<C>(
connection_maker: Arc<C>,
period: Duration,
namespace_name: NamespaceName,
) -> anyhow::Result<()>
where
C: MakeConnection,
{
use tokio::time::{interval, sleep, Instant, MissedTickBehavior};
const RETRY_INTERVAL: Duration = Duration::from_secs(60);
tracing::info!("setting checkpoint interval to {:?}", period);
let mut interval = interval(period);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let mut retry: Option<Duration> = None;
loop {
if let Some(retry) = retry.take() {
if retry.is_zero() {
tracing::warn!("database was not set in WAL journal mode");
return Ok(());
}
sleep(retry).await;
} else {
interval.tick().await;
}
retry = match connection_maker.create().await {
Ok(conn) => {
if let Err(e) = conn.vacuum_if_needed().await {
tracing::warn!("vacuum failed: {}", e);
}
tracing::info!("database checkpoint starts");
let start = Instant::now();
match conn.checkpoint().await {
Ok(_) => {
let elapsed = Instant::now() - start;
if elapsed >= Duration::from_secs(10) {
tracing::warn!("database checkpoint finished (took: {:?})", elapsed);
} else {
tracing::info!("database checkpoint finished (took: {:?})", elapsed);
}
None
}
Err(err) => {
tracing::warn!("failed to execute checkpoint: {}", err);
Some(RETRY_INTERVAL)
}
}
}
Err(err) => {
tracing::warn!("couldn't connect: {}", err);
Some(RETRY_INTERVAL)
}
}
}
}
fn init_version_file(db_path: &Path) -> anyhow::Result<()> {
// try to detect the presence of the data file at the root of db_path. If it's there, it's a
// pre-0.18.0 database and needs to be migrated
if db_path.join("data").exists() {
return Ok(());
}
let version_path = db_path.join(".version");
if !version_path.exists() {
std::fs::create_dir_all(db_path)?;
std::fs::write(version_path, env!("CARGO_PKG_VERSION"))?;
}
Ok(())
}
impl<C, A, D> Server<C, A, D>
where
C: Connector,
A: Accept,
D: Connector,
{
/// Setup sqlite global environment
fn init_sqlite_globals(&self) {
if let Some(soft_limit_mb) = self.db_config.soft_heap_limit_mb {
tracing::warn!("Setting soft heap limit to {soft_limit_mb}MiB");
unsafe {
libsql_sys::ffi::sqlite3_soft_heap_limit64(soft_limit_mb as i64 * 1024 * 1024)
};
}
if let Some(hard_limit_mb) = self.db_config.hard_heap_limit_mb {
tracing::warn!("Setting hard heap limit to {hard_limit_mb}MiB");
unsafe {
libsql_sys::ffi::sqlite3_hard_heap_limit64(hard_limit_mb as i64 * 1024 * 1024)
};
}
}
fn spawn_monitoring_tasks(
&self,
join_set: &mut JoinSet<anyhow::Result<()>>,
stats_receiver: mpsc::Receiver<(NamespaceName, MetaStoreHandle, Weak<Stats>)>,
) -> anyhow::Result<()> {
match self.heartbeat_config {
Some(ref config) => {
tracing::info!(
"Server sending heartbeat to URL {} every {:?}",
config.heartbeat_url.as_deref().unwrap_or("<not supplied>"),
config.heartbeat_period,
);
join_set.spawn({
let heartbeat_auth = config.heartbeat_auth.clone();
let heartbeat_period = config.heartbeat_period;
let heartbeat_url = if let Some(url) = &config.heartbeat_url {
Some(Url::from_str(url).context("invalid heartbeat URL")?)
} else {
None
};
async move {
heartbeat::server_heartbeat(
heartbeat_url,
heartbeat_auth,
heartbeat_period,
stats_receiver,
)
.await;
Ok(())
}
});
// join_set.spawn(run_storage_monitor(self.path.clone(), stats));
}
None => {
tracing::warn!("No server heartbeat configured")
}
}
Ok(())
}
fn make_services<P: Proxy, L: ReplicationLog>(
self,
namespace_store: NamespaceStore,
idle_shutdown_kicker: Option<IdleShutdownKicker>,
proxy_service: P,
replication_service: L,
user_auth_strategy: Auth,
shutdown: Arc<Notify>,
) -> Services<A, P, L, D> {
Services {
namespace_store,
idle_shutdown_kicker,
proxy_service,
replication_service,
user_api_config: self.user_api_config,
admin_api_config: self.admin_api_config,
disable_namespaces: self.disable_namespaces,
disable_default_namespace: self.disable_default_namespace,
db_config: self.db_config,
user_auth_strategy,
path: self.path.clone(),
shutdown,
}
}
pub async fn start(mut self) -> anyhow::Result<()> {
static INIT: std::sync::Once = std::sync::Once::new();
let mut join_set = JoinSet::new();
if std::env::var("LIBSQL_SQLITE_MIMALLOC").is_ok() {
setup_sqlite_alloc();
}
INIT.call_once(|| {
if let Ok(size) = std::env::var("LIBSQL_EXPERIMENTAL_PAGER") {
let size = size.parse().unwrap();
PAGER_CACHE_SIZE.store(size, std::sync::atomic::Ordering::SeqCst);
unsafe {
let rc = sqlite3_config(SQLITE_CONFIG_PCACHE2, &make_pager());
if rc != 0 {
// necessary because in some tests there is race between client and server
// to initialize global state.
tracing::error!(
"failed to setup sqld pager, using sqlite3 default instead"
);
}
}
}
});
init_version_file(&self.path)?;
maybe_migrate(&self.path)?;
self.init_sqlite_globals();
let idle_shutdown_kicker = self.setup_shutdown();
let extensions = self.db_config.validate_extensions()?;
let user_auth_strategy = self.user_api_config.auth_strategy.clone();
let service_shutdown = Arc::new(Notify::new());
let db_kind = if self.rpc_client_config.is_some() {
DatabaseKind::Replica
} else {
DatabaseKind::Primary
};
let scripted_backup = match self.db_config.snapshot_exec {
Some(ref command) => {
let (scripted_backup, script_backup_task) =
ScriptBackupManager::new(&self.path, CommandHandler::new(command.to_string()))
.await?;
join_set.spawn(script_backup_task.run());
Some(scripted_backup)
}
None => None,
};
let (channel, uri) = match self.rpc_client_config {
Some(ref config) => {
let (channel, uri) = config.configure().await?;
(Some(channel), Some(uri))
}
None => (None, None),
};
let (scheduler_sender, scheduler_receiver) = mpsc::channel(128);
let (stats_sender, stats_receiver) = mpsc::channel(1024);
// chose the wal backend
let (make_wal_manager, registry_shutdown) = self.configure_wal_manager()?;
let ns_config = NamespaceConfig {
db_kind,
base_path: self.path.clone(),
max_log_size: self.db_config.max_log_size,
max_log_duration: self.db_config.max_log_duration.map(Duration::from_secs_f32),
bottomless_replication: self.db_config.bottomless_replication.clone(),
extensions,
stats_sender: stats_sender.clone(),
max_response_size: self.db_config.max_response_size,
max_total_response_size: self.db_config.max_total_response_size,
checkpoint_interval: self.db_config.checkpoint_interval,
encryption_config: self.db_config.encryption_config.clone(),
max_concurrent_connections: Arc::new(Semaphore::new(self.max_concurrent_connections)),
scripted_backup,
max_concurrent_requests: self.db_config.max_concurrent_requests,
channel: channel.clone(),
uri: uri.clone(),
migration_scheduler: scheduler_sender.into(),
make_wal_manager,
};
let (metastore_conn_maker, meta_store_wal_manager) =
metastore_connection_maker(self.meta_store_config.bottomless.clone(), &self.path)
.await?;
let meta_conn = metastore_conn_maker()?;
let meta_store = MetaStore::new(
self.meta_store_config.clone(),
&self.path,
meta_conn,
meta_store_wal_manager,
)
.await?;
let namespace_store: NamespaceStore = NamespaceStore::new(
db_kind.is_replica(),
self.db_config.snapshot_at_shutdown,
self.max_active_namespaces,
ns_config,
meta_store,
)
.await?;
let meta_conn = metastore_conn_maker()?;
let scheduler = Scheduler::new(namespace_store.clone(), meta_conn).await?;
join_set.spawn(async move {
scheduler.run(scheduler_receiver).await;
Ok(())
});
self.spawn_monitoring_tasks(&mut join_set, stats_receiver)?;
// eagerly load the default namespace when namespaces are disabled
if self.disable_namespaces && db_kind.is_primary() {
namespace_store
.create(
NamespaceName::default(),
namespace::RestoreOption::Latest,
Default::default(),
)
.await?;
}
// if namespaces are enabled, then bottomless must have set DB ID
if !self.disable_namespaces {
if let Some(bottomless) = &self.db_config.bottomless_replication {
if bottomless.db_id.is_none() {
anyhow::bail!("bottomless replication with namespaces requires a DB ID");
}
}
}
// configure rpc server
if let Some(config) = self.rpc_server_config.take() {
let proxy_service =
ProxyService::new(namespace_store.clone(), None, self.disable_namespaces);
// Garbage collect proxy clients every 30 seconds
join_set.spawn({
let clients = proxy_service.clients();
async move {
loop {
tokio::time::sleep(Duration::from_secs(30)).await;
rpc::proxy::garbage_collect(&mut *clients.write().await).await;
}
}
});
join_set.spawn(run_rpc_server(
proxy_service,
config.acceptor,
config.tls_config,
idle_shutdown_kicker.clone(),
namespace_store.clone(),
self.disable_namespaces,
));
}
let shutdown_timeout = self.shutdown_timeout.clone();
let shutdown = self.shutdown.clone();
// setup user-facing rpc services
match db_kind {
DatabaseKind::Primary => {
let replication_svc = ReplicationLogService::new(
namespace_store.clone(),
idle_shutdown_kicker.clone(),
Some(user_auth_strategy.clone()),
self.disable_namespaces,
true,
);
let proxy_svc = ProxyService::new(
namespace_store.clone(),
Some(user_auth_strategy.clone()),
self.disable_namespaces,
);
// Garbage collect proxy clients every 30 seconds
join_set.spawn({
let clients = proxy_svc.clients();
async move {
loop {
tokio::time::sleep(Duration::from_secs(30)).await;
rpc::proxy::garbage_collect(&mut *clients.write().await).await;
}
}
});
self.make_services(
namespace_store.clone(),
idle_shutdown_kicker,
proxy_svc,
replication_svc,
user_auth_strategy.clone(),
service_shutdown.clone(),
)
.configure(&mut join_set);
}
DatabaseKind::Replica => {
let replication_svc =
ReplicationLogProxyService::new(channel.clone().unwrap(), uri.clone().unwrap());
let proxy_svc = ReplicaProxyService::new(
channel.clone().unwrap(),
uri.clone().unwrap(),
namespace_store.clone(),
user_auth_strategy.clone(),
self.disable_namespaces,
);
self.make_services(
namespace_store.clone(),
idle_shutdown_kicker,
proxy_svc,
replication_svc,
user_auth_strategy,
service_shutdown.clone(),
)
.configure(&mut join_set);
}
};
tokio::select! {
_ = shutdown.notified() => {
let shutdown = async {
join_set.shutdown().await;
service_shutdown.notify_waiters();
namespace_store.shutdown().await?;
registry_shutdown.await?;
Ok::<_, crate::Error>(())
};
match tokio::time::timeout(shutdown_timeout, shutdown).await {
Ok(Ok(())) => {
tracing::info!("sqld was shutdown gracefully. Bye!");
}
Ok(Err(e)) => {
tracing::error!("failed to shutdown gracefully: {}", e);
std::process::exit(1);
},
Err(_) => {
tracing::error!("shutdown timeout hit, forcefully shutting down");
std::process::exit(1);
},
}
}
Some(res) = join_set.join_next() => {
res??;
},
else => (),
}
Ok(())
}
fn setup_shutdown(&self) -> Option<IdleShutdownKicker> {
let shutdown_notify = self.shutdown.clone();
self.idle_shutdown_timeout.map(|d| {
IdleShutdownKicker::new(d, self.initial_idle_shutdown_timeout, shutdown_notify)
})
}
fn configure_wal_manager(
&self,
) -> anyhow::Result<(
Arc<dyn Fn() -> InnerWalManager + Sync + Send + 'static>,
Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + Sync + 'static>>,
)> {
let wal_path = self.path.join("wals");
let enable_libsql_wal_test = {
let is_primary = self.rpc_server_config.is_some();
let is_libsql_wal_test = std::env::var("LIBSQL_WAL_TEST").is_ok();
is_primary && is_libsql_wal_test
};
let use_libsql_wal =
self.use_custom_wal == Some(CustomWAL::LibsqlWal) || enable_libsql_wal_test;
if !use_libsql_wal {
if wal_path.try_exists()? {
anyhow::bail!("database was previously setup to use libsql-wal");
}
}
if self.use_custom_wal.is_some() {
if self.db_config.bottomless_replication.is_some() {
anyhow::bail!("bottomless not supported with custom WAL");
}
if self.rpc_client_config.is_some() {
anyhow::bail!("custom WAL not supported in replica mode");
}
}
let namespace_resolver = |path: &Path| {
NamespaceName::from_string(
path.parent()
.unwrap()
.file_name()
.unwrap()
.to_str()
.unwrap()
.to_string(),
)
.unwrap()
.into()
};
match self.use_custom_wal {
Some(CustomWAL::LibsqlWal) => {
let registry = Arc::new(WalRegistry::new(wal_path, NoStorage)?);
let wal = LibsqlWalManager::new(registry.clone(), Arc::new(namespace_resolver));
let shutdown_notify = self.shutdown.clone();
let shutdown_fut = Box::pin(async move {
shutdown_notify.notified().await;
tokio::task::spawn_blocking(move || registry.shutdown())
.await
.unwrap()?;
Ok(())
});
tracing::info!("using libsql wal");
Ok((Arc::new(move || EitherWAL::B(wal.clone())), shutdown_fut))
}
#[cfg(feature = "durable-wal")]
Some(CustomWAL::DurableWal) => {
tracing::info!("using durable wal");
let lock_manager = Arc::new(std::sync::Mutex::new(LockManager::new()));
let wal = DurableWalManager::new(
lock_manager,
namespace_resolver,
self.storage_server_address.clone(),
);
Ok((
Arc::new(move || EitherWAL::C(wal.clone())),
Box::pin(ready(Ok(()))),
))
}
None => {
tracing::info!("using sqlite3 wal");
Ok((
Arc::new(|| EitherWAL::A(Sqlite3WalManager::default())),
Box::pin(ready(Ok(()))),
))
}
}
}
}
/// Setup sqlite to use the same allocator as sqld.
/// the size of the allocation is stored as a usize before the returned pointer. A i32 would be
/// sufficient, but we need the returned pointer to be aligned to 8
fn setup_sqlite_alloc() {
use std::alloc::GlobalAlloc;
unsafe extern "C" fn malloc(size: i32) -> *mut c_void {
let size_total = size as usize + size_of::<usize>();
let layout = Layout::from_size_align(size_total, align_of::<usize>()).unwrap();
let ptr = GLOBAL.alloc(layout);
if ptr.is_null() {
return std::ptr::null_mut();
}
*(ptr as *mut usize) = size as usize;
ptr.offset(size_of::<usize>() as _) as *mut _
}
unsafe extern "C" fn free(ptr: *mut c_void) {
let orig_ptr = ptr.offset(-(size_of::<usize>() as isize));
let size = *(orig_ptr as *mut usize);
let layout = Layout::from_size_align(size as usize, align_of::<usize>()).unwrap();
GLOBAL.dealloc(orig_ptr as *mut _, layout);
}
unsafe extern "C" fn realloc(ptr: *mut c_void, new_size: i32) -> *mut c_void {
let orig_ptr = ptr.offset(-(size_of::<usize>() as isize));
let orig_size = *(orig_ptr as *mut usize);
let layout =
Layout::from_size_align(orig_size + size_of::<usize>(), align_of::<usize>()).unwrap();
let new_ptr = GLOBAL.realloc(
orig_ptr as *mut _,
layout,
new_size as usize + size_of::<usize>(),
);
if ptr.is_null() {
return std::ptr::null_mut();
}
*(new_ptr as *mut usize) = new_size as usize;
new_ptr.offset(size_of::<usize>() as _) as *mut _
}
unsafe extern "C" fn size(ptr: *mut c_void) -> i32 {
let orig_ptr = ptr.offset(-(size_of::<usize>() as isize));
*(orig_ptr as *mut usize) as i32
}
unsafe extern "C" fn init(_: *mut c_void) -> i32 {
0
}
unsafe extern "C" fn shutdown(_: *mut c_void) {}
unsafe extern "C" fn roundup(n: i32) -> i32 {
(n as usize).next_multiple_of(align_of::<usize>()) as i32
}
let mem = rusqlite::ffi::sqlite3_mem_methods {
xMalloc: Some(malloc),
xFree: Some(free),
xRealloc: Some(realloc),
xSize: Some(size),
xRoundup: Some(roundup),
xInit: Some(init),
xShutdown: Some(shutdown),
pAppData: std::ptr::null_mut(),
};
unsafe {
sqlite3_config(SQLITE_CONFIG_MALLOC, &mem as *const _);
}
}