Skip to content

Commit

Permalink
Add RedisMessenger::add_streams
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Nov 17, 2023
1 parent a6508b4 commit 32fac47
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 0 deletions.
1 change: 1 addition & 0 deletions plerkle_messenger/src/plerkle_messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub trait Messenger: Sync + Send {
where
Self: Sized;
fn messenger_type(&self) -> MessengerType;
async fn add_streams(&mut self, streams: &[(&'static str, usize)]) -> Result<(), MessengerError>;
async fn add_stream(&mut self, stream_key: &'static str) -> Result<(), MessengerError>;
async fn set_buffer_size(&mut self, stream_key: &'static str, max_buffer_size: usize);
async fn send(&mut self, stream_key: &'static str, bytes: &[u8]) -> Result<(), MessengerError>;
Expand Down
30 changes: 30 additions & 0 deletions plerkle_messenger/src/redis_messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,36 @@ impl Messenger for RedisMessenger {
}
}

async fn add_streams(&mut self, streams: &[(&'static str, usize)]) -> Result<(), MessengerError> {
for (stream_key, max_buffer_size) in streams {
// Add to streams hashmap.
let _result = self.streams.insert(
stream_key,
RedisMessengerStream {
max_len: Some(StreamMaxlen::Approx(*max_buffer_size)),
local_buffer: LinkedList::new(),
local_buffer_total: 0,
local_buffer_last_flush: Instant::now(),
},
);

// Add stream to Redis.
let result: RedisResult<()> = self
.connection
.xgroup_create_mkstream(stream_key, self.consumer_group_name.as_str(), "$")
.await;

if let Err(error) = result {
if !(error.kind() == redis::ErrorKind::ExtensionError && error.code() == Some("BUSYGROUP")) {
return Err(MessengerError::ConfigurationError {
msg: format!("{:?}", error),
});
}
}
}
Ok(())
}

async fn add_stream(&mut self, stream_key: &'static str) -> Result<(), MessengerError> {
// Add to streams hashmap.
let _result = self.streams.insert(
Expand Down

0 comments on commit 32fac47

Please sign in to comment.