From c9dec84264a8d1c2ec1e71631a3057095a89eeb6 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Thu, 5 Dec 2024 08:11:41 +0530 Subject: [PATCH] feat: Stream `fence` and `trim` commands (#46) --- src/error.rs | 22 +++++++++++++--------- src/main.rs | 38 +++++++++++++++++++++++++++++++++++++- src/stream.rs | 24 ++++++++++++++++++++++-- 3 files changed, 72 insertions(+), 12 deletions(-) diff --git a/src/error.rs b/src/error.rs index 1440870..cde23f2 100644 --- a/src/error.rs +++ b/src/error.rs @@ -51,7 +51,7 @@ pub enum S2CliError { Service(#[from] ServiceError), } -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub enum ServiceErrorContext { ListBasins, CreateBasin, @@ -63,6 +63,8 @@ pub enum ServiceErrorContext { DeleteStream, GetStreamConfig, CheckTail, + Trim, + Fence, AppendSession, ReadSession, ReconfigureStream, @@ -81,6 +83,8 @@ impl std::fmt::Display for ServiceErrorContext { ServiceErrorContext::DeleteStream => write!(f, "Failed to delete stream"), ServiceErrorContext::GetStreamConfig => write!(f, "Failed to get stream config"), ServiceErrorContext::CheckTail => write!(f, "Failed to check tail"), + ServiceErrorContext::Trim => write!(f, "Failed to trim"), + ServiceErrorContext::Fence => write!(f, "Failed to set fencing token"), ServiceErrorContext::AppendSession => write!(f, "Failed to append session"), ServiceErrorContext::ReadSession => write!(f, "Failed to read session"), ServiceErrorContext::ReconfigureStream => write!(f, "Failed to reconfigure stream"), @@ -90,7 +94,7 @@ impl std::fmt::Display for ServiceErrorContext { /// Error for holding relevant info from `tonic::Status` #[derive(thiserror::Error, Debug, Default)] -#[error("{status}: \n{message}")] +#[error("{status}:\n {message}")] pub struct ServiceStatus { pub message: String, pub status: String, @@ -103,25 +107,25 @@ impl From for ServiceStatus { message: status.message().to_string(), status: status.code().to_string(), }, - _ => Self { - message: error.to_string(), - ..Default::default() + ClientError::Conversion(conv) => Self { + message: conv.to_string(), + status: "Failed to convert SDK type".to_string(), }, } } } #[derive(Debug, thiserror::Error)] -#[error("{kind}:\n {status}")] +#[error("{context}:\n {status}")] pub struct ServiceError { - kind: ServiceErrorContext, + context: ServiceErrorContext, status: ServiceStatus, } impl ServiceError { - pub fn new(kind: ServiceErrorContext, status: impl Into) -> Self { + pub fn new(context: ServiceErrorContext, status: impl Into) -> Self { Self { - kind, + context, status: status.into(), } } diff --git a/src/main.rs b/src/main.rs index 16c383a..bb75f4a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,7 +14,10 @@ use signal_hook_tokio::Signals; use stream::{RecordStream, StreamService}; use streamstore::{ client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient}, - types::{BasinInfo, BasinName, FencingToken, MeteredBytes as _, ReadOutput, StreamInfo}, + types::{ + BasinInfo, BasinName, CommandRecord, FencingToken, MeteredBytes as _, ReadOutput, + StreamInfo, + }, HeaderValue, }; use tokio::{ @@ -207,6 +210,25 @@ enum StreamActions { /// Get the next sequence number that will be assigned by a stream. CheckTail, + /// Set the trim point for the stream. + /// Trimming is eventually consistent, and trimmed records may be visible + /// for a brief period. + Trim { + /// Trim point. + /// This sequence number is only allowed to advance, and any regression + /// will be ignored. + trim_point: u64, + }, + + /// Set the fencing token for the stream. + /// Fencing is strongly consistent, and subsequent appends that specify a + /// fencing token will be rejected if it does not match. + Fence { + /// Payload upto 16 bytes to set as the fencing token. + /// An empty payload clears the token. + fencing_token: Option, + }, + /// Append records to a stream. Currently, only newline delimited records are supported. Append { /// Enforce a fencing token which must have been previously set by a @@ -523,6 +545,20 @@ async fn run() -> Result<(), S2CliError> { let next_seq_num = StreamService::new(stream_client).check_tail().await?; println!("{}", next_seq_num); } + StreamActions::Trim { trim_point } => { + let stream_client = StreamClient::new(client_config, basin, stream); + StreamService::new(stream_client) + .append_command_record(CommandRecord::trim(trim_point)) + .await?; + eprintln!("{}", "✓ Trim requested".green().bold()); + } + StreamActions::Fence { fencing_token } => { + let stream_client = StreamClient::new(client_config, basin, stream); + StreamService::new(stream_client) + .append_command_record(CommandRecord::fence(fencing_token)) + .await?; + eprintln!("{}", "✓ Fencing token set".green().bold()); + } StreamActions::Append { input, fencing_token, diff --git a/src/stream.rs b/src/stream.rs index e9099b4..f371bcb 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,7 +1,10 @@ use streamstore::{ batching::{AppendRecordsBatchingOpts, AppendRecordsBatchingStream}, - client::StreamClient, - types::{AppendOutput, FencingToken, ReadLimit, ReadOutput, ReadSessionRequest}, + client::{ClientError, StreamClient}, + types::{ + AppendInput, AppendOutput, AppendRecordBatch, CommandRecord, FencingToken, ReadLimit, + ReadOutput, ReadSessionRequest, + }, Streaming, }; use tokio::io::AsyncBufRead; @@ -68,6 +71,23 @@ impl StreamService { .map_err(|e| ServiceError::new(ServiceErrorContext::CheckTail, e)) } + pub async fn append_command_record( + &self, + cmd: CommandRecord, + ) -> Result { + let context = match &cmd { + CommandRecord::Fence { .. } => ServiceErrorContext::Fence, + CommandRecord::Trim { .. } => ServiceErrorContext::Trim, + }; + let record = AppendRecord::try_from(cmd) + .map_err(|e| ServiceError::new(context, ClientError::Conversion(e)))?; + let batch = AppendRecordBatch::try_from_iter([record]).expect("single valid append record"); + self.client + .append(AppendInput::new(batch)) + .await + .map_err(|e| ServiceError::new(context, e)) + } + pub async fn append_session( &self, append_input_stream: RecordStream>,