Skip to content
This repository has been archived by the owner on Mar 4, 2022. It is now read-only.

Commit

Permalink
apply clippy + rustfmt
Browse files Browse the repository at this point in the history
  • Loading branch information
auguwu committed Dec 4, 2021
1 parent 7aa7a44 commit c2a3809
Showing 1 changed file with 34 additions and 16 deletions.
50 changes: 34 additions & 16 deletions src/kube.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{config::KanataConfig, etcd::Etcd};
use k8s_openapi::api::core::v1::Pod;
use kube::{api::ListParams, api::WatchEvent, Api, Client, ResourceExt, Resource};
use kube::{api::ListParams, api::WatchEvent, Api, Client, Resource, ResourceExt};
use rocket::futures::{StreamExt, TryStreamExt};
use std::{collections::HashMap, ops::Deref};

Expand All @@ -26,7 +26,7 @@ pub struct Kubernetes {
/// from Etcd. This can be easily overrided with the `KANATA_DISABLE_ETCD_RUN` environment
/// variable if you wish to keep refreshing **pod states** with possible garbage data that
/// isn't accurate.
first_run: bool
first_run: bool,
}

impl Kubernetes {
Expand All @@ -41,13 +41,20 @@ impl Kubernetes {
})
}

pub async fn update_pod_state(&mut self, name: &String, phase: String, config: Box<&KanataConfig>) {
pub async fn update_pod_state(
&mut self,
name: &String,
phase: String,
config: &KanataConfig,
) {
let old_phase = self.pod_states.get(name);
let config = Box::clone(&config);

if let Some(old) = old_phase {
if &phase != old {
info!("Pod {} has updated from phase \"{}\" => \"{}\"", name, old, phase);
info!(
"Pod {} has updated from phase \"{}\" => \"{}\"",
name, old, phase
);

// update in-memory cache
self.pod_states.insert(name.deref().to_string(), phase);
Expand All @@ -59,7 +66,11 @@ impl Kubernetes {
}
}
} else {
info!("Pod {} was not cached in-memory, phase is now {}.", name, phase);
info!(
"Pod {} was not cached in-memory, phase is now {}.",
name, phase
);

self.pod_states.insert(name.deref().to_string(), phase);
}
}
Expand Down Expand Up @@ -94,11 +105,7 @@ impl Kubernetes {
if kube.first_run {
info!("this is the first run, populating in-memory cache...");

let pod_states = etcd_client
.clone()
.get("kanata/pods", None)
.await
.ok();
let pod_states = etcd_client.clone().get("kanata/pods", None).await.ok();

if let Some(current_state) = pod_states {
let header = current_state.header();
Expand All @@ -119,12 +126,18 @@ impl Kubernetes {
if let Some(name) = &metadata.name {
info!("found pod {} from iter index #{}", name, index);
} else {
info!("skipping on unknown pod (no name available) | index from iter: {}", index);
info!(
"skipping on unknown pod (no name available) | index from iter: {}",
index
);
}
}
} else {
warn!("missing etcd pod table (assuming first installation), creating...");
etcd_client.put("kanata/pods", "{}", None).await.expect("unable to create pod table.");
etcd_client
.put("kanata/pods", "{}", None)
.await
.expect("unable to create pod table.");
}

// Check the `first_run` property in Kubernetes struct
Expand All @@ -145,7 +158,8 @@ impl Kubernetes {
let phase = status.phase.clone().unwrap_or_default();
let pod_name = pod.name();

self.update_pod_state(&pod_name, phase, Box::new(config)).await;
self.update_pod_state(&pod_name, phase, config)
.await;
}

WatchEvent::Deleted(pod) => {
Expand All @@ -161,8 +175,12 @@ impl Kubernetes {

info!("stream closed, storing old state in etcd...");

let serialized_state = serde_json::to_string(&kube.pod_states).expect("unable to serialize pod state");
etcd_client.put("kanata/pods", serialized_state, None).await.expect("unable to put kanata/pods into etcd");
let serialized_state =
serde_json::to_string(&kube.pod_states).expect("unable to serialize pod state");
etcd_client
.put("kanata/pods", serialized_state, None)
.await
.expect("unable to put kanata/pods into etcd");

Ok(())
}
Expand Down

0 comments on commit c2a3809

Please sign in to comment.