diff --git a/agent/kubePreUpgrade.go b/agent/kubePreUpgrade.go index 5968ef96..ebf49ffa 100644 --- a/agent/kubePreUpgrade.go +++ b/agent/kubePreUpgrade.go @@ -25,12 +25,13 @@ import ( ) const ( - baseURL = "https://raw.githubusercontent.com/kubernetes/kubernetes" - fileURL = "api/openapi-spec/swagger.json" - crdGroup = "apiextensions.k8s.io" - apiRegistration = "apiregistration.k8s.io" - kubeConfigFileName = "dev-config" - eventSubject_depricated_deleted = "METRICS.kubepug" + baseURL = "https://raw.githubusercontent.com/kubernetes/kubernetes" + fileURL = "api/openapi-spec/swagger.json" + crdGroup = "apiextensions.k8s.io" + apiRegistration = "apiregistration.k8s.io" + kubeConfigFileName = "dev-config" + eventSubject_deleted = "METRICS.deletedAPI" + eventSubject_depricated = "METRICS.deprecatedAPI" ) type ignoreStruct map[string]struct{} @@ -56,16 +57,30 @@ var ( var result *model.Result func publishK8sDepricated_Deleted_Api(result *model.Result, js nats.JetStreamContext) error { - metrics := result - metrics.ClusterName = ClusterName - metricsJson, _ := json.Marshal(metrics) - _, err := js.Publish(eventSubject_depricated_deleted, metricsJson) - if err != nil { - return err + for _, deprecatedAPI := range result.DeprecatedAPIs { + deprecatedAPI.ClusterName = ClusterName + fmt.Println("deprecatedAPI", deprecatedAPI) + deprecatedAPIJson, _ := json.Marshal(deprecatedAPI) + _, err := js.Publish(eventSubject_depricated, deprecatedAPIJson) + if err != nil { + return err + } } + + for _, deletedAPI := range result.DeletedAPIs { + deletedAPI.ClusterName = ClusterName + fmt.Println("deletedAPI", deletedAPI) + deletedAPIJson, _ := json.Marshal(deletedAPI) + _, err := js.Publish(eventSubject_deleted, deletedAPIJson) + if err != nil { + return err + } + } + log.Printf("Metrics with Deletedapi and depricated api has been published") return nil } + func KubePreUpgradeDetector(js nats.JetStreamContext, wg *sync.WaitGroup, errCh chan error) { defer wg.Done() swaggerdir, err := os.MkdirTemp("", "kubepug") diff --git a/clickhouse/clickhouse_client.go b/clickhouse/clickhouse_client.go index d27f3741..944c88b1 100644 --- a/clickhouse/clickhouse_client.go +++ b/clickhouse/clickhouse_client.go @@ -55,11 +55,31 @@ func CreateSchema(connect *sql.DB) { func CreateKubePugSchema(connect *sql.DB) { _, err := connect.Exec(` - CREATE TABLE IF NOT EXISTS deprecatedAPIs_and_deletedAPIs ( - result String, - cluster_name String + CREATE TABLE IF NOT EXISTS DeprecatedAPIs ( + Description String, + Kind String, + Deprecated UInt8, + ClusterName String, + Scope String, + ObjectName String ) engine=File(TabSeparated) - `) + `) + if err != nil { + log.Fatal(err) + } + + _, err = connect.Exec(` + CREATE TABLE IF NOT EXISTS DeletedAPIs ( + Group String, + Kind String, + Version String, + Name String, + Deleted UInt8, + ClusterName String, + Scope String, + ObjectName String + ) engine=File(TabSeparated) + `) if err != nil { log.Fatal(err) } @@ -133,19 +153,63 @@ func InsertOutdatedEvent(connect *sql.DB, metrics model.CheckResultfinal) { } } -func InsertKubepugEvent(connect *sql.DB, metrics model.Result) { +func InsertDeprecatedAPI(connect *sql.DB, deprecatedAPI model.DeprecatedAPI) { var ( tx, _ = connect.Begin() - stmt, _ = tx.Prepare("INSERT INTO deprecatedAPIs_and_deletedAPIs (result, cluster_name) VALUES (?, ?)") + stmt, _ = tx.Prepare("INSERT INTO DeprecatedAPIs (Description, Kind, Deprecated, ClusterName, Scope, ObjectName) VALUES (?, ?, ?, ?, ?, ?)") ) defer stmt.Close() - eventJson, _ := json.Marshal(metrics) - if _, err := stmt.Exec( - string(eventJson), - metrics.ClusterName, - ); err != nil { + + deprecated := uint8(0) + if deprecatedAPI.Deprecated { + deprecated = 1 + } + + for _, item := range deprecatedAPI.Items { + if _, err := stmt.Exec( + deprecatedAPI.Description, + deprecatedAPI.Kind, + deprecated, + deprecatedAPI.ClusterName, + item.Scope, + item.ObjectName, + ); err != nil { + log.Fatal(err) + } + } + + if err := tx.Commit(); err != nil { log.Fatal(err) } +} + +func InsertDeletedAPI(connect *sql.DB, deletedAPI model.DeletedAPI) { + var ( + tx, _ = connect.Begin() + stmt, _ = tx.Prepare("INSERT INTO DeletedAPIs (Group, Kind, Version, Name, Deleted, ClusterName, Scope, ObjectName) VALUES (?, ?, ?, ?, ?, ?, ?, ?)") + ) + defer stmt.Close() + + deleted := uint8(0) + if deletedAPI.Deleted { + deleted = 1 + } + + for _, item := range deletedAPI.Items { + if _, err := stmt.Exec( + deletedAPI.Group, + deletedAPI.Kind, + deletedAPI.Version, + deletedAPI.Name, + deleted, + deletedAPI.ClusterName, + item.Scope, + item.ObjectName, + ); err != nil { + log.Fatal(err) + } + } + if err := tx.Commit(); err != nil { log.Fatal(err) } diff --git a/client/k8smetrics_client.go b/client/k8smetrics_client.go index ed254e65..51b55d3f 100644 --- a/client/k8smetrics_client.go +++ b/client/k8smetrics_client.go @@ -91,17 +91,29 @@ func main() { }, nats.Durable("OUTDATED_EVENTS_CONSUMER"), nats.ManualAck()) // consume kubepug result and insert in clickhouse - js.Subscribe("METRICS.kubepug", func(msg *nats.Msg) { + js.Subscribe("METRICS.deprecatedAPI", func(msg *nats.Msg) { msg.Ack() - var metrics model.Result - err := json.Unmarshal(msg.Data, &metrics) + var deprecatedAPI model.DeprecatedAPI + err := json.Unmarshal(msg.Data, &deprecatedAPI) + if err != nil { + log.Fatal(err) + } + log.Printf("Deprecated API Received: %#v, clustername: %s", deprecatedAPI, deprecatedAPI.ClusterName) + clickhouse.InsertDeprecatedAPI(connection, deprecatedAPI) + log.Println() + }, nats.Durable("DEPRECATED_API_CONSUMER"), nats.ManualAck()) + + js.Subscribe("METRICS.deletedAPI", func(msg *nats.Msg) { + msg.Ack() + var deletedAPI model.DeletedAPI + err := json.Unmarshal(msg.Data, &deletedAPI) if err != nil { log.Fatal(err) } - log.Printf("Kubepug Metrics Received: %#v, clustername: %s", metrics, metrics.ClusterName) - clickhouse.InsertKubepugEvent(connection, metrics) + log.Printf("Deleted API Received: %#v, clustername: %s", deletedAPI, deletedAPI.ClusterName) + clickhouse.InsertDeletedAPI(connection, deletedAPI) log.Println() - }, nats.Durable("KUBEPUG_EVENTS_CONSUMER"), nats.ManualAck()) + }, nats.Durable("DELETED_API_CONSUMER"), nats.ManualAck()) js.Subscribe("METRICS.event", func(msg *nats.Msg) { msg.Ack() diff --git a/model/depricatedapi.go b/model/depricatedapi.go index 0f36747f..6c5dbcf8 100644 --- a/model/depricatedapi.go +++ b/model/depricatedapi.go @@ -21,8 +21,9 @@ type DeprecatedAPI struct { Version string `json,yaml:"version,omitempty"` Name string `json,yaml:"name,omitempty"` // TODO: What is this boolean for? All APIs here aren't already marked as Deprecated? - Deprecated bool `json,yaml:"deprecated,omitempty"` - Items []Item `json,yaml:"deprecated_items,omitempty"` + Deprecated bool `json,yaml:"deprecated,omitempty"` + Items []Item `json,yaml:"deprecated_items,omitempty"` + ClusterName string `json,yaml:"clustername,omitempty"` } // Item definition of the Items inside a deprecated API @@ -40,8 +41,9 @@ type DeletedAPI struct { Version string `json,yaml:"version,omitempty"` Name string `json,yaml:"name,omitempty"` // TODO: What is this boolean for? All APIs here aren't already marked as Deleted? - Deleted bool `json,yaml:"deleted,omitempty"` - Items []Item `json,yaml:"deleted_items,omitempty"` + Deleted bool `json,yaml:"deleted,omitempty"` + Items []Item `json,yaml:"deleted_items,omitempty"` + ClusterName string `json,yaml:"clustername,omitempty"` } // Result to show final user