Skip to content

Commit

Permalink
Merge pull request #233 from intelops/newshedule
Browse files Browse the repository at this point in the history
Added Scheduler
  • Loading branch information
vijeyash1 authored Sep 15, 2023
2 parents bf96176 + c37d23b commit d43a090
Show file tree
Hide file tree
Showing 7 changed files with 417 additions and 41 deletions.
11 changes: 9 additions & 2 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,15 @@ import (
)

type AgentConfigurations struct {
SANamespace string `envconfig:"SA_NAMESPACE" default:"default"`
SAName string `envconfig:"SA_NAME" default:"default"`
SANamespace string `envconfig:"SA_NAMESPACE" default:"default"`
SAName string `envconfig:"SA_NAME" default:"default"`
OutdatedInterval string `envconfig:"OUTDATED_INTERVAL" default:"*/20 * * * *"`
GetAllInterval string `envconfig:"GETALL_INTERVAL" default:"*/30 * * * *"`
KubeScoreInterval string `envconfig:"KUBESCORE_INTERVAL" default:"*/40 * * * *"`
RakkessInterval string `envconfig:"RAKKESS_INTERVAL" default:"*/50 * * * *"`
KubePreUpgradeInterval string `envconfig:"KUBEPREUPGRADE_INTERVAL" default:"*/60 * * * *"`
TrivyInterval string `envconfig:"TRIVY_INTERVAL" default:"*/10 * * * *"`
SchedulerEnable bool `envconfig:"SCHEDULER_ENABLE" default:"false"`
}

