0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2024-11-22 03:26:15 +00:00
libsql/libsql-server/tests/embedded_replica/local.rs
Piotr Sarna cd82068edf
libsql_server,bottomless: add encryption support (#928)
* namespace,replication: add LogFile encryption

Anything that uses our LogFile format can now be encrypted
on-disk.
Tested locally by seeing that `wallog` file contains garbage
and no sensible plaintext strings can be extracted from it.

* test fixups

* libsql-ffi: add libsql_generate_initial_vector and...

... libsql_generate_aes256_key to make them reachable from Rust.

* connection: expose additional encryption symbols

* libsql-server: derive aes256 from user passphrase properly

And by properly, I mean calling back to SQLite3MultipleCiphers' code.

* replication: rename Encryptor to FrameEncryptor

Encryptor sounds a little too generic for this specific use case.

* replication: add snapshot encryption

It uses the same mechanism as wallog encryption, now abstracted
away to libsql-replication crate to be reused.

* replication: add an encryption feature for compilation

* cargo fmt pass

* fix remaining SnapshotFile::open calls in tests

* logger: add an encryption test

* replication: use a single buffer for encryption

Ideally we could even encrypt in place, but WalPage is also
used in snapshots and it's buffered, and that makes it exceptionally
annoying to explain to the borrow checker.

* bottomless: restore with libsql_replication::injector

... instead of the transaction page cache. That gives us free
encryption, since the injector is encryption-aware.

This patch doesn't hook encryption_key parameter yet, it will
come in the next patch.

* bottomless: pass the encryption key in options

For WAL restoration, but also to be able to encrypt data that gets
sent to S3.

* bottomless: inherit encryption key from db config if not specified

* libsql-sys: add db_change_counter()

The helper function calls the underlying C API to extract
4 bytes from offset 24 of the database header and return it.
It's the database change counter, which we can use to compare
two databases and decide which one is newer than the other.

* bottomless: use sqlite API to read database metadata

With encryption enabled, we can no longer just go ahead and read data
from given offsets, we must go through the VFS layer instead.
Fortunately, we can just open a database connection and ask for all
the metadata we need.

* libsql-sys: make db change counter actually read from the db file

* bottomless: treat change counter == 1 as a new database

... which it is, after setting the journal mode. Otherwise we decide
too eagerly that the local database is the source of truth.

* libsql-server: fix a local embedded replica test

rebase conflict with encryption

* bottomless-cli: allow passing the encryption key

* replication: rebase new test to the new api

* snapshots: do not try to decrypt headers

They are not encrypted, so we shouldn't attempt to decrypt the data.

* logger: restore encrypted frames during recovery

Instead of decrypting and encrypting back, we just copy encrypted
frames as is during the recovery process, saves IO.

* compaction: clear unused encryption_key parameter

It wasn't used since for compaction we only need headers,
which are unencrypted.

* replication: switch to FrameBorrowed::new_zeroed

Following MarinPostma's suggestion.

Co-authored-by: Marin Postma <postma.marin@protonmail.com>

* replication: rebase chores, fixing parameters

* libsql-replication: use page_mut() to decrypt data in-place

* rustfmt

* bottomless: use 0 for disabling autocheckpoint

... instead of u32::MAX. Effectively it's similar, but 0 is the correct
choice.

* rustfmt

* libsql-server: make cbc, aes optional for encryption only

* post-rebase fixes

* libsql-replication: suppress warnings when no encryption

* libsql: add encryption support for local databases

* libsql: add bytes dependency for encryption

* libsql-ffi: build libsqlite3mc without debug symbols

Technically it should just depend on cargo build mode,
but that's left for a follow-up.

* bindings: an attempt to compile bindings with releasemode

... partially to save space, but also to make them faster.

---------

Co-authored-by: Marin Postma <postma.marin@protonmail.com>
2024-02-09 14:27:39 +00:00

159 lines
4.5 KiB
Rust

use std::time::Duration;
use libsql::{replication::Frames, Database};
use libsql_replication::snapshot::SnapshotFile;
use serde_json::json;
use tempfile::tempdir;
use turmoil::Builder;
use crate::common::{http::Client, net::TurmoilConnector};
use super::make_primary;
#[test]
fn local_sync_with_writes() {
let mut sim = Builder::new()
.simulation_duration(Duration::from_secs(120))
.build();
let tmp_embedded = tempdir().unwrap();
let tmp_host = tempdir().unwrap();
let tmp_embedded_path = tmp_embedded.path().to_owned();
let tmp_host_path = tmp_host.path().to_owned();
make_primary(&mut sim, tmp_host_path.clone());
sim.client("client", async move {
let client = Client::new();
client
.post("http://primary:9090/v1/namespaces/foo/create", json!({}))
.await?;
println!("{:?}", tmp_host_path);
let _path = tmp_embedded_path.join("embedded");
let primary =
Database::open_remote_with_connector("http://foo.primary:8080", "", TurmoilConnector)?;
let conn = primary.connect()?;
// Do enough writes to ensure that we can force the server to write some snapshots
conn.execute("create table test (x)", ()).await.unwrap();
for _ in 0..233 {
conn.execute("insert into test values (randomblob(4092))", ())
.await
.unwrap();
}
let snapshots_path = tmp_host_path.join("dbs").join("foo").join("snapshots");
let mut dir = tokio::fs::read_dir(snapshots_path).await.unwrap();
let mut snapshots = Vec::new();
while let Some(snapshot) = dir.next_entry().await.unwrap() {
let snap = SnapshotFile::open(snapshot.path(), None).await.unwrap();
snapshots.push(snap);
}
snapshots.sort_by(|a, b| {
a.header()
.start_frame_no
.get()
.cmp(&b.header().start_frame_no.get())
});
let db = Database::open_with_local_sync_remote_writes_connector(
tmp_host_path.join("embedded").to_str().unwrap(),
"http://foo.primary:8080".to_string(),
"".to_string(),
TurmoilConnector,
None,
)
.await?;
for snapshot in snapshots {
println!("snapshots: {:?}", snapshot.header().end_frame_no.get());
db.sync_frames(Frames::Snapshot(snapshot)).await.unwrap();
}
let conn = db.connect()?;
let row = conn
.query("select count(*) from test", ())
.await
.unwrap()
.next()
.await
.unwrap()
.unwrap();
let count = row.get::<u64>(0).unwrap();
assert_eq!(count, 233);
tracing::info!("executing write delegated inserts");
// Attempt to write and ensure it writes only to the primary
for _ in 0..300 {
conn.execute("insert into test values (randomblob(4092))", ())
.await
.unwrap();
}
// Verify no new writes were done locally
let row = conn
.query("select count(*) from test", ())
.await
.unwrap()
.next()
.await
.unwrap()
.unwrap();
let count = row.get::<u64>(0).unwrap();
assert_eq!(count, 233);
let snapshots_path = tmp_host_path.join("dbs").join("foo").join("snapshots");
let mut dir = tokio::fs::read_dir(snapshots_path).await.unwrap();
let mut snapshots = Vec::new();
while let Some(snapshot) = dir.next_entry().await.unwrap() {
let snap = SnapshotFile::open(snapshot.path(), None).await.unwrap();
snapshots.push(snap);
}
snapshots.sort_by(|a, b| {
a.header()
.start_frame_no
.get()
.cmp(&b.header().start_frame_no.get())
});
for snapshot in snapshots.into_iter() {
println!("snapshots: {:?}", snapshot.header().end_frame_no.get());
db.sync_frames(Frames::Snapshot(snapshot)).await.unwrap();
}
let conn = db.connect()?;
let row = conn
.query("select count(*) from test", ())
.await
.unwrap()
.next()
.await
.unwrap()
.unwrap();
let count = row.get::<u64>(0).unwrap();
assert_eq!(count, 467);
Ok(())
});
sim.run().unwrap();
}