Skip to content

Commit

Permalink
testing
Browse files Browse the repository at this point in the history
  • Loading branch information
Nithunikzz committed Sep 12, 2023
1 parent 4665e32 commit 70efca9
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 34 deletions.
5 changes: 3 additions & 2 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
)

type AgentConfigurations struct {
SANamespace string `envconfig:"SA_NAMESPACE" default:"default"`
SAName string `envconfig:"SA_NAME" default:"default"`
SANamespace string `envconfig:"SA_NAMESPACE" default:"default"`
SAName string `envconfig:"SA_NAME" default:"default"`
OutdatedInterval string `envconfig:"OUTDATED_INTERVAL" default:"20m"`
}

func GetAgentConfigurations() (serviceConf *AgentConfigurations, err error) {
Expand Down
30 changes: 29 additions & 1 deletion agent/kubviz/k8smetrics_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"context"

"github.com/intelops/go-common/logging"
"github.com/intelops/kubviz/constants"
"github.com/intelops/kubviz/model"

Expand All @@ -29,6 +30,7 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"

// _ "k8s.io/client-go/plugin/pkg/client/auth/openstack"
"github.com/intelops/kubviz/agent/config"
"github.com/intelops/kubviz/agent/server"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -77,10 +79,15 @@ func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
env := Production
clusterMetricsChan := make(chan error, 1)
cfg, err := config.GetAgentConfigurations()
if err != nil {
log.Fatal("Failed to retrieve agent configurations", err)
}
var (
config *rest.Config
clientset *kubernetes.Clientset
)

// connecting with nats ...
nc, err := nats.Connect(natsurl, nats.Name("K8s Metrics"), nats.Token(token))
checkErr(err)
Expand All @@ -106,8 +113,13 @@ func main() {
}
go publishMetrics(clientset, js, clusterMetricsChan)
go server.StartServer()
scheduler := initScheduler(config, js, *cfg)

// Start the scheduler
scheduler.Start()

collectAndPublishMetrics := func() {
err := outDatedImages(config, js)
err := OutDatedImages(config, js)
LogErr(err)
err = KubePreUpgradeDetector(config, js)
LogErr(err)
Expand Down Expand Up @@ -274,3 +286,19 @@ func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext) {
time.Sleep(time.Second)
}
}
func initScheduler(config *rest.Config, js nats.JetStreamContext, cfg config.AgentConfigurations) (s *Scheduler) {
log := logging.NewLogger()
s = NewScheduler(log)
if cfg.OutdatedInterval != "" {
sj, err := NewOutDatedImagesJob(config, js, cfg.OutdatedInterval)
if err != nil {
log.Fatal("no time interval", err)
}
err = s.AddJob("Outdated", sj)
if err != nil {
log.Fatal("failed to do job", err)
}
}

return
}
5 changes: 3 additions & 2 deletions agent/kubviz/outdated.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"github.com/intelops/kubviz/constants"
"log"
"os"
"regexp"
Expand All @@ -15,6 +14,8 @@ import (
"sync"
"time"

"github.com/intelops/kubviz/constants"

"github.com/intelops/kubviz/model"
"github.com/nats-io/nats.go"

Expand Down Expand Up @@ -66,7 +67,7 @@ func PublishOutdatedImages(out model.CheckResultfinal, js nats.JetStreamContext)
return nil
}

