Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
infiniteregrets committed Nov 21, 2024
1 parent b2a2453 commit 263ed16
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 51 deletions.
12 changes: 6 additions & 6 deletions src/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use streamstore::{
},
};

use crate::error::{ErrorKind, ServiceError};
use crate::error::{ServiceError, ServiceErrorContext};

pub struct AccountService {
client: Client,
Expand All @@ -31,7 +31,7 @@ impl AccountService {
self.client
.list_basins(list_basins_req)
.await
.map_err(|e| ServiceError::new(ErrorKind::ListBasins, e))
.map_err(|e| ServiceError::new(ServiceErrorContext::ListBasins, e))
}

pub async fn create_basin(
Expand All @@ -56,22 +56,22 @@ impl AccountService {
self.client
.create_basin(create_basin_req)
.await
.map_err(|e| ServiceError::new(ErrorKind::CreateBasin, e))
.map_err(|e| ServiceError::new(ServiceErrorContext::CreateBasin, e))
}

pub async fn delete_basin(&self, basin: BasinName) -> Result<(), ServiceError> {
let delete_basin_req = DeleteBasinRequest::new(basin);
self.client
.delete_basin(delete_basin_req)
.await
.map_err(|e| ServiceError::new(ErrorKind::DeleteBasin, e))
.map_err(|e| ServiceError::new(ServiceErrorContext::DeleteBasin, e))
}

pub async fn get_basin_config(&self, basin: BasinName) -> Result<BasinConfig, ServiceError> {
self.client
.get_basin_config(basin)
.await
.map_err(|e| ServiceError::new(ErrorKind::GetBasinConfig, e))
.map_err(|e| ServiceError::new(ServiceErrorContext::GetBasinConfig, e))
}

pub async fn reconfigure_basin(
Expand All @@ -86,6 +86,6 @@ impl AccountService {
self.client
.reconfigure_basin(reconfigure_basin_req)
.await
.map_err(|e| ServiceError::new(ErrorKind::ReconfigureBasin, e))
.map_err(|e| ServiceError::new(ServiceErrorContext::ReconfigureBasin, e))
}
}
12 changes: 6 additions & 6 deletions src/basin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use streamstore::{
},
};

use crate::error::{ErrorKind, ServiceError};
use crate::error::{ServiceError, ServiceErrorContext};

pub struct BasinService {
client: BasinClient,
Expand All @@ -32,7 +32,7 @@ impl BasinService {
.client
.list_streams(list_streams_req)
.await
.map_err(|e| ServiceError::new(ErrorKind::ListStreams, e))?;
.map_err(|e| ServiceError::new(ServiceErrorContext::ListStreams, e))?;

Ok(streams)
}
Expand All @@ -51,21 +51,21 @@ impl BasinService {
self.client
.create_stream(create_stream_req)
.await
.map_err(|e| ServiceError::new(ErrorKind::CreateStream, e))
.map_err(|e| ServiceError::new(ServiceErrorContext::CreateStream, e))
}

pub async fn delete_stream(&self, stream: String) -> Result<(), ServiceError> {
self.client
.delete_stream(DeleteStreamRequest::new(stream))
.await
.map_err(|e| ServiceError::new(ErrorKind::DeleteStream, e))
.map_err(|e| ServiceError::new(ServiceErrorContext::DeleteStream, e))
}

pub async fn get_stream_config(&self, stream: String) -> Result<StreamConfig, ServiceError> {
self.client
.get_stream_config(stream)
.await
.map_err(|e| ServiceError::new(ErrorKind::GetStreamConfig, e))
.map_err(|e| ServiceError::new(ServiceErrorContext::GetStreamConfig, e))
}

pub async fn reconfigure_stream(
Expand All @@ -81,6 +81,6 @@ impl BasinService {
self.client
.reconfigure_stream(reconfigure_stream_req)
.await
.map_err(|e| ServiceError::new(ErrorKind::ReconfigureStream, e))
.map_err(|e| ServiceError::new(ServiceErrorContext::ReconfigureStream, e))
}
}
71 changes: 39 additions & 32 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,43 @@ pub enum S2CliError {
Service(#[from] ServiceError),
}

#[derive(Debug)]
pub enum ServiceErrorContext {
ListBasins,
CreateBasin,
DeleteBasin,
GetBasinConfig,
ReconfigureBasin,
ListStreams,
CreateStream,
DeleteStream,
GetStreamConfig,
CheckTail,
AppendSession,
ReadSession,
ReconfigureStream,
}

impl std::fmt::Display for ServiceErrorContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ServiceErrorContext::ListBasins => write!(f, "Failed to list basins"),
ServiceErrorContext::CreateBasin => write!(f, "Failed to create basin"),
ServiceErrorContext::DeleteBasin => write!(f, "Failed to delete basin"),
ServiceErrorContext::GetBasinConfig => write!(f, "Failed to get basin config"),
ServiceErrorContext::ReconfigureBasin => write!(f, "Failed to reconfigure basin"),
ServiceErrorContext::ListStreams => write!(f, "Failed to list streams"),
ServiceErrorContext::CreateStream => write!(f, "Failed to create stream"),
ServiceErrorContext::DeleteStream => write!(f, "Failed to delete stream"),
ServiceErrorContext::GetStreamConfig => write!(f, "Failed to get stream config"),
ServiceErrorContext::CheckTail => write!(f, "Failed to check tail"),
ServiceErrorContext::AppendSession => write!(f, "Failed to append session"),
ServiceErrorContext::ReadSession => write!(f, "Failed to read session"),
ServiceErrorContext::ReconfigureStream => write!(f, "Failed to reconfigure stream"),
}
}
}

