Skip to content

Commit

Permalink
Merge pull request #57 from vijeyash1/develop
Browse files Browse the repository at this point in the history
modified the kubepug agent, client to split the single json data into…
  • Loading branch information
jebinjeb authored May 29, 2023
2 parents c208c63 + 2baa7d0 commit 9fd3bb5
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 33 deletions.
39 changes: 27 additions & 12 deletions agent/kubePreUpgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ import (
)

const (
baseURL = "https://raw.githubusercontent.com/kubernetes/kubernetes"
fileURL = "api/openapi-spec/swagger.json"
crdGroup = "apiextensions.k8s.io"
apiRegistration = "apiregistration.k8s.io"
kubeConfigFileName = "dev-config"
eventSubject_depricated_deleted = "METRICS.kubepug"
baseURL = "https://raw.githubusercontent.com/kubernetes/kubernetes"
fileURL = "api/openapi-spec/swagger.json"
crdGroup = "apiextensions.k8s.io"
apiRegistration = "apiregistration.k8s.io"
kubeConfigFileName = "dev-config"
eventSubject_deleted = "METRICS.deletedAPI"
eventSubject_depricated = "METRICS.deprecatedAPI"
)

type ignoreStruct map[string]struct{}
Expand All @@ -56,16 +57,30 @@ var (
var result *model.Result

func publishK8sDepricated_Deleted_Api(result *model.Result, js nats.JetStreamContext) error {
metrics := result
metrics.ClusterName = ClusterName
metricsJson, _ := json.Marshal(metrics)
_, err := js.Publish(eventSubject_depricated_deleted, metricsJson)
if err != nil {
return err
for _, deprecatedAPI := range result.DeprecatedAPIs {
deprecatedAPI.ClusterName = ClusterName
fmt.Println("deprecatedAPI", deprecatedAPI)
deprecatedAPIJson, _ := json.Marshal(deprecatedAPI)
_, err := js.Publish(eventSubject_depricated, deprecatedAPIJson)
if err != nil {
return err
}
}

for _, deletedAPI := range result.DeletedAPIs {
deletedAPI.ClusterName = ClusterName
fmt.Println("deletedAPI", deletedAPI)
deletedAPIJson, _ := json.Marshal(deletedAPI)
_, err := js.Publish(eventSubject_deleted, deletedAPIJson)
if err != nil {
return err
}
}

log.Printf("Metrics with Deletedapi and depricated api has been published")
return nil
}

func KubePreUpgradeDetector(js nats.JetStreamContext, wg *sync.WaitGroup, errCh chan error) {
defer wg.Done()
swaggerdir, err := os.MkdirTemp("", "kubepug")
Expand Down
86 changes: 75 additions & 11 deletions clickhouse/clickhouse_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,31 @@ func CreateSchema(connect *sql.DB) {

func CreateKubePugSchema(connect *sql.DB) {
_, err := connect.Exec(`
CREATE TABLE IF NOT EXISTS deprecatedAPIs_and_deletedAPIs (
result String,
cluster_name String
CREATE TABLE IF NOT EXISTS DeprecatedAPIs (
Description String,
Kind String,
Deprecated UInt8,
ClusterName String,
Scope String,
ObjectName String
) engine=File(TabSeparated)
`)
`)
if err != nil {
log.Fatal(err)
}

_, err = connect.Exec(`
CREATE TABLE IF NOT EXISTS DeletedAPIs (
Group String,
Kind String,
Version String,
Name String,
Deleted UInt8,
ClusterName String,
Scope String,
ObjectName String
) engine=File(TabSeparated)
`)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -133,19 +153,63 @@ func InsertOutdatedEvent(connect *sql.DB, metrics model.CheckResultfinal) {
}
}

func InsertKubepugEvent(connect *sql.DB, metrics model.Result) {
func InsertDeprecatedAPI(connect *sql.DB, deprecatedAPI model.DeprecatedAPI) {
var (
tx, _ = connect.Begin()
stmt, _ = tx.Prepare("INSERT INTO deprecatedAPIs_and_deletedAPIs (result, cluster_name) VALUES (?, ?)")
stmt, _ = tx.Prepare("INSERT INTO DeprecatedAPIs (Description, Kind, Deprecated, ClusterName, Scope, ObjectName) VALUES (?, ?, ?, ?, ?, ?)")
)
defer stmt.Close()
eventJson, _ := json.Marshal(metrics)
if _, err := stmt.Exec(
string(eventJson),
metrics.ClusterName,
); err != nil {

deprecated := uint8(0)
if deprecatedAPI.Deprecated {
deprecated = 1
}

for _, item := range deprecatedAPI.Items {
if _, err := stmt.Exec(
deprecatedAPI.Description,
deprecatedAPI.Kind,
deprecated,
deprecatedAPI.ClusterName,
item.Scope,
item.ObjectName,
); err != nil {
log.Fatal(err)
}
}

if err := tx.Commit(); err != nil {
log.Fatal(err)
}
}

func InsertDeletedAPI(connect *sql.DB, deletedAPI model.DeletedAPI) {
var (
tx, _ = connect.Begin()
stmt, _ = tx.Prepare("INSERT INTO DeletedAPIs (Group, Kind, Version, Name, Deleted, ClusterName, Scope, ObjectName) VALUES (?, ?, ?, ?, ?, ?, ?, ?)")
)
defer stmt.Close()

deleted := uint8(0)
if deletedAPI.Deleted {
deleted = 1
}

for _, item := range deletedAPI.Items {
if _, err := stmt.Exec(
deletedAPI.Group,
deletedAPI.Kind,
deletedAPI.Version,
deletedAPI.Name,
deleted,
deletedAPI.ClusterName,
item.Scope,
item.ObjectName,
); err != nil {
log.Fatal(err)
}
}

if err := tx.Commit(); err != nil {
log.Fatal(err)
}
Expand Down
24 changes: 18 additions & 6 deletions client/k8smetrics_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,29 @@ func main() {

}, nats.Durable("OUTDATED_EVENTS_CONSUMER"), nats.ManualAck())
// consume kubepug result and insert in clickhouse
js.Subscribe("METRICS.kubepug", func(msg *nats.Msg) {
js.Subscribe("METRICS.deprecatedAPI", func(msg *nats.Msg) {
msg.Ack()
var metrics model.Result
err := json.Unmarshal(msg.Data, &metrics)
var deprecatedAPI model.DeprecatedAPI
err := json.Unmarshal(msg.Data, &deprecatedAPI)
if err != nil {
log.Fatal(err)
}
log.Printf("Deprecated API Received: %#v, clustername: %s", deprecatedAPI, deprecatedAPI.ClusterName)
clickhouse.InsertDeprecatedAPI(connection, deprecatedAPI)
log.Println()
}, nats.Durable("DEPRECATED_API_CONSUMER"), nats.ManualAck())

js.Subscribe("METRICS.deletedAPI", func(msg *nats.Msg) {
msg.Ack()
var deletedAPI model.DeletedAPI
err := json.Unmarshal(msg.Data, &deletedAPI)
if err != nil {
log.Fatal(err)
}
log.Printf("Kubepug Metrics Received: %#v, clustername: %s", metrics, metrics.ClusterName)
clickhouse.InsertKubepugEvent(connection, metrics)
log.Printf("Deleted API Received: %#v, clustername: %s", deletedAPI, deletedAPI.ClusterName)
clickhouse.InsertDeletedAPI(connection, deletedAPI)
log.Println()
}, nats.Durable("KUBEPUG_EVENTS_CONSUMER"), nats.ManualAck())
}, nats.Durable("DELETED_API_CONSUMER"), nats.ManualAck())

js.Subscribe("METRICS.event", func(msg *nats.Msg) {
msg.Ack()
Expand Down
10 changes: 6 additions & 4 deletions model/depricatedapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ type DeprecatedAPI struct {
Version string `json,yaml:"version,omitempty"`
Name string `json,yaml:"name,omitempty"`
// TODO: What is this boolean for? All APIs here aren't already marked as Deprecated?
Deprecated bool `json,yaml:"deprecated,omitempty"`
Items []Item `json,yaml:"deprecated_items,omitempty"`
Deprecated bool `json,yaml:"deprecated,omitempty"`
Items []Item `json,yaml:"deprecated_items,omitempty"`
ClusterName string `json,yaml:"clustername,omitempty"`
}

// Item definition of the Items inside a deprecated API
Expand All @@ -40,8 +41,9 @@ type DeletedAPI struct {
Version string `json,yaml:"version,omitempty"`
Name string `json,yaml:"name,omitempty"`
// TODO: What is this boolean for? All APIs here aren't already marked as Deleted?
Deleted bool `json,yaml:"deleted,omitempty"`
Items []Item `json,yaml:"deleted_items,omitempty"`
Deleted bool `json,yaml:"deleted,omitempty"`
Items []Item `json,yaml:"deleted_items,omitempty"`
ClusterName string `json,yaml:"clustername,omitempty"`
}

// Result to show final user
Expand Down

0 comments on commit 9fd3bb5

Please sign in to comment.