use std::sync::Arc;
use std::time::Duration;

use futures::FutureExt;
use libsql::Database;
use libsql_server::config::{AdminApiConfig, RpcClientConfig, RpcServerConfig, UserApiConfig};
use tempfile::tempdir;
use tokio::sync::Notify;
use turmoil::Builder;

use crate::common::{
    http::Client,
    net::{init_tracing, SimServer, TestServer, TurmoilAcceptor, TurmoilConnector},
};

/// In this test, we create a primary and a replica, add some data and sync them. when then shut
/// down and bring back up the replica, and ensure the the replica continue normal mode of
/// operation.
#[test]
fn replica_restart() {
    let mut sim = Builder::new()
        .simulation_duration(Duration::from_secs(1000))
        .build();
    let tmp = tempdir().unwrap();
    sim.host("primary", move || {
        let path = tmp.path().to_path_buf();
        async move {
            let server = TestServer {
                path: path.into(),
                user_api_config: UserApiConfig {
                    ..Default::default()
                },
                admin_api_config: Some(AdminApiConfig {
                    acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?,
                    connector: TurmoilConnector,
                    disable_metrics: true,
                }),
                rpc_server_config: Some(RpcServerConfig {
                    acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?,
                    tls_config: None,
                }),
                ..Default::default()
            };

            server.start_sim(8080).await?;

            Ok(())
        }
    });

    let notify = Arc::new(Notify::new());
    let tmp = tempdir().unwrap();
    let notify_clone = notify.clone();
    sim.host("replica", move || {
        let path = tmp.path().to_path_buf();
        let notify = notify_clone.clone();
        async move {
            let make_server = || {
                let path = path.clone();
                async {
                    TestServer {
                        path: path.into(),
                        user_api_config: UserApiConfig {
                            ..Default::default()
                        },
                        admin_api_config: Some(AdminApiConfig {
                            acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
                            connector: TurmoilConnector,
                            disable_metrics: true,
                        }),
                        rpc_client_config: Some(RpcClientConfig {
                            remote_url: "http://primary:4567".into(),
                            connector: TurmoilConnector,
                            tls_config: None,
                        }),
                        ..Default::default()
                    }
                }
            };

            let server = make_server().await;

            tokio::select! {
                res = server.start_sim(8080) => {
                    res.unwrap()
                }
                _ = notify.notified() => (),
            }

            let server = make_server().await;
            server.start_sim(8080).await.unwrap();

            Ok(())
        }
    });

    sim.client("client", async move {
        let http = Client::new();
        let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?;
        let conn = db.connect()?;

        // insert a few valued into the primary
        conn.execute("create table test (x)", ()).await.unwrap();
        for _ in 0..50 {
            conn.execute("insert into test values (42)", ())
                .await
                .unwrap();
        }

        let primary_index = http
            .get("http://primary:9090/v1/namespaces/default/stats")
            .await
            .unwrap()
            .json_value()
            .await
            .unwrap()["replication_index"]
            .clone()
            .as_i64();

        loop {
            let replica_index = http
                .get("http://primary:9090/v1/namespaces/default/stats")
                .await
                .unwrap()
                .json_value()
                .await
                .unwrap()["replication_index"]
                .clone()
                .as_i64();
            if primary_index == replica_index {
                break;
            }
            tokio::time::sleep(Duration::from_millis(50)).await;
        }

        notify.notify_waiters();

        // make sure that replica is up to date
        loop {
            let replica_index = http
                .get("http://primary:9090/v1/namespaces/default/stats")
                .await
                .unwrap()
                .json_value()
                .await
                .unwrap()["replication_index"]
                .clone()
                .as_i64();
            if primary_index == replica_index {
                break;
            }
            tokio::time::sleep(Duration::from_millis(50)).await;
        }

        Ok(())
    });

    sim.run().unwrap();
}