func GetAgentConfigurations() (serviceConf *AgentConfigurations, err error) {
Expand Down
105 changes: 97 additions & 8 deletions agent/kubviz/k8smetrics_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ import (
"encoding/json"
"log"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"

"github.com/intelops/go-common/logging"

"github.com/go-co-op/gocron"
"github.com/nats-io/nats.go"

Expand All @@ -22,6 +26,7 @@ import (

"fmt"

"github.com/intelops/kubviz/agent/config"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
_ "k8s.io/client-go/plugin/pkg/client/auth/azure"
Expand Down Expand Up @@ -77,6 +82,10 @@ func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
env := Production
clusterMetricsChan := make(chan error, 1)
cfg, err := config.GetAgentConfigurations()
if err != nil {
log.Fatal("Failed to retrieve agent configurations", err)
}
var (
config *rest.Config
clientset *kubernetes.Clientset
Expand Down Expand Up @@ -126,15 +135,30 @@ func main() {
if schedulingIntervalStr == "" {
schedulingIntervalStr = "20m"
}
schedulingInterval, err := time.ParseDuration(schedulingIntervalStr)
if err != nil {
log.Fatalf("Failed to parse SCHEDULING_INTERVAL: %v", err)
if cfg.SchedulerEnable { // Assuming "cfg.Schedule" is a boolean indicating whether to schedule or not.
scheduler := initScheduler(config, js, *cfg, clientset)

// Start the scheduler
scheduler.Start()
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
<-signals

scheduler.Stop()
} else {
if schedulingIntervalStr == "" {
schedulingIntervalStr = "20m"
}
schedulingInterval, err := time.ParseDuration(schedulingIntervalStr)
if err != nil {
log.Fatalf("Failed to parse SCHEDULING_INTERVAL: %v", err)
}
s := gocron.NewScheduler(time.UTC)
s.Every(schedulingInterval).Do(func() {
collectAndPublishMetrics()
})
s.StartBlocking()
}
s := gocron.NewScheduler(time.UTC)
s.Every(schedulingInterval).Do(func() {
collectAndPublishMetrics()
})
s.StartBlocking()
}

// publishMetrics publishes stream of events
Expand Down Expand Up @@ -272,3 +296,68 @@ func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext) {
time.Sleep(time.Second)
}
}
func initScheduler(config *rest.Config, js nats.JetStreamContext, cfg config.AgentConfigurations, clientset *kubernetes.Clientset) (s *Scheduler) {
log := logging.NewLogger()
s = NewScheduler(log)
if cfg.OutdatedInterval != "" {
sj, err := NewOutDatedImagesJob(config, js, cfg.OutdatedInterval)
if err != nil {
log.Fatal("no time interval", err)
}
err = s.AddJob("Outdated", sj)
if err != nil {
log.Fatal("failed to do job", err)
}
}
if cfg.GetAllInterval != "" {
sj, err := NewKetallJob(config, js, cfg.GetAllInterval)
if err != nil {
log.Fatal("no time interval", err)
}
err = s.AddJob("GetALL", sj)
if err != nil {
log.Fatal("failed to do job", err)
}
}
if cfg.KubeScoreInterval != "" {
sj, err := NewKubescoreJob(clientset, js, cfg.KubeScoreInterval)
if err != nil {
log.Fatal("no time interval", err)
}
err = s.AddJob("KubeScore", sj)
if err != nil {
log.Fatal("failed to do job", err)
}
}
if cfg.RakkessInterval != "" {
sj, err := NewRakkessJob(config, js, cfg.RakkessInterval)
if err != nil {
log.Fatal("no time interval", err)
}
err = s.AddJob("Rakkess", sj)
if err != nil {
log.Fatal("failed to do job", err)
}
}
if cfg.KubePreUpgradeInterval != "" {
sj, err := NewKubePreUpgradeJob(config, js, cfg.KubePreUpgradeInterval)
if err != nil {
log.Fatal("no time interval", err)
}
err = s.AddJob("KubePreUpgrade", sj)
if err != nil {
log.Fatal("failed to do job", err)
}
}
if cfg.TrivyInterval != "" {
sj, err := NewTrivyJob(config, js, cfg.TrivyInterval)
if err != nil {
log.Fatal("no time interval", err)
}
err = s.AddJob("Trivy", sj)
if err != nil {
log.Fatal("failed to do job", err)
}
}
return
}
88 changes: 88 additions & 0 deletions agent/kubviz/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package main

import (
"sync"

"github.com/pkg/errors"
"github.com/robfig/cron/v3"

"github.com/intelops/go-common/logging"
)

type jobHandler interface {
CronSpec() string
Run()
}

type Scheduler struct {
log logging.Logger
jobs map[string]jobHandler
cronIDs map[string]cron.EntryID
c *cron.Cron
cronMutex *sync.Mutex
}

func NewScheduler(log logging.Logger) *Scheduler {
clog := cron.VerbosePrintfLogger(log.(logging.StdLogger))
return &Scheduler{
log: log,
c: cron.New(cron.WithChain(cron.SkipIfStillRunning(clog), cron.Recover(clog))),
jobs: map[string]jobHandler{},
cronIDs: map[string]cron.EntryID{},
cronMutex: &sync.Mutex{},
}
}

func (t *Scheduler) AddJob(jobName string, job jobHandler) error {
t.cronMutex.Lock()
defer t.cronMutex.Unlock()
_, ok := t.cronIDs[jobName]
if ok {
return errors.Errorf("%s job already exists", jobName)
}
spec := job.CronSpec()
if spec == "" {
return errors.Errorf("%s job has no cron spec", jobName)
}
entryID, err := t.c.AddJob(spec, job)
if err != nil {
return errors.WithMessagef(err, "%s job cron spec not valid", jobName)
}

t.jobs[jobName] = job
t.cronIDs[jobName] = entryID
t.log.Infof("%s job added with cron '%s'", jobName, spec)
return nil
}

// RemoveJob ...
func (t *Scheduler) RemoveJob(jobName string) error {
t.cronMutex.Lock()
defer t.cronMutex.Unlock()
entryID, ok := t.cronIDs[jobName]
if !ok {
return errors.Errorf("%s job not exist", jobName)
}

t.c.Remove(entryID)
delete(t.jobs, jobName)
delete(t.cronIDs, jobName)
t.log.Infof("%s job removed", jobName)
return nil
}

func (t *Scheduler) Start() {
t.c.Start()
t.log.Infof("Job scheduler started")
}

func (t *Scheduler) Stop() {
t.c.Stop()
t.log.Infof("Job scheduler stopped")
}

func (t *Scheduler) GetJobs() map[string]jobHandler {
t.cronMutex.Lock()
defer t.cronMutex.Unlock()
return t.jobs
}
138 changes: 138 additions & 0 deletions agent/kubviz/scheduler_watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package main

import (
"github.com/nats-io/nats.go"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

type OutDatedImagesJob struct {
config *rest.Config
js nats.JetStreamContext
frequency string
}

type KetallJob struct {
config *rest.Config
js nats.JetStreamContext
frequency string
}
type TrivyJob struct {
config *rest.Config
js nats.JetStreamContext
frequency string
}
type RakkessJob struct {
config *rest.Config
js nats.JetStreamContext
frequency string
}
type KubePreUpgradeJob struct {
config *rest.Config
js nats.JetStreamContext
frequency string
}
type KubescoreJob struct {
clientset *kubernetes.Clientset
js nats.JetStreamContext
frequency string
}

func NewOutDatedImagesJob(config *rest.Config, js nats.JetStreamContext, frequency string) (*OutDatedImagesJob, error) {
return &OutDatedImagesJob{
config: config,
js: js,
frequency: frequency,
}, nil
}
func (v *OutDatedImagesJob) CronSpec() string {
return v.frequency
}

func (j *OutDatedImagesJob) Run() {
// Call the outDatedImages function with the provided config and js
err := outDatedImages(j.config, j.js)
LogErr(err)
}
func NewKetallJob(config *rest.Config, js nats.JetStreamContext, frequency string) (*KetallJob, error) {
return &KetallJob{
config: config,
js: js,
frequency: frequency,
}, nil
}
func (v *KetallJob) CronSpec() string {
return v.frequency
}

func (j *KetallJob) Run() {
// Call the Ketall function with the provided config and js
err := GetAllResources(j.config, j.js)
LogErr(err)
}

func NewKubePreUpgradeJob(config *rest.Config, js nats.JetStreamContext, frequency string) (*KubePreUpgradeJob, error) {
return &KubePreUpgradeJob{
config: config,
js: js,
frequency: frequency,
}, nil
}
func (v *KubePreUpgradeJob) CronSpec() string {
return v.frequency
}

func (j *KubePreUpgradeJob) Run() {
// Call the Kubepreupgrade function with the provided config and js
err := GetAllResources(j.config, j.js)
LogErr(err)
}

func NewKubescoreJob(clientset *kubernetes.Clientset, js nats.JetStreamContext, frequency string) (*KubescoreJob, error) {
return &KubescoreJob{
clientset: clientset,
js: js,
frequency: frequency,
}, nil
}
func (v *KubescoreJob) CronSpec() string {
return v.frequency
}

func (j *KubescoreJob) Run() {
// Call the Kubescore function with the provided config and js
err := RunKubeScore(j.clientset, j.js)
LogErr(err)
}
func NewRakkessJob(config *rest.Config, js nats.JetStreamContext, frequency string) (*RakkessJob, error) {
return &RakkessJob{
config: config,
js: js,
frequency: frequency,
}, nil
}
func (v *RakkessJob) CronSpec() string {
return v.frequency
}

func (j *RakkessJob) Run() {
// Call the Rakkes function with the provided config and js
err := RakeesOutput(j.config, j.js)
LogErr(err)
}
func NewTrivyJob(config *rest.Config, js nats.JetStreamContext, frequency string) (*TrivyJob, error) {
return &TrivyJob{
config: config,
js: js,
frequency: frequency,
}, nil
}
func (v *TrivyJob) CronSpec() string {
return v.frequency
}

func (j *TrivyJob) Run() {
// Call the Trivy function with the provided config and js
err := runTrivyScans(j.config, j.js)
LogErr(err)
}
Loading

0 comments on commit d43a090

Please sign in to comment.