Skip to content

Commit

Permalink
proxy: Exclude private ip errors from recorded metrics (#7389)
Browse files Browse the repository at this point in the history
## Problem

Right now we record errors from internal VPC.

## Summary of changes

* Exclude it from the metrics.
* Simplify pg-sni-router
  • Loading branch information
khanova authored Apr 15, 2024
1 parent f752c40 commit 110282e
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 24 deletions.
27 changes: 13 additions & 14 deletions proxy/src/bin/pg_sni_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ use futures::future::Either;
use itertools::Itertools;
use proxy::config::TlsServerEndPoint;
use proxy::context::RequestMonitoring;
use proxy::proxy::run_until_cancelled;
use proxy::{BranchId, EndpointId, ProjectId};
use proxy::proxy::{copy_bidirectional_client_compute, run_until_cancelled};
use rustls::pki_types::PrivateKeyDer;
use tokio::net::TcpListener;

use anyhow::{anyhow, bail, ensure, Context};
use clap::Arg;
use futures::TryFutureExt;
use proxy::console::messages::MetricsAuxInfo;
use proxy::stream::{PqStream, Stream};

use tokio::io::{AsyncRead, AsyncWrite};
Expand Down Expand Up @@ -204,6 +202,7 @@ async fn task_main(
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";

async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
ctx: &mut RequestMonitoring,
raw_stream: S,
tls_config: Arc<rustls::ServerConfig>,
tls_server_end_point: TlsServerEndPoint,
Expand Down Expand Up @@ -233,7 +232,10 @@ async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
}

Ok(Stream::Tls {
tls: Box::new(raw.upgrade(tls_config).await?),
tls: Box::new(
raw.upgrade(tls_config, !ctx.has_private_peer_addr())
.await?,
),
tls_server_end_point,
})
}
Expand All @@ -256,7 +258,7 @@ async fn handle_client(
tls_server_end_point: TlsServerEndPoint,
stream: impl AsyncRead + AsyncWrite + Unpin,
) -> anyhow::Result<()> {
let tls_stream = ssl_handshake(stream, tls_config, tls_server_end_point).await?;
let mut tls_stream = ssl_handshake(&mut ctx, stream, tls_config, tls_server_end_point).await?;

// Cut off first part of the SNI domain
// We receive required destination details in the format of
Expand All @@ -273,18 +275,15 @@ async fn handle_client(

info!("destination: {}", destination);

let client = tokio::net::TcpStream::connect(destination).await?;

let metrics_aux: MetricsAuxInfo = MetricsAuxInfo {
endpoint_id: (&EndpointId::from("")).into(),
project_id: (&ProjectId::from("")).into(),
branch_id: (&BranchId::from("")).into(),
cold_start_info: proxy::console::messages::ColdStartInfo::Unknown,
};
let mut client = tokio::net::TcpStream::connect(destination).await?;

// doesn't yet matter as pg-sni-router doesn't report analytics logs
ctx.set_success();
ctx.log();

proxy::proxy::passthrough::proxy_pass(tls_stream, client, metrics_aux).await
// Starting from here we only proxy the client's traffic.
info!("performing the proxy pass...");
let _ = copy_bidirectional_client_compute(&mut tls_stream, &mut client).await?;

Ok(())
}
12 changes: 11 additions & 1 deletion proxy/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,18 @@ impl RequestMonitoring {
self.auth_method = Some(auth_method);
}

pub fn has_private_peer_addr(&self) -> bool {
match self.peer_addr {
IpAddr::V4(ip) => ip.is_private(),
_ => false,
}
}

pub fn set_error_kind(&mut self, kind: ErrorKind) {
Metrics::get().proxy.errors_total.inc(kind);
// Do not record errors from the private address to metrics.
if !self.has_private_peer_addr() {
Metrics::get().proxy.errors_total.inc(kind);
}
if let Some(ep) = &self.endpoint_id {
let metric = &Metrics::get().proxy.endpoints_affected_by_errors;
let label = metric.with_labels(kind);
Expand Down
4 changes: 3 additions & 1 deletion proxy/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod handshake;
pub mod passthrough;
pub mod retry;
pub mod wake_compute;
pub use copy_bidirectional::copy_bidirectional_client_compute;

use crate::{
auth,
Expand Down Expand Up @@ -256,8 +257,9 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(

let tls = config.tls_config.as_ref();

let record_handshake_error = !ctx.has_private_peer_addr();
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Client);
let do_handshake = handshake(stream, mode.handshake_tls(tls));
let do_handshake = handshake(stream, mode.handshake_tls(tls), record_handshake_error);
let (mut stream, params) =
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
HandshakeData::Startup(stream, params) => (stream, params),
Expand Down
2 changes: 1 addition & 1 deletion proxy/src/proxy/copy_bidirectional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ where
}

#[tracing::instrument(skip_all)]
pub(super) async fn copy_bidirectional_client_compute<Client, Compute>(
pub async fn copy_bidirectional_client_compute<Client, Compute>(
client: &mut Client,
compute: &mut Compute,
) -> Result<(u64, u64), std::io::Error>
Expand Down
5 changes: 4 additions & 1 deletion proxy/src/proxy/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub enum HandshakeData<S> {
pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
stream: S,
mut tls: Option<&TlsConfig>,
record_handshake_error: bool,
) -> Result<HandshakeData<S>, HandshakeError> {
// Client may try upgrading to each protocol only once
let (mut tried_ssl, mut tried_gss) = (false, false);
Expand Down Expand Up @@ -95,7 +96,9 @@ pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
if !read_buf.is_empty() {
return Err(HandshakeError::EarlyData);
}
let tls_stream = raw.upgrade(tls.to_server_config()).await?;
let tls_stream = raw
.upgrade(tls.to_server_config(), record_handshake_error)
.await?;

let (_, tls_server_end_point) = tls
.cert_resolver
Expand Down
2 changes: 1 addition & 1 deletion proxy/src/proxy/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ async fn dummy_proxy(
auth: impl TestAuth + Send,
) -> anyhow::Result<()> {
let client = WithClientIp::new(client);
let mut stream = match handshake(client, tls.as_ref()).await? {
let mut stream = match handshake(client, tls.as_ref(), false).await? {
HandshakeData::Startup(stream, _) => stream,
HandshakeData::Cancel(_) => bail!("cancellation not supported"),
};
Expand Down
5 changes: 4 additions & 1 deletion proxy/src/proxy/tests/mitm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ async fn proxy_mitm(
tokio::spawn(async move {
// begin handshake with end_server
let end_server = connect_tls(server2, client_config2.make_tls_connect().unwrap()).await;
let (end_client, startup) = match handshake(client1, Some(&server_config1)).await.unwrap() {
let (end_client, startup) = match handshake(client1, Some(&server_config1), false)
.await
.unwrap()
{
HandshakeData::Startup(stream, params) => (stream, params),
HandshakeData::Cancel(_) => panic!("cancellation not supported"),
};
Expand Down
12 changes: 10 additions & 2 deletions proxy/src/serverless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ async fn connection_handler(
};

let peer_addr = peer.unwrap_or(peer_addr).ip();
let has_private_peer_addr = match peer_addr {
IpAddr::V4(ip) => ip.is_private(),
_ => false,
};
info!(?session_id, %peer_addr, "accepted new TCP connection");

// try upgrade to TLS, but with a timeout.
Expand All @@ -182,13 +186,17 @@ async fn connection_handler(
}
// The handshake failed
Ok(Err(e)) => {
Metrics::get().proxy.tls_handshake_failures.inc();
if !has_private_peer_addr {
Metrics::get().proxy.tls_handshake_failures.inc();
}
warn!(?session_id, %peer_addr, "failed to accept TLS connection: {e:?}");
return;
}
// The handshake timed out
Err(e) => {
Metrics::get().proxy.tls_handshake_failures.inc();
if !has_private_peer_addr {
Metrics::get().proxy.tls_handshake_failures.inc();
}
warn!(?session_id, %peer_addr, "failed to accept TLS connection: {e:?}");
return;
}
Expand Down
12 changes: 10 additions & 2 deletions proxy/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,20 @@ pub enum StreamUpgradeError {

impl<S: AsyncRead + AsyncWrite + Unpin> Stream<S> {
/// If possible, upgrade raw stream into a secure TLS-based stream.
pub async fn upgrade(self, cfg: Arc<ServerConfig>) -> Result<TlsStream<S>, StreamUpgradeError> {
pub async fn upgrade(
self,
cfg: Arc<ServerConfig>,
record_handshake_error: bool,
) -> Result<TlsStream<S>, StreamUpgradeError> {
match self {
Stream::Raw { raw } => Ok(tokio_rustls::TlsAcceptor::from(cfg)
.accept(raw)
.await
.inspect_err(|_| Metrics::get().proxy.tls_handshake_failures.inc())?),
.inspect_err(|_| {
if record_handshake_error {
Metrics::get().proxy.tls_handshake_failures.inc()
}
})?),
Stream::Tls { .. } => Err(StreamUpgradeError::AlreadyTls),
}
}
Expand Down

1 comment on commit 110282e

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2828 tests run: 2698 passed, 0 failed, 130 skipped (full report)


Code coverage* (full report)

  • functions: 28.0% (6429 of 22940 functions)
  • lines: 46.6% (45029 of 96589 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
110282e at 2024-04-15T19:14:26.617Z :recycle:

Please sign in to comment.