Skip to content

Commit

Permalink
Modify grpc_breaker_retry_interval type
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Oct 8, 2022
1 parent 860a4e8 commit 0fa03d8
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 67 deletions.
5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::time::Duration;

pub use crate::error::{Error, Result};
pub use crate::raft::{Mailbox, Raft, Store};
pub use tikv_raft::ReadOnlyOption;

mod error;
mod message;
Expand All @@ -17,7 +18,7 @@ pub struct Config {
pub grpc_concurrency_limit: usize,
//GRPC failed to fuse threshold
pub grpc_breaker_threshold: u64,
pub grpc_breaker_retry_interval: i64,
pub grpc_breaker_retry_interval: Duration,
//Proposal batchs
pub proposal_batch_size: usize,
pub proposal_batch_timeout: Duration,
Expand All @@ -33,7 +34,7 @@ impl Default for Config {
grpc_timeout: Duration::from_secs(6),
grpc_concurrency_limit: 200,
grpc_breaker_threshold: 4,
grpc_breaker_retry_interval: 2500,
grpc_breaker_retry_interval: Duration::from_millis(2500),
proposal_batch_size: 50,
proposal_batch_timeout: Duration::from_millis(200),
snapshot_interval: Duration::from_secs(600),
Expand Down
4 changes: 2 additions & 2 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl ProposalSender {
}
}

/// A mailbox to send messages to a ruung raft node.
/// A mailbox to send messages to a running raft node.
#[derive(Clone)]
pub struct Mailbox {
peers: Arc<DashMap<(u64, String), Peer>>,
Expand Down Expand Up @@ -274,7 +274,7 @@ impl<S: Store + Send + Sync + 'static> Raft<S> {
grpc_timeout: self.cfg.grpc_timeout,
grpc_concurrency_limit: self.cfg.grpc_concurrency_limit,
grpc_breaker_threshold: self.cfg.grpc_breaker_threshold,
grpc_breaker_retry_interval: self.cfg.grpc_breaker_retry_interval,
grpc_breaker_retry_interval: self.cfg.grpc_breaker_retry_interval.as_millis() as i64,
}
}

Expand Down
64 changes: 1 addition & 63 deletions src/raft_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,68 +429,6 @@ impl<S: Store + 'static> RaftNode<S> {
})
}

// fn start_message_sender() -> mpsc::Sender<MessageSender> {
// let (tx, mut rx): (mpsc::Sender<MessageSender>, mpsc::Receiver<MessageSender>) =
// mpsc::channel(1000);
//
// tokio::spawn(async move {
// use std::sync::atomic::AtomicBool;
// type Queues =
// HashMap<u64, (Arc<AtomicBool>, VecDeque<MessageSender>), ahash::RandomState>;
// let mut queues: Queues = Queues::default();
//
// let sends = |queues: &mut Queues| {
// for (to, (sending, q)) in queues.iter_mut() {
// if sending.load(Ordering::SeqCst) {
// continue;
// }
// if !q.is_empty() {
// log::debug!(
// "to: {}, sending: {}, q.len: {}",
// to,
// sending.load(Ordering::SeqCst),
// q.len()
// );
// }
// if let Some(msg) = q.pop_front() {
// let sending = sending.clone();
// sending.store(true, Ordering::SeqCst);
// tokio::spawn(async move {
// msg.send().await;
// sending.store(false, Ordering::SeqCst);
// });
// }
// }
// };
//
// loop {
// match timeout(Duration::from_millis(10), rx.next()).await {
// //@TODO configurable
// Ok(Some(msg)) => {
// let (_, q) = queues
// .entry(msg.client_id)
// .or_insert((Arc::new(AtomicBool::new(false)), VecDeque::new()));
// q.push_back(msg);
// if q.len() > 300 {
// //@TODO configurable
// warn!("There is too much backlog of unsent messages, {}", q.len())
// }
// sends(&mut queues);
// }
// Ok(None) => {
// log::error!("start_message_sender, recv None");
// break;
// }
// Err(_) => {
// sends(&mut queues);
// }
// }
// }
// });
//
// tx
// }

#[inline]
fn new_config(id: u64, cfg: &RaftConfig) -> RaftConfig {
let mut cfg = cfg.clone();
Expand Down Expand Up @@ -523,7 +461,7 @@ impl<S: Store + 'static> RaftNode<S> {
self.cfg.grpc_timeout,
self.cfg.grpc_concurrency_limit,
self.cfg.grpc_breaker_threshold,
self.cfg.grpc_breaker_retry_interval,
self.cfg.grpc_breaker_retry_interval.as_millis() as i64,
);
self.peers.insert(id, Some(peer.clone()));
peer
Expand Down

0 comments on commit 0fa03d8

Please sign in to comment.