Skip to content

Commit

Permalink
[proxy]: Use TLS for cancellation queries (#10152)
Browse files Browse the repository at this point in the history
## Problem
pg_sni_router assumes that all the streams are upgradable to TLS.
Cancellation requests were declined because of using NoTls config.

## Summary of changes
Provide TLS client config for cancellation requests.

Fixes
[#21789](https://github.com/orgs/neondatabase/projects/65/views/1?pane=issue&itemId=90911361&issue=neondatabase%7Ccloud%7C21789)
  • Loading branch information
awarus authored Dec 17, 2024
1 parent 7dddbb9 commit 93e9583
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 6 deletions.
2 changes: 1 addition & 1 deletion proxy/src/bin/pg_sni_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(

let (raw, read_buf) = stream.into_inner();
// TODO: Normally, client doesn't send any data before
// server says TLS handshake is ok and read_buf is empy.
// server says TLS handshake is ok and read_buf is empty.
// However, you could imagine pipelining of postgres
// SSLRequest + TLS ClientHello in one hunk similar to
// pipelining in our node js driver. We should probably
Expand Down
61 changes: 58 additions & 3 deletions proxy/src/cancellation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use std::sync::Arc;

use dashmap::DashMap;
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use postgres_client::{CancelToken, NoTls};
use once_cell::sync::OnceCell;
use postgres_client::{tls::MakeTlsConnect, CancelToken};
use pq_proto::CancelKeyData;
use rustls::crypto::ring;
use thiserror::Error;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
Expand All @@ -20,6 +22,9 @@ use crate::redis::cancellation_publisher::{
CancellationPublisher, CancellationPublisherMut, RedisPublisherClient,
};

use crate::compute::{load_certs, AcceptEverythingVerifier};
use crate::postgres_rustls::MakeRustlsConnect;

pub type CancelMap = Arc<DashMap<CancelKeyData, Option<CancelClosure>>>;
pub type CancellationHandlerMain = CancellationHandler<Option<Arc<Mutex<RedisPublisherClient>>>>;
pub(crate) type CancellationHandlerMainInternal = Option<Arc<Mutex<RedisPublisherClient>>>;
Expand Down Expand Up @@ -174,7 +179,10 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
source: self.from,
kind: crate::metrics::CancellationOutcome::Found,
});
info!("cancelling query per user's request using key {key}");
info!(
"cancelling query per user's request using key {key}, hostname {}, address: {}",
cancel_closure.hostname, cancel_closure.socket_addr
);
cancel_closure.try_cancel_query().await
}

Expand Down Expand Up @@ -221,6 +229,8 @@ impl<P: CancellationPublisherMut> CancellationHandler<Option<Arc<Mutex<P>>>> {
}
}

static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();

/// This should've been a [`std::future::Future`], but
/// it's impossible to name a type of an unboxed future
/// (we'd need something like `#![feature(type_alias_impl_trait)]`).
Expand All @@ -229,24 +239,69 @@ pub struct CancelClosure {
socket_addr: SocketAddr,
cancel_token: CancelToken,
ip_allowlist: Vec<IpPattern>,
hostname: String, // for pg_sni router
allow_self_signed_compute: bool,
}

impl CancelClosure {
pub(crate) fn new(
socket_addr: SocketAddr,
cancel_token: CancelToken,
ip_allowlist: Vec<IpPattern>,
hostname: String,
allow_self_signed_compute: bool,
) -> Self {
Self {
socket_addr,
cancel_token,
ip_allowlist,
hostname,
allow_self_signed_compute,
}
}
/// Cancels the query running on user's compute node.
pub(crate) async fn try_cancel_query(self) -> Result<(), CancelError> {
let socket = TcpStream::connect(self.socket_addr).await?;
self.cancel_token.cancel_query_raw(socket, NoTls).await?;

let client_config = if self.allow_self_signed_compute {
// Allow all certificates for creating the connection. Used only for tests
let verifier = Arc::new(AcceptEverythingVerifier);
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_safe_default_protocol_versions()
.expect("ring should support the default protocol versions")
.dangerous()
.with_custom_certificate_verifier(verifier)
} else {
let root_store = TLS_ROOTS
.get_or_try_init(load_certs)
.map_err(|_e| {
CancelError::IO(std::io::Error::new(
std::io::ErrorKind::Other,
"TLS root store initialization failed".to_string(),
))
})?
.clone();
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_safe_default_protocol_versions()
.expect("ring should support the default protocol versions")
.with_root_certificates(root_store)
};

let client_config = client_config.with_no_client_auth();

let mut mk_tls = crate::postgres_rustls::MakeRustlsConnect::new(client_config);
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
&mut mk_tls,
&self.hostname,
)
.map_err(|e| {
CancelError::IO(std::io::Error::new(
std::io::ErrorKind::Other,
e.to_string(),
))
})?;

self.cancel_token.cancel_query_raw(socket, tls).await?;
debug!("query was cancelled");
Ok(())
}
Expand Down
6 changes: 4 additions & 2 deletions proxy/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ impl ConnCfg {
secret_key,
},
vec![],
host.to_string(),
allow_self_signed_compute,
);

let connection = PostgresConnection {
Expand Down Expand Up @@ -350,7 +352,7 @@ fn filtered_options(options: &str) -> Option<String> {
Some(options)
}

fn load_certs() -> Result<Arc<rustls::RootCertStore>, Vec<rustls_native_certs::Error>> {
pub(crate) fn load_certs() -> Result<Arc<rustls::RootCertStore>, Vec<rustls_native_certs::Error>> {
let der_certs = rustls_native_certs::load_native_certs();

if !der_certs.errors.is_empty() {
Expand All @@ -364,7 +366,7 @@ fn load_certs() -> Result<Arc<rustls::RootCertStore>, Vec<rustls_native_certs::E
static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();

#[derive(Debug)]
struct AcceptEverythingVerifier;
pub(crate) struct AcceptEverythingVerifier;
impl ServerCertVerifier for AcceptEverythingVerifier {
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
use rustls::SignatureScheme;
Expand Down

1 comment on commit 93e9583

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7245 tests run: 6936 passed, 1 failed, 308 skipped (full report)


Failures on Postgres 16

  • test_storage_controller_many_tenants[github-actions-selfhosted]: release-x86-64
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_storage_controller_many_tenants[release-pg16-github-actions-selfhosted]"
Flaky tests (2)

Postgres 17

Postgres 16

  • test_pgdata_import_smoke[None-1024-RelBlockSize.MULTIPLE_RELATION_SEGMENTS]: release-arm64

Code coverage* (full report)

  • functions: 31.3% (8395 of 26811 functions)
  • lines: 48.0% (66613 of 138869 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
93e9583 at 2024-12-17T22:44:07.051Z :recycle:

Please sign in to comment.