/// Error for holding relevant info from `tonic::Status`
#[derive(thiserror::Error, Debug, Default)]
#[error("{status}: \n{message}")]
Expand All @@ -74,45 +111,15 @@ impl From<ClientError> for ServiceStatus {
}
}

#[derive(Debug, thiserror::Error)]
pub enum ErrorKind {
#[error("Failed to list basins")]
ListBasins,
#[error("Failed to create basin")]
CreateBasin,
#[error("Failed to delete basin")]
DeleteBasin,
#[error("Failed to get basin config")]
GetBasinConfig,
#[error("Failed to reconfigure basin")]
ReconfigureBasin,
#[error("Failed to list streams")]
ListStreams,
#[error("Failed to create stream")]
CreateStream,
#[error("Failed to delete stream")]
DeleteStream,
#[error("Failed to get stream config")]
GetStreamConfig,
#[error("Failed to check tail")]
CheckTail,
#[error("Failed to append session")]
AppendSession,
#[error("Failed to read session")]
ReadSession,
#[error("Failed to write session")]
ReconfigureStream,
}

#[derive(Debug, thiserror::Error)]
#[error("{kind}:\n {status}")]
pub struct ServiceError {
kind: ErrorKind,
kind: ServiceErrorContext,
status: ServiceStatus,
}

impl ServiceError {
pub fn new(kind: ErrorKind, status: impl Into<ServiceStatus>) -> Self {
pub fn new(kind: ServiceErrorContext, status: impl Into<ServiceStatus>) -> Self {
Self {
kind,
status: status.into(),
Expand Down
8 changes: 5 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use basin::BasinService;
use clap::{builder::styling, Parser, Subcommand};
use colored::*;
use config::{config_path, create_config};
use error::{ErrorKind, S2CliError, ServiceError};
use error::{S2CliError, ServiceError, ServiceErrorContext};
use stream::{RecordStream, StreamService};
use streamstore::{
bytesize::ByteSize,
Expand Down Expand Up @@ -518,7 +518,9 @@ async fn run() -> Result<(), S2CliError> {
.bold()
);
})
.map_err(|e| ServiceError::new(ErrorKind::AppendSession, e))?;
.map_err(|e| {
ServiceError::new(ServiceErrorContext::AppendSession, e)
})?;
}
}
StreamActions::Read {
Expand All @@ -542,7 +544,7 @@ async fn run() -> Result<(), S2CliError> {
}

let read_result = read_result
.map_err(|e| ServiceError::new(ErrorKind::ReadSession, e))?;
.map_err(|e| ServiceError::new(ServiceErrorContext::ReadSession, e))?;

match read_result {
ReadOutput::Batch(sequenced_record_batch) => {
Expand Down
8 changes: 4 additions & 4 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use streamstore::types::AppendRecord;
use tokio::io::Lines;
use tokio_stream::Stream;

use crate::error::{ErrorKind, ServiceError};
use crate::error::{ServiceError, ServiceErrorContext};
use crate::ByteSize;

pin_project! {
Expand Down Expand Up @@ -60,7 +60,7 @@ impl StreamService {
self.client
.check_tail()
.await
.map_err(|e| ServiceError::new(ErrorKind::CheckTail, e))
.map_err(|e| ServiceError::new(ServiceErrorContext::CheckTail, e))
}

pub async fn append_session(
Expand All @@ -73,7 +73,7 @@ impl StreamService {
self.client
.append_session(append_record_stream)
.await
.map_err(|e| ServiceError::new(ErrorKind::AppendSession, e))
.map_err(|e| ServiceError::new(ServiceErrorContext::AppendSession, e))
}

pub async fn read_session(
Expand All @@ -95,6 +95,6 @@ impl StreamService {
self.client
.read_session(read_session_req)
.await
.map_err(|e| ServiceError::new(ErrorKind::ReadSession, e))
.map_err(|e| ServiceError::new(ServiceErrorContext::ReadSession, e))
}
}

0 comments on commit 263ed16

Please sign in to comment.