Skip to content

Commit

Permalink
removed pause and resume for events
Browse files Browse the repository at this point in the history
  • Loading branch information
vijeyash1 committed Sep 7, 2023
1 parent 826a036 commit 0cb54e5
Showing 1 changed file with 6 additions and 18 deletions.
24 changes: 6 additions & 18 deletions agent/kubviz/k8smetrics_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ func main() {
}
clientset = getK8sClient(config)
}
controlChan := make(chan bool)
go publishMetrics(clientset, js, clusterMetricsChan, controlChan)
go publishMetrics(clientset, js, clusterMetricsChan)

collectAndPublishMetrics := func() {
err := outDatedImages(config, js)
Expand All @@ -123,9 +122,7 @@ func main() {
LogErr(err)
}

controlChan <- true
collectAndPublishMetrics()
controlChan <- true
if schedulingIntervalStr == "" {
schedulingIntervalStr = "20m"
}
Expand All @@ -135,17 +132,15 @@ func main() {
}
s := gocron.NewScheduler(time.UTC)
s.Every(schedulingInterval).Do(func() {
controlChan <- true
collectAndPublishMetrics()
controlChan <- true
})
s.StartBlocking()
}

// publishMetrics publishes stream of events
// with subject "METRICS.created"
func publishMetrics(clientset *kubernetes.Clientset, js nats.JetStreamContext, errCh chan error, controlChan <-chan bool) {
watchK8sEvents(clientset, js, controlChan)
func publishMetrics(clientset *kubernetes.Clientset, js nats.JetStreamContext, errCh chan error) {
watchK8sEvents(clientset, js)
errCh <- nil
}

Expand Down Expand Up @@ -243,7 +238,7 @@ func LogErr(err error) {
log.Println(err)
}
}
func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext, controlChan <-chan bool) {
func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext) {
watchlist := cache.NewListWatchFromClient(
clientset.CoreV1().RESTClient(),
"events",
Expand All @@ -270,17 +265,10 @@ func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext, c
},
)
stop := make(chan struct{})
defer close(stop)
go controller.Run(stop)

for {
select {
case <-controlChan:
close(stop)
<-controlChan
stop = make(chan struct{})
go controller.Run(stop)
default:
time.Sleep(time.Second)
}
time.Sleep(time.Second)
}
}

0 comments on commit 0cb54e5

Please sign in to comment.