Skip to content

Commit

Permalink
feat: allow custom tonic connectors & expose more errors (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
infiniteregrets authored Nov 12, 2024
1 parent 3b3de6d commit f1c1b15
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 1 deletion.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ backon = "1.2.0"
bytesize = "1.3.0"
futures = "0.3.31"
http = "1.1.0"
hyper = { version = "1.5.0", optional = true }
hyper-util = { version = "0.1.10", optional = true }
prost = "0.13.3"
prost-types = "0.13.3"
secrecy = "0.8.0"
serde = { version = "1.0.214", optional = true, features = ["derive"] }
sync_docs = { path = "sync_docs" }
thiserror = "1.0.67"
tonic = { version = "0.12.3", features = ["tls", "tls-webpki-roots"] }
tower-service = { version = "0.3.3", optional = true }

[build-dependencies]
tonic-build = { version = "0.12.3", features = ["prost"] }
Expand All @@ -29,3 +32,4 @@ tokio = { version = "*", features = ["full"] }

[features]
serde = ["dep:serde"]
connector = ["dep:hyper", "dep:hyper-util", "dep:tower-service"]
94 changes: 94 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,22 @@ impl Client {
Self::connect_inner(config, /* force_lazy_connection = */ false).await
}

#[cfg(feature = "connector")]
pub async fn connect_with_connector<C>(
config: ClientConfig,
connector: C,
) -> Result<Self, ConnectError>
where
C: tower_service::Service<http::Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
C::Future: Send,
C::Error: std::error::Error + Send + Sync + 'static,
{
Ok(Self {
inner: ClientInner::connect_cell_with_connector(config, connector).await?,
})
}

/// Get the client to interact with the S2 basin service API.
pub async fn basin_client(
&self,
Expand Down Expand Up @@ -341,6 +357,22 @@ impl BasinClient {
client.basin_client(basin).await
}

#[cfg(feature = "connector")]
pub async fn connect_with_connector<C>(
config: ClientConfig,
basin: impl Into<String>,
connector: C,
) -> Result<Self, ConnectError>
where
C: tower_service::Service<http::Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
C::Future: Send,
C::Error: std::error::Error + Send + Sync + 'static,
{
let client = Client::connect_with_connector(config, connector).await?;
client.basin_client(basin).await
}

/// Get the client to interact with the S2 stream service API.
pub fn stream_client(&self, stream: impl Into<String>) -> StreamClient {
StreamClient {
Expand Down Expand Up @@ -434,6 +466,24 @@ impl StreamClient {
.map(|client| client.stream_client(stream))
}

#[cfg(feature = "connector")]
pub async fn connect_with_connector<C>(
config: ClientConfig,
basin: impl Into<String>,
stream: impl Into<String>,
connector: C,
) -> Result<Self, ConnectError>
where
C: tower_service::Service<http::Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
C::Future: Send,
C::Error: std::error::Error + Send + Sync + 'static,
{
BasinClient::connect_with_connector(config, basin, connector)
.await
.map(|client| client.stream_client(stream))
}

#[sync_docs]
pub async fn check_tail(&self) -> Result<u64, ServiceError<CheckTailError>> {
self.inner
Expand Down Expand Up @@ -525,6 +575,21 @@ impl ClientInner {
Self::connect(config, cell_endpoint, force_lazy_connection).await
}

#[cfg(feature = "connector")]
async fn connect_cell_with_connector<C>(
config: ClientConfig,
connector: C,
) -> Result<Self, ConnectError>
where
C: tower_service::Service<http::Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
C::Future: Send,
C::Error: std::error::Error + Send + Sync + 'static,
{
let cell_endpoint = config.host_endpoint.cell.clone();
Self::connect_with_connector(config, cell_endpoint, connector).await
}

async fn connect_basin(
&self,
basin: impl Into<String>,
Expand Down Expand Up @@ -573,6 +638,35 @@ impl ClientInner {
})
}

#[cfg(feature = "connector")]
async fn connect_with_connector<C>(
config: ClientConfig,
endpoint: Authority,
connector: C,
) -> Result<Self, ConnectError>
where
C: tower_service::Service<http::Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
C::Future: Send,
C::Error: std::error::Error + Send + Sync + 'static,
{
let endpoint = format!("http://{endpoint}")
.parse::<Endpoint>()?
.user_agent(config.user_agent.clone())?
.http2_adaptive_window(true)
.keep_alive_timeout(Duration::from_secs(5))
.http2_keep_alive_interval(Duration::from_secs(5))
.connect_timeout(config.connection_timeout)
.timeout(config.request_timeout);

let channel = endpoint.connect_with_connector(connector).await?;
Ok(Self {
channel,
basin: None,
config,
})
}

async fn send<T: ServiceRequest>(
&self,
service_req: T,
Expand Down
7 changes: 6 additions & 1 deletion src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ pub enum ServiceError<T: std::error::Error> {
Unauthenticated(String),
#[error("Unavailable: {0}")]
Unavailable(String),
#[error("Aborted: {0}")]
Aborted(String),
#[error("Cancelled: {0}")]
Cancelled(String),
#[error("{0}")]
Unknown(String),
#[error(transparent)]
Expand All @@ -37,7 +41,6 @@ pub async fn send_request<T: ServiceRequest>(
basin: Option<&str>,
) -> Result<T::Response, ServiceError<T::Error>> {
let req = prepare_request(&mut service, token, basin).map_err(ServiceError::Convert)?;

match service.send(req).await {
Ok(resp) => service.parse_response(resp).map_err(ServiceError::Convert),
Err(status) => match status.code() {
Expand All @@ -51,6 +54,8 @@ pub async fn send_request<T: ServiceRequest>(
tonic::Code::Unavailable => {
Err(ServiceError::Unavailable(status.message().to_string()))
}
tonic::Code::Aborted => Err(ServiceError::Aborted(status.message().to_string())),
tonic::Code::Cancelled => Err(ServiceError::Cancelled(status.message().to_string())),
_ => match service.parse_status(&status) {
Ok(resp) => Ok(resp),
Err(None) => Err(ServiceError::Unknown(status.message().to_string())),
Expand Down
26 changes: 26 additions & 0 deletions src/service/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ impl ServiceRequest for ReadServiceRequest {
tonic::Code::InvalidArgument => {
Some(ReadError::InvalidArgument(status.message().to_string()))
}
tonic::Code::DeadlineExceeded => {
Some(ReadError::DeadlineExceeded(status.message().to_string()))
}
_ => None,
})
}
Expand All @@ -141,6 +144,8 @@ pub enum ReadError {
NotFound(String),
#[error("Invalid argument: {0}")]
InvalidArgument(String),
#[error("Deadline exceeded: {0}")]
DeadlineExceeded(String),
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -191,6 +196,9 @@ impl ServiceRequest for ReadSessionServiceRequest {
tonic::Code::InvalidArgument => Some(ReadSessionError::InvalidArgument(
status.message().to_string(),
)),
tonic::Code::DeadlineExceeded => Some(ReadSessionError::DeadlineExceeded(
status.message().to_string(),
)),
_ => None,
})
}
Expand Down Expand Up @@ -230,6 +238,9 @@ impl StreamingResponse for ReadSessionStreamingResponse {
tonic::Code::InvalidArgument => Some(ReadSessionError::InvalidArgument(
status.message().to_string(),
)),
tonic::Code::DeadlineExceeded => Some(ReadSessionError::DeadlineExceeded(
status.message().to_string(),
)),
_ => None,
})
}
Expand All @@ -241,6 +252,8 @@ pub enum ReadSessionError {
NotFound(String),
#[error("Invalid argument: {0}")]
InvalidArgument(String),
#[error("Deadline exceeded: {0}")]
DeadlineExceeded(String),
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -291,6 +304,9 @@ impl ServiceRequest for AppendServiceRequest {
Some(AppendError::InvalidArgument(status.message().to_string()))
}
tonic::Code::Aborted => Some(AppendError::Aborted(status.message().to_string())),
tonic::Code::FailedPrecondition => Some(AppendError::FailedPrecondition(
status.message().to_string(),
)),
_ => None,
})
}
Expand All @@ -311,6 +327,8 @@ pub enum AppendError {
InvalidArgument(String),
#[error("Aborted: {0}")]
Aborted(String),
#[error("Failed precondition: {0}")]
FailedPrecondition(String),
}

pub struct AppendSessionServiceRequest<S>
Expand Down Expand Up @@ -370,6 +388,9 @@ where
tonic::Code::InvalidArgument => Some(AppendSessionError::InvalidArgument(
status.message().to_string(),
)),
tonic::Code::DeadlineExceeded => Some(AppendSessionError::DeadlineExceeded(
status.message().to_string(),
)),
_ => None,
})
}
Expand Down Expand Up @@ -430,6 +451,9 @@ impl StreamingResponse for AppendSessionStreamingResponse {
tonic::Code::InvalidArgument => Some(AppendSessionError::InvalidArgument(
status.message().to_string(),
)),
tonic::Code::DeadlineExceeded => Some(AppendSessionError::DeadlineExceeded(
status.message().to_string(),
)),
_ => None,
})
}
Expand All @@ -441,4 +465,6 @@ pub enum AppendSessionError {
NotFound(String),
#[error("Invalid argument: {0}")]
InvalidArgument(String),
#[error("Deadline exceeded: {0}")]
DeadlineExceeded(String),
}

0 comments on commit f1c1b15

Please sign in to comment.