Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Stream fence and trim commands #46

Merged
merged 4 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub enum S2CliError {
Service(#[from] ServiceError),
}

#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub enum ServiceErrorContext {
ListBasins,
CreateBasin,
Expand All @@ -63,6 +63,8 @@ pub enum ServiceErrorContext {
DeleteStream,
GetStreamConfig,
CheckTail,
Trim,
Fence,
AppendSession,
ReadSession,
ReconfigureStream,
Expand All @@ -81,6 +83,8 @@ impl std::fmt::Display for ServiceErrorContext {
ServiceErrorContext::DeleteStream => write!(f, "Failed to delete stream"),
ServiceErrorContext::GetStreamConfig => write!(f, "Failed to get stream config"),
ServiceErrorContext::CheckTail => write!(f, "Failed to check tail"),
ServiceErrorContext::Trim => write!(f, "Failed to trim"),
ServiceErrorContext::Fence => write!(f, "Failed to set fencing token"),
ServiceErrorContext::AppendSession => write!(f, "Failed to append session"),
ServiceErrorContext::ReadSession => write!(f, "Failed to read session"),
ServiceErrorContext::ReconfigureStream => write!(f, "Failed to reconfigure stream"),
Expand All @@ -90,7 +94,7 @@ impl std::fmt::Display for ServiceErrorContext {

/// Error for holding relevant info from `tonic::Status`
#[derive(thiserror::Error, Debug, Default)]
#[error("{status}: \n{message}")]
#[error("{status}:\n {message}")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the space there helps look like an inline, does it mess up with the output

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The space was in the wrong place, this should fix it :)

pub struct ServiceStatus {
pub message: String,
pub status: String,
Expand All @@ -103,25 +107,25 @@ impl From<ClientError> for ServiceStatus {
message: status.message().to_string(),
status: status.code().to_string(),
},
_ => Self {
message: error.to_string(),
..Default::default()
ClientError::Conversion(conv) => Self {
message: conv.to_string(),
status: "Failed to convert SDK type".to_string(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I meant to also include the underlying message conv, we shouldn't throw away the context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conv is the message here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh doh, my original complaint is not valid ;P

},
}
}
}

#[derive(Debug, thiserror::Error)]
#[error("{kind}:\n {status}")]
#[error("{context}:\n {status}")]
pub struct ServiceError {
kind: ServiceErrorContext,
context: ServiceErrorContext,
status: ServiceStatus,
}

impl ServiceError {
pub fn new(kind: ServiceErrorContext, status: impl Into<ServiceStatus>) -> Self {
pub fn new(context: ServiceErrorContext, status: impl Into<ServiceStatus>) -> Self {
Self {
kind,
context,
status: status.into(),
}
}
Expand Down
38 changes: 37 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use signal_hook_tokio::Signals;
use stream::{RecordStream, StreamService};
use streamstore::{
client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient},
types::{BasinInfo, BasinName, FencingToken, MeteredBytes as _, ReadOutput, StreamInfo},
types::{
BasinInfo, BasinName, CommandRecord, FencingToken, MeteredBytes as _, ReadOutput,
StreamInfo,
},
HeaderValue,
};
use tokio::{
Expand Down Expand Up @@ -207,6 +210,25 @@ enum StreamActions {
/// Get the next sequence number that will be assigned by a stream.
CheckTail,

/// Set the trim point for the stream.
/// Trimming is eventually consistent, and trimmed records may be visible
/// for a brief period.
Trim {
/// Trim point.
/// This sequence number is only allowed to advance, and any regression
/// will be ignored.
trim_point: u64,
},

/// Set the fencing token for the stream.
/// Fencing is strongly consistent, and subsequent appends that specify a
/// fencing token will be rejected if it does not match.
Fence {
/// Payload upto 16 bytes to set as the fencing token.
/// An empty payload clears the token.
fencing_token: Option<FencingToken>,
},

/// Append records to a stream. Currently, only newline delimited records are supported.
Append {
/// Enforce a fencing token which must have been previously set by a
Expand Down Expand Up @@ -523,6 +545,20 @@ async fn run() -> Result<(), S2CliError> {
let next_seq_num = StreamService::new(stream_client).check_tail().await?;
println!("{}", next_seq_num);
}
StreamActions::Trim { trim_point } => {
let stream_client = StreamClient::new(client_config, basin, stream);
StreamService::new(stream_client)
.append_command_record(CommandRecord::trim(trim_point))
.await?;
eprintln!("{}", "✓ Trim point set".green().bold());
shikhar marked this conversation as resolved.
Show resolved Hide resolved
}
StreamActions::Fence { fencing_token } => {
let stream_client = StreamClient::new(client_config, basin, stream);
StreamService::new(stream_client)
.append_command_record(CommandRecord::fence(fencing_token))
.await?;
eprintln!("{}", "✓ Fencing token set".green().bold());
}
StreamActions::Append {
input,
fencing_token,
Expand Down
24 changes: 22 additions & 2 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use streamstore::{
batching::{AppendRecordsBatchingOpts, AppendRecordsBatchingStream},
client::StreamClient,
types::{AppendOutput, FencingToken, ReadLimit, ReadOutput, ReadSessionRequest},
client::{ClientError, StreamClient},
types::{
AppendInput, AppendOutput, AppendRecordBatch, CommandRecord, FencingToken, ReadLimit,
ReadOutput, ReadSessionRequest,
},
Streaming,
};
use tokio::io::AsyncBufRead;
Expand Down Expand Up @@ -68,6 +71,23 @@ impl StreamService {
.map_err(|e| ServiceError::new(ServiceErrorContext::CheckTail, e))
}

pub async fn append_command_record(
&self,
cmd: CommandRecord,
) -> Result<AppendOutput, ServiceError> {
let context = match &cmd {
CommandRecord::Fence { .. } => ServiceErrorContext::Fence,
CommandRecord::Trim { .. } => ServiceErrorContext::Trim,
};
let record = AppendRecord::try_from(cmd)
.map_err(|e| ServiceError::new(context, ClientError::Conversion(e)))?;
let batch = AppendRecordBatch::try_from_iter([record]).expect("single valid append record");
self.client
.append(AppendInput::new(batch))
.await
.map_err(|e| ServiceError::new(context, e))
}

pub async fn append_session(
&self,
append_input_stream: RecordStream<Box<dyn AsyncBufRead + Send + Unpin>>,
Expand Down
Loading