Skip to content

Commit

Permalink
Add first set of tests for locality based exclusions and fix bug in l…
Browse files Browse the repository at this point in the history
…ocality based exclusions (#1811)

* Add first set of tests for locality based exclusions
  • Loading branch information
johscheuer authored Sep 26, 2023
1 parent d4d3bda commit 67e5df5
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 18 deletions.
3 changes: 3 additions & 0 deletions api/v1beta2/foundationdb_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,7 @@ const (
// FDBLocalityProcessIDKey represents the key in the locality map that
// holds the process ID.
FDBLocalityProcessIDKey = "process_id"

// FDBLocalityExclusionPrefix represents the exclusion prefix for locality based exclusions.
FDBLocalityExclusionPrefix = "locality_instance_id"
)
10 changes: 8 additions & 2 deletions api/v1beta2/foundationdbcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func (processGroupID ProcessGroupID) GetIDNumber() (int, error) {

// GetExclusionString returns the exclusion string
func (processGroupStatus *ProcessGroupStatus) GetExclusionString() string {
return fmt.Sprintf("locality_instance_id:%s", processGroupStatus.ProcessGroupID)
return fmt.Sprintf("%s:%s", FDBLocalityExclusionPrefix, processGroupStatus.ProcessGroupID)
}

// IsExcluded returns if a process group is excluded
Expand Down Expand Up @@ -564,11 +564,17 @@ func cleanAddressList(addresses []string) []string {

// AllAddressesExcluded checks if the process group is excluded or if there are still addresses included in the remainingMap.
// This will return true if the process group skips exclusion or has no remaining addresses.
func (processGroupStatus *ProcessGroupStatus) AllAddressesExcluded(remainingMap map[string]bool) (bool, error) {
func (processGroupStatus *ProcessGroupStatus) AllAddressesExcluded(logger logr.Logger, remainingMap map[string]bool) (bool, error) {
if processGroupStatus.IsExcluded() {
return true, nil
}

localityExclusionString := processGroupStatus.GetExclusionString()
if isRemaining, isPresent := remainingMap[localityExclusionString]; isPresent && !isRemaining {
logger.V(1).Info("process group is fully excluded based on locality based exclusions", "processGroupID", processGroupStatus.ProcessGroupID, "exclusionString", localityExclusionString)
return true, nil
}

// If the process group has no addresses assigned we cannot remove it safely and we have to set the skip exclusion.
if len(processGroupStatus.Addresses) == 0 {
return false, fmt.Errorf("process has no addresses, cannot safely determine if process can be removed")
Expand Down
2 changes: 1 addition & 1 deletion api/v1beta2/foundationdbcluster_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5407,7 +5407,7 @@ var _ = Describe("[api] FoundationDBCluster", func() {
})

DescribeTable("when checking if all addresses are excluded for a process group", func(processGroupStatus *ProcessGroupStatus, remainingMap map[string]bool, expected bool, expectedErr error) {
excluded, err := processGroupStatus.AllAddressesExcluded(remainingMap)
excluded, err := processGroupStatus.AllAddressesExcluded(log, remainingMap)
Expect(excluded).To(Equal(expected))

if expectedErr != nil {
Expand Down
3 changes: 2 additions & 1 deletion controllers/remove_process_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func (r *FoundationDBClusterReconciler) getProcessGroupsToRemove(logger logr.Log
allExcluded := true
newExclusions := false
processGroupsToRemove := make([]*fdbv1beta2.ProcessGroupStatus, 0, len(cluster.Status.ProcessGroups))
logger.V(1).Info("Get ProcessGroups to be removed.", "remainingMap", remainingMap)

for _, processGroup := range cluster.Status.ProcessGroups {
if !processGroup.IsMarkedForRemoval() {
Expand All @@ -320,7 +321,7 @@ func (r *FoundationDBClusterReconciler) getProcessGroupsToRemove(logger logr.Log
continue
}

excluded, err := processGroup.AllAddressesExcluded(remainingMap)
excluded, err := processGroup.AllAddressesExcluded(logger, remainingMap)
if !excluded || err != nil {
logger.Info("Incomplete exclusion still present in removeProcessGroups step", "processGroupID", processGroup.ProcessGroupID, "error", err)
allExcluded = false
Expand Down
2 changes: 2 additions & 0 deletions e2e/fixtures/cluster_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type ClusterConfig struct {
DebugSymbols bool
// UseMaintenanceMode if enabled the FoundationDBCluster resource will enable the maintenance mode.
UseMaintenanceMode bool
// UseLocalityBasedExclusions if enabled the FoundationDBCluster resource will enable the locality based exclusions.
UseLocalityBasedExclusions bool
// CreationTracker if specified will be used to log the time between the creations steps.
CreationTracker CreationTrackerLogger
// Number of machines, this is used for calculating the number of Pods and is not correlated to the actual number
Expand Down
1 change: 1 addition & 0 deletions e2e/fixtures/fdb_cluster_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (factory *Factory) createFDBClusterSpec(
IgnoreLogGroupsForUpgrade: []fdbv1beta2.LogGroup{
"fdb-kubernetes-operator",
},
UseLocalitiesForExclusion: pointer.Bool(config.UseLocalityBasedExclusions),
},
Routing: fdbv1beta2.RoutingConfig{
UseDNSInClusterFile: pointer.Bool(false),
Expand Down
38 changes: 34 additions & 4 deletions e2e/test_operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,25 +353,55 @@ var _ = Describe("Operator", Label("e2e", "pr"), func() {
Expect(processGroup.FaultDomain).NotTo(BeEmpty())
}

// TODO (johscheuer): We should check here further fields in the FoundationDBCluter resource to make sure the
// TODO (johscheuer): We should check here further fields in the FoundationDBCluster resource to make sure the
// fields that we expect are actually set.
})

When("replacing a Pod", func() {
var replacedPod corev1.Pod
var useLocalitiesForExclusion bool

BeforeEach(func() {
JustBeforeEach(func() {
initialPods := fdbCluster.GetStatelessPods()
replacedPod = fixtures.RandomPickOnePod(initialPods.Items)
fdbCluster.ReplacePod(replacedPod, true)
})

BeforeEach(func() {
useLocalitiesForExclusion = pointer.BoolDeref(fdbCluster.GetCluster().Spec.AutomationOptions.UseLocalitiesForExclusion, false)
})

AfterEach(func() {
Expect(fdbCluster.ClearProcessGroupsToRemove()).NotTo(HaveOccurred())
// Make sure we reset the previous behaviour.
spec := fdbCluster.GetCluster().Spec.DeepCopy()
spec.AutomationOptions.UseLocalitiesForExclusion = pointer.Bool(useLocalitiesForExclusion)
fdbCluster.UpdateClusterSpecWithSpec(spec)
})

It("should remove the targeted Pod", func() {
fdbCluster.EnsurePodIsDeleted(replacedPod.Name)
When("IP addresses are used for exclusion", func() {
BeforeEach(func() {
spec := fdbCluster.GetCluster().Spec.DeepCopy()
spec.AutomationOptions.UseLocalitiesForExclusion = pointer.Bool(false)
fdbCluster.UpdateClusterSpecWithSpec(spec)
})

It("should remove the targeted Pod", func() {
fdbCluster.EnsurePodIsDeleted(replacedPod.Name)
})
})

When("localities are used for exclusion", func() {
BeforeEach(func() {
spec := fdbCluster.GetCluster().Spec.DeepCopy()
spec.AutomationOptions.UseLocalitiesForExclusion = pointer.Bool(true)
fdbCluster.UpdateClusterSpecWithSpec(spec)
})

It("should remove the targeted Pod", func() {
Expect(pointer.BoolDeref(fdbCluster.GetCluster().Spec.AutomationOptions.UseLocalitiesForExclusion, false)).To(BeTrue())
fdbCluster.EnsurePodIsDeleted(replacedPod.Name)
})
})
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Each test will create a new FoundationDB cluster which will be upgraded.
*/

import (
"k8s.io/utils/pointer"
"log"
"time"

Expand Down Expand Up @@ -313,4 +314,59 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {
EntryDescription("Upgrade from %[1]s to %[2]s"),
fixtures.GenerateUpgradeTableEntries(testOptions),
)

DescribeTable(
"with locality based exclusions",
func(beforeVersion string, targetVersion string) {
clusterSetupWithConfig(beforeVersion, true, &fixtures.ClusterConfig{
DebugSymbols: false,
UseLocalityBasedExclusions: true,
})

Expect(fdbCluster.UpgradeCluster(targetVersion, false)).NotTo(HaveOccurred())
Expect(pointer.BoolDeref(fdbCluster.GetCluster().Spec.AutomationOptions.UseLocalitiesForExclusion, false)).To(BeTrue())

if !fixtures.VersionsAreProtocolCompatible(beforeVersion, targetVersion) {
// Ensure that the operator is setting the IncorrectConfigMap and IncorrectCommandLine conditions during the upgrade
// process.
expectedConditions := map[fdbv1beta2.ProcessGroupConditionType]bool{
fdbv1beta2.IncorrectConfigMap: true,
fdbv1beta2.IncorrectCommandLine: true,
}
Eventually(func() bool {
cluster := fdbCluster.GetCluster()

for _, processGroup := range cluster.Status.ProcessGroups {
if !processGroup.MatchesConditions(expectedConditions) {
return false
}
}

return true
}).WithTimeout(10 * time.Minute).WithPolling(5 * time.Second).Should(BeTrue())
}

transactionSystemProcessGroups := make(map[fdbv1beta2.ProcessGroupID]fdbv1beta2.None)
// Wait until the cluster is upgraded and fully reconciled.
Expect(fdbCluster.WaitUntilWithForceReconcile(2, 600, func(cluster *fdbv1beta2.FoundationDBCluster) bool {
for _, processGroup := range cluster.Status.ProcessGroups {
if processGroup.ProcessClass == fdbv1beta2.ProcessClassStorage {
continue
}
transactionSystemProcessGroups[processGroup.ProcessGroupID] = fdbv1beta2.None{}
}

// Allow soft reconciliation and make sure the running version was updated
return cluster.Status.Generations.Reconciled == cluster.Generation && cluster.Status.RunningVersion == targetVersion
})).NotTo(HaveOccurred())

// Get the desired process counts based on the current cluster configuration
processCounts, err := fdbCluster.GetProcessCounts()
Expect(err).NotTo(HaveOccurred())
expectedProcessCounts := (processCounts.Total()-processCounts.Storage)*2 + 5
Expect(len(transactionSystemProcessGroups)).To(BeNumerically("<=", expectedProcessCounts))
},
EntryDescription("Upgrade from %[1]s to %[2]s"),
fixtures.GenerateUpgradeTableEntries(testOptions),
)
})
34 changes: 25 additions & 9 deletions pkg/fdbstatus/status_checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/FoundationDB/fdb-kubernetes-operator/pkg/fdbadminclient"
"github.com/go-logr/logr"
"math"
"strings"
)

// forbiddenStatusMessages represents messages that could be part of the machine-readable status. Those messages can represent
Expand Down Expand Up @@ -62,6 +63,7 @@ type exclusionStatus struct {

// getRemainingAndExcludedFromStatus checks which processes of the input address list are excluded in the cluster and which are not.
func getRemainingAndExcludedFromStatus(logger logr.Logger, status *fdbv1beta2.FoundationDBStatus, addresses []fdbv1beta2.ProcessAddress) exclusionStatus {
logger.V(1).Info("Verify if exclusions are done", "addresses", addresses)
notExcludedAddresses := map[string]fdbv1beta2.None{}
fullyExcludedAddresses := map[string]int{}
visitedAddresses := map[string]int{}
Expand Down Expand Up @@ -107,25 +109,39 @@ func getRemainingAndExcludedFromStatus(logger logr.Logger, status *fdbv1beta2.Fo
}

addressesToVerify := map[string]fdbv1beta2.None{}
useLocalities := false
for _, addr := range addresses {
addressesToVerify[addr.MachineAddress()] = fdbv1beta2.None{}
address := addr.MachineAddress()
addressesToVerify[address] = fdbv1beta2.None{}

if !useLocalities {
useLocalities = strings.HasPrefix(address, fdbv1beta2.FDBLocalityExclusionPrefix)
}
}

// Check in the status output which processes are already marked for exclusion in the cluster
for _, process := range status.Cluster.Processes {
if _, ok := addressesToVerify[process.Address.MachineAddress()]; !ok {
var address string

if useLocalities {
address = fmt.Sprintf("%s:%s", fdbv1beta2.FDBLocalityExclusionPrefix, process.Locality[fdbv1beta2.FDBLocalityInstanceIDKey])
} else {
address = process.Address.MachineAddress()
}

if _, ok := addressesToVerify[address]; !ok {
continue
}

visitedAddresses[process.Address.MachineAddress()]++
visitedAddresses[address]++
if !process.Excluded {
notExcludedAddresses[process.Address.MachineAddress()] = fdbv1beta2.None{}
notExcludedAddresses[address] = fdbv1beta2.None{}
continue
}

if len(process.Roles) == 0 {
logger.Info("found fully excluded process without any roles", "process", process)
fullyExcludedAddresses[process.Address.MachineAddress()]++
fullyExcludedAddresses[address]++
}
}

Expand All @@ -137,17 +153,17 @@ func getRemainingAndExcludedFromStatus(logger logr.Logger, status *fdbv1beta2.Fo
}

for _, addr := range addresses {
machine := addr.MachineAddress()
address := addr.MachineAddress()
// If we didn't visit that address (absent in the cluster status) we assume it's safe to run the exclude command against it.
// We have to run the exclude command against those addresses, to make sure they are not serving any roles.
visitedCount, visited := visitedAddresses[machine]
visitedCount, visited := visitedAddresses[address]
if !visited {
exclusions.missingInStatus = append(exclusions.missingInStatus, addr)
continue
}

// Those addresses are not excluded, so it's not safe to start the exclude command to check if they are fully excluded.
if _, ok := notExcludedAddresses[machine]; ok {
if _, ok := notExcludedAddresses[address]; ok {
exclusions.notExcluded = append(exclusions.notExcluded, addr)
continue
}
Expand All @@ -163,7 +179,7 @@ func getRemainingAndExcludedFromStatus(logger logr.Logger, status *fdbv1beta2.Fo
exclusions.fullyExcluded = append(exclusions.fullyExcluded, addr)
continue
}
logger.Info("found excluded addresses for machine, but not all processes are fully excluded", "visitedCount", visitedCount, "excludedCount", excludedCount, "machine", machine)
logger.Info("found excluded addresses for machine, but not all processes are fully excluded", "visitedCount", visitedCount, "excludedCount", excludedCount, "address", address)
}

// Those are the processes that are marked as excluded but still serve at least one role.
Expand Down
29 changes: 28 additions & 1 deletion pkg/fdbstatus/status_checks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,21 @@ var _ = Describe("status_checks", func() {
"1": {
Address: addr1,
Excluded: true,
Locality: map[string]string{
fdbv1beta2.FDBLocalityInstanceIDKey: "1",
},
},
"2": {
Address: addr2,
Locality: map[string]string{
fdbv1beta2.FDBLocalityInstanceIDKey: "2",
},
},
"3": {
Address: addr3,
Locality: map[string]string{
fdbv1beta2.FDBLocalityInstanceIDKey: "3",
},
},
"4": {
Address: addr4,
Expand All @@ -63,11 +72,13 @@ var _ = Describe("status_checks", func() {
Role: "tester",
},
},
Locality: map[string]string{
fdbv1beta2.FDBLocalityInstanceIDKey: "4",
},
},
},
},
}

DescribeTable("fetching the excluded and remaining processes from the status",
func(status *fdbv1beta2.FoundationDBStatus,
addresses []fdbv1beta2.ProcessAddress,
Expand Down Expand Up @@ -556,6 +567,22 @@ var _ = Describe("status_checks", func() {
nil,
nil,
),
Entry("when the process is excluded and locality based exclusions are used",
status,
[]fdbv1beta2.ProcessAddress{fdbv1beta2.NewProcessAddress(net.IP{}, "locality_instance_id:4", 0, nil)},
[]fdbv1beta2.ProcessAddress{fdbv1beta2.NewProcessAddress(net.IP{}, "locality_instance_id:4", 0, nil)},
nil,
nil,
nil,
),
Entry("when the process is not excluded and locality based exclusions are used",
status,
[]fdbv1beta2.ProcessAddress{fdbv1beta2.NewProcessAddress(net.IP{}, "locality_instance_id:3", 0, nil)},
nil,
[]fdbv1beta2.ProcessAddress{fdbv1beta2.NewProcessAddress(net.IP{}, "locality_instance_id:3", 0, nil)},
nil,
nil,
),
)
})

Expand Down

0 comments on commit 67e5df5

Please sign in to comment.