Skip to content

Commit

Permalink
Request realtime topic only after 30 seconds
Browse files Browse the repository at this point in the history
Bug fixes

Fixes #140
  • Loading branch information
alok87 committed Feb 19, 2021
1 parent 4edce3f commit a231442
Showing 1 changed file with 17 additions and 14 deletions.
31 changes: 17 additions & 14 deletions controllers/sink_group_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ func (s *sinkGroup) topicRealtime(
topic string,
cache *sync.Map,
) (
bool, error,
bool, *int64, error,
) {
// use cache to prevent calls to kafka
var realtimeCache kafkaRealtimeCache
Expand All @@ -597,16 +597,20 @@ func (s *sinkGroup) topicRealtime(
if cacheValid(time.Second*time.Duration(30), realtimeCache.lastCacheRefresh) {
klog.V(2).Infof("rsk/%s (realtime cache hit) topic: %s", s.rsk.Name, topic)
if realtimeCache.realtime {
return true, nil
return true, realtimeCache.lastCacheRefresh, nil
}
return false, nil
return false, realtimeCache.lastCacheRefresh, nil
}
}

// new cache refresh time so that topics are only checked after an interval
// reduces the request to Kafka by big factor
now := time.Now().UnixNano()

klog.V(2).Infof("rsk/%s (fetching realtime stats) topic: %s", s.rsk.Name, topic)
group, ok := s.topicGroups[topic]
if !ok {
return false, fmt.Errorf("groupID not found for %s", topic)
return false, &now, fmt.Errorf("groupID not found for %s", topic)
}
batcherCGID := consumerGroupID(s.rsk.Name, s.rsk.Namespace, group.ID, "-batcher")
batcherLag, err := watcher.ConsumerGroupLag(
Expand All @@ -615,11 +619,11 @@ func (s *sinkGroup) topicRealtime(
0,
)
if err != nil {
return false, err
return false, &now, err
}
if batcherLag == -1 {
klog.V(4).Infof("%v: lag=-1, condition unmet", batcherCGID)
return false, nil
return false, &now, nil
}

loaderCGID := consumerGroupID(s.rsk.Name, s.rsk.Namespace, group.ID, "-loader")
Expand All @@ -629,21 +633,21 @@ func (s *sinkGroup) topicRealtime(
0,
)
if err != nil {
return false, err
return false, &now, err
}
if loaderLag == -1 {
klog.V(4).Infof("%v: lag=-1, condition unmet", loaderCGID)
return false, nil
return false, &now, nil
}

klog.V(4).Infof("lag=%v for %v", batcherLag, batcherCGID)
klog.V(4).Infof("lag=%v for %v", loaderLag, loaderCGID)

if s.lagBelowThreshold(topic, batcherLag, loaderLag) {
return true, nil
return true, &now, nil
} else {
klog.V(2).Infof("%v: waiting to reach realtime", topic)
return false, nil
return false, &now, nil
}
}

Expand All @@ -658,8 +662,7 @@ func (s *sinkGroup) realtimeTopics(
realtimeTopics := []string{}

for _, topic := range s.topics {
now := time.Now().UnixNano()
realtime, err := s.topicRealtime(watcher, topic, cache)
realtime, lastRefresh, err := s.topicRealtime(watcher, topic, cache)
if err != nil {
klog.Errorf(
"rsk/%s Error getting realtime for topic: %s, err: %v",
Expand All @@ -671,15 +674,15 @@ func (s *sinkGroup) realtimeTopics(
// if there is an error in finding lag
// and the topic was already in realtime consider it realtime
if ok {
cache.Store(topic, kafkaRealtimeCache{lastCacheRefresh: &now, realtime: true})
cache.Store(topic, kafkaRealtimeCache{lastCacheRefresh: lastRefresh, realtime: true})
realtimeTopics = append(realtimeTopics, topic)
continue
}
}
if realtime {
realtimeTopics = append(realtimeTopics, topic)
}
cache.Store(topic, kafkaRealtimeCache{lastCacheRefresh: &now, realtime: realtime})
cache.Store(topic, kafkaRealtimeCache{lastCacheRefresh: lastRefresh, realtime: realtime})
}

return realtimeTopics
Expand Down

0 comments on commit a231442

Please sign in to comment.