diff --git a/proxy/src/bin/pg_sni_router.rs b/proxy/src/bin/pg_sni_router.rs index 623a0fd3b2c9..9538384b9eca 100644 --- a/proxy/src/bin/pg_sni_router.rs +++ b/proxy/src/bin/pg_sni_router.rs @@ -229,7 +229,7 @@ async fn ssl_handshake( let (raw, read_buf) = stream.into_inner(); // TODO: Normally, client doesn't send any data before - // server says TLS handshake is ok and read_buf is empy. + // server says TLS handshake is ok and read_buf is empty. // However, you could imagine pipelining of postgres // SSLRequest + TLS ClientHello in one hunk similar to // pipelining in our node js driver. We should probably diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index dd3edd6abc8f..a58e3961da86 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -3,8 +3,10 @@ use std::sync::Arc; use dashmap::DashMap; use ipnet::{IpNet, Ipv4Net, Ipv6Net}; -use postgres_client::{CancelToken, NoTls}; +use once_cell::sync::OnceCell; +use postgres_client::{tls::MakeTlsConnect, CancelToken}; use pq_proto::CancelKeyData; +use rustls::crypto::ring; use thiserror::Error; use tokio::net::TcpStream; use tokio::sync::Mutex; @@ -20,6 +22,9 @@ use crate::redis::cancellation_publisher::{ CancellationPublisher, CancellationPublisherMut, RedisPublisherClient, }; +use crate::compute::{load_certs, AcceptEverythingVerifier}; +use crate::postgres_rustls::MakeRustlsConnect; + pub type CancelMap = Arc>>; pub type CancellationHandlerMain = CancellationHandler>>>; pub(crate) type CancellationHandlerMainInternal = Option>>; @@ -174,7 +179,10 @@ impl CancellationHandler

{ source: self.from, kind: crate::metrics::CancellationOutcome::Found, }); - info!("cancelling query per user's request using key {key}"); + info!( + "cancelling query per user's request using key {key}, hostname {}, address: {}", + cancel_closure.hostname, cancel_closure.socket_addr + ); cancel_closure.try_cancel_query().await } @@ -221,6 +229,8 @@ impl CancellationHandler>>> { } } +static TLS_ROOTS: OnceCell> = OnceCell::new(); + /// This should've been a [`std::future::Future`], but /// it's impossible to name a type of an unboxed future /// (we'd need something like `#![feature(type_alias_impl_trait)]`). @@ -229,6 +239,8 @@ pub struct CancelClosure { socket_addr: SocketAddr, cancel_token: CancelToken, ip_allowlist: Vec, + hostname: String, // for pg_sni router + allow_self_signed_compute: bool, } impl CancelClosure { @@ -236,17 +248,60 @@ impl CancelClosure { socket_addr: SocketAddr, cancel_token: CancelToken, ip_allowlist: Vec, + hostname: String, + allow_self_signed_compute: bool, ) -> Self { Self { socket_addr, cancel_token, ip_allowlist, + hostname, + allow_self_signed_compute, } } /// Cancels the query running on user's compute node. pub(crate) async fn try_cancel_query(self) -> Result<(), CancelError> { let socket = TcpStream::connect(self.socket_addr).await?; - self.cancel_token.cancel_query_raw(socket, NoTls).await?; + + let client_config = if self.allow_self_signed_compute { + // Allow all certificates for creating the connection. Used only for tests + let verifier = Arc::new(AcceptEverythingVerifier); + rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider())) + .with_safe_default_protocol_versions() + .expect("ring should support the default protocol versions") + .dangerous() + .with_custom_certificate_verifier(verifier) + } else { + let root_store = TLS_ROOTS + .get_or_try_init(load_certs) + .map_err(|_e| { + CancelError::IO(std::io::Error::new( + std::io::ErrorKind::Other, + "TLS root store initialization failed".to_string(), + )) + })? + .clone(); + rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider())) + .with_safe_default_protocol_versions() + .expect("ring should support the default protocol versions") + .with_root_certificates(root_store) + }; + + let client_config = client_config.with_no_client_auth(); + + let mut mk_tls = crate::postgres_rustls::MakeRustlsConnect::new(client_config); + let tls = >::make_tls_connect( + &mut mk_tls, + &self.hostname, + ) + .map_err(|e| { + CancelError::IO(std::io::Error::new( + std::io::ErrorKind::Other, + e.to_string(), + )) + })?; + + self.cancel_token.cancel_query_raw(socket, tls).await?; debug!("query was cancelled"); Ok(()) } diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index 4113b5bb80e3..42df5ff5e3e5 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -319,6 +319,8 @@ impl ConnCfg { secret_key, }, vec![], + host.to_string(), + allow_self_signed_compute, ); let connection = PostgresConnection { @@ -350,7 +352,7 @@ fn filtered_options(options: &str) -> Option { Some(options) } -fn load_certs() -> Result, Vec> { +pub(crate) fn load_certs() -> Result, Vec> { let der_certs = rustls_native_certs::load_native_certs(); if !der_certs.errors.is_empty() { @@ -364,7 +366,7 @@ fn load_certs() -> Result, Vec> = OnceCell::new(); #[derive(Debug)] -struct AcceptEverythingVerifier; +pub(crate) struct AcceptEverythingVerifier; impl ServerCertVerifier for AcceptEverythingVerifier { fn supported_verify_schemes(&self) -> Vec { use rustls::SignatureScheme;