Skip to content

Commit

Permalink
proxy refactor tls listener (#7056)
Browse files Browse the repository at this point in the history
## Problem

Now that we have tls-listener vendored, we can refactor and remove a lot
of bloated code and make the whole flow a bit simpler

## Summary of changes

1. Remove dead code
2. Move the error handling to inside the `TlsListener` accept() function
3. Extract the peer_addr from the PROXY protocol header and log it with
errors
  • Loading branch information
conradludgate authored Mar 12, 2024
1 parent 580e136 commit 1f7d54f
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 255 deletions.
8 changes: 4 additions & 4 deletions proxy/src/protocol2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf};
use uuid::Uuid;

use crate::{metrics::NUM_CLIENT_CONNECTION_GAUGE, serverless::tls_listener::AsyncAccept};
use crate::metrics::NUM_CLIENT_CONNECTION_GAUGE;

pub struct ProxyProtocolAccept {
pub incoming: AddrIncoming,
Expand Down Expand Up @@ -331,15 +331,15 @@ impl<T: AsyncRead> AsyncRead for WithClientIp<T> {
}
}

impl AsyncAccept for ProxyProtocolAccept {
type Connection = WithConnectionGuard<WithClientIp<AddrStream>>;
impl Accept for ProxyProtocolAccept {
type Conn = WithConnectionGuard<WithClientIp<AddrStream>>;

type Error = io::Error;

fn poll_accept(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Connection, Self::Error>>> {
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let conn = ready!(Pin::new(&mut self.incoming).poll_accept(cx)?);
tracing::info!(protocol = self.protocol, "accepted new TCP connection");
let Some(conn) = conn else {
Expand Down
30 changes: 9 additions & 21 deletions proxy/src/serverless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,19 @@ pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use tokio_util::task::TaskTracker;

use crate::context::RequestMonitoring;
use crate::metrics::TLS_HANDSHAKE_FAILURES;
use crate::protocol2::{ProxyProtocolAccept, WithClientIp, WithConnectionGuard};
use crate::rate_limiter::EndpointRateLimiter;
use crate::serverless::backend::PoolingBackend;
use crate::{cancellation::CancellationHandler, config::ProxyConfig};
use futures::StreamExt;
use hyper::{
server::{
accept,
conn::{AddrIncoming, AddrStream},
},
server::conn::{AddrIncoming, AddrStream},
Body, Method, Request, Response,
};

use std::convert::Infallible;
use std::net::IpAddr;
use std::sync::Arc;
use std::task::Poll;
use std::{future::ready, sync::Arc};
use tls_listener::TlsListener;
use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -105,19 +100,12 @@ pub async fn task_main(
let ws_connections = tokio_util::task::task_tracker::TaskTracker::new();
ws_connections.close(); // allows `ws_connections.wait to complete`

let tls_listener = TlsListener::new(tls_acceptor, addr_incoming).filter(|conn| {
if let Err(err) = conn {
error!(
protocol = "http",
"failed to accept TLS connection: {err:?}"
);
TLS_HANDSHAKE_FAILURES.inc();
ready(false)
} else {
info!(protocol = "http", "accepted new TLS connection");
ready(true)
}
});
let tls_listener = TlsListener::new(
tls_acceptor,
addr_incoming,
"http",
config.handshake_timeout,
);

let make_svc = hyper::service::make_service_fn(
|stream: &tokio_rustls::server::TlsStream<
Expand Down Expand Up @@ -174,7 +162,7 @@ pub async fn task_main(
},
);

hyper::Server::builder(accept::from_stream(tls_listener))
hyper::Server::builder(tls_listener)
.serve(make_svc)
.with_graceful_shutdown(cancellation_token.cancelled())
.await?;
Expand Down
Loading

1 comment on commit 1f7d54f

@github-actions
Copy link

@github-actions github-actions bot commented on 1f7d54f Mar 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2588 tests run: 2448 passed, 1 failed, 139 skipped (full report)


Failures on Postgres 14

  • test_bulk_insert[neon-github-actions-selfhosted]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_bulk_insert[neon-release-pg14-github-actions-selfhosted]"

Code coverage* (full report)

  • functions: 28.7% (7026 of 24442 functions)
  • lines: 47.5% (43415 of 91362 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
1f7d54f at 2024-03-12T14:23:46.416Z :recycle:

Please sign in to comment.