use std::time::Duration;

use libsql::{replication::Frames, Database};
use libsql_replication::snapshot::SnapshotFile;
use serde_json::json;
use tempfile::tempdir;
use turmoil::Builder;

use crate::common::{http::Client, net::TurmoilConnector};

use super::make_primary;

#[test]
fn local_sync_with_writes() {
    let mut sim = Builder::new()
        .simulation_duration(Duration::from_secs(120))
        .build();

    let tmp_embedded = tempdir().unwrap();
    let tmp_host = tempdir().unwrap();
    let tmp_embedded_path = tmp_embedded.path().to_owned();
    let tmp_host_path = tmp_host.path().to_owned();

    make_primary(&mut sim, tmp_host_path.clone());

    sim.client("client", async move {
        let client = Client::new();
        client
            .post("http://primary:9090/v1/namespaces/foo/create", json!({}))
            .await?;

        println!("{:?}", tmp_host_path);

        let _path = tmp_embedded_path.join("embedded");

        let primary =
            Database::open_remote_with_connector("http://foo.primary:8080", "", TurmoilConnector)?;
        let conn = primary.connect()?;

        // Do enough writes to ensure that we can force the server to write some snapshots
        conn.execute("create table test (x)", ()).await.unwrap();
        for _ in 0..233 {
            conn.execute("insert into test values (randomblob(4092))", ())
                .await
                .unwrap();
        }

        let snapshots_path = tmp_host_path.join("dbs").join("foo").join("snapshots");

        let mut dir = tokio::fs::read_dir(snapshots_path).await.unwrap();

        let mut snapshots = Vec::new();

        while let Some(snapshot) = dir.next_entry().await.unwrap() {
            let snap = SnapshotFile::open(snapshot.path(), None).await.unwrap();

            snapshots.push(snap);
        }

        snapshots.sort_by(|a, b| {
            a.header()
                .start_frame_no
                .get()
                .cmp(&b.header().start_frame_no.get())
        });

        let db = Database::open_with_local_sync_remote_writes_connector(
            tmp_host_path.join("embedded").to_str().unwrap(),
            "http://foo.primary:8080".to_string(),
            "".to_string(),
            TurmoilConnector,
            None,
        )
        .await?;

        for snapshot in snapshots {
            println!("snapshots: {:?}", snapshot.header().end_frame_no.get());
            db.sync_frames(Frames::Snapshot(snapshot)).await.unwrap();
        }

        let conn = db.connect()?;

        let row = conn
            .query("select count(*) from test", ())
            .await
            .unwrap()
            .next()
            .await
            .unwrap()
            .unwrap();
        let count = row.get::<u64>(0).unwrap();

        assert_eq!(count, 233);

        tracing::info!("executing write delegated inserts");

        // Attempt to write and ensure it writes only to the primary
        for _ in 0..300 {
            conn.execute("insert into test values (randomblob(4092))", ())
                .await
                .unwrap();
        }

        // Verify no new writes were done locally
        let row = conn
            .query("select count(*) from test", ())
            .await
            .unwrap()
            .next()
            .await
            .unwrap()
            .unwrap();
        let count = row.get::<u64>(0).unwrap();
        assert_eq!(count, 233);

        let snapshots_path = tmp_host_path.join("dbs").join("foo").join("snapshots");

        let mut dir = tokio::fs::read_dir(snapshots_path).await.unwrap();

        let mut snapshots = Vec::new();

        while let Some(snapshot) = dir.next_entry().await.unwrap() {
            let snap = SnapshotFile::open(snapshot.path(), None).await.unwrap();

            snapshots.push(snap);
        }

        snapshots.sort_by(|a, b| {
            a.header()
                .start_frame_no
                .get()
                .cmp(&b.header().start_frame_no.get())
        });

        for snapshot in snapshots.into_iter() {
            println!("snapshots: {:?}", snapshot.header().end_frame_no.get());
            db.sync_frames(Frames::Snapshot(snapshot)).await.unwrap();
        }

        let conn = db.connect()?;

        let row = conn
            .query("select count(*) from test", ())
            .await
            .unwrap()
            .next()
            .await
            .unwrap()
            .unwrap();
        let count = row.get::<u64>(0).unwrap();

        assert_eq!(count, 467);

        Ok(())
    });

    sim.run().unwrap();
}