Skip to content

Commit

Permalink
Initial migration backoff -> backon
Browse files Browse the repository at this point in the history
Fixes kube-rs#1635

Signed-off-by: Natalie Klestrup Röijezon <[email protected]>
  • Loading branch information
nightkr committed Nov 28, 2024
1 parent 3ee4ae5 commit 5b949c8
Show file tree
Hide file tree
Showing 16 changed files with 254 additions and 123 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ assert-json-diff = "2.0.2"
async-broadcast = "0.7.0"
async-stream = "0.3.5"
async-trait = "0.1.64"
backoff = "0.4.0"
backon = "1.3.0"
base64 = "0.22.1"
bytes = "1.1.0"
chrono = { version = "0.4.34", default-features = false }
Expand Down
4 changes: 1 addition & 3 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ db-urls = ["https://github.com/rustsec/advisory-db"]
# remove them when we have to
yanked = "warn"

ignore = [
"RUSTSEC-2024-0384", # instant dep via unmaintained backoff dep
]
ignore = []

[licenses]
# See https://spdx.org/licenses/ for list of possible licenses
Expand Down
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ tower-http = { workspace = true, features = ["trace", "decompression-gzip"] }
hyper = { workspace = true, features = ["client", "http1"] }
hyper-util = { workspace = true, features = ["client-legacy", "http1", "tokio"] }
thiserror.workspace = true
backoff.workspace = true
backon.workspace = true
clap = { version = "4.0", default-features = false, features = ["std", "cargo", "derive"] }
edit = "0.1.3"
tokio-stream = { version = "0.1.9", features = ["net"] }
Expand Down
2 changes: 2 additions & 0 deletions examples/errorbounded_configmap_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ use tracing::*;
#[resource(inherit = ConfigMap)]
struct CaConfigMap {
metadata: ObjectMeta,
#[allow(unused)]
data: CaConfigMapData,
}

#[derive(Deserialize, Debug, Clone)]
struct CaConfigMapData {
#[serde(rename = "ca.crt")]
#[allow(unused)]
ca_crt: String,
}

Expand Down
4 changes: 2 additions & 2 deletions examples/shared_stream_controllers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{ops::Deref, sync::Arc, time::Duration};

