Skip to content

Commit

Permalink
Add annotations and cargo clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Sep 7, 2024
1 parent f399e46 commit b9d5d74
Show file tree
Hide file tree
Showing 8 changed files with 558 additions and 42 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rmqtt-raft"
version = "0.4.0"
version = "0.4.1"
authors = ["rmqtt <[email protected]>"]
edition = "2021"
license = "MIT OR Apache-2.0"
Expand Down
37 changes: 33 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::time::Duration;

// Re-exporting necessary types and modules for external use.
pub use crate::error::{Error, Result};
pub use crate::message::Status;
pub use crate::raft::{Mailbox, Raft, Store};
pub use tikv_raft::ReadOnlyOption;

// Importing modules for internal use.
mod error;
mod message;
mod raft;
Expand All @@ -13,43 +15,70 @@ mod raft_server;
mod raft_service;
mod storage;

/// Configuration options for the Raft-based system.
#[derive(Clone)]
pub struct Config {
#[cfg(feature = "reuseaddr")]
/// Whether to reuse local addresses. This option is enabled only if the `reuseaddr` feature is active.
pub reuseaddr: bool,

#[cfg(all(
feature = "reuseport",
not(any(target_os = "solaris", target_os = "illumos"))
))]
/// Whether to reuse local ports. This option is enabled only if the `reuseport` feature is active
/// and the target OS is not Solaris or Illumos.
pub reuseport: bool,

/// The timeout duration for gRPC calls.
pub grpc_timeout: Duration,

/// The maximum number of concurrent gRPC calls.
pub grpc_concurrency_limit: usize,

/// The maximum size of gRPC messages in bytes.
pub grpc_message_size: usize,
//GRPC failed to fuse threshold

/// The threshold for the gRPC circuit breaker. If the number of failed requests exceeds this threshold,
/// the circuit breaker will trip.
pub grpc_breaker_threshold: u64,

/// The interval at which the gRPC circuit breaker will retry after tripping.
pub grpc_breaker_retry_interval: Duration,
//Proposal batchs

/// The maximum number of proposals to batch together before processing.
pub proposal_batch_size: usize,

/// The timeout duration for collecting proposals into a batch. If this timeout is reached,
/// the collected proposals will be processed regardless of the batch size.
pub proposal_batch_timeout: Duration,
//Snapshot generation interval

/// The interval at which snapshots are generated.
pub snapshot_interval: Duration,

/// The interval at which heartbeat messages are sent to maintain leader election and cluster health.
pub heartbeat: Duration,

/// Configuration options for the Raft protocol.
pub raft_cfg: tikv_raft::Config,
}

impl Default for Config {
/// Provides default values for the `Config` struct.
fn default() -> Self {
Self {
#[cfg(feature = "reuseaddr")]
reuseaddr: false,

#[cfg(all(
feature = "reuseport",
not(any(target_os = "solaris", target_os = "illumos"))
))]
reuseport: false,

grpc_timeout: Duration::from_secs(6),
grpc_concurrency_limit: 200,
grpc_message_size: 50 * 1024 * 1024,
grpc_message_size: 50 * 1024 * 1024, // 50 MB
grpc_breaker_threshold: 4,
grpc_breaker_retry_interval: Duration::from_millis(2500),
proposal_batch_size: 50,
Expand Down
67 changes: 52 additions & 15 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,60 @@ use futures::channel::oneshot::Sender;
use serde::{Deserialize, Serialize};
use tikv_raft::eraftpb::{ConfChange, Message as RaftMessage};

/// Enumeration representing various types of responses that can be sent back to clients.
#[derive(Serialize, Deserialize, Debug)]
pub enum RaftResponse {
/// Indicates that the request was sent to the wrong leader.
WrongLeader {
leader_id: u64,
leader_addr: Option<String>,
},
/// Indicates that a join request was successful.
JoinSuccess {
assigned_id: u64,
peer_addrs: HashMap<u64, String>,
},
RequestId {
leader_id: u64,
},
/// Contains the leader ID in response to a request for ID.
RequestId { leader_id: u64 },
/// Represents an error with a message.
Error(String),
Response {
data: Vec<u8>,
},
/// Contains arbitrary response data.
Response { data: Vec<u8> },
/// Represents the status of the system.
Status(Status),
/// Represents a successful operation.
Ok,
}

