0
0
mirror of https://github.com/tursodatabase/libsql.git synced 2025-01-22 12:38:56 +00:00
Lucio Franco 5b1e1c245b Add client version reporting metrics
This commit adds a new `x-libsql-client-version` header emitted by
clients. The server will collect these values and emit them as a
`libsql_client_version{version="libsql-hrana-0.1.11"}`. This also
exposes special doc hidden functions that our other clients that use the
rust one to emit their own metric.

Closes #546
2023-11-07 19:39:01 -05:00

198 lines
7.3 KiB
Rust

//! Module that provides `h2c` server adapters.
//!
//! # What is `h2c`?
//!
//! `h2c` is a http1.1 upgrade token that allows us to accept http2 without
//! going through tls/alpn while also accepting regular http1.1 requests. Since,
//! our server does not do TLS there is no way to negotiate that an incoming
//! connection is going to speak http2 or http1.1 so we must default to http1.1.
//!
//! # How does it work?
//!
//! The `H2c` service gets called on every http request that arrives to the
//! server and checks if the request has an `upgrade` header set. If this
//! header is set to `h2c` then it will start the upgrade process. If this
//! header is not set the request continues normally without any upgrades.
//!
//! The upgrade process is quite simple, if the correct header value is set
//! the server will spawn a background task, return status code `101`
//! (switching protocols) and will set the same upgrade header with `h2c` as
//! the value.
//!
//! The background task will wait for `hyper::upgrade::on` to complete. At this
//! point when `on` completes it returns an `IO` object that we can read/write from.
//! We then pass this into hyper's low level server connection type and force http2.
//! This means from the point that the client gets back the upgrade headers and correct
//! status code the connection will be immediealty speaking http2 and thus the upgrade
//! is complete.
//!
//! ┌───────────────┐ upgrade:h2c ┌──────────────────┐
//! │ http::request ├────────────────────────►│ upgrade to http2 │
//! └─────┬─────────┘ └────────┬─────────┘
//! │ │
//! │ │
//! │ │
//! │ │
//! │ │
//! │ ┌─────────────────┐ │
//! └────────────►│call axum router │◄───────────┘
//! └─────────────────┘
use std::marker::PhantomData;
use std::pin::Pin;
use axum::{body::BoxBody, http::HeaderValue};
use bytes::Bytes;
use hyper::header;
use hyper::Body;
use hyper::{Request, Response};
use tonic::transport::server::TcpConnectInfo;
use tower::Service;
type BoxError = Box<dyn std::error::Error + Send + Sync>;
/// A `MakeService` adapter for [`H2c`] that injects connection
/// info into the request extensions.
#[derive(Debug, Clone)]
pub struct H2cMaker<S, B> {
s: S,
_pd: PhantomData<fn(B)>,
}
impl<S, B> H2cMaker<S, B> {
pub fn new(s: S) -> Self {
Self {
s,
_pd: PhantomData,
}
}
}
impl<S, C, B> Service<&C> for H2cMaker<S, B>
where
S: Service<Request<Body>, Response = Response<B>> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Sync + Send + 'static,
S::Response: Send + 'static,
C: crate::net::Conn,
B: http_body::Body<Data = Bytes> + Send + 'static,
B::Error: Into<BoxError> + Sync + Send + 'static,
{
type Response = H2c<S, B>;
type Error = hyper::Error;
type Future =
Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(
&mut self,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, conn: &C) -> Self::Future {
let connect_info = conn.connect_info();
let s = self.s.clone();
Box::pin(async move {
Ok(H2c {
s,
connect_info,
_pd: PhantomData,
})
})
}
}
/// A service that can perform `h2c` upgrades and will
/// delegate calls to the inner service once a protocol
/// has been selected.
#[derive(Debug, Clone)]
pub struct H2c<S, B> {
s: S,
connect_info: TcpConnectInfo,
_pd: PhantomData<fn(B)>,
}
impl<S, B> Service<Request<Body>> for H2c<S, B>
where
S: Service<Request<Body>, Response = Response<B>> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Sync + Send + 'static,
S::Response: Send + 'static,
B: http_body::Body<Data = Bytes> + Send + 'static,
B::Error: Into<BoxError> + Sync + Send + 'static,
{
type Response = hyper::Response<BoxBody>;
type Error = BoxError;
type Future =
Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(
&mut self,
_: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, mut req: hyper::Request<Body>) -> Self::Future {
let mut svc = self.s.clone();
let connect_info = self.connect_info.clone();
Box::pin(async move {
req.extensions_mut().insert(connect_info.clone());
// Check if this request is a `h2c` upgrade, if it is not pass
// the request to the inner service, which in our case is the
// axum router.
if req.headers().get(header::UPGRADE) != Some(&HeaderValue::from_static("h2c")) {
return svc
.call(req)
.await
.map(|r| r.map(axum::body::boxed))
.map_err(Into::into);
}
tracing::debug!("Got a h2c upgrade request");
// We got a h2c header so lets spawn a task that will wait for the
// upgrade to complete and start a http2 connection.
tokio::spawn(async move {
let upgraded_io = match hyper::upgrade::on(&mut req).await {
Ok(io) => io,
Err(e) => {
tracing::error!("Failed to upgrade h2c connection: {}", e);
return;
}
};
tracing::debug!("Successfully upgraded the connection, speaking h2 now");
if let Err(e) = hyper::server::conn::Http::new()
.http2_only(true)
.serve_connection(
upgraded_io,
tower::service_fn(move |mut r: hyper::Request<hyper::Body>| {
r.extensions_mut().insert(connect_info.clone());
svc.call(r)
}),
)
.await
{
tracing::error!("http2 connection error: {}", e);
}
});
// Reply that we are switching protocols to h2
let body = axum::body::boxed(axum::body::Empty::new());
let mut res = hyper::Response::new(body);
*res.status_mut() = hyper::StatusCode::SWITCHING_PROTOCOLS;
res.headers_mut()
.insert(header::UPGRADE, HeaderValue::from_static("h2c"));
Ok(res)
})
}
}