Skip to content

Commit

Permalink
proxy: Present new auth backend cplane_proxy_v1
Browse files Browse the repository at this point in the history
Implement a new auth backend based on the current Neon
backend to switch over to the new Proxy V1 cplane API.
  • Loading branch information
awarus committed Dec 4, 2024
1 parent dec2e2f commit dea3988
Show file tree
Hide file tree
Showing 6 changed files with 602 additions and 11 deletions.
4 changes: 4 additions & 0 deletions proxy/src/auth/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ impl std::fmt::Display for Backend<'_, ()> {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ControlPlane(api, ()) => match &**api {
ControlPlaneClient::ProxyV1(endpoint) => fmt
.debug_tuple("ControlPlane::ProxyV1")
.field(&endpoint.url())
.finish(),
ControlPlaneClient::Neon(endpoint) => fmt
.debug_tuple("ControlPlane::Neon")
.field(&endpoint.url())
Expand Down
76 changes: 66 additions & 10 deletions proxy/src/bin/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,39 @@ async fn main() -> anyhow::Result<()> {
.instrument(span),
);
}
} else if let proxy::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api {
match (redis_notifications_client, regional_redis_client.clone()) {
(None, None) => {}
(client1, client2) => {
let cache = api.caches.project_info.clone();
if let Some(client) = client1 {
maintenance_tasks.spawn(notifications::task_main(
client,
cache.clone(),
cancel_map.clone(),
args.region.clone(),
));
}
if let Some(client) = client2 {
maintenance_tasks.spawn(notifications::task_main(
client,
cache.clone(),
cancel_map.clone(),
args.region.clone(),
));
}
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
}
}
if let Some(regional_redis_client) = regional_redis_client {
let cache = api.caches.endpoints_cache.clone();
let con = regional_redis_client;
let span = tracing::info_span!("endpoints_cache");
maintenance_tasks.spawn(
async move { cache.do_read(con, cancellation_token.clone()).await }
.instrument(span),
);
}
}
}

Expand Down Expand Up @@ -697,22 +730,45 @@ fn build_auth_backend(
)?));
tokio::spawn(locks.garbage_collect_worker());

let url = args.auth_endpoint.parse()?;
let url: ApiUrl = args.auth_endpoint.parse()?;
let mut proxy_v1 = true;
if url.as_str().contains("management/api/v2") {
proxy_v1 = false;
}

let endpoint = http::Endpoint::new(url, http::new_client());

let mut wake_compute_rps_limit = args.wake_compute_limit.clone();
RateBucketInfo::validate(&mut wake_compute_rps_limit)?;
let wake_compute_endpoint_rate_limiter =
Arc::new(WakeComputeRateLimiter::new(wake_compute_rps_limit));
let api = control_plane::client::neon::NeonControlPlaneClient::new(
endpoint,
args.control_plane_token.clone(),
caches,
locks,
wake_compute_endpoint_rate_limiter,
);
let api = control_plane::client::ControlPlaneClient::Neon(api);
let auth_backend = auth::Backend::ControlPlane(MaybeOwned::Owned(api), ());

let auth_backend: auth::Backend<'_, ()>;
match proxy_v1 {
true => {
let api = control_plane::client::cplane_proxy_v1::NeonControlPlaneClient::new(
endpoint,
args.control_plane_token.clone(),
caches,
locks,
wake_compute_endpoint_rate_limiter,
);

let api = control_plane::client::ControlPlaneClient::ProxyV1(api);
auth_backend = auth::Backend::ControlPlane(MaybeOwned::Owned(api), ());
}
false => {
let api = control_plane::client::neon::NeonControlPlaneClient::new(
endpoint,
args.control_plane_token.clone(),
caches,
locks,
wake_compute_endpoint_rate_limiter,
);
let api = control_plane::client::ControlPlaneClient::Neon(api);
auth_backend = auth::Backend::ControlPlane(MaybeOwned::Owned(api), ());
}
};

let config = Box::leak(Box::new(auth_backend));

Expand Down
Loading

0 comments on commit dea3988

Please sign in to comment.