diff --git a/agent/Cargo.lock b/agent/Cargo.lock index 3548be11c3f..5b9f3a59d95 100644 --- a/agent/Cargo.lock +++ b/agent/Cargo.lock @@ -26,17 +26,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" -[[package]] -name = "ahash" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" -dependencies = [ - "getrandom", - "once_cell", - "version_check", -] - [[package]] name = "ahash" version = "0.8.3" @@ -417,7 +406,7 @@ version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "58da0ae1e701ea752cc46c1bb9f39d5ecefc7395c3ecd526261a566d4f16e0c2" dependencies = [ - "ahash 0.8.3", + "ahash", "base64 0.13.1", "bitvec", "hex", @@ -1005,7 +994,7 @@ dependencies = [ name = "deepflow-agent" version = "0.1.0" dependencies = [ - "ahash 0.8.3", + "ahash", "anyhow", "arc-swap", "base64 0.21.2", @@ -1042,9 +1031,9 @@ dependencies = [ "integration_skywalking", "ipnet", "ipnetwork", - "k8s-openapi", + "k8s-openapi 0.16.0", "kube", - "kube-derive 0.74.0 (registry+https://github.com/rust-lang/crates.io-index)", + "kube-derive", "l7", "lazy_static", "libc", @@ -1667,7 +1656,7 @@ version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" dependencies = [ - "ahash 0.8.3", + "ahash", ] [[package]] @@ -2181,12 +2170,13 @@ dependencies = [ [[package]] name = "json-patch" -version = "0.2.7" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb3fa5a61630976fc4c353c70297f2e93f1930e3ccee574d59d618ccbd5154ce" +checksum = "e712e62827c382a77b87f590532febb1f8b2fdbc3eefa1ee37fe7281687075ef" dependencies = [ "serde", "serde_json", + "thiserror", "treediff", ] @@ -2213,9 +2203,26 @@ dependencies = [ [[package]] name = "k8s-openapi" -version = "0.15.0" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcc1f973542059e6d5a6d63de6a9539d0ec784f82b2327f3c1915d33200bc6a4" +dependencies = [ + "base64 0.13.1", + "bytes 1.4.0", + "chrono", + "http 0.2.9", + "percent-encoding", + "serde", + "serde-value", + "serde_json", + "url", +] + +[[package]] +name = "k8s-openapi" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2ae2c04fcee6b01b04e3aadd56bb418932c8e0a9d8a93f48bc68c6bdcdb559d" +checksum = "6d9455388f4977de4d0934efa9f7d36296295537d774574113a20f6082de03da" dependencies = [ "base64 0.13.1", "bytes 1.4.0", @@ -2231,20 +2238,22 @@ dependencies = [ [[package]] name = "kube" -version = "0.74.0" -source = "git+https://github.com/deepflowio/kube?tag=0.74.2#c423734b258393beb1019bd6896ea5b9f7df5e36" +version = "0.77.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ba77b857a9581e3d1cb1165f9cb1d1732d65ce52642498addae8fa2c6d5e037" dependencies = [ - "k8s-openapi", + "k8s-openapi 0.16.0", "kube-client", "kube-core", - "kube-derive 0.74.0 (git+https://github.com/deepflowio/kube?tag=0.74.2)", + "kube-derive", "kube-runtime", ] [[package]] name = "kube-client" -version = "0.74.0" -source = "git+https://github.com/deepflowio/kube?tag=0.74.2#c423734b258393beb1019bd6896ea5b9f7df5e36" +version = "0.77.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e80db3ca107e89da5f7eae37ea5274e06cefdcf9689d0ebd5ec3575a6f983e4e" dependencies = [ "base64 0.13.1", "bytes 1.4.0", @@ -2258,7 +2267,7 @@ dependencies = [ "hyper-rustls 0.23.2", "hyper-timeout", "jsonpath_lib", - "k8s-openapi", + "k8s-openapi 0.16.0", "kube-core", "pem", "pin-project", @@ -2278,14 +2287,15 @@ dependencies = [ [[package]] name = "kube-core" -version = "0.74.0" -source = "git+https://github.com/deepflowio/kube?tag=0.74.2#c423734b258393beb1019bd6896ea5b9f7df5e36" +version = "0.77.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fce686d2fbdaf6cb18d19cdb0692e9485dd9945f79f944b8772bdb2a07e8d39d" dependencies = [ "chrono", "form_urlencoded", "http 0.2.9", "json-patch", - "k8s-openapi", + "k8s-openapi 0.16.0", "once_cell", "schemars", "serde", @@ -2295,21 +2305,9 @@ dependencies = [ [[package]] name = "kube-derive" -version = "0.74.0" +version = "0.77.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66d74121eb41af4480052901f31142d8d9bbdf1b7c6b856da43bcb02f5b1b177" -dependencies = [ - "darling", - "proc-macro2", - "quote", - "serde_json", - "syn 1.0.109", -] - -[[package]] -name = "kube-derive" -version = "0.74.0" -source = "git+https://github.com/deepflowio/kube?tag=0.74.2#c423734b258393beb1019bd6896ea5b9f7df5e36" +checksum = "93ef49d30d03c5de8041e2ab5dc421d671d6225ffd53975571d4a5b18d5e50fb" dependencies = [ "darling", "proc-macro2", @@ -2320,15 +2318,16 @@ dependencies = [ [[package]] name = "kube-runtime" -version = "0.74.0" -source = "git+https://github.com/deepflowio/kube?tag=0.74.2#c423734b258393beb1019bd6896ea5b9f7df5e36" +version = "0.77.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acc59ede459fd8e944ab1e6ff798aca83188b08aeb44e8c3d6f028db2d74233c" dependencies = [ - "ahash 0.7.6", + "ahash", "backoff", "derivative", "futures", "json-patch", - "k8s-openapi", + "k8s-openapi 0.16.0", "kube-client", "parking_lot 0.12.1", "pin-project", @@ -2738,13 +2737,14 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "openshift-openapi" version = "0.3.1" -source = "git+https://github.com/deepflowio/openshift-openapi.git#6498131f5f5ef80411a0aee2e7e9ba632bfdb406" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "405ddcb6c72b1becd95b576a9d136e7ad9ef400f7c464aaf38271c1637edc526" dependencies = [ "base64 0.12.3", "bytes 0.5.6", "chrono", "http 0.2.9", - "k8s-openapi", + "k8s-openapi 0.11.0", "percent-encoding", "serde", "serde-value", @@ -3235,7 +3235,7 @@ dependencies = [ "flate2", "futures", "ipnet", - "k8s-openapi", + "k8s-openapi 0.16.0", "kube", "libc", "log 0.4.22", @@ -4387,7 +4387,7 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" name = "trace-utils" version = "0.1.0" dependencies = [ - "ahash 0.8.3", + "ahash", "btf-rs", "cc", "env_logger 0.11.5", @@ -4444,9 +4444,9 @@ dependencies = [ [[package]] name = "treediff" -version = "3.0.2" +version = "4.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "761e8d5ad7ce14bb82b7e61ccc0ca961005a275a060b9644a2431aa11553c2ff" +checksum = "4d127780145176e2b5d16611cc25a900150e86e9fd79d3bde6ff3a37359c9cb5" dependencies = [ "serde_json", ] diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 7f18178e94a..7cb5bcf39f2 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -14,6 +14,8 @@ exclude = [ ] [workspace.dependencies] +k8s-openapi = { version = "0.16", features = ["v1_19", "schemars"] } +kube = { version = "0.77", default-features = false } prost = "0.11" [dependencies] @@ -108,22 +110,18 @@ procfs = { git = "https://github.com/deepflowio/procfs/" } reorder = { path = "plugins/reorder" } [target.'cfg(target_os = "linux")'.dependencies] -k8s-openapi = { version = "^0.15", features = ["v1_19", "schemars"] } -kube = { version = "0.74", default-features = false, features = [ +k8s-openapi.workspace = true +kube = { workspace = true, features = [ "client", "derive", "runtime", "rustls-tls", ] } -kube-derive = "0.74" -openshift-openapi = { version = "0.3.1", features = ["v4_6"] } +kube-derive = "0.77" +openshift-openapi = { version = "0.3.1", features = ["v4_5"] } schemars = "0.8" trace-utils = { path = "crates/trace-utils" } -[patch.crates-io] -kube = { git = "https://github.com/deepflowio/kube", tag = "0.74.2" } -openshift-openapi = { git = "https://github.com/deepflowio/openshift-openapi.git" } - [target.'cfg(target_os = "windows")'.dependencies] pcap = "0.10.1" winapi = { version = "0.3.9", features = [ diff --git a/agent/crates/public/Cargo.toml b/agent/crates/public/Cargo.toml index 7ec1d92b370..016eeebb16c 100644 --- a/agent/crates/public/Cargo.toml +++ b/agent/crates/public/Cargo.toml @@ -28,8 +28,8 @@ thiserror = "1.0" tonic = "0.8.1" [target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies] -k8s-openapi = { version = "^0.15", features = ["v1_19", "schemars"] } -kube = { version = "0.74", default-features = false, features = ["client"] } +k8s-openapi.workspace = true +kube = { workspace = true, features = ["client"] } neli = "0.6.4" nix = "0.23" diff --git a/agent/src/platform/kubernetes/resource_watcher.rs b/agent/src/platform/kubernetes/resource_watcher.rs index 222618018d7..ab51cb69e86 100644 --- a/agent/src/platform/kubernetes/resource_watcher.rs +++ b/agent/src/platform/kubernetes/resource_watcher.rs @@ -41,7 +41,10 @@ use k8s_openapi::{ }, extensions, networking, }, - apimachinery::pkg::apis::meta::v1::ObjectMeta, + apimachinery::pkg::apis::meta::v1::{ + FieldsV1, ManagedFieldsEntry, ObjectMeta, OwnerReference, Time, + }, + Metadata, }; use kube::{ api::{ListParams, WatchEvent}, @@ -49,7 +52,6 @@ use kube::{ Api, Client, Error as ClientErr, Resource as KubeResource, ResourceExt, }; use log::{debug, info, trace, warn}; -use openshift_openapi::api::route::v1::Route; use serde::de::DeserializeOwned; use serde::ser::Serialize; use tokio::{runtime::Handle, sync::Mutex, task::JoinHandle, time}; @@ -78,6 +80,121 @@ pub trait Watcher { fn ready(&self) -> bool; } +#[derive(Clone, Debug)] +pub struct Route { + metadata: ObjectMeta, + inner: openshift_openapi::api::route::v1::Route, +} + +impl Metadata for Route { + type Ty = ObjectMeta; + + fn metadata(&self) -> &::Ty { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut ::Ty { + &mut self.metadata + } +} + +impl k8s_openapi::Resource for Route { + type Scope = k8s_openapi::NamespaceResourceScope; + + const API_VERSION: &'static str = "route.openshift.io/v1"; + const GROUP: &'static str = "route.openshift.io"; + const KIND: &'static str = "Route"; + const VERSION: &'static str = "v1"; + const URL_PATH_SEGMENT: &'static str = "routes"; +} + +impl serde::Serialize for Route { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut state = serializer.serialize_struct(::KIND, 5)?; + serde::ser::SerializeStruct::serialize_field( + &mut state, + "apiVersion", + ::API_VERSION, + )?; + serde::ser::SerializeStruct::serialize_field( + &mut state, + "kind", + ::KIND, + )?; + serde::ser::SerializeStruct::serialize_field(&mut state, "metadata", &self.metadata)?; + serde::ser::SerializeStruct::serialize_field(&mut state, "spec", &self.inner.spec)?; + serde::ser::SerializeStruct::serialize_field(&mut state, "status", &self.inner.status)?; + serde::ser::SerializeStruct::end(state) + } +} + +impl<'de> serde::Deserialize<'de> for Route { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let mut os_route = openshift_openapi::api::route::v1::Route::deserialize(deserializer)?; + Ok(Route { + metadata: ObjectMeta { + annotations: os_route.metadata.annotations.take(), + creation_timestamp: os_route + .metadata + .creation_timestamp + .take() + .map(|t| Time(t.0)), + deletion_grace_period_seconds: os_route + .metadata + .deletion_grace_period_seconds + .take(), + deletion_timestamp: os_route + .metadata + .deletion_timestamp + .take() + .map(|t| Time(t.0)), + finalizers: os_route.metadata.finalizers.take(), + generate_name: os_route.metadata.generate_name.take(), + generation: os_route.metadata.generation.take(), + labels: os_route.metadata.labels.take(), + managed_fields: os_route.metadata.managed_fields.take().map(|fs| { + fs.into_iter() + .map(|f| ManagedFieldsEntry { + api_version: f.api_version, + fields_type: f.fields_type, + fields_v1: f.fields_v1.map(|f| FieldsV1(f.0)), + manager: f.manager, + operation: f.operation, + time: f.time.map(|t| Time(t.0)), + ..Default::default() + }) + .collect() + }), + name: os_route.metadata.name.take(), + namespace: os_route.metadata.namespace.take(), + owner_references: os_route.metadata.owner_references.take().map(|rs| { + rs.into_iter() + .map(|r| OwnerReference { + api_version: r.api_version, + block_owner_deletion: r.block_owner_deletion, + controller: r.controller, + kind: r.kind, + name: r.name, + uid: r.uid, + }) + .collect() + }), + resource_version: os_route.metadata.resource_version.take(), + self_link: os_route.metadata.self_link.take(), + uid: os_route.metadata.uid.take(), + ..Default::default() + }, + inner: os_route, + }) + } +} + #[enum_dispatch(Watcher)] #[derive(Clone)] pub enum GenericResourceWatcher { @@ -1089,7 +1206,7 @@ impl Trimmable for Route { namespace: self.metadata.namespace.take(), ..Default::default() }; - self.status = Default::default(); + self.inner.status = Default::default(); self } } @@ -1223,22 +1340,53 @@ impl ResourceWatcherFactory { } } - fn new_watcher_inner( + fn new_cluster_resource( &self, kind: Resource, stats_collector: &stats::Collector, - namespace: Option<&str>, config: &WatcherConfig, ) -> ResourceWatcher where - K: Clone + Debug + DeserializeOwned + KubeResource + Serialize + Trimmable, + K: Clone + + Debug + + DeserializeOwned + + KubeResource + + Serialize + + Trimmable, ::DynamicType: Default, { let watcher = ResourceWatcher::new( - match namespace { - Some(namespace) => Api::namespaced(self.client.clone(), namespace), - None => Api::all(self.client.clone()), - }, + Api::all(self.client.clone()), + kind, + self.runtime.clone(), + config, + self.listing.clone(), + ); + stats_collector.register_countable( + &stats::SingleTagModule("resource_watcher", "kind", &watcher.kind), + Countable::Ref(Arc::downgrade(&watcher.stats_counter) as Weak), + ); + watcher + } + + fn new_namespace_resource( + &self, + kind: Resource, + stats_collector: &stats::Collector, + namespace: &str, + config: &WatcherConfig, + ) -> ResourceWatcher + where + K: Clone + + Debug + + DeserializeOwned + + KubeResource + + Serialize + + Trimmable, + ::DynamicType: Default, + { + let watcher = ResourceWatcher::new( + Api::namespaced(self.client.clone(), namespace), kind, self.runtime.clone(), config, @@ -1258,33 +1406,32 @@ impl ResourceWatcherFactory { stats_collector: &stats::Collector, config: &WatcherConfig, ) -> Option { + let namespace = namespace.unwrap_or(""); let watcher = match resource.name { // 特定namespace不支持Node/Namespace资源 - "nodes" => GenericResourceWatcher::Node(self.new_watcher_inner( + "nodes" => GenericResourceWatcher::Node(self.new_cluster_resource( resource, stats_collector, - None, config, )), - "namespaces" => GenericResourceWatcher::Namespace(self.new_watcher_inner( + "namespaces" => GenericResourceWatcher::Namespace(self.new_cluster_resource( resource, stats_collector, - None, config, )), - "services" => GenericResourceWatcher::Service(self.new_watcher_inner( + "services" => GenericResourceWatcher::Service(self.new_namespace_resource( resource, stats_collector, namespace, config, )), - "deployments" => GenericResourceWatcher::Deployment(self.new_watcher_inner( + "deployments" => GenericResourceWatcher::Deployment(self.new_namespace_resource( resource, stats_collector, namespace, config, )), - "pods" => GenericResourceWatcher::Pod(self.new_watcher_inner( + "pods" => GenericResourceWatcher::Pod(self.new_namespace_resource( resource, stats_collector, namespace, @@ -1294,7 +1441,7 @@ impl ResourceWatcherFactory { GroupVersion { group: "apps.kruise.io", version: "v1beta1", - } => GenericResourceWatcher::KruiseStatefulSet(self.new_watcher_inner( + } => GenericResourceWatcher::KruiseStatefulSet(self.new_namespace_resource( resource, stats_collector, namespace, @@ -1303,7 +1450,7 @@ impl ResourceWatcherFactory { GroupVersion { group: "apps", version: "v1", - } => GenericResourceWatcher::StatefulSet(self.new_watcher_inner( + } => GenericResourceWatcher::StatefulSet(self.new_namespace_resource( resource, stats_collector, namespace, @@ -1318,16 +1465,16 @@ impl ResourceWatcherFactory { return None; } }, - "daemonsets" => GenericResourceWatcher::DaemonSet(self.new_watcher_inner( + "daemonsets" => GenericResourceWatcher::DaemonSet(self.new_namespace_resource( resource, stats_collector, namespace, config, )), "replicationcontrollers" => GenericResourceWatcher::ReplicationController( - self.new_watcher_inner(resource, stats_collector, namespace, config), + self.new_namespace_resource(resource, stats_collector, namespace, config), ), - "replicasets" => GenericResourceWatcher::ReplicaSet(self.new_watcher_inner( + "replicasets" => GenericResourceWatcher::ReplicaSet(self.new_namespace_resource( resource, stats_collector, namespace, @@ -1337,7 +1484,7 @@ impl ResourceWatcherFactory { GroupVersion { group: "networking.k8s.io", version: "v1", - } => GenericResourceWatcher::V1Ingress(self.new_watcher_inner( + } => GenericResourceWatcher::V1Ingress(self.new_namespace_resource( resource, stats_collector, namespace, @@ -1346,7 +1493,7 @@ impl ResourceWatcherFactory { GroupVersion { group: "networking.k8s.io", version: "v1beta1", - } => GenericResourceWatcher::V1beta1Ingress(self.new_watcher_inner( + } => GenericResourceWatcher::V1beta1Ingress(self.new_namespace_resource( resource, stats_collector, namespace, @@ -1355,7 +1502,7 @@ impl ResourceWatcherFactory { GroupVersion { group: "extensions", version: "v1beta1", - } => GenericResourceWatcher::ExtV1beta1Ingress(self.new_watcher_inner( + } => GenericResourceWatcher::ExtV1beta1Ingress(self.new_namespace_resource( resource, stats_collector, namespace, @@ -1370,32 +1517,32 @@ impl ResourceWatcherFactory { return None; } }, - "routes" => GenericResourceWatcher::Route(self.new_watcher_inner( + "routes" => GenericResourceWatcher::Route(self.new_namespace_resource( resource, stats_collector, namespace, config, )), - "servicerules" => GenericResourceWatcher::ServiceRule(self.new_watcher_inner( + "servicerules" => GenericResourceWatcher::ServiceRule(self.new_namespace_resource( resource, stats_collector, namespace, config, )), - "clonesets" => GenericResourceWatcher::CloneSet(self.new_watcher_inner( + "clonesets" => GenericResourceWatcher::CloneSet(self.new_namespace_resource( resource, stats_collector, namespace, config, )), - "ippools" => GenericResourceWatcher::IpPool(self.new_watcher_inner( + "ippools" => GenericResourceWatcher::IpPool(self.new_namespace_resource( resource, stats_collector, namespace, config, )), "opengaussclusters" => GenericResourceWatcher::OpenGaussCluster( - self.new_watcher_inner(resource, stats_collector, namespace, config), + self.new_namespace_resource(resource, stats_collector, namespace, config), ), _ => { warn!("unsupported resource {}", resource.name);