Skip to content

Commit

Permalink
feat: Stream fence and trim commands
Browse files Browse the repository at this point in the history
Resolves: #33

Signed-off-by: Vaibhav Rabber <[email protected]>
  • Loading branch information
vrongmeal committed Dec 4, 2024
1 parent df94374 commit 84fbe5b
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 8 deletions.
14 changes: 9 additions & 5 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub enum S2CliError {
Service(#[from] ServiceError),
}

#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub enum ServiceErrorContext {
ListBasins,
CreateBasin,
Expand All @@ -63,6 +63,8 @@ pub enum ServiceErrorContext {
DeleteStream,
GetStreamConfig,
CheckTail,
Trim,
Fence,
AppendSession,
ReadSession,
ReconfigureStream,
Expand All @@ -81,6 +83,8 @@ impl std::fmt::Display for ServiceErrorContext {
ServiceErrorContext::DeleteStream => write!(f, "Failed to delete stream"),
ServiceErrorContext::GetStreamConfig => write!(f, "Failed to get stream config"),
ServiceErrorContext::CheckTail => write!(f, "Failed to check tail"),
ServiceErrorContext::Trim => write!(f, "Failed to trim"),
ServiceErrorContext::Fence => write!(f, "Failed to set fencing token"),
ServiceErrorContext::AppendSession => write!(f, "Failed to append session"),
ServiceErrorContext::ReadSession => write!(f, "Failed to read session"),
ServiceErrorContext::ReconfigureStream => write!(f, "Failed to reconfigure stream"),
Expand All @@ -90,7 +94,7 @@ impl std::fmt::Display for ServiceErrorContext {

/// Error for holding relevant info from `tonic::Status`
#[derive(thiserror::Error, Debug, Default)]
#[error("{status}: \n{message}")]
#[error("{status}:\n {message}")]
pub struct ServiceStatus {
pub message: String,
pub status: String,
Expand All @@ -103,9 +107,9 @@ impl From<ClientError> for ServiceStatus {
message: status.message().to_string(),
status: status.code().to_string(),
},
_ => Self {
message: error.to_string(),
..Default::default()
ClientError::Conversion(conv) => Self {
message: conv.to_string(),
status: "Internal error".to_string(),
},
}
}
Expand Down
38 changes: 37 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use signal_hook_tokio::Signals;
use stream::{RecordStream, StreamService};
use streamstore::{
client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient},
types::{BasinInfo, BasinName, FencingToken, MeteredBytes as _, ReadOutput, StreamInfo},
types::{
BasinInfo, BasinName, CommandRecord, FencingToken, MeteredBytes as _, ReadOutput,
StreamInfo,
},
HeaderValue,
};
use tokio::{
Expand Down Expand Up @@ -207,6 +210,25 @@ enum StreamActions {
/// Get the next sequence number that will be assigned by a stream.
CheckTail,

/// Set the trim point for the stream.
/// Trimming is eventually consistent, and trimmed records may be visible
/// for a brief period.
Trim {
/// Trim point.
/// This sequence number is only allowed to advance, and any regression
/// will be ignored.
trim_point: u64,
},

/// Set the fencing token for the stream.
/// Fencing is strongly consistent, and subsequent appends that specify a
/// fencing token will be rejected if it does not match.
Fence {
/// Payload upto 16 bytes to set as the fencing token.
/// An empty payload clears the token.
fencing_token: Option<FencingToken>,
},

/// Append records to a stream. Currently, only newline delimited records are supported.
Append {
/// Enforce a fencing token which must have been previously set by a
Expand Down Expand Up @@ -523,6 +545,20 @@ async fn run() -> Result<(), S2CliError> {
let next_seq_num = StreamService::new(stream_client).check_tail().await?;
println!("{}", next_seq_num);
}
StreamActions::Trim { trim_point } => {
let stream_client = StreamClient::new(client_config, basin, stream);
StreamService::new(stream_client)
.append_command_record(CommandRecord::trim(trim_point))
.await?;
eprintln!("{}", "✓ Trim point set".green().bold());
}
StreamActions::Fence { fencing_token } => {
let stream_client = StreamClient::new(client_config, basin, stream);
StreamService::new(stream_client)
.append_command_record(CommandRecord::fence(fencing_token))
.await?;
eprintln!("{}", "✓ Fencing token set".green().bold());
}
StreamActions::Append {
input,
fencing_token,
Expand Down
24 changes: 22 additions & 2 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use streamstore::{
batching::{AppendRecordsBatchingOpts, AppendRecordsBatchingStream},
client::StreamClient,
types::{AppendOutput, FencingToken, ReadLimit, ReadOutput, ReadSessionRequest},
client::{ClientError, StreamClient},
types::{
AppendInput, AppendOutput, AppendRecordBatch, CommandRecord, FencingToken, ReadLimit,
ReadOutput, ReadSessionRequest,
},
Streaming,
};
use tokio::io::AsyncBufRead;
Expand Down Expand Up @@ -68,6 +71,23 @@ impl StreamService {
.map_err(|e| ServiceError::new(ServiceErrorContext::CheckTail, e))
}

pub async fn append_command_record(
&self,
cmd: CommandRecord,
) -> Result<AppendOutput, ServiceError> {
let context = match &cmd {
CommandRecord::Fence { .. } => ServiceErrorContext::Fence,
CommandRecord::Trim { .. } => ServiceErrorContext::Trim,
};
let record = AppendRecord::try_from(cmd)
.map_err(|e| ServiceError::new(context, ClientError::Conversion(e)))?;
let batch = AppendRecordBatch::try_from_iter([record]).expect("single valid append record");
self.client
.append(AppendInput::new(batch))
.await
.map_err(|e| ServiceError::new(context, e))
}

pub async fn append_session(
&self,
append_input_stream: RecordStream<Box<dyn AsyncBufRead + Send + Unpin>>,
Expand Down

0 comments on commit 84fbe5b

Please sign in to comment.