use futures::{future, StreamExt};
use futures::StreamExt;
use k8s_openapi::api::{apps::v1::Deployment, core::v1::Pod};
use kube::{
runtime::{
Expand Down Expand Up @@ -63,7 +63,7 @@ async fn main() -> anyhow::Result<()> {
.clone()
.map(|r| Ok(r.deref().clone()))
.predicate_filter(predicates::resource_version)
.filter_map(|r| future::ready(r.ok().map(Arc::new)));
.filter_map(|r| std::future::ready(r.ok().map(Arc::new)));

// Reflect a stream of pod watch events into the store and apply a backoff. For subscribers to
// be able to consume updates, the reflector must be shared.
Expand Down
2 changes: 1 addition & 1 deletion kube-client/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ mod tests {
let io: TokioIo<TcpStream> = TokioIo::new(tcp);

tokio::spawn(async move {
let _ = http1::Builder::new()
http1::Builder::new()
.timer(TokioTimer::new())
.serve_connection(
io,
Expand Down
7 changes: 1 addition & 6 deletions kube-client/src/client/client_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,12 +487,7 @@ mod test {

// Fetch using local object reference
let svc: Service = client
.fetch(
&LocalObjectReference {
name: svc.name_any().into(),
}
.within(svc.namespace()),
)
.fetch(&LocalObjectReference { name: svc.name_any() }.within(svc.namespace()))
.await?;
assert_eq!(svc.name_unchecked(), "kubernetes");

Expand Down
4 changes: 4 additions & 0 deletions kube-derive/tests/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,27 @@ use kube_derive::Resource;
#[resource(inherit = "ConfigMap")]
struct TypedMap {
metadata: ObjectMeta,
#[allow(unused)]
data: Option<TypedData>,
}

#[derive(Default)]
struct TypedData {
#[allow(unused)]
field: String,
}

#[derive(Resource, Default)]
#[resource(inherit = "Secret")]
struct TypedSecret {
metadata: ObjectMeta,
#[allow(unused)]
data: Option<TypedSecretData>,
}

#[derive(Default)]
struct TypedSecretData {
#[allow(unused)]
field: ByteString,
}

Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ json-patch.workspace = true
jsonptr.workspace = true
serde_json.workspace = true
thiserror.workspace = true
backoff.workspace = true
backon.workspace = true
async-trait.workspace = true
hashbrown.workspace = true
k8s-openapi.workspace = true
Expand Down
21 changes: 12 additions & 9 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ use crate::{
ObjectRef,
},
scheduler::{debounced_scheduler, ScheduleRequest},
utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt},
watcher::{self, metadata_watcher, watcher, DefaultBackoff},
utils::{
trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, ResettableBackoff,
ResettableBackoffWrapper, StreamBackoff, WatchStreamExt,
},
watcher::{self, metadata_watcher, watcher, DefaultBackoffBuilder},
};
use backoff::backoff::Backoff;
use backon::BackoffBuilder;
use educe::Educe;
use futures::{
channel,
Expand Down Expand Up @@ -629,7 +632,7 @@ where
{
// NB: Need to Unpin for stream::select_all
trigger_selector: stream::SelectAll<BoxStream<'static, Result<ReconcileRequest<K>, watcher::Error>>>,
trigger_backoff: Box<dyn Backoff + Send>,
trigger_backoff: Box<dyn ResettableBackoff + Send>,
/// [`run`](crate::Controller::run) starts a graceful shutdown when any of these [`Future`]s complete,
/// refusing to start any new reconciliations but letting any existing ones finish.
graceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
Expand Down Expand Up @@ -689,7 +692,7 @@ where
trigger_selector.push(self_watcher);
Self {
trigger_selector,
trigger_backoff: Box::<DefaultBackoff>::default(),
trigger_backoff: Box::<ResettableBackoffWrapper<DefaultBackoffBuilder>>::default(),
graceful_shutdown_selector: vec![
// Fallback future, ensuring that we never terminate if no additional futures are added to the selector
future::pending().boxed(),
Expand Down Expand Up @@ -775,7 +778,7 @@ where
trigger_selector.push(self_watcher);
Self {
trigger_selector,
trigger_backoff: Box::<DefaultBackoff>::default(),
trigger_backoff: Box::<ResettableBackoffWrapper<DefaultBackoffBuilder>>::default(),
graceful_shutdown_selector: vec![
// Fallback future, ensuring that we never terminate if no additional futures are added to the selector
future::pending().boxed(),
Expand Down Expand Up @@ -886,7 +889,7 @@ where
trigger_selector.push(self_watcher);
Self {
trigger_selector,
trigger_backoff: Box::<DefaultBackoff>::default(),
trigger_backoff: Box::<ResettableBackoffWrapper<DefaultBackoffBuilder>>::default(),
graceful_shutdown_selector: vec![
// Fallback future, ensuring that we never terminate if no additional futures are added to the selector
future::pending().boxed(),
Expand Down Expand Up @@ -915,8 +918,8 @@ where
/// The [`default_backoff`](crate::watcher::default_backoff) follows client-go conventions,
/// but can be overridden by calling this method.
#[must_use]
pub fn trigger_backoff(mut self, backoff: impl Backoff + Send + 'static) -> Self {
self.trigger_backoff = Box::new(backoff);
pub fn trigger_backoff(mut self, backoff_builder: impl BackoffBuilder + Clone + 'static) -> Self {
self.trigger_backoff = Box::new(ResettableBackoffWrapper::new(backoff_builder));
self
}

Expand Down
114 changes: 72 additions & 42 deletions kube-runtime/src/utils/backoff_reset_timer.rs
Original file line number Diff line number Diff line change
@@ -1,86 +1,116 @@
use std::time::{Duration, Instant};

use backoff::{backoff::Backoff, Clock, SystemClock};
use backon::BackoffBuilder;

/// A [`Backoff`] wrapper that resets after a fixed duration has elapsed.
pub struct ResetTimerBackoff<B, C = SystemClock> {
backoff: B,
clock: C,
last_backoff: Option<Instant>,
reset_duration: Duration,
use super::{ResettableBackoff, ResettableBackoffWrapper};

// TODO: do we actually need this or should we just use tokio::time?
pub trait Clock: Send + Sync + Unpin {
fn now(&self) -> Instant;
}

#[derive(Debug, Clone, Copy)]
pub struct TokioClock;
impl Clock for TokioClock {
fn now(&self) -> Instant {
tokio::time::Instant::now().into_std()
}
}

impl<B: Backoff> ResetTimerBackoff<B> {
pub fn new(backoff: B, reset_duration: Duration) -> Self {
Self::new_with_custom_clock(backoff, reset_duration, SystemClock {})
impl<B: BackoffBuilder> ResetTimerBackoffBuilder<B> {
pub fn new(inner_backoff_builder: B, reset_duration: Duration) -> Self {
Self::new_with_custom_clock(inner_backoff_builder, reset_duration, TokioClock)
}
}

impl<B: Backoff, C: Clock> ResetTimerBackoff<B, C> {
fn new_with_custom_clock(backoff: B, reset_duration: Duration, clock: C) -> Self {
impl<B: BackoffBuilder, C: Clock> ResetTimerBackoffBuilder<B, C> {
fn new_with_custom_clock(inner_backoff_builder: B, reset_duration: Duration, clock: C) -> Self {
Self {
backoff,
inner_backoff_builder,
clock,
last_backoff: None,
reset_duration,
}
}
}

impl<B: Backoff, C: Clock> Backoff for ResetTimerBackoff<B, C> {
fn next_backoff(&mut self) -> Option<Duration> {
/// A [`Backoff`] wrapper that resets after a fixed duration has elapsed.
#[derive(Debug, Clone)]
pub struct ResetTimerBackoffBuilder<B, C = TokioClock> {
inner_backoff_builder: B,
clock: C,
reset_duration: Duration,
}

impl<B: BackoffBuilder + Clone, C: Clock> BackoffBuilder for ResetTimerBackoffBuilder<B, C> {
type Backoff = ResetTimerBackoff<ResettableBackoffWrapper<B>, C>;

fn build(self) -> Self::Backoff {
ResetTimerBackoff {
inner_backoff: ResettableBackoffWrapper::new(self.inner_backoff_builder),
clock: self.clock,
reset_duration: self.reset_duration,
last_backoff: None,
}
}
}

/// Constructed by [`ResetTimerBackoffBuilder`].
#[derive(Debug)]
pub struct ResetTimerBackoff<B, C = TokioClock> {
inner_backoff: B,
clock: C,
reset_duration: Duration,
last_backoff: Option<Instant>,
}

// impl Backoff, which is now effectively an alias for Iterator<Item = Duration>
impl<B: ResettableBackoff, C: Clock> Iterator for ResetTimerBackoff<B, C> {
type Item = Duration;

fn next(&mut self) -> Option<Duration> {
if let Some(last_backoff) = self.last_backoff {
if self.clock.now() > last_backoff + self.reset_duration {
tracing::debug!(
?last_backoff,
reset_duration = ?self.reset_duration,
"Resetting backoff, since reset duration has expired"
);
self.backoff.reset();
self.inner_backoff.reset();
}
}
self.last_backoff = Some(self.clock.now());
self.backoff.next_backoff()
}

fn reset(&mut self) {
// Do not even bother trying to reset here, since `next_backoff` will take care of this when the timer expires.
self.inner_backoff.next()
}
}

#[cfg(test)]
mod tests {
use backoff::{backoff::Backoff, Clock};
use backon::BackoffBuilder;
use tokio::time::advance;

use super::ResetTimerBackoff;
use crate::utils::stream_backoff::tests::LinearBackoff;
use std::time::{Duration, Instant};
use crate::utils::{
backoff_reset_timer::TokioClock, stream_backoff::tests::LinearBackoffBuilder,
ResetTimerBackoffBuilder,
};
use std::time::Duration;

#[tokio::test]
async fn should_reset_when_timer_expires() {
tokio::time::pause();
let mut backoff = ResetTimerBackoff::new_with_custom_clock(
LinearBackoff::new(Duration::from_secs(2)),
let mut backoff = ResetTimerBackoffBuilder::new_with_custom_clock(
LinearBackoffBuilder::new(Duration::from_secs(2)),
Duration::from_secs(60),
TokioClock,
);
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2)));
)
.build();
assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
advance(Duration::from_secs(40)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(4)));
assert_eq!(backoff.next(), Some(Duration::from_secs(4)));
advance(Duration::from_secs(40)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(6)));
assert_eq!(backoff.next(), Some(Duration::from_secs(6)));
advance(Duration::from_secs(80)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2)));
assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
advance(Duration::from_secs(80)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2)));
}

struct TokioClock;

impl Clock for TokioClock {
fn now(&self) -> Instant {
tokio::time::Instant::now().into_std()
}
assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
}
}
55 changes: 55 additions & 0 deletions kube-runtime/src/utils/backoff_resettable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::{ops::DerefMut, time::Duration};

use backon::{Backoff, BackoffBuilder};

/// A [`Backoff`] that can also be reset.
///
/// Implemented by [`ResettableBackoffWrapper`].
// Separated into a trait so that it can be used as a trait object, erasing the backing [`BackoffBuilder`].
pub trait ResettableBackoff: Backoff {
fn reset(&mut self);
}

impl ResettableBackoff for Box<dyn ResettableBackoff + Send> {
fn reset(&mut self) {
Box::deref_mut(self).reset();
}
}

/// Implements [`ResettableBackoff`] by reconstructing the backing [`Backoff`] each time [`Self::reset`] has been called.
#[derive(Debug)]
pub struct ResettableBackoffWrapper<B: BackoffBuilder> {
backoff_builder: B,
current_backoff: Option<B::Backoff>,
}

impl<B: BackoffBuilder> ResettableBackoffWrapper<B> {
pub fn new(backoff_builder: B) -> Self {
Self {
backoff_builder,
current_backoff: None,
}
}
}

impl<B: BackoffBuilder + Default> Default for ResettableBackoffWrapper<B> {
fn default() -> Self {
Self::new(B::default())
}
}

impl<B: BackoffBuilder + Clone> Iterator for ResettableBackoffWrapper<B> {
type Item = Duration;

fn next(&mut self) -> Option<Self::Item> {
self.current_backoff
.get_or_insert_with(|| self.backoff_builder.clone().build())
.next()
}
}

impl<B: BackoffBuilder + Clone> ResettableBackoff for ResettableBackoffWrapper<B> {
fn reset(&mut self) {
self.current_backoff = None;
}
}
Loading

0 comments on commit 5b949c8

Please sign in to comment.