Skip to content

Commit

Permalink
proxy: add more logging
Browse files Browse the repository at this point in the history
  • Loading branch information
conradludgate committed Feb 12, 2024
1 parent e5daf36 commit e9870c5
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 27 deletions.
5 changes: 3 additions & 2 deletions proxy/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ impl LatencyTimer {

pub fn success(&mut self) {
// stop the stopwatch and record the time that we have accumulated
let start = self.start.take().expect("latency timer should be started");
self.accumulated += start.elapsed();
if let Some(start) = self.start.take() {
self.accumulated += start.elapsed();
}

// success
self.outcome = "success";
Expand Down
8 changes: 4 additions & 4 deletions proxy/src/serverless/backend.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{sync::Arc, time::Duration};

use async_trait::async_trait;
use tracing::info;
use tracing::{field::display, info};

use crate::{
auth::{backend::ComputeCredentialKeys, check_peer_addr_is_in_list, AuthError},
Expand All @@ -15,7 +15,7 @@ use crate::{
proxy::connect_compute::ConnectMechanism,
};

use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool, APP_NAME};
use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool};

pub struct PoolingBackend {
pub pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
Expand Down Expand Up @@ -81,8 +81,8 @@ impl PoolingBackend {
return Ok(client);
}
let conn_id = uuid::Uuid::new_v4();
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
ctx.set_application(Some(APP_NAME));
tracing::Span::current().record("conn_id", display(conn_id));
info!("pool: opening a new connection '{conn_info}'");
let backend = self
.config
.auth_backend
Expand Down
22 changes: 6 additions & 16 deletions proxy/src/serverless/conn_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use metrics::IntCounterPairGuard;
use parking_lot::RwLock;
use rand::Rng;
use smallvec::SmallVec;
use smol_str::SmolStr;
use std::{collections::HashMap, pin::pin, sync::Arc, sync::Weak, time::Duration};
use std::{
fmt,
Expand All @@ -31,8 +30,6 @@ use tracing::{info, info_span, Instrument};

use super::backend::HttpConnError;

pub const APP_NAME: SmolStr = SmolStr::new_inline("/sql_over_http");

#[derive(Debug, Clone)]
pub struct ConnInfo {
pub user_info: ComputeUserInfo,
Expand Down Expand Up @@ -379,12 +376,13 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
return Ok(None);
} else {
info!("pool: reusing connection '{conn_info}'");
client.session.send(ctx.session_id)?;
tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id));
tracing::Span::current().record(
"pid",
&tracing::field::display(client.inner.get_process_id()),
);
info!("pool: reusing connection '{conn_info}'");
client.session.send(ctx.session_id)?;
ctx.latency_timer.pool_hit();
ctx.latency_timer.success();
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
Expand Down Expand Up @@ -577,7 +575,6 @@ pub struct Client<C: ClientInnerExt> {
}

pub struct Discard<'a, C: ClientInnerExt> {
conn_id: uuid::Uuid,
conn_info: &'a ConnInfo,
pool: &'a mut Weak<RwLock<EndpointConnPool<C>>>,
}
Expand All @@ -603,14 +600,7 @@ impl<C: ClientInnerExt> Client<C> {
span: _,
} = self;
let inner = inner.as_mut().expect("client inner should not be removed");
(
&mut inner.inner,
Discard {
pool,
conn_info,
conn_id: inner.conn_id,
},
)
(&mut inner.inner, Discard { pool, conn_info })
}

pub fn check_idle(&mut self, status: ReadyForQueryStatus) {
Expand All @@ -625,13 +615,13 @@ impl<C: ClientInnerExt> Discard<'_, C> {
pub fn check_idle(&mut self, status: ReadyForQueryStatus) {
let conn_info = &self.conn_info;
if status != ReadyForQueryStatus::Idle && std::mem::take(self.pool).strong_count() > 0 {
info!(conn_id = %self.conn_id, "pool: throwing away connection '{conn_info}' because connection is not idle")
info!("pool: throwing away connection '{conn_info}' because connection is not idle")
}
}
pub fn discard(&mut self) {
let conn_info = &self.conn_info;
if std::mem::take(self.pool).strong_count() > 0 {
info!(conn_id = %self.conn_id, "pool: throwing away connection '{conn_info}' because connection is potentially in a broken state")
info!("pool: throwing away connection '{conn_info}' because connection is potentially in a broken state")
}
}
}
Expand Down
33 changes: 28 additions & 5 deletions proxy/src/serverless/sql_over_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::error::ReportableError;
use crate::metrics::HTTP_CONTENT_LENGTH;
use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE;
use crate::proxy::NeonOptions;
use crate::serverless::backend::HttpConnError;
use crate::RoleName;