func outDatedImages(config *rest.Config, js nats.JetStreamContext) error {
func OutDatedImages(config *rest.Config, js nats.JetStreamContext) error {
images, err := ListImages(config)
if err != nil {
log.Println("unable to list images")
Expand Down
29 changes: 29 additions & 0 deletions agent/kubviz/outdated_watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package main

import (
"github.com/nats-io/nats.go"
"k8s.io/client-go/rest"
)

type OutDatedImagesJob struct {
config *rest.Config
js nats.JetStreamContext
frequency string
}

func NewOutDatedImagesJob(config *rest.Config, js nats.JetStreamContext, frequency string) (*OutDatedImagesJob, error) {
return &OutDatedImagesJob{
config: config,
js: js,
frequency: frequency,
}, nil
}
func (v *OutDatedImagesJob) CronSpec() string {
return v.frequency
}

func (j *OutDatedImagesJob) Run() {
// Call the outDatedImages function with the provided config and js
err := OutDatedImages(j.config, j.js)
LogErr(err)
}
88 changes: 88 additions & 0 deletions agent/kubviz/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package main

import (
"sync"

"github.com/pkg/errors"
"github.com/robfig/cron/v3"

"github.com/intelops/go-common/logging"
)

type jobHandler interface {
CronSpec() string
Run()
}

type Scheduler struct {
log logging.Logger
jobs map[string]jobHandler
cronIDs map[string]cron.EntryID
c *cron.Cron
cronMutex *sync.Mutex
}

func NewScheduler(log logging.Logger) *Scheduler {
clog := cron.VerbosePrintfLogger(log.(logging.StdLogger))
return &Scheduler{
log: log,
c: cron.New(cron.WithChain(cron.SkipIfStillRunning(clog), cron.Recover(clog))),
jobs: map[string]jobHandler{},
cronIDs: map[string]cron.EntryID{},
cronMutex: &sync.Mutex{},
}
}

func (t *Scheduler) AddJob(jobName string, job jobHandler) error {
t.cronMutex.Lock()
defer t.cronMutex.Unlock()
_, ok := t.cronIDs[jobName]
if ok {
return errors.Errorf("%s job already exists", jobName)
}
spec := job.CronSpec()
if spec == "" {
return errors.Errorf("%s job has no cron spec", jobName)
}
entryID, err := t.c.AddJob(spec, job)
if err != nil {
return errors.WithMessagef(err, "%s job cron spec not valid", jobName)
}

t.jobs[jobName] = job
t.cronIDs[jobName] = entryID
t.log.Infof("%s job added with cron '%s'", jobName, spec)
return nil
}

// RemoveJob ...
func (t *Scheduler) RemoveJob(jobName string) error {
t.cronMutex.Lock()
defer t.cronMutex.Unlock()
entryID, ok := t.cronIDs[jobName]
if !ok {
return errors.Errorf("%s job not exist", jobName)
}

t.c.Remove(entryID)
delete(t.jobs, jobName)
delete(t.cronIDs, jobName)
t.log.Infof("%s job removed", jobName)
return nil
}

func (t *Scheduler) Start() {
t.c.Start()
t.log.Infof("Job scheduler started")
}

func (t *Scheduler) Stop() {
t.c.Stop()
t.log.Infof("Job scheduler stopped")
}

func (t *Scheduler) GetJobs() map[string]jobHandler {
t.cronMutex.Lock()
defer t.cronMutex.Unlock()
return t.jobs
}
24 changes: 16 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ require (
github.com/docker/docker v24.0.4+incompatible
github.com/genuinetools/reg v0.16.1
github.com/getkin/kin-openapi v0.118.0
github.com/ghodss/yaml v1.0.0
github.com/gin-gonic/gin v1.9.1
github.com/go-co-op/gocron v1.30.1
github.com/go-playground/webhooks/v6 v6.2.0
github.com/google/uuid v1.3.0
github.com/hashicorp/go-version v1.6.0
github.com/intelops/go-common v1.0.19
github.com/kelseyhightower/envconfig v1.4.0
github.com/nats-io/nats.go v1.27.1
github.com/pkg/errors v0.9.1
github.com/robfig/cron/v3 v3.0.1
github.com/sirupsen/logrus v1.9.3
golang.org/x/term v0.10.0
golang.org/x/term v0.11.0
k8s.io/api v0.27.3
k8s.io/apimachinery v0.27.3
k8s.io/cli-runtime v0.27.3
Expand All @@ -30,6 +31,9 @@ require (
)

require (
cloud.google.com/go v0.110.6 // indirect
cloud.google.com/go/iam v1.1.1 // indirect
cloud.google.com/go/storage v1.30.1 // indirect
github.com/ClickHouse/ch-go v0.52.1 // indirect
github.com/CycloneDX/cyclonedx-go v0.7.2-0.20230625092137-07e2f29defc3 // indirect
github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092 // indirect
Expand Down Expand Up @@ -69,6 +73,7 @@ require (
github.com/google/go-containerregistry v0.15.2 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/googleapis/gax-go/v2 v2.11.0 // indirect
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect
github.com/imdario/mergo v0.3.15 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand Down Expand Up @@ -105,10 +110,10 @@ require (
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/samber/lo v1.38.1 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/showa-93/go-mask v0.6.0 // indirect
github.com/spdx/tools-golang v0.5.0 // indirect
github.com/spf13/cobra v1.7.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
Expand All @@ -126,15 +131,18 @@ require (
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.10.0 // indirect
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect
golang.org/x/net v0.11.0 // indirect
golang.org/x/oauth2 v0.7.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.10.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/oauth2 v0.11.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.126.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230911183012-2d3300fd4832 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
Loading

0 comments on commit 70efca9

Please sign in to comment.