From 3fad4c43504c1d074a7d609b77575bf0425bfdb4 Mon Sep 17 00:00:00 2001 From: Yujia Qiao Date: Wed, 19 Jan 2022 10:58:05 +0800 Subject: [PATCH] fix: improve exp backoff for control channels --- src/client.rs | 16 +++++++++++++--- src/constants.rs | 1 + tests/integration_test.rs | 4 ++-- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/client.rs b/src/client.rs index 23af0d45..0c19519f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use tokio::io::{self, copy_bidirectional, AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpStream, UdpSocket}; use tokio::sync::{broadcast, mpsc, oneshot, RwLock}; -use tokio::time::{self, Duration}; +use tokio::time::{self, Duration, Instant}; use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span}; #[cfg(feature = "noise")] @@ -487,6 +487,8 @@ impl ControlChannelHandle { tokio::spawn( async move { let mut backoff = run_control_chan_backoff(); + let mut start = Instant::now(); + while let Err(err) = s .run() .await @@ -496,12 +498,20 @@ impl ControlChannelHandle { break; } - if let Some(duration) = backoff.next_backoff() { + if start.elapsed() > Duration::from_secs(3) { + // The client runs for at least 3 secs and then disconnects + // Retry immediately + backoff.reset(); + error!("{:#}. Retry...", err); + } else if let Some(duration) = backoff.next_backoff() { error!("{:#}. Retry in {:?}...", err, duration); time::sleep(duration).await; } else { - error!("{:#}. Break", err); + // Should never reach + panic!("{:#}. Break", err); } + + start = Instant::now(); } } .instrument(Span::current()), diff --git a/src/constants.rs b/src/constants.rs index c7c4ad7c..8de4dee0 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -17,6 +17,7 @@ pub fn listen_backoff() -> ExponentialBackoff { pub fn run_control_chan_backoff() -> ExponentialBackoff { ExponentialBackoff { + randomization_factor: 0.1, max_elapsed_time: None, max_interval: Duration::from_secs(1), ..Default::default() diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 18115753..52650578 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -113,7 +113,7 @@ async fn test(config_path: &'static str, t: Type) -> Result<()> { .await .unwrap(); }); - time::sleep(Duration::from_millis(2000)).await; // Wait for the client to retry + time::sleep(Duration::from_millis(2500)).await; // Wait for the client to retry info!("echo"); echo_hitter(ECHO_SERVER_ADDR_EXPOSED, t).await.unwrap(); @@ -155,7 +155,7 @@ async fn test(config_path: &'static str, t: Type) -> Result<()> { .await .unwrap(); }); - time::sleep(Duration::from_millis(2000)).await; // Wait for the client to retry + time::sleep(Duration::from_millis(2500)).await; // Wait for the client to retry // Simulate heavy load info!("lots of echo and pingpong");