Skip to content

Commit

Permalink
proxy: add unit test for wake_compute (#4819)
Browse files Browse the repository at this point in the history
## Problem

ref #4721, ref
#4709

## Summary of changes

This PR adds unit tests for wake_compute.

The patch adds a new variant `Test` to auth backends. When
`wake_compute` is called, we will verify if it is the exact operation
sequence we are expecting. The operation sequence now contains 3 more
operations: `Wake`, `WakeRetry`, and `WakeFail`.

The unit tests for proxy connects are now complete and I'll continue
work on WebSocket e2e test in future PRs.

---------

Signed-off-by: Alex Chi Z <[email protected]>
  • Loading branch information
skyzh authored Jul 28, 2023
1 parent 4338eed commit a8f3540
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 36 deletions.
15 changes: 15 additions & 0 deletions proxy/src/auth/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ pub enum BackendType<'a, T> {
Postgres(Cow<'a, console::provider::mock::Api>, T),
/// Authentication via a web browser.
Link(Cow<'a, url::ApiUrl>),
/// Test backend.
Test(&'a dyn TestBackend),
}

pub trait TestBackend: Send + Sync + 'static {
fn wake_compute(&self) -> Result<CachedNodeInfo, console::errors::WakeComputeError>;
}

impl std::fmt::Display for BackendType<'_, ()> {
Expand All @@ -62,6 +68,7 @@ impl std::fmt::Display for BackendType<'_, ()> {
Console(endpoint, _) => fmt.debug_tuple("Console").field(&endpoint.url()).finish(),
Postgres(endpoint, _) => fmt.debug_tuple("Postgres").field(&endpoint.url()).finish(),
Link(url) => fmt.debug_tuple("Link").field(&url.as_str()).finish(),
Test(_) => fmt.debug_tuple("Test").finish(),
}
}
}
Expand All @@ -75,6 +82,7 @@ impl<T> BackendType<'_, T> {
Console(c, x) => Console(Cow::Borrowed(c), x),
Postgres(c, x) => Postgres(Cow::Borrowed(c), x),
Link(c) => Link(Cow::Borrowed(c)),
Test(x) => Test(*x),
}
}
}
Expand All @@ -89,6 +97,7 @@ impl<'a, T> BackendType<'a, T> {
Console(c, x) => Console(c, f(x)),
Postgres(c, x) => Postgres(c, f(x)),
Link(c) => Link(c),
Test(x) => Test(x),
}
}
}
Expand All @@ -102,6 +111,7 @@ impl<'a, T, E> BackendType<'a, Result<T, E>> {
Console(c, x) => x.map(|x| Console(c, x)),
Postgres(c, x) => x.map(|x| Postgres(c, x)),
Link(c) => Ok(Link(c)),
Test(x) => Ok(Test(x)),
}
}
}
Expand Down Expand Up @@ -147,6 +157,7 @@ impl BackendType<'_, ClientCredentials<'_>> {
Console(_, creds) => creds.project.clone(),
Postgres(_, creds) => creds.project.clone(),
Link(_) => Some("link".to_owned()),
Test(_) => Some("test".to_owned()),
}
}
/// Authenticate the client via the requested backend, possibly using credentials.
Expand Down Expand Up @@ -188,6 +199,9 @@ impl BackendType<'_, ClientCredentials<'_>> {
.await?
.map(CachedNodeInfo::new_uncached)
}
Test(_) => {
unreachable!("this function should never be called in the test backend")
}
};