use super::backend::PoolingBackend;
Expand Down Expand Up @@ -300,7 +301,14 @@ pub async fn handle(
Ok(response)
}

#[instrument(name = "sql-over-http", fields(pid = tracing::field::Empty), skip_all)]
#[instrument(
name = "sql-over-http",
skip_all,
fields(
pid = tracing::field::Empty,
conn_id = tracing::field::Empty
)
)]
async fn handle_inner(
config: &'static ProxyConfig,
ctx: &mut RequestMonitoring,
Expand Down Expand Up @@ -354,12 +362,10 @@ async fn handle_inner(
let txn_read_only = headers.get(&TXN_READ_ONLY) == Some(&HEADER_VALUE_TRUE);
let txn_deferrable = headers.get(&TXN_DEFERRABLE) == Some(&HEADER_VALUE_TRUE);

let paused = ctx.latency_timer.pause();
let request_content_length = match request.body().size_hint().upper() {
Some(v) => v,
None => MAX_REQUEST_SIZE + 1,
};
drop(paused);
info!(request_content_length, "request size in bytes");
HTTP_CONTENT_LENGTH.observe(request_content_length as f64);

Expand All @@ -375,15 +381,20 @@ async fn handle_inner(
let body = hyper::body::to_bytes(request.into_body())
.await
.map_err(anyhow::Error::from)?;
info!(length = body.len(), "request payload read");
let payload: Payload = serde_json::from_slice(&body)?;
Ok::<Payload, anyhow::Error>(payload) // Adjust error type accordingly
};

let authenticate_and_connect = async {
let keys = backend.authenticate(ctx, &conn_info).await?;
backend
let client = backend
.connect_to_compute(ctx, conn_info, keys, !allow_pool)
.await
.await?;
// not strictly necessary to mark success here,
// but it's just insurance for if we forget it somewhere else
ctx.latency_timer.success();
Ok::<_, HttpConnError>(client)
};

// Run both operations in parallel
Expand Down Expand Up @@ -415,6 +426,7 @@ async fn handle_inner(
results
}
Payload::Batch(statements) => {
info!("starting transaction");
let (inner, mut discard) = client.inner();
let mut builder = inner.build_transaction();
if let Some(isolation_level) = txn_isolation_level {
Expand Down Expand Up @@ -444,6 +456,7 @@ async fn handle_inner(
.await
{
Ok(results) => {
info!("commit");
let status = transaction.commit().await.map_err(|e| {
// if we cannot commit - for now don't return connection to pool
// TODO: get a query status from the error
Expand All @@ -454,6 +467,7 @@ async fn handle_inner(
results
}
Err(err) => {
info!("rollback");
let status = transaction.rollback().await.map_err(|e| {
// if we cannot rollback - for now don't return connection to pool
// TODO: get a query status from the error
Expand Down Expand Up @@ -528,8 +542,10 @@ async fn query_to_json<T: GenericClient>(
raw_output: bool,
default_array_mode: bool,
) -> anyhow::Result<(ReadyForQueryStatus, Value)> {
info!("executing query");
let query_params = data.params;
let row_stream = client.query_raw_txt(&data.query, query_params).await?;
info!("finished executing query");

// Manually drain the stream into a vector to leave row_stream hanging
// around to get a command tag. Also check that the response is not too
Expand Down Expand Up @@ -564,6 +580,13 @@ async fn query_to_json<T: GenericClient>(
}
.and_then(|s| s.parse::<i64>().ok());

info!(
rows = rows.len(),
?ready,
command_tag,
"finished reading rows"
);

let mut fields = vec![];
let mut columns = vec![];

Expand Down

0 comments on commit e9870c5

Please sign in to comment.