From 8dcc71f47a2ba5afd6410a7fb6f91a5b76b7b9b0 Mon Sep 17 00:00:00 2001
From: Pekka Enberg <penberg@turso.tech>
Date: Fri, 21 Mar 2025 08:48:39 +0200
Subject: [PATCH] libsql: Check integrity during WAL pull

The check is expensive, but let's add it to catch errors early.
---
 libsql/src/local/connection.rs | 13 +++++++++++++
 libsql/src/sync.rs             |  3 +++
 2 files changed, 16 insertions(+)

diff --git a/libsql/src/local/connection.rs b/libsql/src/local/connection.rs
index b9a2e20150..c81050c50e 100644
--- a/libsql/src/local/connection.rs
+++ b/libsql/src/local/connection.rs
@@ -3,6 +3,7 @@
 use crate::local::rows::BatchedRows;
 use crate::params::Params;
 use crate::{connection::BatchRows, errors};
+use crate::value::Value;
 use std::time::Duration;
 
 use super::{Database, Error, Result, Rows, RowsFuture, Statement, Transaction};
@@ -568,6 +569,18 @@ impl Connection {
         self.wal_insert_begin()?;
         Ok(WalInsertHandle { conn: self, in_session: RefCell::new(true) })
     }
+
+    pub(crate) fn check_integrity(&self) -> Result<bool> {
+        let stmt = self.prepare("PRAGMA integrity_check")?;
+        let rows = stmt.query(&Params::None)?;
+        let mut result = true;
+        if let Some(row) = rows.next()? {
+            if row.get_value(0)? != Value::Text("ok".to_string()) {
+                result = false;
+            }
+        }
+        Ok(result)
+    }
 }
 
 pub(crate) struct WalInsertHandle<'a> {
diff --git a/libsql/src/sync.rs b/libsql/src/sync.rs
index a1d6b742f9..798730b9bf 100644
--- a/libsql/src/sync.rs
+++ b/libsql/src/sync.rs
@@ -656,6 +656,7 @@ async fn try_pull(
         match sync_ctx.pull_one_frame(generation, frame_no).await {
             Ok(PullResult::Frame(frame)) => {
                 insert_handle.insert(&frame)?;
+                assert!(conn.check_integrity()?);
                 sync_ctx.durable_frame_num = frame_no;
             }
             Ok(PullResult::EndOfGeneration { max_generation }) => {
@@ -666,8 +667,10 @@ async fn try_pull(
                 insert_handle.end()?;
                 sync_ctx.write_metadata().await?;
 
+                assert!(conn.check_integrity()?);
                 // TODO: Make this crash-proof.
                 conn.wal_checkpoint(true)?;
+                assert!(conn.check_integrity()?);
 
                 sync_ctx.next_generation();
                 sync_ctx.write_metadata().await?;