diff --git a/manager/pkg/statussyncer/syncers/local_policy_compliance_handler.go b/manager/pkg/statussyncer/syncers/local_policy_compliance_handler.go index c132cdecd..d14cf7f9e 100644 --- a/manager/pkg/statussyncer/syncers/local_policy_compliance_handler.go +++ b/manager/pkg/statussyncer/syncers/local_policy_compliance_handler.go @@ -76,17 +76,19 @@ func handleCompliance(log logr.Logger, ctx context.Context, evt *cloudevents.Eve complianceClustersFromDB = NewPolicyClusterSets() } + allClustersOnDB := complianceClustersFromDB.GetAllClusters() + // handle compliant clusters of the policy compliantCompliances := newLocalCompliances(leafHub, policyID, database.Compliant, - eventCompliance.CompliantClusters, complianceClustersFromDB.GetClusters(database.Compliant)) + eventCompliance.CompliantClusters, allClustersOnDB) // handle non compliant clusters of the policy nonCompliantCompliances := newLocalCompliances(leafHub, policyID, database.NonCompliant, - eventCompliance.NonCompliantClusters, complianceClustersFromDB.GetClusters(database.NonCompliant)) + eventCompliance.NonCompliantClusters, allClustersOnDB) // handle unknown compliance clusters of the policy unknownCompliances := newLocalCompliances(leafHub, policyID, database.Unknown, - eventCompliance.UnknownComplianceClusters, complianceClustersFromDB.GetClusters(database.Unknown)) + eventCompliance.UnknownComplianceClusters, allClustersOnDB) batchLocalCompliances := []models.LocalStatusCompliance{} batchLocalCompliances = append(batchLocalCompliances, compliantCompliances...) @@ -101,11 +103,7 @@ func handleCompliance(log logr.Logger, ctx context.Context, evt *cloudevents.Eve return err } - // delete compliance status rows in the db that were not sent in the bundle (leaf hub sends only living resources) - allClustersOnDB := complianceClustersFromDB.GetAllClusters() - for _, compliance := range batchLocalCompliances { - allClustersOnDB.Remove(compliance.ClusterName) - } + // delete err = db.Transaction(func(tx *gorm.DB) error { for _, name := range allClustersOnDB.ToSlice() { clusterName, ok := name.(string) @@ -150,23 +148,11 @@ func handleCompliance(log logr.Logger, ctx context.Context, evt *cloudevents.Eve return nil } -func newComplianceClusters(eventComplianceClusters []string, complianceClusters set.Set) []string { - newComplianceClusters := make([]string, 0) - for _, clusterName := range eventComplianceClusters { - if !complianceClusters.Contains(clusterName) { - newComplianceClusters = append(newComplianceClusters, clusterName) - } - } - return newComplianceClusters -} - func newLocalCompliances(leafHub, policyID string, compliance database.ComplianceStatus, - eventComplianceClusters []string, complianceClustersOnDB set.Set, + eventComplianceClusters []string, allClustersOnDB set.Set, ) []models.LocalStatusCompliance { - clusters := newComplianceClusters(eventComplianceClusters, complianceClustersOnDB) - compliances := make([]models.LocalStatusCompliance, 0) - for _, cluster := range clusters { + for _, cluster := range eventComplianceClusters { compliances = append(compliances, models.LocalStatusCompliance{ LeafHubName: leafHub, PolicyID: policyID, @@ -174,6 +160,7 @@ func newLocalCompliances(leafHub, policyID string, compliance database.Complianc Error: database.ErrorNone, Compliance: compliance, }) + allClustersOnDB.Remove(cluster) } return compliances } diff --git a/manager/pkg/statussyncer/syncers/local_policy_compliance_handler_test.go b/manager/pkg/statussyncer/syncers/local_policy_compliance_handler_test.go index ee8ea9729..dd3feee37 100644 --- a/manager/pkg/statussyncer/syncers/local_policy_compliance_handler_test.go +++ b/manager/pkg/statussyncer/syncers/local_policy_compliance_handler_test.go @@ -53,7 +53,6 @@ var _ = Describe("LocalPolicyComplianceHandler", Ordered, func() { By("Build a new policy bundle in the managed hub") complianceVersion = eventversion.NewVersion() complianceVersion.Incr() - complianceVersion.Incr() data := grc.ComplianceBundle{} data = append(data, grc.Compliance{ @@ -68,6 +67,7 @@ var _ = Describe("LocalPolicyComplianceHandler", Ordered, func() { By("Sync message with transport") err = producer.SendEvent(ctx, *evt) Expect(err).Should(Succeed()) + complianceVersion.Next() By("Check the local compliance is created and expired policy is deleted from database") Eventually(func() error { @@ -97,6 +97,81 @@ var _ = Describe("LocalPolicyComplianceHandler", Ordered, func() { }, 30*time.Second, 100*time.Millisecond).ShouldNot(HaveOccurred()) }) + It("should handle the local compliance event with manager resync", func() { + By("Add an expired policy to the database") + db := database.GetGorm() + err := db.Create(&models.LocalStatusCompliance{ + PolicyID: createdPolicyId, + ClusterName: "cluster3", + LeafHubName: leafHubName, + Compliance: database.Compliant, + Error: "none", + }).Error + Expect(err).ToNot(HaveOccurred()) + + By("Check the expired policy is added in database") + Eventually(func() error { + var localCompliances []models.LocalStatusCompliance + err = db.Where("policy_id = ?", createdPolicyId).Find(&localCompliances).Error + if err != nil { + return err + } + + for _, localCompliance := range localCompliances { + if localCompliance.ClusterName == "cluster3" && + localCompliance.Compliance == database.Compliant { + return nil + } + } + return fmt.Errorf("failed to persist data to local compliance of table") + }, 10*time.Second, 100*time.Millisecond).ShouldNot(HaveOccurred()) + + By("Build a new policy bundle in the managed hub") + data := grc.ComplianceBundle{} + data = append(data, grc.Compliance{ + PolicyID: createdPolicyId, + CompliantClusters: []string{"cluster1"}, + NonCompliantClusters: []string{"cluster2"}, + UnknownComplianceClusters: []string{}, + }) + complianceVersion.Incr() + + evt := ToCloudEvent(leafHubName, string(enum.LocalComplianceType), complianceVersion, data) + + By("Sync message with transport") + err = producer.SendEvent(ctx, *evt) + Expect(err).Should(Succeed()) + complianceVersion.Next() + + By("Wait the resync finished") + time.Sleep(5 * time.Second) + + By("Check the local compliance is created and expired policy is deleted from database") + Eventually(func() error { + var localCompliances []models.LocalStatusCompliance + err = db.Where("leaf_hub_name = ?", leafHubName).Find(&localCompliances).Error + if err != nil { + return err + } + + addedCount := 0 + for _, c := range localCompliances { + fmt.Printf("LocalCompliance Resync: ID(%s) %s/%s %s \n", c.PolicyID, c.LeafHubName, c.ClusterName, c.Compliance) + if c.PolicyID == createdPolicyId && c.ClusterName == "cluster1" || c.ClusterName == "cluster2" { + addedCount++ + } + if c.ClusterName == "cluster3" { + return fmt.Errorf("the cluster3 should be removed from database") + } + } + if addedCount == 2 && len(localCompliances) == 2 { + fmt.Println("LocalCompliance(Resync) ========================================================== ") + return nil + } + return fmt.Errorf("failed to sync local compliance") + }, 30*time.Second, 100*time.Millisecond).ShouldNot(HaveOccurred()) + }) + It("shouldn't update the by the local complete compliance event", func() { db := database.GetGorm() By("Create a complete compliance bundle") diff --git a/manager/pkg/statussyncer/syncers/policy_compliance_handler.go b/manager/pkg/statussyncer/syncers/policy_compliance_handler.go index 3d4b6999a..b43b4afdf 100644 --- a/manager/pkg/statussyncer/syncers/policy_compliance_handler.go +++ b/manager/pkg/statussyncer/syncers/policy_compliance_handler.go @@ -84,17 +84,18 @@ func (h *policyComplianceHandler) handleEvent(ctx context.Context, evt *cloudeve complianceClustersFromDB = NewPolicyClusterSets() } + allClustersOnDB := complianceClustersFromDB.GetAllClusters() // handle compliant clusters of the policy compliantCompliances := newCompliances(leafHubName, policyID, database.Compliant, - eventCompliance.CompliantClusters, complianceClustersFromDB.GetClusters(database.Compliant)) + eventCompliance.CompliantClusters, allClustersOnDB) // handle non compliant clusters of the policy nonCompliantCompliances := newCompliances(leafHubName, policyID, database.NonCompliant, - eventCompliance.NonCompliantClusters, complianceClustersFromDB.GetClusters(database.NonCompliant)) + eventCompliance.NonCompliantClusters, allClustersOnDB) // handle unknown compliance clusters of the policy unknownCompliances := newCompliances(leafHubName, policyID, database.Unknown, - eventCompliance.UnknownComplianceClusters, complianceClustersFromDB.GetClusters(database.Unknown)) + eventCompliance.UnknownComplianceClusters, allClustersOnDB) batchCompliances := []models.StatusCompliance{} batchCompliances = append(batchCompliances, compliantCompliances...) @@ -109,11 +110,7 @@ func (h *policyComplianceHandler) handleEvent(ctx context.Context, evt *cloudeve return err } - // delete compliance status rows in the db that were not sent in the bundle (leaf hub sends only living resources) - allClustersOnDB := complianceClustersFromDB.GetAllClusters() - for _, compliance := range batchCompliances { - allClustersOnDB.Remove(compliance.ClusterName) - } + // delete err = db.Transaction(func(tx *gorm.DB) error { for _, name := range allClustersOnDB.ToSlice() { clusterName, ok := name.(string) @@ -159,12 +156,10 @@ func (h *policyComplianceHandler) handleEvent(ctx context.Context, evt *cloudeve } func newCompliances(leafHub, policyID string, compliance database.ComplianceStatus, - eventComplianceClusters []string, complianceClustersOnDB set.Set, + eventComplianceClusters []string, allClusterOnDB set.Set, ) []models.StatusCompliance { - clusters := newComplianceClusters(eventComplianceClusters, complianceClustersOnDB) - compliances := make([]models.StatusCompliance, 0) - for _, cluster := range clusters { + for _, cluster := range eventComplianceClusters { compliances = append(compliances, models.StatusCompliance{ LeafHubName: leafHub, PolicyID: policyID, @@ -172,6 +167,7 @@ func newCompliances(leafHub, policyID string, compliance database.ComplianceStat Error: database.ErrorNone, Compliance: compliance, }) + allClusterOnDB.Remove(cluster) } return compliances }