diff --git a/controllers/sink_group_controller.go b/controllers/sink_group_controller.go index 7ed7ada8d..f455f8b1a 100644 --- a/controllers/sink_group_controller.go +++ b/controllers/sink_group_controller.go @@ -587,7 +587,7 @@ func (s *sinkGroup) topicRealtime( topic string, cache *sync.Map, ) ( - bool, error, + bool, *int64, error, ) { // use cache to prevent calls to kafka var realtimeCache kafkaRealtimeCache @@ -597,16 +597,20 @@ func (s *sinkGroup) topicRealtime( if cacheValid(time.Second*time.Duration(30), realtimeCache.lastCacheRefresh) { klog.V(2).Infof("rsk/%s (realtime cache hit) topic: %s", s.rsk.Name, topic) if realtimeCache.realtime { - return true, nil + return true, realtimeCache.lastCacheRefresh, nil } - return false, nil + return false, realtimeCache.lastCacheRefresh, nil } } + // new cache refresh time so that topics are only checked after an interval + // reduces the request to Kafka by big factor + now := time.Now().UnixNano() + klog.V(2).Infof("rsk/%s (fetching realtime stats) topic: %s", s.rsk.Name, topic) group, ok := s.topicGroups[topic] if !ok { - return false, fmt.Errorf("groupID not found for %s", topic) + return false, &now, fmt.Errorf("groupID not found for %s", topic) } batcherCGID := consumerGroupID(s.rsk.Name, s.rsk.Namespace, group.ID, "-batcher") batcherLag, err := watcher.ConsumerGroupLag( @@ -615,11 +619,11 @@ func (s *sinkGroup) topicRealtime( 0, ) if err != nil { - return false, err + return false, &now, err } if batcherLag == -1 { klog.V(4).Infof("%v: lag=-1, condition unmet", batcherCGID) - return false, nil + return false, &now, nil } loaderCGID := consumerGroupID(s.rsk.Name, s.rsk.Namespace, group.ID, "-loader") @@ -629,21 +633,21 @@ func (s *sinkGroup) topicRealtime( 0, ) if err != nil { - return false, err + return false, &now, err } if loaderLag == -1 { klog.V(4).Infof("%v: lag=-1, condition unmet", loaderCGID) - return false, nil + return false, &now, nil } klog.V(4).Infof("lag=%v for %v", batcherLag, batcherCGID) klog.V(4).Infof("lag=%v for %v", loaderLag, loaderCGID) if s.lagBelowThreshold(topic, batcherLag, loaderLag) { - return true, nil + return true, &now, nil } else { klog.V(2).Infof("%v: waiting to reach realtime", topic) - return false, nil + return false, &now, nil } } @@ -658,8 +662,7 @@ func (s *sinkGroup) realtimeTopics( realtimeTopics := []string{} for _, topic := range s.topics { - now := time.Now().UnixNano() - realtime, err := s.topicRealtime(watcher, topic, cache) + realtime, lastRefresh, err := s.topicRealtime(watcher, topic, cache) if err != nil { klog.Errorf( "rsk/%s Error getting realtime for topic: %s, err: %v", @@ -671,7 +674,7 @@ func (s *sinkGroup) realtimeTopics( // if there is an error in finding lag // and the topic was already in realtime consider it realtime if ok { - cache.Store(topic, kafkaRealtimeCache{lastCacheRefresh: &now, realtime: true}) + cache.Store(topic, kafkaRealtimeCache{lastCacheRefresh: lastRefresh, realtime: true}) realtimeTopics = append(realtimeTopics, topic) continue } @@ -679,7 +682,7 @@ func (s *sinkGroup) realtimeTopics( if realtime { realtimeTopics = append(realtimeTopics, topic) } - cache.Store(topic, kafkaRealtimeCache{lastCacheRefresh: &now, realtime: realtime}) + cache.Store(topic, kafkaRealtimeCache{lastCacheRefresh: lastRefresh, realtime: realtime}) } return realtimeTopics