/// In this test, we start a primary and a replica. We add some entries to the primary, and wait
/// for the replica to be up to date. Then we stop the primary, remove it's wallog, and restart the
/// primary. This will force the primary to regenerate the log. The replica should catch that, and
/// self heal. During this process the replica is not shutdown.
#[test]
fn primary_regenerate_log_no_replica_restart() {
    let mut sim = Builder::new()
        .simulation_duration(Duration::from_secs(1000))
        .build();
    let tmp = tempdir().unwrap();

    let notify = Arc::new(Notify::new());
    let notify_clone = notify.clone();

    init_tracing();
    sim.host("primary", move || {
        let notify = notify_clone.clone();
        let path = tmp.path().to_path_buf();
        async move {
            let make_server = || async {
                TestServer {
                    path: path.clone().into(),
                    user_api_config: UserApiConfig {
                        ..Default::default()
                    },
                    admin_api_config: Some(AdminApiConfig {
                        acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
                        connector: TurmoilConnector,
                        disable_metrics: true,
                    }),
                    rpc_server_config: Some(RpcServerConfig {
                        acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await.unwrap(),
                        tls_config: None,
                    }),
                    ..Default::default()
                }
            };
            let server = make_server().await;
            let shutdown = server.shutdown.clone();

            let fut = async move { server.start_sim(8080).await };

            tokio::pin!(fut);

            loop {
                tokio::select! {
                    res =  &mut fut => {
                        res.unwrap();
                        break
                    }
                    _ = notify.notified() => {
                        shutdown.notify_waiters();
                    },
                }
            }
            // remove the wallog and start again
            tokio::fs::remove_file(path.join("dbs/default/wallog"))
                .await
                .unwrap();
            notify.notify_waiters();
            let server = make_server().await;
            server.start_sim(8080).await.unwrap();

            Ok(())
        }
    });

    let tmp = tempdir().unwrap();
    sim.host("replica", move || {
        let path = tmp.path().to_path_buf();
        async move {
            let make_server = || {
                let path = path.clone();
                async {
                    TestServer {
                        path: path.into(),
                        user_api_config: UserApiConfig {
                            ..Default::default()
                        },
                        admin_api_config: Some(AdminApiConfig {
                            acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
                            connector: TurmoilConnector,
                            disable_metrics: true,
                        }),
                        rpc_client_config: Some(RpcClientConfig {
                            remote_url: "http://primary:4567".into(),
                            connector: TurmoilConnector,
                            tls_config: None,
                        }),
                        ..Default::default()
                    }
                }
            };

            let server = make_server().await;
            server.start_sim(8080).await.unwrap();

            Ok(())
        }
    });

    sim.client("client", async move {
        let http = Client::new();
        let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?;
        let conn = db.connect()?;

        // insert a few valued into the primary
        conn.execute("create table test (x)", ()).await.unwrap();
        for _ in 0..50 {
            conn.execute("insert into test values (42)", ())
                .await
                .unwrap();
        }

        let primary_index = http
            .get("http://primary:9090/v1/namespaces/default/stats")
            .await
            .unwrap()
            .json_value()
            .await
            .unwrap()["replication_index"]
            .clone()
            .as_i64();

        loop {
            let replica_index = http
                .get("http://primary:9090/v1/namespaces/default/stats")
                .await
                .unwrap()
                .json_value()
                .await
                .unwrap()["replication_index"]
                .clone()
                .as_i64();
            if primary_index == replica_index {
                break;
            }
            tokio::time::sleep(Duration::from_millis(50)).await;
        }

        notify.notify_waiters();
        notify.notified().await;

        drop(http);
        let http = Client::new();
        // make sure that replica is up to date
        let new_primary_index = http
            .get("http://primary:9090/v1/namespaces/default/stats")
            .await
            .unwrap()
            .json_value()
            .await
            .unwrap()["replication_index"]
            .clone()
            .as_i64();
        assert_ne!(primary_index, new_primary_index);
        loop {
            let replica_index = http
                .get("http://primary:9090/v1/namespaces/default/stats")
                .await
                .unwrap()
                .json_value()
                .await
                .unwrap()["replication_index"]
                .clone()
                .as_i64();
            if new_primary_index == replica_index {
                break;
            }
            tokio::time::sleep(Duration::from_millis(50)).await;
        }

        Ok(())
    });

    sim.run().unwrap();
}

