diff --git a/cmd/main.go b/cmd/main.go index 1b28a83..8c21417 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -35,13 +35,16 @@ var ( func runController(ctx context.Context, cluster string, log *logrus.Entry, clientset *kubernetes.Clientset, uploader firehose.Uploader) error { // load nodes nodesInformer := controller.NewNodesInformer() - loaded := nodesInformer.Load(ctx, cluster, clientset) + loaded, err := nodesInformer.Load(ctx, log, cluster, clientset) + if err != nil { + return errors.Wrap(err, "loading nodes") + } // wait for nodes to be loaded <-loaded // create controller and run it scanner := controller.New(log, clientset, uploader, nodesInformer) - err := scanner.Run(ctx) + err = scanner.Run(ctx) if err != nil { return errors.Wrap(err, "running scanner controller") } diff --git a/internal/controller/controller.go b/internal/controller/controller.go index c08e8f4..c9eb6a9 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -17,9 +17,10 @@ import ( ) const ( - syncPeriod = 15 * time.Minute - syncPeriodDebug = 5 * time.Minute - developModeKey contextKey = "develop-mode" + podCacheSyncPeriod = 5 * time.Minute + syncPeriod = 15 * time.Minute + syncPeriodDebug = 5 * time.Minute + developModeKey contextKey = "develop-mode" ) type contextKey string @@ -64,7 +65,7 @@ func (s *scanner) DeletePod(obj interface{}) { // get the node info from the cache node, ok := s.nodeInformer.GetNode(pod.Spec.NodeName) if !ok { - s.log.Warnf("getting node %s from cache", pod.Spec.NodeName) + s.log.Warnf("failed to get node %s from cache", pod.Spec.NodeName) } // convert PodInfo to usage record now := time.Now() @@ -89,7 +90,7 @@ func (s *scanner) Run(ctx context.Context) error { DisableChunking: true, }, &v1.Pod{}, - syncPeriod, + podCacheSyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) @@ -116,44 +117,44 @@ func (s *scanner) Run(ctx context.Context) error { tick = syncPeriodDebug } + // upload function + upload := func() { + // get the list of pods from the cache + pods := podInformer.GetStore().List() + // convert PodInfo to usage record + now := time.Now() + beginTime := now.Add(-syncPeriod) + records := make([]*usage.PodInfo, 0, len(pods)) + for _, obj := range pods { + pod := obj.(*v1.Pod) + // get the node info from the cache + node, ok := s.nodeInformer.GetNode(pod.Spec.NodeName) + if !ok { + s.log.Warnf("getting node %s from cache", pod.Spec.NodeName) + } + record := usage.GetPodInfo(s.log, pod, beginTime, now, node) + records = append(records, record) + } + // add deleted pods and clear the list if any + if len(s.deletedPods) > 0 { + s.log.WithField("count", len(s.deletedPods)).Debug("adding deleted pods to the pod records") + records = append(records, s.deletedPods...) + s.deletedPods = make([]*usage.PodInfo, 0) + } + // upload the records to EKS Lens + s.log.WithField("count", len(records)).Debug("uploading pod records to EKS Lens") + err = s.uploader.Upload(ctx, records) + if err != nil { + s.log.WithError(err).Error("uploading pods records to EKS Lens") + } + } + // upload first time + upload() + // get pod list from the cache every "syncPeriod/DevelopMode" minutes ticker := time.NewTicker(tick) defer ticker.Stop() for { - upload := func() { - // get the list of pods from the cache - pods := podInformer.GetStore().List() - // convert PodInfo to usage record - now := time.Now() - beginTime := now.Add(-syncPeriod) - records := make([]*usage.PodInfo, 0, len(pods)) - for _, obj := range pods { - pod := obj.(*v1.Pod) - // get the node info from the cache - node, ok := s.nodeInformer.GetNode(pod.Spec.NodeName) - if !ok { - s.log.Warnf("getting node %s from cache", pod.Spec.NodeName) - } - record := usage.GetPodInfo(s.log, pod, beginTime, now, node) - records = append(records, record) - } - // add deleted pods and clear the list if any - if len(s.deletedPods) > 0 { - s.log.WithField("count", len(s.deletedPods)).Debug("adding deleted pods to the pod records") - records = append(records, s.deletedPods...) - s.deletedPods = make([]*usage.PodInfo, 0) - } - // upload the records to EKS Lens - s.log.WithField("count", len(records)).Debug("uploading pod records to EKS Lens") - err = s.uploader.Upload(ctx, records) - if err != nil { - s.log.WithError(err).Error("uploading pods records to EKS Lens") - } - } - - // upload first time - upload() - select { case <-ctx.Done(): return nil diff --git a/internal/controller/nodes.go b/internal/controller/nodes.go index 410c9d6..875d910 100644 --- a/internal/controller/nodes.go +++ b/internal/controller/nodes.go @@ -2,11 +2,12 @@ package controller import ( "context" - "log" "sync" "time" "github.com/doitintl/eks-lens-agent/internal/usage" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -15,8 +16,18 @@ import ( "k8s.io/client-go/tools/cache" ) +const ( + delayDelete = 1 * time.Minute + nodeCacheSyncPeriod = 5 * time.Minute +) + +var ( + // ErrCacheSync is returned when the cache fails to sync + ErrCacheSync = errors.New("failed to sync cache") +) + type NodesInformer interface { - Load(ctx context.Context, cluster string, clientset kubernetes.Interface) chan bool + Load(ctx context.Context, log *logrus.Entry, cluster string, clientset kubernetes.Interface) (chan bool, error) GetNode(nodeName string) (*usage.NodeInfo, bool) } @@ -39,9 +50,11 @@ func (n *NodesMap) GetNode(nodeName string) (*usage.NodeInfo, bool) { } // Load loads the NodesMap with the current nodes in the cluster return channel to signal when the map is loaded -func (n *NodesMap) Load(ctx context.Context, cluster string, clientset kubernetes.Interface) chan bool { +// +//nolint:funlen +func (n *NodesMap) Load(ctx context.Context, log *logrus.Entry, cluster string, clientset kubernetes.Interface) (chan bool, error) { // Create a new Node informer - nodeInformer := cache.NewSharedIndexInformer( + nodeInformer := cache.NewSharedInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (object runtime.Object, err error) { return clientset.CoreV1().Nodes().List(context.Background(), options) //nolint:wrapcheck @@ -51,20 +64,19 @@ func (n *NodesMap) Load(ctx context.Context, cluster string, clientset kubernete }, }, &v1.Node{}, - 0, // resyncPeriod - cache.Indexers{}, + nodeCacheSyncPeriod, ) // create stopper channel stopper := make(chan struct{}) - defer close(stopper) // Start the Node informer go nodeInformer.Run(stopper) // Wait for the Node informer to sync + log.Debug("waiting for node informer to sync") if !cache.WaitForCacheSync(make(chan struct{}), nodeInformer.HasSynced) { - log.Panicf("Error syncing node informer cache") + return nil, ErrCacheSync } // Process Node add and delete events @@ -74,6 +86,7 @@ func (n *NodesMap) Load(ctx context.Context, cluster string, clientset kubernete nodeInfo := usage.NodeInfoFromNode(cluster, node) n.mu.Lock() defer n.mu.Unlock() + log.WithField("node", node.Name).Debug("adding node to map") n.data[node.Name] = nodeInfo }, DeleteFunc: func(obj interface{}) { @@ -84,11 +97,18 @@ func (n *NodesMap) Load(ctx context.Context, cluster string, clientset kubernete } n.mu.Lock() defer n.mu.Unlock() - delete(n.data, node.Name) + // non-blocking delete from the map after 5 minutes + go func() { + time.Sleep(delayDelete) + n.mu.Lock() + defer n.mu.Unlock() + log.WithField("node", node.Name).Debug("deleting node from map after delay") + delete(n.data, node.Name) + }() }, }) if err != nil { - log.Panicf("Error adding event handler to node informer: %v", err) + return nil, errors.Wrap(err, "failed to add event handler to node informer") } // Create a channel to signal when the map is loaded @@ -97,10 +117,6 @@ func (n *NodesMap) Load(ctx context.Context, cluster string, clientset kubernete // Update the Node map periodically previousResourceVersion := "0" // the resource version of the nodes at the last sync go func() { - ticker := time.NewTicker(syncPeriod) - // abort if the context is cancelled - defer ticker.Stop() - // refresh the nodes map and send to the loaded channel (if not already sent) refresh := func() { n.mu.Lock() @@ -112,9 +128,14 @@ func (n *NodesMap) Load(ctx context.Context, cluster string, clientset kubernete return } - // If the nodes have been updated, update the nodes map + // clear the nodes map + log.Debug("refreshing nodes map with latest nodes") + n.data = make(map[string]usage.NodeInfo) + + // update the nodes map for _, obj := range nodeInformer.GetStore().List() { node := obj.(*v1.Node) + log.WithField("node", node.Name).Debug("adding node to map") n.data[node.Name] = usage.NodeInfoFromNode(cluster, node) } @@ -133,14 +154,21 @@ func (n *NodesMap) Load(ctx context.Context, cluster string, clientset kubernete // refresh the nodes map once before starting the ticker refresh() - // loop until the context is cancelled - select { - case <-ctx.Done(): - return - case <-ticker.C: - refresh() + // refresh the nodes map periodically + ticker := time.NewTicker(nodeCacheSyncPeriod) + defer ticker.Stop() + for { + // loop until the context is cancelled + select { + case <-ctx.Done(): + // context is cancelled, close the stopper channel to stop the informer + close(stopper) + return + case <-ticker.C: + refresh() + } } }() - return loaded + return loaded, nil } diff --git a/internal/controller/nodes_test.go b/internal/controller/nodes_test.go index 7f6f3f5..515e365 100644 --- a/internal/controller/nodes_test.go +++ b/internal/controller/nodes_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/doitintl/eks-lens-agent/internal/usage" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -78,7 +79,7 @@ func TestNodesInformerLoad(t *testing.T) { // Load the nodes using the fake clientset ctx, cancel := context.WithTimeout(context.Background(), 50*time.Second) defer cancel() - loaded := nodesInformer.Load(ctx, "test-cluster", clientset) + loaded, _ := nodesInformer.Load(ctx, logrus.NewEntry(logrus.New()), "test-cluster", clientset) // Check if the nodes are loaded select {