From 5b949c8089c5a484cb1f692540696df23e8f9e95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Thu, 28 Nov 2024 16:01:18 +0100 Subject: [PATCH] Initial migration backoff -> backon MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #1635 Signed-off-by: Natalie Klestrup Röijezon --- Cargo.toml | 2 +- deny.toml | 4 +- examples/Cargo.toml | 2 +- examples/errorbounded_configmap_watcher.rs | 2 + examples/shared_stream_controllers.rs | 4 +- kube-client/src/client/builder.rs | 2 +- kube-client/src/client/client_ext.rs | 7 +- kube-derive/tests/resource.rs | 4 + kube-runtime/Cargo.toml | 2 +- kube-runtime/src/controller/mod.rs | 21 ++-- kube-runtime/src/utils/backoff_reset_timer.rs | 114 +++++++++++------- kube-runtime/src/utils/backoff_resettable.rs | 55 +++++++++ kube-runtime/src/utils/mod.rs | 4 +- kube-runtime/src/utils/stream_backoff.rs | 81 +++++++++---- kube-runtime/src/utils/watch_ext.rs | 25 ++-- kube-runtime/src/watcher.rs | 48 +++++--- 16 files changed, 254 insertions(+), 123 deletions(-) create mode 100644 kube-runtime/src/utils/backoff_resettable.rs diff --git a/Cargo.toml b/Cargo.toml index 14fc4e283..d3a7e8c08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ assert-json-diff = "2.0.2" async-broadcast = "0.7.0" async-stream = "0.3.5" async-trait = "0.1.64" -backoff = "0.4.0" +backon = "1.3.0" base64 = "0.22.1" bytes = "1.1.0" chrono = { version = "0.4.34", default-features = false } diff --git a/deny.toml b/deny.toml index 2f1704961..0065d2fce 100644 --- a/deny.toml +++ b/deny.toml @@ -12,9 +12,7 @@ db-urls = ["https://github.com/rustsec/advisory-db"] # remove them when we have to yanked = "warn" -ignore = [ - "RUSTSEC-2024-0384", # instant dep via unmaintained backoff dep -] +ignore = [] [licenses] # See https://spdx.org/licenses/ for list of possible licenses diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 25add42a2..d5f7de5ea 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -52,7 +52,7 @@ tower-http = { workspace = true, features = ["trace", "decompression-gzip"] } hyper = { workspace = true, features = ["client", "http1"] } hyper-util = { workspace = true, features = ["client-legacy", "http1", "tokio"] } thiserror.workspace = true -backoff.workspace = true +backon.workspace = true clap = { version = "4.0", default-features = false, features = ["std", "cargo", "derive"] } edit = "0.1.3" tokio-stream = { version = "0.1.9", features = ["net"] } diff --git a/examples/errorbounded_configmap_watcher.rs b/examples/errorbounded_configmap_watcher.rs index db9e69830..b541a8975 100644 --- a/examples/errorbounded_configmap_watcher.rs +++ b/examples/errorbounded_configmap_watcher.rs @@ -15,12 +15,14 @@ use tracing::*; #[resource(inherit = ConfigMap)] struct CaConfigMap { metadata: ObjectMeta, + #[allow(unused)] data: CaConfigMapData, } #[derive(Deserialize, Debug, Clone)] struct CaConfigMapData { #[serde(rename = "ca.crt")] + #[allow(unused)] ca_crt: String, } diff --git a/examples/shared_stream_controllers.rs b/examples/shared_stream_controllers.rs index f8ff276d6..adf3a9e0e 100644 --- a/examples/shared_stream_controllers.rs +++ b/examples/shared_stream_controllers.rs @@ -1,6 +1,6 @@ use std::{ops::Deref, sync::Arc, time::Duration}; -use futures::{future, StreamExt}; +use futures::StreamExt; use k8s_openapi::api::{apps::v1::Deployment, core::v1::Pod}; use kube::{ runtime::{ @@ -63,7 +63,7 @@ async fn main() -> anyhow::Result<()> { .clone() .map(|r| Ok(r.deref().clone())) .predicate_filter(predicates::resource_version) - .filter_map(|r| future::ready(r.ok().map(Arc::new))); + .filter_map(|r| std::future::ready(r.ok().map(Arc::new))); // Reflect a stream of pod watch events into the store and apply a backoff. For subscribers to // be able to consume updates, the reflector must be shared. diff --git a/kube-client/src/client/builder.rs b/kube-client/src/client/builder.rs index bb2518393..004993325 100644 --- a/kube-client/src/client/builder.rs +++ b/kube-client/src/client/builder.rs @@ -282,7 +282,7 @@ mod tests { let io: TokioIo = TokioIo::new(tcp); tokio::spawn(async move { - let _ = http1::Builder::new() + http1::Builder::new() .timer(TokioTimer::new()) .serve_connection( io, diff --git a/kube-client/src/client/client_ext.rs b/kube-client/src/client/client_ext.rs index ced2df626..051ac2050 100644 --- a/kube-client/src/client/client_ext.rs +++ b/kube-client/src/client/client_ext.rs @@ -487,12 +487,7 @@ mod test { // Fetch using local object reference let svc: Service = client - .fetch( - &LocalObjectReference { - name: svc.name_any().into(), - } - .within(svc.namespace()), - ) + .fetch(&LocalObjectReference { name: svc.name_any() }.within(svc.namespace())) .await?; assert_eq!(svc.name_unchecked(), "kubernetes"); diff --git a/kube-derive/tests/resource.rs b/kube-derive/tests/resource.rs index e784c38cb..5d9951bba 100644 --- a/kube-derive/tests/resource.rs +++ b/kube-derive/tests/resource.rs @@ -9,11 +9,13 @@ use kube_derive::Resource; #[resource(inherit = "ConfigMap")] struct TypedMap { metadata: ObjectMeta, + #[allow(unused)] data: Option, } #[derive(Default)] struct TypedData { + #[allow(unused)] field: String, } @@ -21,11 +23,13 @@ struct TypedData { #[resource(inherit = "Secret")] struct TypedSecret { metadata: ObjectMeta, + #[allow(unused)] data: Option, } #[derive(Default)] struct TypedSecretData { + #[allow(unused)] field: ByteString, } diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index a89492384..aced8d639 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -43,7 +43,7 @@ json-patch.workspace = true jsonptr.workspace = true serde_json.workspace = true thiserror.workspace = true -backoff.workspace = true +backon.workspace = true async-trait.workspace = true hashbrown.workspace = true k8s-openapi.workspace = true diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index e701b1d1b..c700fc20d 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -8,10 +8,13 @@ use crate::{ ObjectRef, }, scheduler::{debounced_scheduler, ScheduleRequest}, - utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt}, - watcher::{self, metadata_watcher, watcher, DefaultBackoff}, + utils::{ + trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, ResettableBackoff, + ResettableBackoffWrapper, StreamBackoff, WatchStreamExt, + }, + watcher::{self, metadata_watcher, watcher, DefaultBackoffBuilder}, }; -use backoff::backoff::Backoff; +use backon::BackoffBuilder; use educe::Educe; use futures::{ channel, @@ -629,7 +632,7 @@ where { // NB: Need to Unpin for stream::select_all trigger_selector: stream::SelectAll, watcher::Error>>>, - trigger_backoff: Box, + trigger_backoff: Box, /// [`run`](crate::Controller::run) starts a graceful shutdown when any of these [`Future`]s complete, /// refusing to start any new reconciliations but letting any existing ones finish. graceful_shutdown_selector: Vec>, @@ -689,7 +692,7 @@ where trigger_selector.push(self_watcher); Self { trigger_selector, - trigger_backoff: Box::::default(), + trigger_backoff: Box::>::default(), graceful_shutdown_selector: vec![ // Fallback future, ensuring that we never terminate if no additional futures are added to the selector future::pending().boxed(), @@ -775,7 +778,7 @@ where trigger_selector.push(self_watcher); Self { trigger_selector, - trigger_backoff: Box::::default(), + trigger_backoff: Box::>::default(), graceful_shutdown_selector: vec![ // Fallback future, ensuring that we never terminate if no additional futures are added to the selector future::pending().boxed(), @@ -886,7 +889,7 @@ where trigger_selector.push(self_watcher); Self { trigger_selector, - trigger_backoff: Box::::default(), + trigger_backoff: Box::>::default(), graceful_shutdown_selector: vec![ // Fallback future, ensuring that we never terminate if no additional futures are added to the selector future::pending().boxed(), @@ -915,8 +918,8 @@ where /// The [`default_backoff`](crate::watcher::default_backoff) follows client-go conventions, /// but can be overridden by calling this method. #[must_use] - pub fn trigger_backoff(mut self, backoff: impl Backoff + Send + 'static) -> Self { - self.trigger_backoff = Box::new(backoff); + pub fn trigger_backoff(mut self, backoff_builder: impl BackoffBuilder + Clone + 'static) -> Self { + self.trigger_backoff = Box::new(ResettableBackoffWrapper::new(backoff_builder)); self } diff --git a/kube-runtime/src/utils/backoff_reset_timer.rs b/kube-runtime/src/utils/backoff_reset_timer.rs index 1c09a5344..5e99cccfc 100644 --- a/kube-runtime/src/utils/backoff_reset_timer.rs +++ b/kube-runtime/src/utils/backoff_reset_timer.rs @@ -1,34 +1,73 @@ use std::time::{Duration, Instant}; -use backoff::{backoff::Backoff, Clock, SystemClock}; +use backon::BackoffBuilder; -/// A [`Backoff`] wrapper that resets after a fixed duration has elapsed. -pub struct ResetTimerBackoff { - backoff: B, - clock: C, - last_backoff: Option, - reset_duration: Duration, +use super::{ResettableBackoff, ResettableBackoffWrapper}; + +// TODO: do we actually need this or should we just use tokio::time? +pub trait Clock: Send + Sync + Unpin { + fn now(&self) -> Instant; +} + +#[derive(Debug, Clone, Copy)] +pub struct TokioClock; +impl Clock for TokioClock { + fn now(&self) -> Instant { + tokio::time::Instant::now().into_std() + } } -impl ResetTimerBackoff { - pub fn new(backoff: B, reset_duration: Duration) -> Self { - Self::new_with_custom_clock(backoff, reset_duration, SystemClock {}) +impl ResetTimerBackoffBuilder { + pub fn new(inner_backoff_builder: B, reset_duration: Duration) -> Self { + Self::new_with_custom_clock(inner_backoff_builder, reset_duration, TokioClock) } } -impl ResetTimerBackoff { - fn new_with_custom_clock(backoff: B, reset_duration: Duration, clock: C) -> Self { +impl ResetTimerBackoffBuilder { + fn new_with_custom_clock(inner_backoff_builder: B, reset_duration: Duration, clock: C) -> Self { Self { - backoff, + inner_backoff_builder, clock, - last_backoff: None, reset_duration, } } } -impl Backoff for ResetTimerBackoff { - fn next_backoff(&mut self) -> Option { +/// A [`Backoff`] wrapper that resets after a fixed duration has elapsed. +#[derive(Debug, Clone)] +pub struct ResetTimerBackoffBuilder { + inner_backoff_builder: B, + clock: C, + reset_duration: Duration, +} + +impl BackoffBuilder for ResetTimerBackoffBuilder { + type Backoff = ResetTimerBackoff, C>; + + fn build(self) -> Self::Backoff { + ResetTimerBackoff { + inner_backoff: ResettableBackoffWrapper::new(self.inner_backoff_builder), + clock: self.clock, + reset_duration: self.reset_duration, + last_backoff: None, + } + } +} + +/// Constructed by [`ResetTimerBackoffBuilder`]. +#[derive(Debug)] +pub struct ResetTimerBackoff { + inner_backoff: B, + clock: C, + reset_duration: Duration, + last_backoff: Option, +} + +// impl Backoff, which is now effectively an alias for Iterator +impl Iterator for ResetTimerBackoff { + type Item = Duration; + + fn next(&mut self) -> Option { if let Some(last_backoff) = self.last_backoff { if self.clock.now() > last_backoff + self.reset_duration { tracing::debug!( @@ -36,51 +75,42 @@ impl Backoff for ResetTimerBackoff { reset_duration = ?self.reset_duration, "Resetting backoff, since reset duration has expired" ); - self.backoff.reset(); + self.inner_backoff.reset(); } } self.last_backoff = Some(self.clock.now()); - self.backoff.next_backoff() - } - - fn reset(&mut self) { - // Do not even bother trying to reset here, since `next_backoff` will take care of this when the timer expires. + self.inner_backoff.next() } } #[cfg(test)] mod tests { - use backoff::{backoff::Backoff, Clock}; + use backon::BackoffBuilder; use tokio::time::advance; - use super::ResetTimerBackoff; - use crate::utils::stream_backoff::tests::LinearBackoff; - use std::time::{Duration, Instant}; + use crate::utils::{ + backoff_reset_timer::TokioClock, stream_backoff::tests::LinearBackoffBuilder, + ResetTimerBackoffBuilder, + }; + use std::time::Duration; #[tokio::test] async fn should_reset_when_timer_expires() { tokio::time::pause(); - let mut backoff = ResetTimerBackoff::new_with_custom_clock( - LinearBackoff::new(Duration::from_secs(2)), + let mut backoff = ResetTimerBackoffBuilder::new_with_custom_clock( + LinearBackoffBuilder::new(Duration::from_secs(2)), Duration::from_secs(60), TokioClock, - ); - assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2))); + ) + .build(); + assert_eq!(backoff.next(), Some(Duration::from_secs(2))); advance(Duration::from_secs(40)).await; - assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(4))); + assert_eq!(backoff.next(), Some(Duration::from_secs(4))); advance(Duration::from_secs(40)).await; - assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(6))); + assert_eq!(backoff.next(), Some(Duration::from_secs(6))); advance(Duration::from_secs(80)).await; - assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2))); + assert_eq!(backoff.next(), Some(Duration::from_secs(2))); advance(Duration::from_secs(80)).await; - assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2))); - } - - struct TokioClock; - - impl Clock for TokioClock { - fn now(&self) -> Instant { - tokio::time::Instant::now().into_std() - } + assert_eq!(backoff.next(), Some(Duration::from_secs(2))); } } diff --git a/kube-runtime/src/utils/backoff_resettable.rs b/kube-runtime/src/utils/backoff_resettable.rs new file mode 100644 index 000000000..45ac48d61 --- /dev/null +++ b/kube-runtime/src/utils/backoff_resettable.rs @@ -0,0 +1,55 @@ +use std::{ops::DerefMut, time::Duration}; + +use backon::{Backoff, BackoffBuilder}; + +/// A [`Backoff`] that can also be reset. +/// +/// Implemented by [`ResettableBackoffWrapper`]. +// Separated into a trait so that it can be used as a trait object, erasing the backing [`BackoffBuilder`]. +pub trait ResettableBackoff: Backoff { + fn reset(&mut self); +} + +impl ResettableBackoff for Box { + fn reset(&mut self) { + Box::deref_mut(self).reset(); + } +} + +/// Implements [`ResettableBackoff`] by reconstructing the backing [`Backoff`] each time [`Self::reset`] has been called. +#[derive(Debug)] +pub struct ResettableBackoffWrapper { + backoff_builder: B, + current_backoff: Option, +} + +impl ResettableBackoffWrapper { + pub fn new(backoff_builder: B) -> Self { + Self { + backoff_builder, + current_backoff: None, + } + } +} + +impl Default for ResettableBackoffWrapper { + fn default() -> Self { + Self::new(B::default()) + } +} + +impl Iterator for ResettableBackoffWrapper { + type Item = Duration; + + fn next(&mut self) -> Option { + self.current_backoff + .get_or_insert_with(|| self.backoff_builder.clone().build()) + .next() + } +} + +impl ResettableBackoff for ResettableBackoffWrapper { + fn reset(&mut self) { + self.current_backoff = None; + } +} diff --git a/kube-runtime/src/utils/mod.rs b/kube-runtime/src/utils/mod.rs index 74cc7cf2f..1afee89c2 100644 --- a/kube-runtime/src/utils/mod.rs +++ b/kube-runtime/src/utils/mod.rs @@ -1,6 +1,7 @@ //! Helpers for manipulating built-in streams mod backoff_reset_timer; +mod backoff_resettable; pub(crate) mod delayed_init; mod event_decode; mod event_modify; @@ -9,7 +10,8 @@ mod reflect; mod stream_backoff; mod watch_ext; -pub use backoff_reset_timer::ResetTimerBackoff; +pub use backoff_reset_timer::{ResetTimerBackoff, ResetTimerBackoffBuilder}; +pub use backoff_resettable::{ResettableBackoff, ResettableBackoffWrapper}; pub use event_decode::EventDecode; pub use event_modify::EventModify; pub use predicate::{predicates, Predicate, PredicateFilter}; diff --git a/kube-runtime/src/utils/stream_backoff.rs b/kube-runtime/src/utils/stream_backoff.rs index 01c6c4292..2e8802b13 100644 --- a/kube-runtime/src/utils/stream_backoff.rs +++ b/kube-runtime/src/utils/stream_backoff.rs @@ -1,10 +1,11 @@ use std::{future::Future, pin::Pin, task::Poll}; -use backoff::backoff::Backoff; use futures::{Stream, TryStream}; use pin_project::pin_project; use tokio::time::{sleep, Instant, Sleep}; +use super::ResettableBackoff; + /// Applies a [`Backoff`] policy to a [`Stream`] /// /// After any [`Err`] is emitted, the stream is paused for [`Backoff::next_backoff`]. The @@ -30,7 +31,7 @@ enum State { Awake, } -impl StreamBackoff { +impl StreamBackoff { pub fn new(stream: S, backoff: B) -> Self { Self { stream, @@ -40,7 +41,7 @@ impl StreamBackoff { } } -impl Stream for StreamBackoff { +impl Stream for StreamBackoff { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { @@ -71,7 +72,7 @@ impl Stream for StreamBackoff { let next_item = this.stream.try_poll_next(cx); match &next_item { Poll::Ready(Some(Err(_))) => { - if let Some(backoff_duration) = this.backoff.next_backoff() { + if let Some(backoff_duration) = this.backoff.next() { let backoff_sleep = sleep(backoff_duration); tracing::debug!( deadline = ?backoff_sleep.deadline(), @@ -98,8 +99,9 @@ impl Stream for StreamBackoff { pub(crate) mod tests { use std::{pin::pin, task::Poll, time::Duration}; - use super::StreamBackoff; - use backoff::backoff::Backoff; + use crate::WatchStreamExt; + + use backon::BackoffBuilder; use futures::{channel::mpsc, poll, stream, StreamExt}; #[tokio::test] @@ -107,7 +109,11 @@ pub(crate) mod tests { tokio::time::pause(); let tick = Duration::from_secs(1); let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]); - let mut rx = pin!(StreamBackoff::new(rx, backoff::backoff::Constant::new(tick))); + let mut rx = pin!(rx.backoff( + backon::ConstantBuilder::default() + .with_delay(tick) + .without_max_times() + )); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(0)))); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(1)))); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Err(2)))); @@ -123,7 +129,7 @@ pub(crate) mod tests { tokio::time::pause(); let (tx, rx) = mpsc::unbounded(); // let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3)]); - let mut rx = pin!(StreamBackoff::new(rx, LinearBackoff::new(Duration::from_secs(2)))); + let mut rx = pin!(rx.backoff(LinearBackoffBuilder::new(Duration::from_secs(2)))); tx.unbounded_send(Ok(0)).unwrap(); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(0)))); tx.unbounded_send(Ok(1)).unwrap(); @@ -149,39 +155,62 @@ pub(crate) mod tests { #[tokio::test] async fn backoff_should_close_when_requested() { assert_eq!( - StreamBackoff::new( - stream::iter([Ok(0), Ok(1), Err(2), Ok(3)]), - backoff::backoff::Stop {} - ) - .collect::>() - .await, + stream::iter([Ok(0), Ok(1), Err(2), Ok(3)]) + .backoff(StopBackoff) + .collect::>() + .await, vec![Ok(0), Ok(1), Err(2)] ); } + /// Backoff policy that stops immediately + #[derive(Clone)] + // No need for a builder since it has no state anyway. + pub struct StopBackoff; + impl Iterator for StopBackoff { + type Item = Duration; + + fn next(&mut self) -> Option { + None + } + } + + /// Dynamic backoff policy that is still deterministic and testable - pub struct LinearBackoff { + #[derive(Debug, Clone)] + pub struct LinearBackoffBuilder { interval: Duration, - current_duration: Duration, } - impl LinearBackoff { + impl LinearBackoffBuilder { pub fn new(interval: Duration) -> Self { - Self { - interval, + Self { interval } + } + } + + #[derive(Debug)] + pub struct LinearBackoff { + builder: LinearBackoffBuilder, + current_duration: Duration, + } + + impl BackoffBuilder for LinearBackoffBuilder { + type Backoff = LinearBackoff; + + fn build(self) -> Self::Backoff { + LinearBackoff { + builder: self, current_duration: Duration::ZERO, } } } - impl Backoff for LinearBackoff { - fn next_backoff(&mut self) -> Option { - self.current_duration += self.interval; - Some(self.current_duration) - } + impl Iterator for LinearBackoff { + type Item = Duration; - fn reset(&mut self) { - self.current_duration = Duration::ZERO + fn next(&mut self) -> Option { + self.current_duration += self.builder.interval; + Some(self.current_duration) } } } diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index 7ed636201..a4560de1d 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -1,39 +1,40 @@ use crate::{ + reflector::store::Writer, utils::{ event_decode::EventDecode, event_modify::EventModify, predicate::{Predicate, PredicateFilter}, stream_backoff::StreamBackoff, + Reflect, }, - watcher, + watcher::{self, DefaultBackoffBuilder}, }; +use backon::BackoffBuilder; use kube_client::Resource; -use crate::{reflector::store::Writer, utils::Reflect}; - -use crate::watcher::DefaultBackoff; -use backoff::backoff::Backoff; use futures::{Stream, TryStream}; +use super::ResettableBackoffWrapper; + /// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector) pub trait WatchStreamExt: Stream { /// Apply the [`DefaultBackoff`] watcher [`Backoff`] policy /// /// This is recommended for controllers that want to play nicely with the apiserver. - fn default_backoff(self) -> StreamBackoff + fn default_backoff(self) -> StreamBackoff> where Self: TryStream + Sized, { - StreamBackoff::new(self, DefaultBackoff::default()) + self.backoff(DefaultBackoffBuilder::default()) } - /// Apply a specific [`Backoff`] policy to a [`Stream`] using [`StreamBackoff`] - fn backoff(self, b: B) -> StreamBackoff + /// Apply a specific [`BackoffBuilder`] policy to a [`Stream`] using [`StreamBackoff`] + fn backoff(self, backoff_builder: B) -> StreamBackoff> where - B: Backoff, + B: BackoffBuilder + Clone, Self: TryStream + Sized, { - StreamBackoff::new(self, b) + StreamBackoff::new(self, ResettableBackoffWrapper::new(backoff_builder)) } /// Decode a [`watcher()`] stream into a stream of applied objects @@ -297,7 +298,7 @@ pub(crate) mod tests { // not #[test] because this is only a compile check verification #[allow(dead_code, unused_must_use)] fn test_watcher_stream_type_drift() { - let pred_watch = watcher(compile_type::>(), Default::default()) + let pred_watch = super::watcher::watcher(compile_type::>(), Default::default()) .touched_objects() .predicate_filter(predicates::generation) .boxed(); diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 8a649ec17..523efa38b 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -2,9 +2,9 @@ //! //! See [`watcher`] for the primary entry point. -use crate::utils::ResetTimerBackoff; +use crate::utils::ResetTimerBackoffBuilder; use async_trait::async_trait; -use backoff::{backoff::Backoff, ExponentialBackoff}; +use backon::BackoffBuilder; use educe::Educe; use futures::{stream::BoxStream, Stream, StreamExt}; use kube_client::{ @@ -892,30 +892,42 @@ pub fn watch_object; +#[derive(Debug, Clone)] +pub struct DefaultBackoffBuilder(Strategy); +type Strategy = ResetTimerBackoffBuilder; + +#[derive(Debug)] +pub struct DefaultBackoff(::Backoff); -impl Default for DefaultBackoff { +impl Default for DefaultBackoffBuilder { fn default() -> Self { - Self(ResetTimerBackoff::new( - backoff::ExponentialBackoffBuilder::new() - .with_initial_interval(Duration::from_millis(800)) - .with_max_interval(Duration::from_secs(30)) - .with_randomization_factor(1.0) - .with_multiplier(2.0) - .with_max_elapsed_time(None) - .build(), + Self(ResetTimerBackoffBuilder::new( + backon::ExponentialBuilder::default() + .with_min_delay(Duration::from_millis(800)) + .with_max_delay(Duration::from_secs(30)) + // .with_jitter() isn't quite a 1:1 match to randomization factor, it always *adds* 0..min_delay, vs multiplying + // the final delay by +-factor + .with_jitter() + // .with_randomization_factor(1.0) + .with_factor(2.0) + .without_max_times(), Duration::from_secs(120), )) } } -impl Backoff for DefaultBackoff { - fn next_backoff(&mut self) -> Option { - self.0.next_backoff() +impl BackoffBuilder for DefaultBackoffBuilder { + type Backoff = DefaultBackoff; + + fn build(self) -> Self::Backoff { + DefaultBackoff(self.0.build()) } +} + +impl Iterator for DefaultBackoff { + type Item = Duration; - fn reset(&mut self) { - self.0.reset() + fn next(&mut self) -> Option { + self.0.next() } }