/// This test is very similar to `primary_regenerate_log_no_replica_restart`. The only difference
/// is that the replica is being shutdown before the primary regenerates their log. When the
/// replica is brought back up, it will try to load the namespace from a primary with a new log,
/// and it should self heal.
#[test]
fn primary_regenerate_log_with_replica_restart() {
    let mut sim = Builder::new()
        .simulation_duration(Duration::from_secs(1000))
        .build();
    let tmp = tempdir().unwrap();

    let notify = Arc::new(Notify::new());
    let notify_clone = notify.clone();

    init_tracing();
    sim.host("primary", move || {
        let notify = notify_clone.clone();
        let path = tmp.path().to_path_buf();
        async move {
            let make_server = || async {
                TestServer {
                    path: path.clone().into(),
                    user_api_config: UserApiConfig {
                        ..Default::default()
                    },
                    admin_api_config: Some(AdminApiConfig {
                        acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
                        connector: TurmoilConnector,
                        disable_metrics: true,
                    }),
                    rpc_server_config: Some(RpcServerConfig {
                        acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await.unwrap(),
                        tls_config: None,
                    }),
                    ..Default::default()
                }
            };
            let server = make_server().await;
            let shutdown = server.shutdown.clone();

            let fut = async move { server.start_sim(8080).await };

            tokio::pin!(fut);

            loop {
                tokio::select! {
                    res =  &mut fut => {
                        res.unwrap();
                        break
                    }
                    _ = notify.notified() => {
                        shutdown.notify_waiters();
                    },
                }
            }
            // remove the wallog and start again
            tokio::fs::remove_file(path.join("dbs/default/wallog"))
                .await
                .unwrap();
            notify.notify_waiters();
            let server = make_server().await;
            server.start_sim(8080).await.unwrap();

            Ok(())
        }
    });

    let tmp = tempdir().unwrap();
    let notify_clone = notify.clone();
    sim.host("replica", move || {
        let path = tmp.path().to_path_buf();
        let notify = notify_clone.clone();
        async move {
            let make_server = || {
                let path = path.clone();
                async {
                    TestServer {
                        path: path.into(),
                        user_api_config: UserApiConfig {
                            ..Default::default()
                        },
                        admin_api_config: Some(AdminApiConfig {
                            acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
                            connector: TurmoilConnector,
                            disable_metrics: true,
                        }),
                        rpc_client_config: Some(RpcClientConfig {
                            remote_url: "http://primary:4567".into(),
                            connector: TurmoilConnector,
                            tls_config: None,
                        }),
                        ..Default::default()
                    }
                }
            };

            let server = make_server().await;
            let shutdown = server.shutdown.clone();
            let fut = async {
                server.start_sim(8080).await.unwrap();
            };

            tokio::pin!(fut);
            let notify_fut = async {
                notify.notified().await;
            }
            .fuse();
            tokio::pin!(notify_fut);
            loop {
                tokio::select! {
                    _ = &mut fut => break,
                    _ = &mut notify_fut => {
                        shutdown.notify_waiters();
                    }
                }
            }

            // we wait for the server to have restarted
            notify.notified().await;

            // and then restart the replica
            let server = make_server().await;
            server.start_sim(8080).await.unwrap();

            Ok(())
        }
    });

    sim.client("client", async move {
        let http = Client::new();
        let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?;
        let conn = db.connect()?;

        // insert a few valued into the primary
        conn.execute("create table test (x)", ()).await.unwrap();
        for _ in 0..50 {
            conn.execute("insert into test values (42)", ())
                .await
                .unwrap();
        }

        let primary_index = http
            .get("http://primary:9090/v1/namespaces/default/stats")
            .await
            .unwrap()
            .json_value()
            .await
            .unwrap()["replication_index"]
            .clone()
            .as_i64();

        loop {
            let replica_index = http
                .get("http://primary:9090/v1/namespaces/default/stats")
                .await
                .unwrap()
                .json_value()
                .await
                .unwrap()["replication_index"]
                .clone()
                .as_i64();
            if primary_index == replica_index {
                break;
            }
            tokio::time::sleep(Duration::from_millis(50)).await;
        }

        notify.notify_waiters();
        notify.notified().await;

        drop(http);
        let http = Client::new();
        // make sure that replica is up to date
        let new_primary_index = http
            .get("http://primary:9090/v1/namespaces/default/stats")
            .await
            .unwrap()
            .json_value()
            .await
            .unwrap()["replication_index"]
            .clone()
            .as_i64();
        assert_ne!(primary_index, new_primary_index);
        loop {
            let replica_index = http
                .get("http://primary:9090/v1/namespaces/default/stats")
                .await
                .unwrap()
                .json_value()
                .await
                .unwrap()["replication_index"]
                .clone()
                .as_i64();
            if new_primary_index == replica_index {
                break;
            }
            tokio::time::sleep(Duration::from_millis(50)).await;
        }

        Ok(())
    });

    sim.run().unwrap();
}