/// Enumeration representing different types of messages that can be sent within the system.
#[allow(dead_code)]
pub enum Message {
/// A proposal message to be processed.
Propose {
proposal: Vec<u8>,
chan: Sender<RaftResponse>,
},
/// A query message to be processed.
Query {
query: Vec<u8>,
chan: Sender<RaftResponse>,
},
/// A configuration change message to be processed.
ConfigChange {
change: ConfChange,
chan: Sender<RaftResponse>,
},
RequestId {
chan: Sender<RaftResponse>,
},
ReportUnreachable {
node_id: u64,
},
/// A request for the leader's ID.
RequestId { chan: Sender<RaftResponse> },
/// Report that a node is unreachable.
ReportUnreachable { node_id: u64 },
/// A Raft message to be processed.
Raft(Box<RaftMessage>),
Status {
chan: Sender<RaftResponse>,
},
/// A request for the status of the system.
Status { chan: Sender<RaftResponse> },
}

/// Struct representing the status of the system.
#[derive(Serialize, Deserialize, Debug)]
pub struct Status {
pub id: u64,
Expand All @@ -63,28 +70,37 @@ pub struct Status {
}

impl Status {
/// Checks if the node has started.
#[inline]
pub fn is_started(&self) -> bool {
self.leader_id > 0
}

/// Checks if this node is the leader.
#[inline]
pub fn is_leader(&self) -> bool {
self.leader_id == self.id
}
}

/// Enumeration for reply channels which could be single or multiple.
pub(crate) enum ReplyChan {
/// Single reply channel with its timestamp.
One((Sender<RaftResponse>, Instant)),
/// Multiple reply channels with their timestamps.
More(Vec<(Sender<RaftResponse>, Instant)>),
}

/// Enumeration for proposals which could be a single proposal or multiple proposals.
#[derive(Serialize, Deserialize)]
pub(crate) enum Proposals {
/// A single proposal.
One(Vec<u8>),
/// Multiple proposals.
More(Vec<Vec<u8>>),
}

/// A struct to manage proposal batching and sending.
pub(crate) struct Merger {
proposals: Vec<Vec<u8>>,
chans: Vec<(Sender<RaftResponse>, Instant)>,
Expand All @@ -94,6 +110,14 @@ pub(crate) struct Merger {
}

impl Merger {
/// Creates a new `Merger` instance with the specified batch size and timeout.
///
/// # Parameters
/// - `proposal_batch_size`: The maximum number of proposals to include in a batch.
/// - `proposal_batch_timeout`: The timeout duration for collecting proposals.
///
/// # Returns
/// A new `Merger` instance.
pub fn new(proposal_batch_size: usize, proposal_batch_timeout: Duration) -> Self {
Self {
proposals: Vec::new(),
Expand All @@ -104,17 +128,30 @@ impl Merger {
}
}

/// Adds a new proposal and its corresponding reply channel to the merger.
///
/// # Parameters
/// - `proposal`: The proposal data to be added.
/// - `chan`: The reply channel for the proposal.
#[inline]
pub fn add(&mut self, proposal: Vec<u8>, chan: Sender<RaftResponse>) {
self.proposals.push(proposal);
self.chans.push((chan, Instant::now()));
}

/// Returns the number of proposals currently held by the merger.
///
/// # Returns
/// The number of proposals.
#[inline]
pub fn len(&self) -> usize {
self.proposals.len()
}

/// Retrieves a batch of proposals and their corresponding reply channels if the batch size or timeout criteria are met.
///
/// # Returns
/// An `Option` containing the proposals and reply channels, or `None` if no batch is ready.
#[inline]
pub fn take(&mut self) -> Option<(Proposals, ReplyChan)> {
let max = self.proposal_batch_size;
Expand Down
Loading

0 comments on commit b9d5d74

Please sign in to comment.