Skip to content

Commit

Permalink
Merge pull request #62 from vijeyash1/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
jebinjeb authored Jun 13, 2023
2 parents d648fe0 + 03d6608 commit d82ae7f
Show file tree
Hide file tree
Showing 8 changed files with 947 additions and 28 deletions.
19 changes: 13 additions & 6 deletions agent/k8smetrics_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import (
// Or uncomment to load specific auth plugins
"fmt"

"github.com/ghodss/yaml"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
_ "k8s.io/client-go/plugin/pkg/client/auth/azure"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
_ "k8s.io/client-go/plugin/pkg/client/auth/openstack"

"github.com/ghodss/yaml"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
// _ "k8s.io/client-go/plugin/pkg/client/auth/openstack"
"k8s.io/client-go/tools/cache"
)

Expand All @@ -57,9 +57,10 @@ func main() {
kubePreUpgradeChan := make(chan error, 1)
getAllResourceChan := make(chan error, 1)
clusterMetricsChan := make(chan error, 1)
RakeesErrChan := make(chan error, 1)
var wg sync.WaitGroup
// waiting for 4 go routines
wg.Add(4)
wg.Add(5)
// connecting with nats ...
nc, err := nats.Connect(natsurl, nats.Name("K8s Metrics"), nats.Token(token))
checkErr(err)
Expand All @@ -75,14 +76,16 @@ func main() {
go outDatedImages(js, &wg, outdatedErrChan)
go KubePreUpgradeDetector(js, &wg, kubePreUpgradeChan)
go GetAllResources(js, &wg, getAllResourceChan)
getK8sEvents(clientset)
go RakeesOutput(js, &wg, RakeesErrChan)
go getK8sEvents(clientset)
go publishMetrics(clientset, js, &wg, clusterMetricsChan)
wg.Wait()
// once the go routines completes we will close the error channels
close(outdatedErrChan)
close(kubePreUpgradeChan)
close(getAllResourceChan)
close(clusterMetricsChan)
close(RakeesErrChan)
// for loop will wait for the error channels
// logs if any error occurs
for {
Expand All @@ -103,6 +106,10 @@ func main() {
if err != nil {
log.Println(err)
}
case err := <-RakeesErrChan:
if err != nil {
log.Println(err)
}
}
}

Expand Down
123 changes: 123 additions & 0 deletions agent/rakees_agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"

"github.com/kube-tarian/kubviz/agent/rakkess"
"github.com/kube-tarian/kubviz/model"
"github.com/nats-io/nats.go"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

const eventSubject_rakees = "METRICS.rakees"

func accessToOutcome(access rakkess.Access) (rakkess.Outcome, error) {
switch access {
case 0:
return rakkess.None, nil
case 1:
return rakkess.Up, nil
case 2:
return rakkess.Down, nil
case 3:
return rakkess.Err, nil
default:
return rakkess.None, fmt.Errorf("unknown access code: %d", access)
}
}

func RakeesOutput(js nats.JetStreamContext, wg *sync.WaitGroup, errCh chan error) {
// var kubeConfig string
// if home := homedir.HomeDir(); home != "" {
// kubeConfig = filepath.Join(home, ".kube", "dev-config")
// }
// Build Kubernetes configuration from flags
// config, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
config, err := rest.InClusterConfig()
if err != nil {
log.Fatal(err)
}

// Create a new Kubernetes client
client, err := kubernetes.NewForConfig(config)
if err != nil {
errCh <- err
}

// Retrieve all available resource types
resourceList, err := client.Discovery().ServerPreferredResources()
if err != nil {
errCh <- err
}
var opts = rakkess.NewRakkessOptions()
opts.Verbs = []string{"list", "create", "update", "delete"}
opts.OutputFormat = "icon-table"
opts.ResourceList = resourceList

ctx, cancel := context.WithCancel(context.Background())
catchCtrlC(cancel)

res, err := rakkess.Resource(ctx, opts)
if err != nil {
fmt.Println("Error")
errCh <- err
}
fmt.Println("Result..")
for resourceType, access := range res {
createOutcome, err := accessToOutcome(access["create"])
if err != nil {
errCh <- err
}
deleteOutcome, err := accessToOutcome(access["delete"])
if err != nil {
errCh <- err
}
listOutcome, err := accessToOutcome(access["list"])
if err != nil {
errCh <- err
}
updateOutcome, err := accessToOutcome(access["update"])
if err != nil {
errCh <- err
}
metrics := model.RakeesMetrics{
ClusterName: ClusterName,
Name: resourceType,
Create: rakkess.HumanreadableAccessCode(createOutcome),
Delete: rakkess.HumanreadableAccessCode(deleteOutcome),
List: rakkess.HumanreadableAccessCode(listOutcome),
Update: rakkess.HumanreadableAccessCode(updateOutcome),
}
metricsJson, _ := json.Marshal(metrics)
_, err = js.Publish(eventSubject_rakees, metricsJson)
if err != nil {
errCh <- err
}
log.Printf("Metrics with resource %s has been published", resourceType)
}
// t := res.Table(opts.Verbs)
// t.Render(opts.Streams.Out, opts.OutputFormat)

}

func catchCtrlC(cancel context.CancelFunc) {
catchSigs(cancel, syscall.SIGINT, syscall.SIGPIPE, syscall.SIGTERM)
}

func catchSigs(cancel context.CancelFunc, sigs ...os.Signal) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, sigs...)

go func() {
<-sigChan
cancel()
}()
}
Loading

0 comments on commit d82ae7f

Please sign in to comment.