Skip to content

Commit

Permalink
fix: Only connect lazily (remove option to connect eagerly) (#49)
Browse files Browse the repository at this point in the history
Resolves: #43

---------

Signed-off-by: Vaibhav Rabber <[email protected]>
  • Loading branch information
vrongmeal authored Nov 14, 2024
1 parent f1c1b15 commit 94e6950
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 125 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ Cargo.lock

# IDE Specific configurations
.idea/
.helix/
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ backon = "1.2.0"
bytesize = "1.3.0"
futures = "0.3.31"
http = "1.1.0"
hyper = { version = "1.5.0", optional = true }
hyper-util = { version = "0.1.10", optional = true }
hyper = "1.5.0"
hyper-util = "0.1.10"
prost = "0.13.3"
prost-types = "0.13.3"
secrecy = "0.8.0"
serde = { version = "1.0.214", optional = true, features = ["derive"] }
sync_docs = { path = "sync_docs" }
thiserror = "1.0.67"
tonic = { version = "0.12.3", features = ["tls", "tls-webpki-roots"] }
tower-service = { version = "0.3.3", optional = true }
tower-service = "0.3.3"

[build-dependencies]
tonic-build = { version = "0.12.3", features = ["prost"] }
Expand All @@ -32,4 +32,4 @@ tokio = { version = "*", features = ["full"] }

[features]
serde = ["dep:serde"]
connector = ["dep:hyper", "dep:hyper-util", "dep:tower-service"]
connector = []
4 changes: 2 additions & 2 deletions examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn main() {

println!("Connecting with {config:#?}");

let client = Client::connect(config).await.unwrap();
let client = Client::new(config).unwrap();

let basin = "s2-sdk-example-basin";

Expand Down Expand Up @@ -67,7 +67,7 @@ async fn main() {

let create_stream_req = CreateStreamRequest::new(stream);

let basin_client = client.basin_client(basin).await.unwrap();
let basin_client = client.basin_client(basin).unwrap();

match basin_client.create_stream(create_stream_req).await {
Ok(()) => {
Expand Down
186 changes: 67 additions & 119 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{fmt::Display, str::FromStr, time::Duration};

use backon::{ConstantBuilder, Retryable};
use http::uri::Authority;
use hyper_util::client::legacy::connect::HttpConnector;
use secrecy::SecretString;
use sync_docs::sync_docs;
use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
Expand Down Expand Up @@ -35,6 +36,8 @@ use crate::{
types,
};

const DEFAULT_HTTP_CONNECTOR: Option<HttpConnector> = None;

/// Cloud deployment to be used to connect the client with.
///
/// Can be used to create the client with default hosted URIs:
Expand Down Expand Up @@ -152,15 +155,15 @@ pub struct ClientConfig {
pub token: SecretString,
/// Host URI to connect with.
pub host_endpoint: HostEndpoints,
/// Should the connection be lazy, i.e., only be made when making the very
/// first request.
pub connect_lazily: bool,
/// Timeout for connecting/reconnecting.
pub connection_timeout: Duration,
/// Timeout for a particular request.
pub request_timeout: Duration,
/// User agent to be used for the client.
pub user_agent: String,
/// URI scheme to use to connect.
#[cfg(feature = "connector")]
pub uri_scheme: http::uri::Scheme,
}

impl ClientConfig {
Expand All @@ -170,10 +173,11 @@ impl ClientConfig {
Self {
token: token.into().into(),
host_endpoint: HostEndpoints::default(),
connect_lazily: true,
connection_timeout: Duration::from_secs(3),
request_timeout: Duration::from_secs(5),
user_agent: "s2-sdk-rust".to_string(),
#[cfg(feature = "connector")]
uri_scheme: http::uri::Scheme::HTTPS,
}
}

Expand All @@ -185,15 +189,6 @@ impl ClientConfig {
}
}

/// Construct from an existing configuration with the new `connect_lazily`
/// configuration.
pub fn with_connect_lazily(self, connect_lazily: bool) -> Self {
Self {
connect_lazily,
..self
}
}

/// Construct from an existing configuration with the new connection
/// timeout.
pub fn with_connection_timeout(self, connection_timeout: impl Into<Duration>) -> Self {
Expand All @@ -218,6 +213,15 @@ impl ClientConfig {
..self
}
}

/// Construct from an existing configuration with the new URI scheme.
#[cfg(feature = "connector")]
pub fn with_uri_scheme(self, uri_scheme: impl Into<http::uri::Scheme>) -> Self {
Self {
uri_scheme: uri_scheme.into(),
..self
}
}
}

/// The S2 client to interact with the API.
Expand All @@ -227,46 +231,30 @@ pub struct Client {
}

impl Client {
async fn connect_inner(
config: ClientConfig,
force_lazy_connection: bool,
) -> Result<Self, ConnectError> {
/// Create the client to connect with the S2 API.
pub fn new(config: ClientConfig) -> Result<Self, ClientError> {
Ok(Self {
inner: ClientInner::connect_cell(config, force_lazy_connection).await?,
inner: ClientInner::new_cell(config, DEFAULT_HTTP_CONNECTOR)?,
})
}

/// Connect the client with the S2 API.
pub async fn connect(config: ClientConfig) -> Result<Self, ConnectError> {
Self::connect_inner(config, /* force_lazy_connection = */ false).await
}

#[cfg(feature = "connector")]
pub async fn connect_with_connector<C>(
config: ClientConfig,
connector: C,
) -> Result<Self, ConnectError>
pub fn new_with_connector<C>(config: ClientConfig, connector: C) -> Result<Self, ClientError>
where
C: tower_service::Service<http::Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
C::Future: Send,
C::Error: std::error::Error + Send + Sync + 'static,
{
Ok(Self {
inner: ClientInner::connect_cell_with_connector(config, connector).await?,
inner: ClientInner::new_cell(config, Some(connector))?,
})
}

/// Get the client to interact with the S2 basin service API.
pub async fn basin_client(
&self,
basin: impl Into<String>,
) -> Result<BasinClient, ConnectError> {
pub fn basin_client(&self, basin: impl Into<String>) -> Result<BasinClient, ClientError> {
Ok(BasinClient {
inner: self
.inner
.connect_basin(basin, /* force_lazy_connection = */ false)
.await?,
inner: self.inner.new_basin(basin)?,
})
}

Expand Down Expand Up @@ -343,34 +331,26 @@ pub struct BasinClient {
}

impl BasinClient {
/// Connect the client with the S2 basin service API.
pub async fn connect(
config: ClientConfig,
basin: impl Into<String>,
) -> Result<Self, ConnectError> {
// Since we're directly trying to connect to the basin, force lazy
// connection with the global client so we don't end up making 2
// connections for connecting with the basin client directly (given the
// cell URI and global URIs are different).
let force_lazy_connection = config.host_endpoint.basin_zone.is_some();
let client = Client::connect_inner(config, force_lazy_connection).await?;
client.basin_client(basin).await
/// Create the client to connect with the S2 basin service API.
pub fn new(config: ClientConfig, basin: impl Into<String>) -> Result<Self, ClientError> {
let client = Client::new(config)?;
client.basin_client(basin)
}

#[cfg(feature = "connector")]
pub async fn connect_with_connector<C>(
pub fn new_with_connector<C>(
config: ClientConfig,
basin: impl Into<String>,
connector: C,
) -> Result<Self, ConnectError>
) -> Result<Self, ClientError>
where
C: tower_service::Service<http::Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
C::Future: Send,
C::Error: std::error::Error + Send + Sync + 'static,
{
let client = Client::connect_with_connector(config, connector).await?;
client.basin_client(basin).await
let client = Client::new_with_connector(config, connector)?;
client.basin_client(basin)
}

/// Get the client to interact with the S2 stream service API.
Expand Down Expand Up @@ -455,32 +435,29 @@ pub struct StreamClient {
}

impl StreamClient {
/// Connect the client with the S2 stream service API.
pub async fn connect(
/// Create the client to connect with the S2 stream service API.
pub async fn new(
config: ClientConfig,
basin: impl Into<String>,
stream: impl Into<String>,
) -> Result<Self, ConnectError> {
BasinClient::connect(config, basin)
.await
.map(|client| client.stream_client(stream))
) -> Result<Self, ClientError> {
BasinClient::new(config, basin).map(|client| client.stream_client(stream))
}

#[cfg(feature = "connector")]
pub async fn connect_with_connector<C>(
pub fn new_with_connector<C>(
config: ClientConfig,
basin: impl Into<String>,
stream: impl Into<String>,
connector: C,
) -> Result<Self, ConnectError>
) -> Result<Self, ClientError>
where
C: tower_service::Service<http::Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
C::Future: Send,
C::Error: std::error::Error + Send + Sync + 'static,
{
BasinClient::connect_with_connector(config, basin, connector)
.await
BasinClient::new_with_connector(config, basin, connector)
.map(|client| client.stream_client(stream))
}

Expand Down Expand Up @@ -567,41 +544,24 @@ struct ClientInner {
}

impl ClientInner {
async fn connect_cell(
config: ClientConfig,
force_lazy_connection: bool,
) -> Result<Self, ConnectError> {
let cell_endpoint = config.host_endpoint.cell.clone();
Self::connect(config, cell_endpoint, force_lazy_connection).await
}

#[cfg(feature = "connector")]
async fn connect_cell_with_connector<C>(
config: ClientConfig,
connector: C,
) -> Result<Self, ConnectError>
fn new_cell<C>(config: ClientConfig, connector: Option<C>) -> Result<Self, ClientError>
where
C: tower_service::Service<http::Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
C::Future: Send,
C::Error: std::error::Error + Send + Sync + 'static,
{
let cell_endpoint = config.host_endpoint.cell.clone();
Self::connect_with_connector(config, cell_endpoint, connector).await
Self::new(config, cell_endpoint, connector)
}

async fn connect_basin(
&self,
basin: impl Into<String>,
force_lazy_connection: bool,
) -> Result<Self, ConnectError> {
fn new_basin(&self, basin: impl Into<String>) -> Result<Self, ClientError> {
let basin = basin.into();

match self.config.host_endpoint.basin_zone.clone() {
Some(endpoint) => {
let basin_endpoint: Authority = format!("{basin}.{endpoint}").parse()?;
ClientInner::connect(self.config.clone(), basin_endpoint, force_lazy_connection)
.await
ClientInner::new(self.config.clone(), basin_endpoint, DEFAULT_HTTP_CONNECTOR)
}
None => Ok(Self {
basin: Some(basin),
Expand All @@ -610,12 +570,23 @@ impl ClientInner {
}
}

async fn connect(
fn new<C>(
config: ClientConfig,
endpoint: Authority,
force_lazy_connection: bool,
) -> Result<Self, ConnectError> {
let endpoint = format!("https://{endpoint}")
connector: Option<C>,
) -> Result<Self, ClientError>
where
C: tower_service::Service<http::Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
C::Future: Send,
C::Error: std::error::Error + Send + Sync + 'static,
{
#[cfg(not(feature = "connector"))]
let scheme = "https";
#[cfg(feature = "connector")]
let scheme = config.uri_scheme.as_str();

let endpoint = format!("{scheme}://{endpoint}")
.parse::<Endpoint>()?
.user_agent(config.user_agent.clone())?
.http2_adaptive_window(true)
Expand All @@ -626,40 +597,17 @@ impl ClientInner {
)?
.connect_timeout(config.connection_timeout)
.timeout(config.request_timeout);
let channel = if config.connect_lazily || force_lazy_connection {
endpoint.connect_lazy()

let channel = if let Some(connector) = connector {
assert!(
config.host_endpoint.basin_zone.is_none(),
"cannot connect with connector if basin zone is provided"
);
endpoint.connect_with_connector_lazy(connector)
} else {
endpoint.connect().await?
endpoint.connect_lazy()
};
Ok(Self {
channel,
basin: None,
config,
})
}

#[cfg(feature = "connector")]
async fn connect_with_connector<C>(
config: ClientConfig,
endpoint: Authority,
connector: C,
) -> Result<Self, ConnectError>
where
C: tower_service::Service<http::Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
C::Future: Send,
C::Error: std::error::Error + Send + Sync + 'static,
{
let endpoint = format!("http://{endpoint}")
.parse::<Endpoint>()?
.user_agent(config.user_agent.clone())?
.http2_adaptive_window(true)
.keep_alive_timeout(Duration::from_secs(5))
.http2_keep_alive_interval(Duration::from_secs(5))
.connect_timeout(config.connection_timeout)
.timeout(config.request_timeout);

let channel = endpoint.connect_with_connector(connector).await?;
Ok(Self {
channel,
basin: None,
Expand Down Expand Up @@ -701,7 +649,7 @@ impl ClientInner {

/// Error connecting to S2 endpoint.
#[derive(Debug, thiserror::Error)]
pub enum ConnectError {
pub enum ClientError {
#[error(transparent)]
TonicTransportError(#[from] tonic::transport::Error),
#[error(transparent)]
Expand Down

0 comments on commit 94e6950

Please sign in to comment.