use std::sync::Arc;
use std::time::Duration;

use insta::assert_debug_snapshot;
use libsql::Database;
use libsql_server::config::{AdminApiConfig, DbConfig, RpcClientConfig, RpcServerConfig};
use tokio::sync::Notify;

use crate::common::{
    http::Client,
    net::{SimServer, TestServer, TurmoilAcceptor, TurmoilConnector},
};

/// In this test, we first create a primary with a very small max_log_size, and then add a good
/// amount of data to it. This will cause the primary to create a bunch of snaphots a large enough
/// to prevent the replica from applying them all at once. We then start the replica, and check
/// that it replicates correctly to the primary's replicaton index. #[test]
#[test]
fn apply_partial_snapshot() {
    let mut sim = turmoil::Builder::new()
        .tcp_capacity(4096 * 30)
        .simulation_duration(Duration::from_secs(3600))
        .build();

    let prim_tmp = tempfile::tempdir().unwrap();
    let notify = Arc::new(Notify::new());

    sim.host("primary", {
        let prim_path = prim_tmp.path().to_path_buf();
        move || {
            let prim_path = prim_path.clone();
            async move {
                let primary = TestServer {
                    path: prim_path.into(),
                    db_config: DbConfig {
                        max_log_size: 1,
                        ..Default::default()
                    },
                    admin_api_config: Some(AdminApiConfig {
                        acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
                        connector: TurmoilConnector,
                        disable_metrics: true,
                    }),
                    rpc_server_config: Some(RpcServerConfig {
                        acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 5050)).await.unwrap(),
                        tls_config: None,
                    }),
                    ..Default::default()
                };

                primary.start_sim(8080).await.unwrap();

                Ok(())
            }
        }
    });

    sim.host("replica", {
        let notify = notify.clone();
        move || {
            let notify = notify.clone();
            async move {
                let tmp = tempfile::tempdir().unwrap();
                let replica = TestServer {
                    path: tmp.path().to_path_buf().into(),
                    db_config: DbConfig {
                        max_log_size: 1,
                        ..Default::default()
                    },
                    admin_api_config: Some(AdminApiConfig {
                        acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
                        connector: TurmoilConnector,
                        disable_metrics: true,
                    }),
                    rpc_client_config: Some(RpcClientConfig {
                        remote_url: "http://primary:5050".into(),
                        tls_config: None,
                        connector: TurmoilConnector,
                    }),
                    ..Default::default()
                };

                notify.notified().await;
                replica.start_sim(8080).await.unwrap();

                Ok(())
            }
        }
    });

    sim.client("client", async move {
        let primary = libsql::Database::open_remote_with_connector(
            "http://primary:8080",
            "",
            TurmoilConnector,
        )
        .unwrap();
        let conn = primary.connect().unwrap();
        conn.execute("CREATE TABLE TEST (x)", ()).await.unwrap();
        // we need a sufficiently large snapshot for the test. Before the fix, 5000 insert would
        // trigger an infinite loop.
        for _ in 0..5000 {
            conn.execute("INSERT INTO TEST VALUES (randomblob(6000))", ())
                .await
                .unwrap();
        }

        let client = Client::new();
        let resp = client
            .get("http://primary:9090/v1/namespaces/default/stats")
            .await
            .unwrap();
        let stats = resp.json_value().await.unwrap();
        let primary_replication_index = stats["replication_index"].as_i64().unwrap();

        // primary is setup, time to start replica
        notify.notify_waiters();

        let client = Client::new();

        // wait for replica to start up
        while client.get("http://replica:8080/").await.is_err() {
            tokio::time::sleep(Duration::from_millis(100)).await;
        }

        loop {
            let resp = client
                .get("http://replica:9090/v1/namespaces/default/stats")
                .await
                .unwrap();
            let stats = resp.json_value().await.unwrap();
            let replication_index = &stats["replication_index"];
            if !replication_index.is_null()
                && replication_index.as_i64().unwrap() == primary_replication_index
            {
                break;
            }
            tokio::time::sleep(Duration::from_millis(1000)).await;
        }

        Ok(())
    });

    sim.run().unwrap();
}

#[test]
fn replica_lazy_creation() {
    let mut sim = turmoil::Builder::new()
        .simulation_duration(Duration::from_secs(1000))
        .build();

    let prim_tmp = tempfile::tempdir().unwrap();

    sim.host("primary", {
        let prim_path = prim_tmp.path().to_path_buf();
        move || {
            let prim_path = prim_path.clone();
            async move {
                let primary = TestServer {
                    path: prim_path.into(),
                    db_config: DbConfig {
                        max_log_size: 1,
                        ..Default::default()
                    },
                    admin_api_config: Some(AdminApiConfig {
                        acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
                        connector: TurmoilConnector,
                        disable_metrics: true,
                    }),
                    rpc_server_config: Some(RpcServerConfig {
                        acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 5050)).await.unwrap(),
                        tls_config: None,
                    }),
                    disable_namespaces: false,
                    disable_default_namespace: true,
                    ..Default::default()
                };

                primary.start_sim(8080).await.unwrap();

                Ok(())
            }
        }
    });

    sim.host("replica", {
        move || async move {
            let tmp = tempfile::tempdir().unwrap();
            let replica = TestServer {
                path: tmp.path().to_path_buf().into(),
                db_config: DbConfig {
                    max_log_size: 1,
                    ..Default::default()
                },
                admin_api_config: Some(AdminApiConfig {
                    acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
                    connector: TurmoilConnector,
                    disable_metrics: true,
                }),
                rpc_client_config: Some(RpcClientConfig {
                    remote_url: "http://primary:5050".into(),
                    tls_config: None,
                    connector: TurmoilConnector,
                }),
                disable_namespaces: false,
                disable_default_namespace: true,
                ..Default::default()
            };

            replica.start_sim(8080).await.unwrap();

            Ok(())
        }
    });

    sim.client("client", async move {
        let db =
            Database::open_remote_with_connector("http://test.replica:8080", "", TurmoilConnector)
                .unwrap();
        let conn = db.connect().unwrap();
        assert_debug_snapshot!(conn.execute("create table test (x)", ()).await.unwrap_err());
        let primary_http = Client::new();
        primary_http
            .post(
                "http://primary:9090/v1/namespaces/test/create",
                serde_json::json!({}),
            )
            .await
            .unwrap();

        // try again
        conn.execute("create table test (x)", ()).await.unwrap();

        Ok(())
    });

    sim.run().unwrap();
}