Skip to content

Commit

Permalink
fixed the resync (#862)
Browse files Browse the repository at this point in the history
Signed-off-by: myan <[email protected]>
  • Loading branch information
yanmxa authored Apr 1, 2024
1 parent 7424fd0 commit 17da295
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,19 @@ func handleCompliance(log logr.Logger, ctx context.Context, evt *cloudevents.Eve
complianceClustersFromDB = NewPolicyClusterSets()
}

allClustersOnDB := complianceClustersFromDB.GetAllClusters()

// handle compliant clusters of the policy
compliantCompliances := newLocalCompliances(leafHub, policyID, database.Compliant,
eventCompliance.CompliantClusters, complianceClustersFromDB.GetClusters(database.Compliant))
eventCompliance.CompliantClusters, allClustersOnDB)

// handle non compliant clusters of the policy
nonCompliantCompliances := newLocalCompliances(leafHub, policyID, database.NonCompliant,
eventCompliance.NonCompliantClusters, complianceClustersFromDB.GetClusters(database.NonCompliant))
eventCompliance.NonCompliantClusters, allClustersOnDB)

// handle unknown compliance clusters of the policy
unknownCompliances := newLocalCompliances(leafHub, policyID, database.Unknown,
eventCompliance.UnknownComplianceClusters, complianceClustersFromDB.GetClusters(database.Unknown))
eventCompliance.UnknownComplianceClusters, allClustersOnDB)

batchLocalCompliances := []models.LocalStatusCompliance{}
batchLocalCompliances = append(batchLocalCompliances, compliantCompliances...)
Expand All @@ -101,11 +103,7 @@ func handleCompliance(log logr.Logger, ctx context.Context, evt *cloudevents.Eve
return err
}

