From 5b949c8089c5a484cb1f692540696df23e8f9e95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Thu, 28 Nov 2024 16:01:18 +0100 Subject: [PATCH 1/8] Initial migration backoff -> backon MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #1635 Signed-off-by: Natalie Klestrup Röijezon --- Cargo.toml | 2 +- deny.toml | 4 +- examples/Cargo.toml | 2 +- examples/errorbounded_configmap_watcher.rs | 2 + examples/shared_stream_controllers.rs | 4 +- kube-client/src/client/builder.rs | 2 +- kube-client/src/client/client_ext.rs | 7 +- kube-derive/tests/resource.rs | 4 + kube-runtime/Cargo.toml | 2 +- kube-runtime/src/controller/mod.rs | 21 ++-- kube-runtime/src/utils/backoff_reset_timer.rs | 114 +++++++++++------- kube-runtime/src/utils/backoff_resettable.rs | 55 +++++++++ kube-runtime/src/utils/mod.rs | 4 +- kube-runtime/src/utils/stream_backoff.rs | 81 +++++++++---- kube-runtime/src/utils/watch_ext.rs | 25 ++-- kube-runtime/src/watcher.rs | 48 +++++--- 16 files changed, 254 insertions(+), 123 deletions(-) create mode 100644 kube-runtime/src/utils/backoff_resettable.rs diff --git a/Cargo.toml b/Cargo.toml index 14fc4e283..d3a7e8c08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ assert-json-diff = "2.0.2" async-broadcast = "0.7.0" async-stream = "0.3.5" async-trait = "0.1.64" -backoff = "0.4.0" +backon = "1.3.0" base64 = "0.22.1" bytes = "1.1.0" chrono = { version = "0.4.34", default-features = false } diff --git a/deny.toml b/deny.toml index 2f1704961..0065d2fce 100644 --- a/deny.toml +++ b/deny.toml @@ -12,9 +12,7 @@ db-urls = ["https://github.com/rustsec/advisory-db"] # remove them when we have to yanked = "warn" -ignore = [ - "RUSTSEC-2024-0384", # instant dep via unmaintained backoff dep -] +ignore = [] [licenses] # See https://spdx.org/licenses/ for list of possible licenses diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 25add42a2..d5f7de5ea 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -52,7 +52,7 @@ tower-http = { workspace = true, features = ["trace", "decompression-gzip"] } hyper = { workspace = true, features = ["client", "http1"] } hyper-util = { workspace = true, features = ["client-legacy", "http1", "tokio"] } thiserror.workspace = true -backoff.workspace = true +backon.workspace = true clap = { version = "4.0", default-features = false, features = ["std", "cargo", "derive"] } edit = "0.1.3" tokio-stream = { version = "0.1.9", features = ["net"] } diff --git a/examples/errorbounded_configmap_watcher.rs b/examples/errorbounded_configmap_watcher.rs index db9e69830..b541a8975 100644 --- a/examples/errorbounded_configmap_watcher.rs +++ b/examples/errorbounded_configmap_watcher.rs @@ -15,12 +15,14 @@ use tracing::*; #[resource(inherit = ConfigMap)] struct CaConfigMap { metadata: ObjectMeta, + #[allow(unused)] data: CaConfigMapData, } #[derive(Deserialize, Debug, Clone)] struct CaConfigMapData { #[serde(rename = "ca.crt")] + #[allow(unused)] ca_crt: String, } diff --git a/examples/shared_stream_controllers.rs b/examples/shared_stream_controllers.rs index f8ff276d6..adf3a9e0e 100644 --- a/examples/shared_stream_controllers.rs +++ b/examples/shared_stream_controllers.rs @@ -1,6 +1,6 @@ use std::{ops::Deref, sync::Arc, time::Duration}; -use futures::{future, StreamExt}; +use futures::StreamExt; use k8s_openapi::api::{apps::v1::Deployment, core::v1::Pod}; use kube::{ runtime::{ @@ -63,7 +63,7 @@ async fn main() -> anyhow::Result<()> { .clone() .map(|r| Ok(r.deref().clone())) .predicate_filter(predicates::resource_version) - .filter_map(|r| future::ready(r.ok().map(Arc::new))); + .filter_map(|r| std::future::ready(r.ok().map(Arc::new))); // Reflect a stream of pod watch events into the store and apply a backoff. For subscribers to // be able to consume updates, the reflector must be shared. diff --git a/kube-client/src/client/builder.rs b/kube-client/src/client/builder.rs index bb2518393..004993325 100644 --- a/kube-client/src/client/builder.rs +++ b/kube-client/src/client/builder.rs @@ -282,7 +282,7 @@ mod tests { let io: TokioIo = TokioIo::new(tcp); tokio::spawn(async move { - let _ = http1::Builder::new() + http1::Builder::new() .timer(TokioTimer::new()) .serve_connection( io, diff --git a/kube-client/src/client/client_ext.rs b/kube-client/src/client/client_ext.rs index ced2df626..051ac2050 100644 --- a/kube-client/src/client/client_ext.rs +++ b/kube-client/src/client/client_ext.rs @@ -487,12 +487,7 @@ mod test { // Fetch using local object reference let svc: Service = client - .fetch( - &LocalObjectReference { - name: svc.name_any().into(), - } - .within(svc.namespace()), - ) + .fetch(&LocalObjectReference { name: svc.name_any() }.within(svc.namespace())) .await?; assert_eq!(svc.name_unchecked(), "kubernetes"); diff --git a/kube-derive/tests/resource.rs b/kube-derive/tests/resource.rs index e784c38cb..5d9951bba 100644 --- a/kube-derive/tests/resource.rs +++ b/kube-derive/tests/resource.rs @@ -9,11 +9,13 @@ use kube_derive::Resource; #[resource(inherit = "ConfigMap")] struct TypedMap { metadata: ObjectMeta, + #[allow(unused)] data: Option, } #[derive(Default)] struct TypedData { + #[allow(unused)] field: String, } @@ -21,11 +23,13 @@ struct TypedData { #[resource(inherit = "Secret")] struct TypedSecret { metadata: ObjectMeta, + #[allow(unused)] data: Option, } #[derive(Default)] struct TypedSecretData { + #[allow(unused)] field: ByteString, } diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index a89492384..aced8d639 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -43,7 +43,7 @@ json-patch.workspace = true jsonptr.workspace = true serde_json.workspace = true thiserror.workspace = true -backoff.workspace = true +backon.workspace = true async-trait.workspace = true hashbrown.workspace = true k8s-openapi.workspace = true diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index e701b1d1b..c700fc20d 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -8,10 +8,13 @@ use crate::{ ObjectRef, }, scheduler::{debounced_scheduler, ScheduleRequest}, - utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt}, - watcher::{self, metadata_watcher, watcher, DefaultBackoff}, + utils::{ + trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, ResettableBackoff, + ResettableBackoffWrapper, StreamBackoff, WatchStreamExt, + }, + watcher::{self, metadata_watcher, watcher, DefaultBackoffBuilder}, }; -use backoff::backoff::Backoff; +use backon::BackoffBuilder; use educe::Educe; use futures::{ channel, @@ -629,7 +632,7 @@ where { // NB: Need to Unpin for stream::select_all trigger_selector: stream::SelectAll, watcher::Error>>>, - trigger_backoff: Box, + trigger_backoff: Box, /// [`run`](crate::Controller::run) starts a graceful shutdown when any of these [`Future`]s complete, /// refusing to start any new reconciliations but letting any existing ones finish. graceful_shutdown_selector: Vec>, @@ -689,7 +692,7 @@ where trigger_selector.push(self_watcher); Self { trigger_selector, - trigger_backoff: Box::::default(), + trigger_backoff: Box::>::default(), graceful_shutdown_selector: vec![ // Fallback future, ensuring that we never terminate if no additional futures are added to the selector future::pending().boxed(), @@ -775,7 +778,7 @@ where trigger_selector.push(self_watcher); Self { trigger_selector, - trigger_backoff: Box::::default(), + trigger_backoff: Box::>::default(), graceful_shutdown_selector: vec![ // Fallback future, ensuring that we never terminate if no additional futures are added to the selector future::pending().boxed(), @@ -886,7 +889,7 @@ where trigger_selector.push(self_watcher); Self { trigger_selector, - trigger_backoff: Box::::default(), + trigger_backoff: Box::>::default(), graceful_shutdown_selector: vec![ // Fallback future, ensuring that we never terminate if no additional futures are added to the selector future::pending().boxed(), @@ -915,8 +918,8 @@ where /// The [`default_backoff`](crate::watcher::default_backoff) follows client-go conventions, /// but can be overridden by calling this method. #[must_use] - pub fn trigger_backoff(mut self, backoff: impl Backoff + Send + 'static) -> Self { - self.trigger_backoff = Box::new(backoff); + pub fn trigger_backoff(mut self, backoff_builder: impl BackoffBuilder + Clone + 'static) -> Self { + self.trigger_backoff = Box::new(ResettableBackoffWrapper::new(backoff_builder)); self } diff --git a/kube-runtime/src/utils/backoff_reset_timer.rs b/kube-runtime/src/utils/backoff_reset_timer.rs index 1c09a5344..5e99cccfc 100644 --- a/kube-runtime/src/utils/backoff_reset_timer.rs +++ b/kube-runtime/src/utils/backoff_reset_timer.rs @@ -1,34 +1,73 @@ use std::time::{Duration, Instant}; -use backoff::{backoff::Backoff, Clock, SystemClock}; +use backon::BackoffBuilder; -/// A [`Backoff`] wrapper that resets after a fixed duration has elapsed. -pub struct ResetTimerBackoff { - backoff: B, - clock: C, - last_backoff: Option, - reset_duration: Duration, +use super::{ResettableBackoff, ResettableBackoffWrapper}; + +// TODO: do we actually need this or should we just use tokio::time? +pub trait Clock: Send + Sync + Unpin { + fn now(&self) -> Instant; +} + +#[derive(Debug, Clone, Copy)] +pub struct TokioClock; +impl Clock for TokioClock { + fn now(&self) -> Instant { + tokio::time::Instant::now().into_std() + } } -impl ResetTimerBackoff { - pub fn new(backoff: B, reset_duration: Duration) -> Self { - Self::new_with_custom_clock(backoff, reset_duration, SystemClock {}) +impl ResetTimerBackoffBuilder { + pub fn new(inner_backoff_builder: B, reset_duration: Duration) -> Self { + Self::new_with_custom_clock(inner_backoff_builder, reset_duration, TokioClock) } } -impl ResetTimerBackoff { - fn new_with_custom_clock(backoff: B, reset_duration: Duration, clock: C) -> Self { +impl ResetTimerBackoffBuilder { + fn new_with_custom_clock(inner_backoff_builder: B, reset_duration: Duration, clock: C) -> Self { Self { - backoff, + inner_backoff_builder, clock, - last_backoff: None, reset_duration, } } } -impl Backoff for ResetTimerBackoff { - fn next_backoff(&mut self) -> Option { +/// A [`Backoff`] wrapper that resets after a fixed duration has elapsed. +#[derive(Debug, Clone)] +pub struct ResetTimerBackoffBuilder { + inner_backoff_builder: B, + clock: C, + reset_duration: Duration, +} + +impl BackoffBuilder for ResetTimerBackoffBuilder { + type Backoff = ResetTimerBackoff, C>; + + fn build(self) -> Self::Backoff { + ResetTimerBackoff { + inner_backoff: ResettableBackoffWrapper::new(self.inner_backoff_builder), + clock: self.clock, + reset_duration: self.reset_duration, + last_backoff: None, + } + } +} + +/// Constructed by [`ResetTimerBackoffBuilder`]. +#[derive(Debug)] +pub struct ResetTimerBackoff { + inner_backoff: B, + clock: C, + reset_duration: Duration, + last_backoff: Option, +} + +// impl Backoff, which is now effectively an alias for Iterator +impl Iterator for ResetTimerBackoff { + type Item = Duration; + + fn next(&mut self) -> Option { if let Some(last_backoff) = self.last_backoff { if self.clock.now() > last_backoff + self.reset_duration { tracing::debug!( @@ -36,51 +75,42 @@ impl Backoff for ResetTimerBackoff { reset_duration = ?self.reset_duration, "Resetting backoff, since reset duration has expired" ); - self.backoff.reset(); + self.inner_backoff.reset(); } } self.last_backoff = Some(self.clock.now()); - self.backoff.next_backoff() - } - - fn reset(&mut self) { - // Do not even bother trying to reset here, since `next_backoff` will take care of this when the timer expires. + self.inner_backoff.next() } } #[cfg(test)] mod tests { - use backoff::{backoff::Backoff, Clock}; + use backon::BackoffBuilder; use tokio::time::advance; - use super::ResetTimerBackoff; - use crate::utils::stream_backoff::tests::LinearBackoff; - use std::time::{Duration, Instant}; + use crate::utils::{ + backoff_reset_timer::TokioClock, stream_backoff::tests::LinearBackoffBuilder, + ResetTimerBackoffBuilder, + }; + use std::time::Duration; #[tokio::test] async fn should_reset_when_timer_expires() { tokio::time::pause(); - let mut backoff = ResetTimerBackoff::new_with_custom_clock( - LinearBackoff::new(Duration::from_secs(2)), + let mut backoff = ResetTimerBackoffBuilder::new_with_custom_clock( + LinearBackoffBuilder::new(Duration::from_secs(2)), Duration::from_secs(60), TokioClock, - ); - assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2))); + ) + .build(); + assert_eq!(backoff.next(), Some(Duration::from_secs(2))); advance(Duration::from_secs(40)).await; - assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(4))); + assert_eq!(backoff.next(), Some(Duration::from_secs(4))); advance(Duration::from_secs(40)).await; - assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(6))); + assert_eq!(backoff.next(), Some(Duration::from_secs(6))); advance(Duration::from_secs(80)).await; - assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2))); + assert_eq!(backoff.next(), Some(Duration::from_secs(2))); advance(Duration::from_secs(80)).await; - assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2))); - } - - struct TokioClock; - - impl Clock for TokioClock { - fn now(&self) -> Instant { - tokio::time::Instant::now().into_std() - } + assert_eq!(backoff.next(), Some(Duration::from_secs(2))); } } diff --git a/kube-runtime/src/utils/backoff_resettable.rs b/kube-runtime/src/utils/backoff_resettable.rs new file mode 100644 index 000000000..45ac48d61 --- /dev/null +++ b/kube-runtime/src/utils/backoff_resettable.rs @@ -0,0 +1,55 @@ +use std::{ops::DerefMut, time::Duration}; + +use backon::{Backoff, BackoffBuilder}; + +/// A [`Backoff`] that can also be reset. +/// +/// Implemented by [`ResettableBackoffWrapper`]. +// Separated into a trait so that it can be used as a trait object, erasing the backing [`BackoffBuilder`]. +pub trait ResettableBackoff: Backoff { + fn reset(&mut self); +} + +impl ResettableBackoff for Box { + fn reset(&mut self) { + Box::deref_mut(self).reset(); + } +} + +/// Implements [`ResettableBackoff`] by reconstructing the backing [`Backoff`] each time [`Self::reset`] has been called. +#[derive(Debug)] +pub struct ResettableBackoffWrapper { + backoff_builder: B, + current_backoff: Option, +} + +impl ResettableBackoffWrapper { + pub fn new(backoff_builder: B) -> Self { + Self { + backoff_builder, + current_backoff: None, + } + } +} + +impl Default for ResettableBackoffWrapper { + fn default() -> Self { + Self::new(B::default()) + } +} + +impl Iterator for ResettableBackoffWrapper { + type Item = Duration; + + fn next(&mut self) -> Option { + self.current_backoff + .get_or_insert_with(|| self.backoff_builder.clone().build()) + .next() + } +} + +impl ResettableBackoff for ResettableBackoffWrapper { + fn reset(&mut self) { + self.current_backoff = None; + } +} diff --git a/kube-runtime/src/utils/mod.rs b/kube-runtime/src/utils/mod.rs index 74cc7cf2f..1afee89c2 100644 --- a/kube-runtime/src/utils/mod.rs +++ b/kube-runtime/src/utils/mod.rs @@ -1,6 +1,7 @@ //! Helpers for manipulating built-in streams mod backoff_reset_timer; +mod backoff_resettable; pub(crate) mod delayed_init; mod event_decode; mod event_modify; @@ -9,7 +10,8 @@ mod reflect; mod stream_backoff; mod watch_ext; -pub use backoff_reset_timer::ResetTimerBackoff; +pub use backoff_reset_timer::{ResetTimerBackoff, ResetTimerBackoffBuilder}; +pub use backoff_resettable::{ResettableBackoff, ResettableBackoffWrapper}; pub use event_decode::EventDecode; pub use event_modify::EventModify; pub use predicate::{predicates, Predicate, PredicateFilter}; diff --git a/kube-runtime/src/utils/stream_backoff.rs b/kube-runtime/src/utils/stream_backoff.rs index 01c6c4292..2e8802b13 100644 --- a/kube-runtime/src/utils/stream_backoff.rs +++ b/kube-runtime/src/utils/stream_backoff.rs @@ -1,10 +1,11 @@ use std::{future::Future, pin::Pin, task::Poll}; -use backoff::backoff::Backoff; use futures::{Stream, TryStream}; use pin_project::pin_project; use tokio::time::{sleep, Instant, Sleep}; +use super::ResettableBackoff; + /// Applies a [`Backoff`] policy to a [`Stream`] /// /// After any [`Err`] is emitted, the stream is paused for [`Backoff::next_backoff`]. The @@ -30,7 +31,7 @@ enum State { Awake, } -impl StreamBackoff { +impl StreamBackoff { pub fn new(stream: S, backoff: B) -> Self { Self { stream, @@ -40,7 +41,7 @@ impl StreamBackoff { } } -impl Stream for StreamBackoff { +impl Stream for StreamBackoff { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { @@ -71,7 +72,7 @@ impl Stream for StreamBackoff { let next_item = this.stream.try_poll_next(cx); match &next_item { Poll::Ready(Some(Err(_))) => { - if let Some(backoff_duration) = this.backoff.next_backoff() { + if let Some(backoff_duration) = this.backoff.next() { let backoff_sleep = sleep(backoff_duration); tracing::debug!( deadline = ?backoff_sleep.deadline(), @@ -98,8 +99,9 @@ impl Stream for StreamBackoff { pub(crate) mod tests { use std::{pin::pin, task::Poll, time::Duration}; - use super::StreamBackoff; - use backoff::backoff::Backoff; + use crate::WatchStreamExt; + + use backon::BackoffBuilder; use futures::{channel::mpsc, poll, stream, StreamExt}; #[tokio::test] @@ -107,7 +109,11 @@ pub(crate) mod tests { tokio::time::pause(); let tick = Duration::from_secs(1); let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]); - let mut rx = pin!(StreamBackoff::new(rx, backoff::backoff::Constant::new(tick))); + let mut rx = pin!(rx.backoff( + backon::ConstantBuilder::default() + .with_delay(tick) + .without_max_times() + )); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(0)))); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(1)))); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Err(2)))); @@ -123,7 +129,7 @@ pub(crate) mod tests { tokio::time::pause(); let (tx, rx) = mpsc::unbounded(); // let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3)]); - let mut rx = pin!(StreamBackoff::new(rx, LinearBackoff::new(Duration::from_secs(2)))); + let mut rx = pin!(rx.backoff(LinearBackoffBuilder::new(Duration::from_secs(2)))); tx.unbounded_send(Ok(0)).unwrap(); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(0)))); tx.unbounded_send(Ok(1)).unwrap(); @@ -149,39 +155,62 @@ pub(crate) mod tests { #[tokio::test] async fn backoff_should_close_when_requested() { assert_eq!( - StreamBackoff::new( - stream::iter([Ok(0), Ok(1), Err(2), Ok(3)]), - backoff::backoff::Stop {} - ) - .collect::>() - .await, + stream::iter([Ok(0), Ok(1), Err(2), Ok(3)]) + .backoff(StopBackoff) + .collect::>() + .await, vec![Ok(0), Ok(1), Err(2)] ); } + /// Backoff policy that stops immediately + #[derive(Clone)] + // No need for a builder since it has no state anyway. + pub struct StopBackoff; + impl Iterator for StopBackoff { + type Item = Duration; + + fn next(&mut self) -> Option { + None + } + } + + /// Dynamic backoff policy that is still deterministic and testable - pub struct LinearBackoff { + #[derive(Debug, Clone)] + pub struct LinearBackoffBuilder { interval: Duration, - current_duration: Duration, } - impl LinearBackoff { + impl LinearBackoffBuilder { pub fn new(interval: Duration) -> Self { - Self { - interval, + Self { interval } + } + } + + #[derive(Debug)] + pub struct LinearBackoff { + builder: LinearBackoffBuilder, + current_duration: Duration, + } + + impl BackoffBuilder for LinearBackoffBuilder { + type Backoff = LinearBackoff; + + fn build(self) -> Self::Backoff { + LinearBackoff { + builder: self, current_duration: Duration::ZERO, } } } - impl Backoff for LinearBackoff { - fn next_backoff(&mut self) -> Option { - self.current_duration += self.interval; - Some(self.current_duration) - } + impl Iterator for LinearBackoff { + type Item = Duration; - fn reset(&mut self) { - self.current_duration = Duration::ZERO + fn next(&mut self) -> Option { + self.current_duration += self.builder.interval; + Some(self.current_duration) } } } diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index 7ed636201..a4560de1d 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -1,39 +1,40 @@ use crate::{ + reflector::store::Writer, utils::{ event_decode::EventDecode, event_modify::EventModify, predicate::{Predicate, PredicateFilter}, stream_backoff::StreamBackoff, + Reflect, }, - watcher, + watcher::{self, DefaultBackoffBuilder}, }; +use backon::BackoffBuilder; use kube_client::Resource; -use crate::{reflector::store::Writer, utils::Reflect}; - -use crate::watcher::DefaultBackoff; -use backoff::backoff::Backoff; use futures::{Stream, TryStream}; +use super::ResettableBackoffWrapper; + /// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector) pub trait WatchStreamExt: Stream { /// Apply the [`DefaultBackoff`] watcher [`Backoff`] policy /// /// This is recommended for controllers that want to play nicely with the apiserver. - fn default_backoff(self) -> StreamBackoff + fn default_backoff(self) -> StreamBackoff> where Self: TryStream + Sized, { - StreamBackoff::new(self, DefaultBackoff::default()) + self.backoff(DefaultBackoffBuilder::default()) } - /// Apply a specific [`Backoff`] policy to a [`Stream`] using [`StreamBackoff`] - fn backoff(self, b: B) -> StreamBackoff + /// Apply a specific [`BackoffBuilder`] policy to a [`Stream`] using [`StreamBackoff`] + fn backoff(self, backoff_builder: B) -> StreamBackoff> where - B: Backoff, + B: BackoffBuilder + Clone, Self: TryStream + Sized, { - StreamBackoff::new(self, b) + StreamBackoff::new(self, ResettableBackoffWrapper::new(backoff_builder)) } /// Decode a [`watcher()`] stream into a stream of applied objects @@ -297,7 +298,7 @@ pub(crate) mod tests { // not #[test] because this is only a compile check verification #[allow(dead_code, unused_must_use)] fn test_watcher_stream_type_drift() { - let pred_watch = watcher(compile_type::>(), Default::default()) + let pred_watch = super::watcher::watcher(compile_type::>(), Default::default()) .touched_objects() .predicate_filter(predicates::generation) .boxed(); diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 8a649ec17..523efa38b 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -2,9 +2,9 @@ //! //! See [`watcher`] for the primary entry point. -use crate::utils::ResetTimerBackoff; +use crate::utils::ResetTimerBackoffBuilder; use async_trait::async_trait; -use backoff::{backoff::Backoff, ExponentialBackoff}; +use backon::BackoffBuilder; use educe::Educe; use futures::{stream::BoxStream, Stream, StreamExt}; use kube_client::{ @@ -892,30 +892,42 @@ pub fn watch_object; +#[derive(Debug, Clone)] +pub struct DefaultBackoffBuilder(Strategy); +type Strategy = ResetTimerBackoffBuilder; + +#[derive(Debug)] +pub struct DefaultBackoff(::Backoff); -impl Default for DefaultBackoff { +impl Default for DefaultBackoffBuilder { fn default() -> Self { - Self(ResetTimerBackoff::new( - backoff::ExponentialBackoffBuilder::new() - .with_initial_interval(Duration::from_millis(800)) - .with_max_interval(Duration::from_secs(30)) - .with_randomization_factor(1.0) - .with_multiplier(2.0) - .with_max_elapsed_time(None) - .build(), + Self(ResetTimerBackoffBuilder::new( + backon::ExponentialBuilder::default() + .with_min_delay(Duration::from_millis(800)) + .with_max_delay(Duration::from_secs(30)) + // .with_jitter() isn't quite a 1:1 match to randomization factor, it always *adds* 0..min_delay, vs multiplying + // the final delay by +-factor + .with_jitter() + // .with_randomization_factor(1.0) + .with_factor(2.0) + .without_max_times(), Duration::from_secs(120), )) } } -impl Backoff for DefaultBackoff { - fn next_backoff(&mut self) -> Option { - self.0.next_backoff() +impl BackoffBuilder for DefaultBackoffBuilder { + type Backoff = DefaultBackoff; + + fn build(self) -> Self::Backoff { + DefaultBackoff(self.0.build()) } +} + +impl Iterator for DefaultBackoff { + type Item = Duration; - fn reset(&mut self) { - self.0.reset() + fn next(&mut self) -> Option { + self.0.next() } } From 11e9045e14ccbbfc4ff1dbc301fd9e446544b64d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Thu, 28 Nov 2024 16:11:36 +0100 Subject: [PATCH 2/8] Remove Clock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Natalie Klestrup Röijezon --- kube-runtime/src/utils/backoff_reset_timer.rs | 57 ++++++------------- 1 file changed, 17 insertions(+), 40 deletions(-) diff --git a/kube-runtime/src/utils/backoff_reset_timer.rs b/kube-runtime/src/utils/backoff_reset_timer.rs index 5e99cccfc..beadf13eb 100644 --- a/kube-runtime/src/utils/backoff_reset_timer.rs +++ b/kube-runtime/src/utils/backoff_reset_timer.rs @@ -4,72 +4,53 @@ use backon::BackoffBuilder; use super::{ResettableBackoff, ResettableBackoffWrapper}; -// TODO: do we actually need this or should we just use tokio::time? -pub trait Clock: Send + Sync + Unpin { - fn now(&self) -> Instant; -} - -#[derive(Debug, Clone, Copy)] -pub struct TokioClock; -impl Clock for TokioClock { - fn now(&self) -> Instant { - tokio::time::Instant::now().into_std() - } +/// Builder for [`ResetTimerBackoff`]. +#[derive(Debug, Clone)] +pub struct ResetTimerBackoffBuilder { + inner_backoff_builder: B, + reset_duration: Duration, } impl ResetTimerBackoffBuilder { pub fn new(inner_backoff_builder: B, reset_duration: Duration) -> Self { - Self::new_with_custom_clock(inner_backoff_builder, reset_duration, TokioClock) - } -} - -impl ResetTimerBackoffBuilder { - fn new_with_custom_clock(inner_backoff_builder: B, reset_duration: Duration, clock: C) -> Self { Self { inner_backoff_builder, - clock, reset_duration, } } } -/// A [`Backoff`] wrapper that resets after a fixed duration has elapsed. -#[derive(Debug, Clone)] -pub struct ResetTimerBackoffBuilder { - inner_backoff_builder: B, - clock: C, - reset_duration: Duration, -} - -impl BackoffBuilder for ResetTimerBackoffBuilder { - type Backoff = ResetTimerBackoff, C>; +impl BackoffBuilder for ResetTimerBackoffBuilder { + type Backoff = ResetTimerBackoff>; fn build(self) -> Self::Backoff { ResetTimerBackoff { inner_backoff: ResettableBackoffWrapper::new(self.inner_backoff_builder), - clock: self.clock, reset_duration: self.reset_duration, last_backoff: None, } } } + +/// Wraps a [`Backoff`] and resets it after a fixed duration of inactivity has elapsed. +/// /// Constructed by [`ResetTimerBackoffBuilder`]. #[derive(Debug)] -pub struct ResetTimerBackoff { +pub struct ResetTimerBackoff { inner_backoff: B, - clock: C, reset_duration: Duration, last_backoff: Option, } // impl Backoff, which is now effectively an alias for Iterator -impl Iterator for ResetTimerBackoff { +impl Iterator for ResetTimerBackoff { type Item = Duration; fn next(&mut self) -> Option { + let now = tokio::time::Instant::now().into_std(); if let Some(last_backoff) = self.last_backoff { - if self.clock.now() > last_backoff + self.reset_duration { + if now > last_backoff + self.reset_duration { tracing::debug!( ?last_backoff, reset_duration = ?self.reset_duration, @@ -78,7 +59,7 @@ impl Iterator for ResetTimerBackoff { self.inner_backoff.reset(); } } - self.last_backoff = Some(self.clock.now()); + self.last_backoff = Some(now); self.inner_backoff.next() } } @@ -88,19 +69,15 @@ mod tests { use backon::BackoffBuilder; use tokio::time::advance; - use crate::utils::{ - backoff_reset_timer::TokioClock, stream_backoff::tests::LinearBackoffBuilder, - ResetTimerBackoffBuilder, - }; + use crate::utils::{stream_backoff::tests::LinearBackoffBuilder, ResetTimerBackoffBuilder}; use std::time::Duration; #[tokio::test] async fn should_reset_when_timer_expires() { tokio::time::pause(); - let mut backoff = ResetTimerBackoffBuilder::new_with_custom_clock( + let mut backoff = ResetTimerBackoffBuilder::new( LinearBackoffBuilder::new(Duration::from_secs(2)), Duration::from_secs(60), - TokioClock, ) .build(); assert_eq!(backoff.next(), Some(Duration::from_secs(2))); From 656c270394c63305fa9f7d73b2567e2694fe2f51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Thu, 28 Nov 2024 16:14:33 +0100 Subject: [PATCH 3/8] Minor docs tweaks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Natalie Klestrup Röijezon --- kube-runtime/src/utils/backoff_resettable.rs | 1 + kube-runtime/src/watcher.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/kube-runtime/src/utils/backoff_resettable.rs b/kube-runtime/src/utils/backoff_resettable.rs index 45ac48d61..3958b7826 100644 --- a/kube-runtime/src/utils/backoff_resettable.rs +++ b/kube-runtime/src/utils/backoff_resettable.rs @@ -11,6 +11,7 @@ pub trait ResettableBackoff: Backoff { } impl ResettableBackoff for Box { + /// Reset the [`Backoff`] to its initial state. fn reset(&mut self) { Box::deref_mut(self).reset(); } diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 523efa38b..aa4072067 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -896,6 +896,7 @@ pub fn watch_object; +/// See [`DefaultBackoffBuilder`]. #[derive(Debug)] pub struct DefaultBackoff(::Backoff); From 6a0ee68c8878dcee0c5f56305f43a7ed73ef250a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Thu, 28 Nov 2024 16:20:13 +0100 Subject: [PATCH 4/8] Remove data from DefaultBackoffBuilder MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Natalie Klestrup Röijezon --- kube-runtime/src/utils/watch_ext.rs | 2 +- kube-runtime/src/watcher.rs | 42 +++++++++++++---------------- 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index a4560de1d..c6700ebf7 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -25,7 +25,7 @@ pub trait WatchStreamExt: Stream { where Self: TryStream + Sized, { - self.backoff(DefaultBackoffBuilder::default()) + self.backoff(DefaultBackoffBuilder) } /// Apply a specific [`BackoffBuilder`] policy to a [`Stream`] using [`StreamBackoff`] diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index aa4072067..0163af73d 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -890,38 +890,34 @@ pub fn watch_object; +/// when calling `WatchStreamExt::default_backoff`. +#[derive(Debug, Clone, Default)] +pub struct DefaultBackoffBuilder; /// See [`DefaultBackoffBuilder`]. #[derive(Debug)] pub struct DefaultBackoff(::Backoff); - -impl Default for DefaultBackoffBuilder { - fn default() -> Self { - Self(ResetTimerBackoffBuilder::new( - backon::ExponentialBuilder::default() - .with_min_delay(Duration::from_millis(800)) - .with_max_delay(Duration::from_secs(30)) - // .with_jitter() isn't quite a 1:1 match to randomization factor, it always *adds* 0..min_delay, vs multiplying - // the final delay by +-factor - .with_jitter() - // .with_randomization_factor(1.0) - .with_factor(2.0) - .without_max_times(), - Duration::from_secs(120), - )) - } -} +type Strategy = ResetTimerBackoffBuilder; impl BackoffBuilder for DefaultBackoffBuilder { type Backoff = DefaultBackoff; fn build(self) -> Self::Backoff { - DefaultBackoff(self.0.build()) + DefaultBackoff( + ResetTimerBackoffBuilder::new( + backon::ExponentialBuilder::default() + .with_min_delay(Duration::from_millis(800)) + .with_max_delay(Duration::from_secs(30)) + // .with_jitter() isn't quite a 1:1 match to randomization factor, it always *adds* 0..min_delay, vs multiplying + // the final delay by +-factor + .with_jitter() + // .with_randomization_factor(1.0) + .with_factor(2.0) + .without_max_times(), + Duration::from_secs(120), + ) + .build(), + ) } } From c484a25b4e39acbf871eb396bba120322e00bff9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Thu, 28 Nov 2024 16:53:35 +0100 Subject: [PATCH 5/8] Fix doc warnings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Natalie Klestrup Röijezon --- kube-client/src/api/mod.rs | 2 +- kube-client/src/api/subresource.rs | 2 +- kube-client/src/client/mod.rs | 3 +-- kube-client/src/config/file_config.rs | 4 +-- kube-client/src/discovery/mod.rs | 2 +- kube-client/src/lib.rs | 6 ++--- kube-core/src/object.rs | 8 +++--- kube-core/src/params.rs | 4 +-- kube-derive/src/lib.rs | 4 +-- kube-runtime/src/controller/mod.rs | 2 +- kube-runtime/src/reflector/dispatcher.rs | 10 +++---- kube-runtime/src/reflector/object_ref.rs | 2 +- kube-runtime/src/scheduler.rs | 2 +- kube-runtime/src/utils/backoff_reset_timer.rs | 2 ++ kube-runtime/src/utils/stream_backoff.rs | 8 +++--- kube-runtime/src/utils/watch_ext.rs | 18 ++++++++++--- kube-runtime/src/watcher.rs | 26 +++++++++++-------- 17 files changed, 62 insertions(+), 43 deletions(-) diff --git a/kube-client/src/api/mod.rs b/kube-client/src/api/mod.rs index 020d95337..a69f9785c 100644 --- a/kube-client/src/api/mod.rs +++ b/kube-client/src/api/mod.rs @@ -60,7 +60,7 @@ pub struct Api { /// Api constructors for Resource implementors with custom DynamicTypes /// -/// This generally means resources created via [`DynamicObject`](crate::api::DynamicObject). +/// This generally means resources created via [`DynamicObject`]. impl Api { /// Cluster level resources, or resources viewed across all namespaces /// diff --git a/kube-client/src/api/subresource.rs b/kube-client/src/api/subresource.rs index c62681365..bb4feca0b 100644 --- a/kube-client/src/api/subresource.rs +++ b/kube-client/src/api/subresource.rs @@ -451,7 +451,7 @@ fn evict_path() { /// Marker trait for objects that can be evicted /// -/// See [`Api::evic`] for usage +/// See [`Api::evict`] for usage pub trait Evict {} impl Evict for k8s_openapi::api::core::v1::Pod {} diff --git a/kube-client/src/client/mod.rs b/kube-client/src/client/mod.rs index cd6c9ac9e..283458f01 100644 --- a/kube-client/src/client/mod.rs +++ b/kube-client/src/client/mod.rs @@ -88,8 +88,7 @@ pub struct Client { impl Client { /// Create a [`Client`] using a custom `Service` stack. /// - /// [`ConfigExt`](crate::client::ConfigExt) provides extensions for - /// building a custom stack. + /// [`ConfigExt`] provides extensions for building a custom stack. /// /// To create with the default stack with a [`Config`], use /// [`Client::try_from`]. diff --git a/kube-client/src/config/file_config.rs b/kube-client/src/config/file_config.rs index dd41cc5db..b947db221 100644 --- a/kube-client/src/config/file_config.rs +++ b/kube-client/src/config/file_config.rs @@ -115,7 +115,7 @@ pub struct Cluster { /// `disable_compression` allows client to opt-out of response compression for all requests to the server. /// This is useful to speed up requests (specifically lists) when client-server network bandwidth is ample, /// by saving time on compression (server-side) and decompression (client-side): - /// https://github.com/kubernetes/kubernetes/issues/112296 + /// #[serde(rename = "disable-compression")] #[serde(skip_serializing_if = "Option::is_none")] pub disable_compression: Option, @@ -550,7 +550,7 @@ impl AuthInfo { /// Connection information for auth plugins that have `provideClusterInfo` enabled. /// -/// This is a copy of [`kube::config::Cluster`] with certificate_authority passed as bytes without the path. +/// This is a copy of [`Cluster`] with certificate_authority passed as bytes without the path. /// Taken from [clientauthentication/types.go#Cluster](https://github.com/kubernetes/client-go/blob/477cb782cf024bc70b7239f0dca91e5774811950/pkg/apis/clientauthentication/types.go#L73-L129) #[derive(Clone, Debug, Serialize, Deserialize, Default)] #[serde(rename_all = "kebab-case")] diff --git a/kube-client/src/discovery/mod.rs b/kube-client/src/discovery/mod.rs index 0807e6d30..a7bcd2a97 100644 --- a/kube-client/src/discovery/mod.rs +++ b/kube-client/src/discovery/mod.rs @@ -45,7 +45,7 @@ impl DiscoveryMode { /// To make use of discovered apis, extract one or more [`ApiGroup`]s from it, /// or resolve a precise one using [`Discovery::resolve_gvk`](crate::discovery::Discovery::resolve_gvk). /// -/// If caching of results is __not required__, then a simpler [`oneshot`](crate::discovery::oneshot) discovery system can be used. +/// If caching of results is __not required__, then a simpler [`oneshot`] discovery system can be used. /// /// [`ApiGroup`]: crate::discovery::ApiGroup #[cfg_attr(docsrs, doc(cfg(feature = "client")))] diff --git a/kube-client/src/lib.rs b/kube-client/src/lib.rs index f32875c0e..d847d2df8 100644 --- a/kube-client/src/lib.rs +++ b/kube-client/src/lib.rs @@ -56,9 +56,9 @@ //! //! For more details, see: //! -//! - [`Client`](crate::client) for the extensible Kubernetes client -//! - [`Config`](crate::config) for the Kubernetes config abstraction -//! - [`Api`](crate::Api) for the generic api methods available on Kubernetes resources +//! - [`client`] for the extensible Kubernetes client +//! - [`config`] for the Kubernetes config abstraction +//! - [`Api`] for the generic api methods available on Kubernetes resources //! - [k8s-openapi](https://docs.rs/k8s-openapi) for how to create typed kubernetes objects directly #![cfg_attr(docsrs, feature(doc_cfg))] // Nightly clippy (0.1.64) considers Drop a side effect, see https://github.com/rust-lang/rust-clippy/issues/9608 diff --git a/kube-core/src/object.rs b/kube-core/src/object.rs index 1e81a947b..fb9a6d922 100644 --- a/kube-core/src/object.rs +++ b/kube-core/src/object.rs @@ -13,9 +13,9 @@ use std::borrow::Cow; /// Kubernetes' API [always seem to expose list structs in this manner](https://docs.rs/k8s-openapi/0.10.0/k8s_openapi/apimachinery/pkg/apis/meta/v1/struct.ObjectMeta.html?search=List). /// /// Note that this is only used internally within reflectors and informers, -/// and is generally produced from list/watch/delete collection queries on an [`Resource`](super::Resource). +/// and is generally produced from list/watch/delete collection queries on an [`Resource`]. /// -/// This is almost equivalent to [`k8s_openapi::List`](k8s_openapi::List), but iterable. +/// This is almost equivalent to [`k8s_openapi::List`], but iterable. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct ObjectList where @@ -27,7 +27,7 @@ where /// ListMeta - only really used for its `resourceVersion` /// - /// See [ListMeta](k8s_openapi::apimachinery::pkg::apis::meta::v1::ListMeta) + /// See [ListMeta] #[serde(default)] pub metadata: ListMeta, @@ -312,7 +312,7 @@ where /// Empty struct for when data should be discarded /// /// Not using [`()`](https://doc.rust-lang.org/stable/std/primitive.unit.html), because serde's -/// [`Deserialize`](serde::Deserialize) `impl` is too strict. +/// [`Deserialize`] `impl` is too strict. #[derive(Clone, Deserialize, Serialize, Default, Debug)] pub struct NotUsed {} diff --git a/kube-core/src/params.rs b/kube-core/src/params.rs index fa508dc4e..5d9404e9e 100644 --- a/kube-core/src/params.rs +++ b/kube-core/src/params.rs @@ -168,7 +168,7 @@ impl ListParams { /// Configure typed label selectors /// - /// Configure typed selectors from [`Selector`](crate::Selector) and [`Expression`](crate::Expression) lists. + /// Configure typed selectors from [`Selector`] and [`Expression`](crate::Expression) lists. /// /// ``` /// use kube::core::{Expression, Selector, ParseExpressionError}; @@ -456,7 +456,7 @@ impl WatchParams { /// Configure typed label selectors /// - /// Configure typed selectors from [`Selector`](crate::Selector) and [`Expression`](crate::Expression) lists. + /// Configure typed selectors from [`Selector`] and [`Expression`](crate::Expression) lists. /// /// ``` /// use kube::core::{Expression, Selector, ParseExpressionError}; diff --git a/kube-derive/src/lib.rs b/kube-derive/src/lib.rs index 36b7df07c..cf44f9bf8 100644 --- a/kube-derive/src/lib.rs +++ b/kube-derive/src/lib.rs @@ -365,8 +365,8 @@ pub fn derive_custom_resource(input: proc_macro::TokenStream) -> proc_macro::Tok /// ``` /// /// The example above will generate: -/// ``` -/// // impl kube::Resource for FooMap { .. } +/// ```rust,ignore +/// impl kube::Resource for FooMap { .. } /// ``` /// [`kube`]: https://docs.rs/kube /// [`kube::Api`]: https://docs.rs/kube/*/kube/struct.Api.html diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index c700fc20d..fa080151a 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -915,7 +915,7 @@ where /// /// This includes the core watch, as well as auxilary watches introduced by [`Self::owns`] and [`Self::watches`]. /// - /// The [`default_backoff`](crate::watcher::default_backoff) follows client-go conventions, + /// The [default backoff policy](crate::WatchStreamExt::default_backoff) follows client-go conventions, /// but can be overridden by calling this method. #[must_use] pub fn trigger_backoff(mut self, backoff_builder: impl BackoffBuilder + Clone + 'static) -> Self { diff --git a/kube-runtime/src/reflector/dispatcher.rs b/kube-runtime/src/reflector/dispatcher.rs index 1060dab2b..cf8a995cc 100644 --- a/kube-runtime/src/reflector/dispatcher.rs +++ b/kube-runtime/src/reflector/dispatcher.rs @@ -14,6 +14,8 @@ use async_broadcast::{InactiveReceiver, Receiver, Sender}; use super::Lookup; +#[cfg(doc)] pub use super::store::Writer; + #[derive(Educe)] #[educe(Debug(bound("K: Debug, K::DynamicType: Debug")), Clone)] // A helper type that holds a broadcast transmitter and a broadcast receiver, @@ -74,9 +76,9 @@ where /// A handle to a shared stream reader /// -/// [`ReflectHandle`]s are created by calling [`subscribe()`] on a [`Writer`], -/// or by calling `clone()` on an already existing [`ReflectHandle`]. Each -/// shared stream reader should be polled independently and driven to readiness +/// [`ReflectHandle`]s are created by calling [`Writer::subscribe`], +/// or by calling [`clone`](Clone::clone) on an already existing [`ReflectHandle`]. +/// Each shared stream reader should be polled independently and driven to readiness /// to avoid deadlocks. When the [`Writer`]'s buffer is filled, backpressure /// will be applied on the root stream side. /// @@ -84,8 +86,6 @@ where /// subscribed to the stream will also terminate after all events yielded by /// the root stream have been observed. This means [`ReflectHandle`] streams /// can still be polled after the root stream has been dropped. -/// -/// [`Writer`]: crate::reflector::Writer #[pin_project] pub struct ReflectHandle where diff --git a/kube-runtime/src/reflector/object_ref.rs b/kube-runtime/src/reflector/object_ref.rs index 9cfc4e028..324e71869 100644 --- a/kube-runtime/src/reflector/object_ref.rs +++ b/kube-runtime/src/reflector/object_ref.rs @@ -28,7 +28,7 @@ pub trait Lookup { /// The [version](Resource::version) for this object. fn version(dyntype: &Self::DynamicType) -> Cow<'_, str>; - /// The [apiVersion](Resource::_version) for this object. + /// The [apiVersion](Resource::api_version) for this object. fn api_version(dyntype: &Self::DynamicType) -> Cow<'_, str> { api_version_from_group_version(Self::group(dyntype), Self::version(dyntype)) } diff --git a/kube-runtime/src/scheduler.rs b/kube-runtime/src/scheduler.rs index a00435f2c..9beb1fa79 100644 --- a/kube-runtime/src/scheduler.rs +++ b/kube-runtime/src/scheduler.rs @@ -1,4 +1,4 @@ -//! Delays and deduplicates [`Stream`](futures::stream::Stream) items +//! Delays and deduplicates [`Stream`] items use futures::{stream::Fuse, Stream, StreamExt}; use hashbrown::{hash_map::RawEntryMut, HashMap}; diff --git a/kube-runtime/src/utils/backoff_reset_timer.rs b/kube-runtime/src/utils/backoff_reset_timer.rs index beadf13eb..d8268fed0 100644 --- a/kube-runtime/src/utils/backoff_reset_timer.rs +++ b/kube-runtime/src/utils/backoff_reset_timer.rs @@ -4,6 +4,8 @@ use backon::BackoffBuilder; use super::{ResettableBackoff, ResettableBackoffWrapper}; +#[cfg(doc)] use backon::Backoff; + /// Builder for [`ResetTimerBackoff`]. #[derive(Debug, Clone)] pub struct ResetTimerBackoffBuilder { diff --git a/kube-runtime/src/utils/stream_backoff.rs b/kube-runtime/src/utils/stream_backoff.rs index 2e8802b13..0946420fc 100644 --- a/kube-runtime/src/utils/stream_backoff.rs +++ b/kube-runtime/src/utils/stream_backoff.rs @@ -6,12 +6,14 @@ use tokio::time::{sleep, Instant, Sleep}; use super::ResettableBackoff; +#[cfg(doc)] use backon::Backoff; + /// Applies a [`Backoff`] policy to a [`Stream`] /// -/// After any [`Err`] is emitted, the stream is paused for [`Backoff::next_backoff`]. The -/// [`Backoff`] is [`reset`](`Backoff::reset`) on any [`Ok`] value. +/// After any [`Err`] is emitted, the stream is paused for [`Backoff::next`](Iterator::next). The +/// [`Backoff`] is [`reset`](`ResettableBackoff::reset`) on any [`Ok`] value. /// -/// If [`Backoff::next_backoff`] returns [`None`] then the backing stream is given up on, and closed. +/// If [`Backoff::next`](Iterator::next) returns [`None`] then the backing stream is given up on, and closed. #[pin_project] pub struct StreamBackoff { #[pin] diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index c6700ebf7..899aaa9fe 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -16,7 +16,10 @@ use futures::{Stream, TryStream}; use super::ResettableBackoffWrapper; -/// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector) +#[cfg(doc)] use crate::watcher::DefaultBackoff; +#[cfg(doc)] use backon::Backoff; + +/// Extension trait for streams returned by [`watcher`](watcher::watcher) or [`reflector`](crate::reflector::reflector) pub trait WatchStreamExt: Stream { /// Apply the [`DefaultBackoff`] watcher [`Backoff`] policy /// @@ -40,6 +43,8 @@ pub trait WatchStreamExt: Stream { /// Decode a [`watcher()`] stream into a stream of applied objects /// /// All Added/Modified events are passed through, and critical errors bubble up. + /// + /// [`watcher()`]: crate::watcher::watcher fn applied_objects(self) -> EventDecode where Self: Stream, watcher::Error>> + Sized, @@ -50,6 +55,8 @@ pub trait WatchStreamExt: Stream { /// Decode a [`watcher()`] stream into a stream of touched objects /// /// All Added/Modified/Deleted events are passed through, and critical errors bubble up. + /// + /// [`watcher()`]: crate::watcher::watcher fn touched_objects(self) -> EventDecode where Self: Stream, watcher::Error>> + Sized, @@ -84,6 +91,8 @@ pub trait WatchStreamExt: Stream { /// # Ok(()) /// # } /// ``` + + /// [`watcher()`]: crate::watcher::watcher fn modify(self, f: F) -> EventModify where Self: Stream, watcher::Error>> + Sized, @@ -173,6 +182,7 @@ pub trait WatchStreamExt: Stream { /// ``` /// /// [`Store`]: crate::reflector::Store + /// [`watcher()`]: crate::watcher::watcher fn reflect(self, writer: Writer) -> Reflect where Self: Stream>> + Sized, @@ -187,7 +197,7 @@ pub trait WatchStreamExt: Stream { /// Returns the stream unmodified, but passes every [`watcher::Event`] /// through a [`Writer`]. This populates a [`Store`] as the stream is /// polled. When the [`watcher::Event`] is not an error or a - /// [`watcher::Event::Deleted`] then its inner object will also be + /// [`watcher::Event::Delete`] then its inner object will also be /// propagated to subscribers. /// /// Subscribers can be created by calling [`subscribe()`] on a [`Writer`]. @@ -212,7 +222,9 @@ pub trait WatchStreamExt: Stream { /// [`Store`]: crate::reflector::Store /// [`subscribe()`]: crate::reflector::store::Writer::subscribe() /// [`Stream`]: futures::stream::Stream - /// [`ReflectHandle`]: crate::reflector::dispatcher::ReflectHandle + /// [`ReflectHandle`]: crate::reflector::ReflectHandle + /// [`watcher()`]: crate::watcher::watcher + /// /// ## Usage /// ```no_run /// # use futures::StreamExt; diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 0163af73d..472f0fc2a 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -18,6 +18,9 @@ use std::{clone::Clone, collections::VecDeque, fmt::Debug, future, time::Duratio use thiserror::Error; use tracing::{debug, error, warn}; +#[cfg(doc)] use crate::WatchStreamExt; +#[cfg(doc)] use backon::Backoff; + #[derive(Debug, Error)] pub enum Error { #[error("failed to perform initial object list: {0}")] @@ -43,26 +46,27 @@ pub enum Event { /// NOTE: This should not be used for managing persistent state elsewhere, since /// events may be lost if the watcher is unavailable. Use Finalizers instead. Delete(K), + /// The watch stream was restarted. /// - /// A series of `InitApply` events are expected to follow until all matching objects + /// A series of [`InitApply`](Event::InitApply) events will follow until all matching objects /// have been listed. This event can be used to prepare a buffer for `InitApply` events. Init, - /// Received an object during `Init`. + /// Received an object during [`Init`](Event::Init). /// - /// Objects returned here are either from the initial stream using the `StreamingList` strategy, - /// or from pages using the `ListWatch` strategy. + /// Objects returned here are either from the initial stream using the [`InitialListStrategy::StreamingList`] strategy, + /// or from pages using the [`InitialListStrategy::ListWatch`] strategy. /// /// These events can be passed up if having a complete set of objects is not a concern. - /// If you need to wait for a complete set, please buffer these events until an `InitDone`. + /// If you need to wait for a complete set, please buffer these events until an [`InitDone`](Event::InitDone). InitApply(K), /// The initialisation is complete. /// /// This can be used as a signal to replace buffered store contents atomically. - /// No more `InitApply` events will happen until the next `Init` event. + /// No more [`InitApply`](Event::InitApply) events will happen until the next [`Init`](Event::Init) event. /// - /// Any objects that were previously [`Applied`](Event::Applied) but are not listed in any of - /// the `InitApply` events should be assumed to have been [`Deleted`](Event::Deleted). + /// Any objects that were previously [`Apply`ed](Event::Apply) but are not listed in any of + /// the [`InitApply`](Event::InitApply) events should be assumed to have been [`Delete`d](Event::Delete). InitDone, } @@ -339,7 +343,7 @@ impl Config { /// Configure typed label selectors /// - /// Configure typed selectors from [`Selector`](kube_client::core::Selector) and [`Expression`](kube_client::core::Expression) lists. + /// Configure typed selectors from [`Selector`] and [`Expression`](kube_client::core::Expression) lists. /// /// ``` /// use kube_runtime::watcher::Config; @@ -748,7 +752,7 @@ where /// /// The stream will attempt to be recovered on the next poll after an [`Err`] is returned. /// This will normally happen immediately, but you can use [`StreamBackoff`](crate::utils::StreamBackoff) -/// to introduce an artificial delay. [`default_backoff`] returns a suitable default set of parameters. +/// to introduce an artificial delay. [`WatchStreamExt::default_backoff`] returns a suitable default set of parameters. /// /// If the watch connection is interrupted, then `watcher` will attempt to restart the watch using the last /// [resource version](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes) @@ -811,7 +815,7 @@ pub fn watcher( /// /// The stream will attempt to be recovered on the next poll after an [`Err`] is returned. /// This will normally happen immediately, but you can use [`StreamBackoff`](crate::utils::StreamBackoff) -/// to introduce an artificial delay. [`default_backoff`] returns a suitable default set of parameters. +/// to introduce an artificial delay. [`WatchStreamExt::default_backoff`] returns a suitable default set of parameters. /// /// If the watch connection is interrupted, then `watcher` will attempt to restart the watch using the last /// [resource version](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes) From ad0b754883032c7ae784801fd2ba96bb73320387 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Thu, 28 Nov 2024 17:08:42 +0100 Subject: [PATCH 6/8] Make clippy happier MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Natalie Klestrup Röijezon --- kube-runtime/src/utils/watch_ext.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index 899aaa9fe..a5c56abd5 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -91,7 +91,7 @@ pub trait WatchStreamExt: Stream { /// # Ok(()) /// # } /// ``` - + /// /// [`watcher()`]: crate::watcher::watcher fn modify(self, f: F) -> EventModify where From 4cfb119d46bbd1e4a62a262bc6b901d208fe5800 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Thu, 28 Nov 2024 17:18:25 +0100 Subject: [PATCH 7/8] Add doccomment for test_ui to satisfy tarpaulin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Natalie Klestrup Röijezon --- kube-derive/tests/test_ui.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kube-derive/tests/test_ui.rs b/kube-derive/tests/test_ui.rs index 446d88c3d..536f6222f 100644 --- a/kube-derive/tests/test_ui.rs +++ b/kube-derive/tests/test_ui.rs @@ -1,3 +1,5 @@ +/// Runs UI tests using [`trybuild`]. + // Test that `kube-derive` outputs helpful error messages. // If you make a change, remove `tests/ui/*.stderr` and run `cargo test`. // Then copy the files that appear under `wip/` if it's what you expected. From 583ac916075f7bf7cb7b6016abecd2e7bd22d065 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Thu, 28 Nov 2024 18:19:51 +0100 Subject: [PATCH 8/8] Silence doc errors for tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This seems to have been added in Rust 1.83 that was just released, and is only triggered by tarpaulin? Signed-off-by: Natalie Klestrup Röijezon --- kube-derive/tests/crd_enum_test.rs | 2 ++ kube-derive/tests/crd_schema_test.rs | 1 + kube-derive/tests/resource.rs | 2 ++ kube-derive/tests/test_ui.rs | 2 +- 4 files changed, 6 insertions(+), 1 deletion(-) diff --git a/kube-derive/tests/crd_enum_test.rs b/kube-derive/tests/crd_enum_test.rs index 8c37fb1db..cc14148cb 100644 --- a/kube-derive/tests/crd_enum_test.rs +++ b/kube-derive/tests/crd_enum_test.rs @@ -1,3 +1,5 @@ +#![allow(missing_docs)] + use kube_derive::CustomResource; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; diff --git a/kube-derive/tests/crd_schema_test.rs b/kube-derive/tests/crd_schema_test.rs index e975d8ff3..3affbf8cb 100644 --- a/kube-derive/tests/crd_schema_test.rs +++ b/kube-derive/tests/crd_schema_test.rs @@ -1,3 +1,4 @@ +#![allow(missing_docs)] #![recursion_limit = "256"] use assert_json_diff::assert_json_eq; diff --git a/kube-derive/tests/resource.rs b/kube-derive/tests/resource.rs index 5d9951bba..545e55292 100644 --- a/kube-derive/tests/resource.rs +++ b/kube-derive/tests/resource.rs @@ -1,3 +1,5 @@ +#![allow(missing_docs)] + use k8s_openapi::{ api::core::v1::{ConfigMap, Secret}, ByteString, diff --git a/kube-derive/tests/test_ui.rs b/kube-derive/tests/test_ui.rs index 536f6222f..068191260 100644 --- a/kube-derive/tests/test_ui.rs +++ b/kube-derive/tests/test_ui.rs @@ -1,4 +1,4 @@ -/// Runs UI tests using [`trybuild`]. +#![allow(missing_docs)] // Test that `kube-derive` outputs helpful error messages. // If you make a change, remove `tests/ui/*.stderr` and run `cargo test`.