Skip to content

Commit

Permalink
handle pending policies
Browse files Browse the repository at this point in the history
Signed-off-by: ldpliu <[email protected]>
  • Loading branch information
ldpliu committed Apr 9, 2024
1 parent 02d6159 commit ecbd6d9
Show file tree
Hide file tree
Showing 25 changed files with 1,496 additions and 322 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ func (h *completeComplianceHandler) Update(obj client.Object) bool {
index := getPayloadIndexByUID(originPolicyID, *(h.eventData))
if index == -1 { // object not found, need to add it to the bundle (only in case it contains non-compliant/unknown)
// don't send in the bundle a policy where all clusters are compliant
if len(newComplete.UnknownComplianceClusters) == 0 && len(newComplete.NonCompliantClusters) == 0 {
if len(newComplete.UnknownComplianceClusters) == 0 && len(newComplete.NonCompliantClusters) == 0 &&
len(newComplete.PendingComplianceClusters) == 0 {
return false
}

Expand All @@ -64,16 +65,19 @@ func (h *completeComplianceHandler) Update(obj client.Object) bool {
// if we reached here, policy already exists in the bundle with at least one non compliant or unknown cluster.
oldComplete := (*h.eventData)[index]
if utils.Equal(oldComplete.NonCompliantClusters, newComplete.NonCompliantClusters) &&
utils.Equal(oldComplete.PendingComplianceClusters, newComplete.PendingComplianceClusters) &&
utils.Equal(oldComplete.UnknownComplianceClusters, newComplete.UnknownComplianceClusters) {
return false
}

// the payload is updated
(*h.eventData)[index].NonCompliantClusters = newComplete.NonCompliantClusters
(*h.eventData)[index].UnknownComplianceClusters = newComplete.UnknownComplianceClusters
(*h.eventData)[index].PendingComplianceClusters = newComplete.PendingComplianceClusters

// don't send in the bundle a policy where all clusters are compliant
if len((*h.eventData)[index].NonCompliantClusters) == 0 && len((*h.eventData)[index].UnknownComplianceClusters) == 0 {
if len((*h.eventData)[index].NonCompliantClusters) == 0 && len((*h.eventData)[index].UnknownComplianceClusters) == 0 &&
len((*h.eventData)[index].PendingComplianceClusters) == 0 {
(*h.eventData) = append((*h.eventData)[:index], (*h.eventData)[index+1:]...) // remove from objects
}
return true
Expand All @@ -98,13 +102,16 @@ func (h *completeComplianceHandler) Delete(obj client.Object) bool {
func newCompleteCompliance(originPolicyID string, policy *policiesv1.Policy) *grc.CompleteCompliance {
nonCompliantClusters := make([]string, 0)
unknownComplianceClusters := make([]string, 0)
pendingComplianceClusters := make([]string, 0)

for _, clusterCompliance := range policy.Status.Status {
if clusterCompliance.ComplianceState == policiesv1.Compliant {
continue
}
if clusterCompliance.ComplianceState == policiesv1.NonCompliant {
nonCompliantClusters = append(nonCompliantClusters, clusterCompliance.ClusterName)
} else if clusterCompliance.ComplianceState == policiesv1.Pending {
pendingComplianceClusters = append(pendingComplianceClusters, clusterCompliance.ClusterName)
} else { // not compliant not non compliant -> means unknown
unknownComplianceClusters = append(unknownComplianceClusters, clusterCompliance.ClusterName)
}
Expand All @@ -115,6 +122,7 @@ func newCompleteCompliance(originPolicyID string, policy *policiesv1.Policy) *gr
NamespacedName: policy.Namespace + "/" + policy.Name,
NonCompliantClusters: nonCompliantClusters,
UnknownComplianceClusters: unknownComplianceClusters,
PendingComplianceClusters: pendingComplianceClusters,
}
}

Expand Down
24 changes: 16 additions & 8 deletions agent/pkg/status/controller/policies/compliance_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (h *complianceHandler) Update(obj client.Object) bool {
if index == -1 { // object not found, need to add it to the bundle
compliance := getNewCompliance(policyID, policy)
if len(compliance.CompliantClusters) == 0 && len(compliance.NonCompliantClusters) == 0 &&
len(compliance.UnknownComplianceClusters) == 0 {
len(compliance.UnknownComplianceClusters) == 0 && len(compliance.PendingComplianceClusters) == 0 {
return false
}
*h.eventData = append(*h.eventData, *compliance)
Expand All @@ -70,25 +70,28 @@ func (h *complianceHandler) Update(obj client.Object) bool {

// returns true if cluster list has changed(added/removed), otherwise returns false (even if cluster statuses changed).
func (h *complianceHandler) updatePayloadIfChanged(objectIndex int, policy *policiesv1.Policy) bool {
newCompliantClusters, newNonCompliantClusters, newUnknownClusters := getClusterStatus(policy)
allClusters := utils.Merge(newCompliantClusters, newNonCompliantClusters, newUnknownClusters)
newCompliantClusters, newNonCompliantClusters, newUnknownClusters, newPendingClusters := getClusterStatus(policy)
allClusters := utils.Merge(newCompliantClusters, newNonCompliantClusters, newUnknownClusters, newPendingClusters)

cachedCompliance := (*h.eventData)[objectIndex]
clusterListChanged := false

// check if any cluster was added or removed
if len(cachedCompliance.CompliantClusters)+len(cachedCompliance.NonCompliantClusters)+
len(cachedCompliance.UnknownComplianceClusters) != len(allClusters) ||
len(cachedCompliance.UnknownComplianceClusters)+len(cachedCompliance.PendingComplianceClusters) != len(allClusters) ||
!utils.ContainSubStrings(allClusters, cachedCompliance.CompliantClusters) ||
!utils.ContainSubStrings(allClusters, cachedCompliance.NonCompliantClusters) ||
!utils.ContainSubStrings(allClusters, cachedCompliance.UnknownComplianceClusters) {
!utils.ContainSubStrings(allClusters, cachedCompliance.UnknownComplianceClusters) ||
!utils.ContainSubStrings(allClusters, cachedCompliance.PendingComplianceClusters) {
clusterListChanged = true // at least one cluster was added/removed
}

// in any case we want to update the internal bundle in case statuses changed
cachedCompliance.CompliantClusters = newCompliantClusters
cachedCompliance.NonCompliantClusters = newNonCompliantClusters
cachedCompliance.UnknownComplianceClusters = newUnknownClusters
cachedCompliance.PendingComplianceClusters = newPendingClusters

(*h.eventData)[objectIndex] = cachedCompliance
return clusterListChanged
}
Expand Down Expand Up @@ -140,29 +143,34 @@ func getIndexByPolicyID(uid string, compliances []grc.Compliance) int {
}

func getNewCompliance(originPolicyID string, policy *policiesv1.Policy) *grc.Compliance {
compliantClusters, nonCompliantClusters, unknownComplianceClusters := getClusterStatus(policy)
compliantClusters, nonCompliantClusters, unknownComplianceClusters, pendingComplianceClusters := getClusterStatus(policy)
return &grc.Compliance{
PolicyID: originPolicyID,
NamespacedName: fmt.Sprintf("%s/%s", policy.GetNamespace(), policy.GetName()),
CompliantClusters: compliantClusters,
NonCompliantClusters: nonCompliantClusters,
UnknownComplianceClusters: unknownComplianceClusters,
PendingComplianceClusters: pendingComplianceClusters,
}
}

// getClusterStatus returns (list of compliant clusters, list of nonCompliant clusters, list of unknown clusters.
func getClusterStatus(policy *policiesv1.Policy) ([]string, []string, []string) {
func getClusterStatus(policy *policiesv1.Policy) ([]string, []string, []string, []string) {
compliantClusters := make([]string, 0)
nonCompliantClusters := make([]string, 0)
unknownComplianceClusters := make([]string, 0)
pendingComplianceClusters := make([]string, 0)

for _, clusterStatus := range policy.Status.Status {
if clusterStatus.ComplianceState == policiesv1.Compliant {
compliantClusters = append(compliantClusters, clusterStatus.ClusterName)
} else if clusterStatus.ComplianceState == policiesv1.NonCompliant {
nonCompliantClusters = append(nonCompliantClusters, clusterStatus.ClusterName)
} else if clusterStatus.ComplianceState == policiesv1.Pending {
pendingComplianceClusters = append(pendingComplianceClusters, clusterStatus.ClusterName)
} else {
unknownComplianceClusters = append(unknownComplianceClusters, clusterStatus.ClusterName)
}
}
return compliantClusters, nonCompliantClusters, unknownComplianceClusters
return compliantClusters, nonCompliantClusters, unknownComplianceClusters, pendingComplianceClusters
}
2 changes: 1 addition & 1 deletion doc/simulation/local-policies/create_local_policies.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ declare
policy_control_random_index int;
all_policy_severities text[] := '{"low","high"}';
policy_severity_random_index int;
all_compliances local_status.compliance_type[] := '{"compliant","non_compliant"}';
all_compliances local_status.compliance_type[] := '{"compliant","unknown","pending","non_compliant"}';
compliance_random_index int;
managed_cluster text;
policy text;
Expand Down
5 changes: 3 additions & 2 deletions manager/pkg/cronjob/task/local_compliance_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ func insertToLocalComplianceHistoryByLocalStatus(ctx context.Context, tableName
do
$$
declare
all_compliances local_status.compliance_type[] := '{"compliant","non_compliant"}';
all_compliances local_status.compliance_type[] := '{"compliant","non_compliant","unknown","pending"}';
compliance_random_index int;
begin
SELECT floor(random() * 2 + 1)::int into compliance_random_index;
SELECT floor(random() * 4 + 1)::int into compliance_random_index;
INSERT INTO history.local_compliance (policy_id, cluster_id, leaf_hub_name, compliance, compliance_date)
(
SELECT policy_id,cluster_id,leaf_hub_name,all_compliances[compliance_random_index],
Expand Down Expand Up @@ -251,6 +251,7 @@ func insertToLocalComplianceHistoryByPolicyEvent(ctx context.Context, totalCount
CASE
WHEN bool_or(compliance = 'non_compliant') THEN 'non_compliant'
WHEN bool_or(compliance = 'unknown') THEN 'unknown'
WHEN bool_or(compliance = 'pending') THEN 'pending'
ELSE 'compliant'
END::local_status.compliance_type AS aggregated_compliance
FROM event.local_policies
Expand Down
14 changes: 14 additions & 0 deletions manager/pkg/statussyncer/syncers/local_policy_complete_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,20 @@ func handleCompleteCompliance(log logr.Logger, ctx context.Context, evt *cloudev
allNonComplianceCluster.Remove(eventCluster) // mark cluster as handled
}

// pending: go over the pending clusters from event
for _, eventCluster := range eventCompliance.PendingComplianceClusters {
if !nonComplianceClusterSetsFromDB.GetClusters(database.Pending).Contains(eventCluster) {
batchLocalCompliance = append(batchLocalCompliance, models.LocalStatusCompliance{
PolicyID: policyID,
LeafHubName: leafHub,
ClusterName: eventCluster,
Compliance: database.Pending,
Error: database.ErrorNone,
})
}
allNonComplianceCluster.Remove(eventCluster) // mark cluster as handled
}

// unknown: go over the unknown clusters from event
for _, eventCluster := range eventCompliance.UnknownComplianceClusters {
if !nonComplianceClusterSetsFromDB.GetClusters(database.Unknown).Contains(eventCluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,15 @@ func handleCompliance(log logr.Logger, ctx context.Context, evt *cloudevents.Eve
unknownCompliances := newLocalCompliances(leafHub, policyID, database.Unknown,
eventCompliance.UnknownComplianceClusters, allClustersOnDB)

// handle pending compliance clusters of the policy
pendingCompliances := newLocalCompliances(leafHub, policyID, database.Pending,
eventCompliance.PendingComplianceClusters, complianceClustersFromDB.GetClusters(database.Pending))

batchLocalCompliances := []models.LocalStatusCompliance{}
batchLocalCompliances = append(batchLocalCompliances, compliantCompliances...)
batchLocalCompliances = append(batchLocalCompliances, nonCompliantCompliances...)
batchLocalCompliances = append(batchLocalCompliances, unknownCompliances...)
batchLocalCompliances = append(batchLocalCompliances, pendingCompliances...)

// batch upsert
err = db.Clauses(clause.OnConflict{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ var _ = Describe("LocalPolicyComplianceHandler", Ordered, func() {
PolicyID: createdPolicyId,
CompliantClusters: []string{"cluster1"},
NonCompliantClusters: []string{"cluster2"},
PendingComplianceClusters: []string{"cluster4"},
UnknownComplianceClusters: []string{},
})

Expand All @@ -83,13 +84,13 @@ var _ = Describe("LocalPolicyComplianceHandler", Ordered, func() {
if c.PolicyID == expiredPolicyID && c.ClusterName == "cluster1" {
expiredCount++
}
if c.PolicyID == createdPolicyId && c.ClusterName == "cluster1" || c.ClusterName == "cluster2" {
if c.PolicyID == createdPolicyId && c.ClusterName == "cluster1" || c.ClusterName == "cluster2" || c.ClusterName == "cluster4" {
addedCount++
}

fmt.Printf("LocalCompliance: ID(%s) %s/%s %s \n", c.PolicyID, c.LeafHubName, c.ClusterName, c.Compliance)
}
if expiredCount == 0 && addedCount == 2 && len(localCompliances) == 2 {
if expiredCount == 0 && addedCount == 3 && len(localCompliances) == 3 {
fmt.Println("LocalCompliance ========================================================== ")
return nil
}
Expand Down Expand Up @@ -132,6 +133,7 @@ var _ = Describe("LocalPolicyComplianceHandler", Ordered, func() {
PolicyID: createdPolicyId,
CompliantClusters: []string{"cluster1"},
NonCompliantClusters: []string{"cluster2"},
PendingComplianceClusters: []string{"cluster5"},
UnknownComplianceClusters: []string{},
})
complianceVersion.Incr()
Expand All @@ -157,19 +159,19 @@ var _ = Describe("LocalPolicyComplianceHandler", Ordered, func() {
addedCount := 0
for _, c := range localCompliances {
fmt.Printf("LocalCompliance Resync: ID(%s) %s/%s %s \n", c.PolicyID, c.LeafHubName, c.ClusterName, c.Compliance)
if c.PolicyID == createdPolicyId && c.ClusterName == "cluster1" || c.ClusterName == "cluster2" {
if c.PolicyID == createdPolicyId && c.ClusterName == "cluster1" || c.ClusterName == "cluster2" || c.ClusterName == "cluster5" {
addedCount++
}
if c.ClusterName == "cluster3" {
return fmt.Errorf("the cluster3 should be removed from database")
}
}
if addedCount == 2 && len(localCompliances) == 2 {
if addedCount == 3 && len(localCompliances) == 3 {
fmt.Println("LocalCompliance(Resync) ========================================================== ")
return nil
}
return fmt.Errorf("failed to sync local compliance")
}, 30*time.Second, 100*time.Millisecond).ShouldNot(HaveOccurred())
}, 10*time.Second, 3*time.Second).ShouldNot(HaveOccurred())
})

It("shouldn't update the by the local complete compliance event", func() {
Expand Down Expand Up @@ -237,6 +239,7 @@ var _ = Describe("LocalPolicyComplianceHandler", Ordered, func() {
PolicyID: createdPolicyId,
NonCompliantClusters: []string{"cluster1"},
UnknownComplianceClusters: []string{"cluster3"},
PendingComplianceClusters: []string{"cluster5"},
})

evt := ToCloudEvent(leafHubName, string(enum.LocalCompleteComplianceType), version, data)
Expand Down Expand Up @@ -264,13 +267,16 @@ var _ = Describe("LocalPolicyComplianceHandler", Ordered, func() {
if c.ClusterName == "cluster2" && c.Compliance == database.Compliant {
success++
}
if c.ClusterName == "cluster5" && c.Compliance == database.Pending {
success++
}
if c.ClusterName == "cluster3" {
return fmt.Errorf("the cluster3 shouldn't synced by the local compliance bundle")
}
}
}

if len(localCompliances) == 2 && success == 2 {
if len(localCompliances) == 3 && success == 3 {
fmt.Println("LocalComplete update ========================================================== ")
return nil
}
Expand Down
14 changes: 14 additions & 0 deletions manager/pkg/statussyncer/syncers/policy_complete_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,20 @@ func (h *policyCompleteHandler) handleEvent(ctx context.Context, evt *cloudevent
allNonComplianceCluster.Remove(eventCluster) // mark cluster as handled
}

// pending: go over the pending clusters from event
for _, eventCluster := range eventCompliance.PendingComplianceClusters {
if !nonComplianceClusterSetsFromDB.GetClusters(database.Pending).Contains(eventCluster) {
batchCompliance = append(batchCompliance, models.StatusCompliance{
PolicyID: policyID,
LeafHubName: leafHub,
ClusterName: eventCluster,
Compliance: database.Pending,
Error: database.ErrorNone,
})
}
allNonComplianceCluster.Remove(eventCluster) // mark cluster as handled
}

// unknown: go over the unknown clusters from event
for _, eventCluster := range eventCompliance.UnknownComplianceClusters {
if !nonComplianceClusterSetsFromDB.GetClusters(database.Unknown).Contains(eventCluster) {
Expand Down
7 changes: 7 additions & 0 deletions manager/pkg/statussyncer/syncers/policy_compliance_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,15 @@ func (h *policyComplianceHandler) handleEvent(ctx context.Context, evt *cloudeve
unknownCompliances := newCompliances(leafHubName, policyID, database.Unknown,
eventCompliance.UnknownComplianceClusters, allClustersOnDB)

// handle pending compliance clusters of the policy
pendingCompliances := newCompliances(leafHubName, policyID, database.Pending,
eventCompliance.PendingComplianceClusters, allClustersOnDB)

batchCompliances := []models.StatusCompliance{}
batchCompliances = append(batchCompliances, compliantCompliances...)
batchCompliances = append(batchCompliances, nonCompliantCompliances...)
batchCompliances = append(batchCompliances, unknownCompliances...)
batchCompliances = append(batchCompliances, pendingCompliances...)

// batch upsert
err = db.Clauses(clause.OnConflict{
Expand Down Expand Up @@ -199,6 +204,7 @@ func NewPolicyClusterSets() *PolicyClustersSets {
database.Compliant: set.NewSet(),
database.NonCompliant: set.NewSet(),
database.Unknown: set.NewSet(),
database.Pending: set.NewSet(),
},
}
}
Expand All @@ -217,6 +223,7 @@ func (sets *PolicyClustersSets) AddCluster(clusterName string, complianceStatus
func (sets *PolicyClustersSets) GetAllClusters() set.Set {
return sets.complianceToSetMap[database.Compliant].
Union(sets.complianceToSetMap[database.NonCompliant].
Union(sets.complianceToSetMap[database.Pending]).
Union(sets.complianceToSetMap[database.Unknown]))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ var _ = Describe("GlobalPolicyComplianceHandler", Ordered, func() {
PolicyID: createdPolicyId,
CompliantClusters: []string{"cluster1"}, // generate record: createdPolicyId hub1-cluster1 compliant
NonCompliantClusters: []string{"cluster2"}, // generate record: createdPolicyId hub1-cluster2 non_compliant
PendingComplianceClusters: []string{"cluster4"}, // generate record: createdPolicyId hub1-cluster4 pending
UnknownComplianceClusters: []string{},
})

Expand All @@ -86,13 +87,13 @@ var _ = Describe("GlobalPolicyComplianceHandler", Ordered, func() {
if c.PolicyID == expiredPolicyID && c.ClusterName == "cluster1" {
expiredCount++
}
if c.PolicyID == createdPolicyId && c.ClusterName == "cluster1" || c.ClusterName == "cluster2" {
if c.PolicyID == createdPolicyId && c.ClusterName == "cluster1" || c.ClusterName == "cluster2" || c.ClusterName == "cluster4" {
addedCount++
}

fmt.Printf("Compliance: ID(%s) %s/%s %s \n", c.PolicyID, c.LeafHubName, c.ClusterName, c.Compliance)
}
if expiredCount == 0 && addedCount == 2 && len(compliances) == 2 {
if expiredCount == 0 && addedCount == 3 && len(compliances) == 3 {
return nil
}
return fmt.Errorf("failed to sync compliance")
Expand Down Expand Up @@ -163,6 +164,7 @@ var _ = Describe("GlobalPolicyComplianceHandler", Ordered, func() {
PolicyID: createdPolicyId,
NonCompliantClusters: []string{"cluster1"},
UnknownComplianceClusters: []string{"cluster3"},
PendingComplianceClusters: []string{"cluster4"},
})

evt := ToCloudEvent(leafHubName, string(enum.CompleteComplianceType), completeVersion, data)
Expand Down Expand Up @@ -191,13 +193,16 @@ var _ = Describe("GlobalPolicyComplianceHandler", Ordered, func() {
if c.ClusterName == "cluster2" && c.Compliance == database.Compliant {
success++
}
if c.ClusterName == "cluster4" && c.Compliance == database.Pending {
success++
}
if c.ClusterName == "cluster3" {
return fmt.Errorf("the cluster3 shouldn't synced by the compliance bundle")
}
}
}

if len(compliances) == 2 && success == 2 {
if len(compliances) == 3 && success == 3 {
return nil
}
return fmt.Errorf("failed to sync complete compliance")
Expand Down
Loading

0 comments on commit ecbd6d9

Please sign in to comment.