info!("user successfully authenticated");
Expand All @@ -206,6 +220,7 @@ impl BackendType<'_, ClientCredentials<'_>> {
Console(api, creds) => api.wake_compute(extra, creds).map_ok(Some).await,
Postgres(api, creds) => api.wake_compute(extra, creds).map_ok(Some).await,
Link(_) => Ok(None),
Test(x) => x.wake_compute().map(Some),
}
}
}
35 changes: 21 additions & 14 deletions proxy/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
cancellation::{self, CancelMap},
compute::{self, PostgresConnection},
config::{ProxyConfig, TlsConfig},
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo},
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api},
stream::{PqStream, Stream},
};
use anyhow::{bail, Context};
Expand Down Expand Up @@ -413,18 +413,18 @@ where
loop {
match state {
ConnectionState::Invalid(config, err) => {
info!("compute node's state has likely changed; requesting a wake-up");

let wake_res = match creds {
auth::BackendType::Console(api, creds) => {
try_wake(api.as_ref(), extra, creds).await
}
auth::BackendType::Postgres(api, creds) => {
try_wake(api.as_ref(), extra, creds).await
}
auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds).await,
auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds).await,
// nothing to do?
auth::BackendType::Link(_) => return Err(err.into()),
// test backend
auth::BackendType::Test(x) => x.wake_compute(),
};

match wake_res {
match handle_try_wake(wake_res) {
// there was an error communicating with the control plane
Err(e) => return Err(e.into()),
// failed to wake up but we can continue to retry
Expand Down Expand Up @@ -478,13 +478,10 @@ where
/// * Returns Ok(Continue(e)) if there was an error waking but retries are acceptable
/// * Returns Ok(Break(node)) if the wakeup succeeded
/// * Returns Err(e) if there was an error
pub async fn try_wake(
api: &impl console::Api,
extra: &console::ConsoleReqExtra<'_>,
creds: &auth::ClientCredentials<'_>,
fn handle_try_wake(
result: Result<console::CachedNodeInfo, WakeComputeError>,
) -> Result<ControlFlow<console::CachedNodeInfo, WakeComputeError>, WakeComputeError> {
info!("compute node's state has likely changed; requesting a wake-up");
match api.wake_compute(extra, creds).await {
match result {
Err(err) => match &err {
WakeComputeError::ApiError(api) if api.could_retry() => Ok(ControlFlow::Continue(err)),
_ => Err(err),
Expand All @@ -494,6 +491,16 @@ pub async fn try_wake(
}
}

/// Attempts to wake up the compute node.
pub async fn try_wake(
api: &impl console::Api,
extra: &console::ConsoleReqExtra<'_>,
creds: &auth::ClientCredentials<'_>,
) -> Result<ControlFlow<console::CachedNodeInfo, WakeComputeError>, WakeComputeError> {
info!("compute node's state has likely changed; requesting a wake-up");
handle_try_wake(api.wake_compute(extra, creds).await)
}

pub trait ShouldRetry {
fn could_retry(&self) -> bool;
fn should_retry(&self, num_retries: u32) -> bool {
Expand Down
120 changes: 98 additions & 22 deletions proxy/src/proxy/tests.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! A group of high-level tests for connection establishing logic and auth.
use std::borrow::Cow;

//!
use super::*;
use crate::auth::backend::TestBackend;
use crate::auth::ClientCredentials;
use crate::console::{CachedNodeInfo, NodeInfo};
use crate::{auth, sasl, scram};
use crate::{auth, http, sasl, scram};
use async_trait::async_trait;
use rstest::rstest;
use tokio_postgres::config::SslMode;
Expand Down Expand Up @@ -309,8 +309,11 @@ fn connect_compute_total_wait() {
assert!(total_wait > tokio::time::Duration::from_secs(10));
}

#[derive(Clone, Copy)]
#[derive(Clone, Copy, Debug)]
enum ConnectAction {
Wake,
WakeFail,
WakeRetry,
Connect,
Retry,
Fail,
Expand All @@ -321,6 +324,17 @@ struct TestConnectMechanism {
sequence: Vec<ConnectAction>,
}

impl TestConnectMechanism {
fn verify(&self) {
let counter = self.counter.lock().unwrap();
assert_eq!(
*counter,
self.sequence.len(),
"sequence does not proceed to the end"
);
}
}

impl TestConnectMechanism {
fn new(sequence: Vec<ConnectAction>) -> Self {
Self {
Expand Down Expand Up @@ -370,73 +384,110 @@ impl ConnectMechanism for TestConnectMechanism {
ConnectAction::Connect => Ok(TestConnection),
ConnectAction::Retry => Err(TestConnectError { retryable: true }),
ConnectAction::Fail => Err(TestConnectError { retryable: false }),
x => panic!("expecting action {:?}, connect is called instead", x),
}
}

fn update_connect_config(&self, _conf: &mut compute::ConnCfg) {}
}

fn helper_create_connect_info() -> (
CachedNodeInfo,
console::ConsoleReqExtra<'static>,
auth::BackendType<'static, ClientCredentials<'static>>,
) {
impl TestBackend for TestConnectMechanism {
fn wake_compute(&self) -> Result<CachedNodeInfo, console::errors::WakeComputeError> {
let mut counter = self.counter.lock().unwrap();
let action = self.sequence[*counter];
*counter += 1;
match action {
ConnectAction::Wake => Ok(helper_create_cached_node_info()),
ConnectAction::WakeFail => {
let err = console::errors::ApiError::Console {
status: http::StatusCode::FORBIDDEN,
text: "TEST".into(),
};
assert!(!err.could_retry());
Err(console::errors::WakeComputeError::ApiError(err))
}
ConnectAction::WakeRetry => {
let err = console::errors::ApiError::Console {
status: http::StatusCode::INTERNAL_SERVER_ERROR,
text: "TEST".into(),
};
assert!(err.could_retry());
Err(console::errors::WakeComputeError::ApiError(err))
}
x => panic!("expecting action {:?}, wake_compute is called instead", x),
}
}
}

fn helper_create_cached_node_info() -> CachedNodeInfo {
let node = NodeInfo {
config: compute::ConnCfg::new(),
aux: Default::default(),
allow_self_signed_compute: false,
};
let cache = CachedNodeInfo::new_uncached(node);
CachedNodeInfo::new_uncached(node)
}

fn helper_create_connect_info(
mechanism: &TestConnectMechanism,
) -> (
CachedNodeInfo,
console::ConsoleReqExtra<'static>,
auth::BackendType<'_, ClientCredentials<'static>>,
) {
let cache = helper_create_cached_node_info();
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some("TEST"),
};
let url = "https://TEST_URL".parse().unwrap();
let api = console::provider::mock::Api::new(url);
let creds = auth::BackendType::Postgres(Cow::Owned(api), ClientCredentials::new_noop());
let creds = auth::BackendType::Test(mechanism);
(cache, extra, creds)
}

#[tokio::test]
async fn connect_to_compute_success() {
use ConnectAction::*;
let mechanism = TestConnectMechanism::new(vec![Connect]);
let (cache, extra, creds) = helper_create_connect_info();
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
connect_to_compute(&mechanism, cache, &extra, &creds)
.await
.unwrap();
mechanism.verify();
}

#[tokio::test]
async fn connect_to_compute_retry() {
use ConnectAction::*;
let mechanism = TestConnectMechanism::new(vec![Retry, Retry, Connect]);
let (cache, extra, creds) = helper_create_connect_info();
let mechanism = TestConnectMechanism::new(vec![Retry, Wake, Retry, Connect]);
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
connect_to_compute(&mechanism, cache, &extra, &creds)
.await
.unwrap();
mechanism.verify();
}

/// Test that we don't retry if the error is not retryable.
#[tokio::test]
async fn connect_to_compute_non_retry_1() {
use ConnectAction::*;
let mechanism = TestConnectMechanism::new(vec![Retry, Retry, Fail]);
let (cache, extra, creds) = helper_create_connect_info();
let mechanism = TestConnectMechanism::new(vec![Retry, Wake, Retry, Fail]);
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
connect_to_compute(&mechanism, cache, &extra, &creds)
.await
.unwrap_err();
mechanism.verify();
}

/// Even for non-retryable errors, we should retry at least once.
#[tokio::test]
async fn connect_to_compute_non_retry_2() {
use ConnectAction::*;
let mechanism = TestConnectMechanism::new(vec![Fail, Retry, Connect]);
let (cache, extra, creds) = helper_create_connect_info();
let mechanism = TestConnectMechanism::new(vec![Fail, Wake, Retry, Connect]);
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
connect_to_compute(&mechanism, cache, &extra, &creds)
.await
.unwrap();
mechanism.verify();
}

/// Retry for at most `NUM_RETRIES_CONNECT` times.
Expand All @@ -445,11 +496,36 @@ async fn connect_to_compute_non_retry_3() {
assert_eq!(NUM_RETRIES_CONNECT, 10);
use ConnectAction::*;
let mechanism = TestConnectMechanism::new(vec![
Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry,
Retry, Wake, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry,
/* the 11th time */ Retry,
]);
let (cache, extra, creds) = helper_create_connect_info();
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
connect_to_compute(&mechanism, cache, &extra, &creds)
.await
.unwrap_err();
mechanism.verify();
}

/// Should retry wake compute.
#[tokio::test]
async fn wake_retry() {
use ConnectAction::*;
let mechanism = TestConnectMechanism::new(vec![Retry, WakeRetry, Wake, Connect]);
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
connect_to_compute(&mechanism, cache, &extra, &creds)
.await
.unwrap();
mechanism.verify();
}

/// Wake failed with a non-retryable error.
#[tokio::test]
async fn wake_non_retry() {
use ConnectAction::*;
let mechanism = TestConnectMechanism::new(vec![Retry, WakeFail]);
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
connect_to_compute(&mechanism, cache, &extra, &creds)
.await
.unwrap_err();
mechanism.verify();
}

1 comment on commit a8f3540

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1308 tests run: 1250 passed, 0 failed, 58 skipped (full report)


Flaky tests (1)

Postgres 15

  • test_remote_timeline_client_calls_started_metric[local_fs]: release

Please sign in to comment.