// delete compliance status rows in the db that were not sent in the bundle (leaf hub sends only living resources)
allClustersOnDB := complianceClustersFromDB.GetAllClusters()
for _, compliance := range batchLocalCompliances {
allClustersOnDB.Remove(compliance.ClusterName)
}
// delete
err = db.Transaction(func(tx *gorm.DB) error {
for _, name := range allClustersOnDB.ToSlice() {
clusterName, ok := name.(string)
Expand Down Expand Up @@ -150,30 +148,19 @@ func handleCompliance(log logr.Logger, ctx context.Context, evt *cloudevents.Eve
return nil
}

func newComplianceClusters(eventComplianceClusters []string, complianceClusters set.Set) []string {
newComplianceClusters := make([]string, 0)
for _, clusterName := range eventComplianceClusters {
if !complianceClusters.Contains(clusterName) {
newComplianceClusters = append(newComplianceClusters, clusterName)
}
}
return newComplianceClusters
}

func newLocalCompliances(leafHub, policyID string, compliance database.ComplianceStatus,
eventComplianceClusters []string, complianceClustersOnDB set.Set,
eventComplianceClusters []string, allClustersOnDB set.Set,
) []models.LocalStatusCompliance {
clusters := newComplianceClusters(eventComplianceClusters, complianceClustersOnDB)

compliances := make([]models.LocalStatusCompliance, 0)
for _, cluster := range clusters {
for _, cluster := range eventComplianceClusters {
compliances = append(compliances, models.LocalStatusCompliance{
LeafHubName: leafHub,
PolicyID: policyID,
ClusterName: cluster,
Error: database.ErrorNone,
Compliance: compliance,
})
allClustersOnDB.Remove(cluster)
}
return compliances
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ var _ = Describe("LocalPolicyComplianceHandler", Ordered, func() {
By("Build a new policy bundle in the managed hub")
complianceVersion = eventversion.NewVersion()
complianceVersion.Incr()
complianceVersion.Incr()

data := grc.ComplianceBundle{}
data = append(data, grc.Compliance{
Expand All @@ -68,6 +67,7 @@ var _ = Describe("LocalPolicyComplianceHandler", Ordered, func() {
By("Sync message with transport")
err = producer.SendEvent(ctx, *evt)
Expect(err).Should(Succeed())
complianceVersion.Next()

By("Check the local compliance is created and expired policy is deleted from database")
Eventually(func() error {
Expand Down Expand Up @@ -97,6 +97,81 @@ var _ = Describe("LocalPolicyComplianceHandler", Ordered, func() {
}, 30*time.Second, 100*time.Millisecond).ShouldNot(HaveOccurred())
})

It("should handle the local compliance event with manager resync", func() {
By("Add an expired policy to the database")
db := database.GetGorm()
err := db.Create(&models.LocalStatusCompliance{
PolicyID: createdPolicyId,
ClusterName: "cluster3",
LeafHubName: leafHubName,
Compliance: database.Compliant,
Error: "none",
}).Error
Expect(err).ToNot(HaveOccurred())

By("Check the expired policy is added in database")
Eventually(func() error {
var localCompliances []models.LocalStatusCompliance
err = db.Where("policy_id = ?", createdPolicyId).Find(&localCompliances).Error
if err != nil {
return err
}

for _, localCompliance := range localCompliances {
if localCompliance.ClusterName == "cluster3" &&
localCompliance.Compliance == database.Compliant {
return nil
}
}
return fmt.Errorf("failed to persist data to local compliance of table")
}, 10*time.Second, 100*time.Millisecond).ShouldNot(HaveOccurred())

By("Build a new policy bundle in the managed hub")
data := grc.ComplianceBundle{}
data = append(data, grc.Compliance{
PolicyID: createdPolicyId,
CompliantClusters: []string{"cluster1"},
NonCompliantClusters: []string{"cluster2"},
UnknownComplianceClusters: []string{},
})
complianceVersion.Incr()

evt := ToCloudEvent(leafHubName, string(enum.LocalComplianceType), complianceVersion, data)

By("Sync message with transport")
err = producer.SendEvent(ctx, *evt)
Expect(err).Should(Succeed())
complianceVersion.Next()

By("Wait the resync finished")
time.Sleep(5 * time.Second)

By("Check the local compliance is created and expired policy is deleted from database")
Eventually(func() error {
var localCompliances []models.LocalStatusCompliance
err = db.Where("leaf_hub_name = ?", leafHubName).Find(&localCompliances).Error
if err != nil {
return err
}

addedCount := 0
for _, c := range localCompliances {
fmt.Printf("LocalCompliance Resync: ID(%s) %s/%s %s \n", c.PolicyID, c.LeafHubName, c.ClusterName, c.Compliance)
if c.PolicyID == createdPolicyId && c.ClusterName == "cluster1" || c.ClusterName == "cluster2" {
addedCount++
}
if c.ClusterName == "cluster3" {
return fmt.Errorf("the cluster3 should be removed from database")
}
}
if addedCount == 2 && len(localCompliances) == 2 {
fmt.Println("LocalCompliance(Resync) ========================================================== ")
return nil
}
return fmt.Errorf("failed to sync local compliance")
}, 30*time.Second, 100*time.Millisecond).ShouldNot(HaveOccurred())
})

It("shouldn't update the by the local complete compliance event", func() {
db := database.GetGorm()
By("Create a complete compliance bundle")
Expand Down
20 changes: 8 additions & 12 deletions manager/pkg/statussyncer/syncers/policy_compliance_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,18 @@ func (h *policyComplianceHandler) handleEvent(ctx context.Context, evt *cloudeve
complianceClustersFromDB = NewPolicyClusterSets()
}

allClustersOnDB := complianceClustersFromDB.GetAllClusters()
// handle compliant clusters of the policy
compliantCompliances := newCompliances(leafHubName, policyID, database.Compliant,
eventCompliance.CompliantClusters, complianceClustersFromDB.GetClusters(database.Compliant))
eventCompliance.CompliantClusters, allClustersOnDB)

// handle non compliant clusters of the policy
nonCompliantCompliances := newCompliances(leafHubName, policyID, database.NonCompliant,
eventCompliance.NonCompliantClusters, complianceClustersFromDB.GetClusters(database.NonCompliant))
eventCompliance.NonCompliantClusters, allClustersOnDB)

// handle unknown compliance clusters of the policy
unknownCompliances := newCompliances(leafHubName, policyID, database.Unknown,
eventCompliance.UnknownComplianceClusters, complianceClustersFromDB.GetClusters(database.Unknown))
eventCompliance.UnknownComplianceClusters, allClustersOnDB)

batchCompliances := []models.StatusCompliance{}
batchCompliances = append(batchCompliances, compliantCompliances...)
Expand All @@ -109,11 +110,7 @@ func (h *policyComplianceHandler) handleEvent(ctx context.Context, evt *cloudeve
return err
}

// delete compliance status rows in the db that were not sent in the bundle (leaf hub sends only living resources)
allClustersOnDB := complianceClustersFromDB.GetAllClusters()
for _, compliance := range batchCompliances {
allClustersOnDB.Remove(compliance.ClusterName)
}
// delete
err = db.Transaction(func(tx *gorm.DB) error {
for _, name := range allClustersOnDB.ToSlice() {
clusterName, ok := name.(string)
Expand Down Expand Up @@ -159,19 +156,18 @@ func (h *policyComplianceHandler) handleEvent(ctx context.Context, evt *cloudeve
}

func newCompliances(leafHub, policyID string, compliance database.ComplianceStatus,
eventComplianceClusters []string, complianceClustersOnDB set.Set,
eventComplianceClusters []string, allClusterOnDB set.Set,
) []models.StatusCompliance {
clusters := newComplianceClusters(eventComplianceClusters, complianceClustersOnDB)

compliances := make([]models.StatusCompliance, 0)
for _, cluster := range clusters {
for _, cluster := range eventComplianceClusters {
compliances = append(compliances, models.StatusCompliance{
LeafHubName: leafHub,
PolicyID: policyID,
ClusterName: cluster,
Error: database.ErrorNone,
Compliance: compliance,
})
allClusterOnDB.Remove(cluster)
}
return compliances
}
Expand Down

0 comments on commit 17da295

Please sign in to comment.