Skip to content

Commit

Permalink
Check if data loader is running and make sure cluster has no data loss
Browse files Browse the repository at this point in the history
  • Loading branch information
johscheuer committed Aug 2, 2023
1 parent 53a0fb5 commit ab7dc95
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 0 deletions.
31 changes: 31 additions & 0 deletions e2e/fixtures/fdb_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
ctx "context"
"fmt"
"log"
"math"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -1221,3 +1222,33 @@ func (fdbCluster *FdbCluster) SetBuggifyBlockRemoval(blockRemovals []fdbv1beta2.
func (fdbCluster *FdbCluster) GetAutomationOptions() fdbv1beta2.FoundationDBClusterAutomationOptions {
return fdbCluster.cluster.Spec.AutomationOptions
}

// EnsureTeamTrackersAreHealthy will check if the machine-readable status suggest that the team trackers are healthy
// and all data is present.
func (fdbCluster *FdbCluster) EnsureTeamTrackersAreHealthy() {
gomega.Eventually(func() bool {
for _, tracker := range fdbCluster.GetStatus().Cluster.Data.TeamTrackers {
if !tracker.State.Healthy {
return false
}
}

return true
}).WithTimeout(1 * time.Minute).WithPolling(1 * time.Second).MustPassRepeatedly(5).Should(gomega.BeTrue())
}

// EnsureTeamTrackersHaveMinReplicas will check if the machine-readable status suggest that the team trackers min_replicas
// match the expected replicas.
func (fdbCluster *FdbCluster) EnsureTeamTrackersHaveMinReplicas() {
desiredFaultTolerance := fdbCluster.GetCachedCluster().DesiredFaultTolerance()
gomega.Eventually(func() int {
minReplicas := math.MaxInt
for _, tracker := range fdbCluster.GetStatus().Cluster.Data.TeamTrackers {
if minReplicas > tracker.State.MinReplicasRemaining {
minReplicas = tracker.State.MinReplicasRemaining
}
}

return minReplicas
}).WithTimeout(1 * time.Minute).WithPolling(1 * time.Second).MustPassRepeatedly(5).Should(gomega.BeNumerically(">=", desiredFaultTolerance))
}
31 changes: 31 additions & 0 deletions e2e/fixtures/fdb_data_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,22 @@ package fixtures

import (
"bytes"
"context"
"errors"
"github.com/onsi/gomega"
"io"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer/yaml"
yamlutil "k8s.io/apimachinery/pkg/util/yaml"
"sigs.k8s.io/controller-runtime/pkg/client"
"text/template"
"time"
)

const (
// For now we only load 2GB into the cluster, we can increase this later if we want.
dataLoaderJob = `apiVersion: batch/v1
kind: Job
metadata:
Expand Down Expand Up @@ -194,4 +199,30 @@ func (factory *Factory) CreateDataLoaderIfAbsent(cluster *FdbCluster) {
factory.CreateIfAbsent(unstructuredObj),
).NotTo(gomega.HaveOccurred())
}

factory.WaitUntilDataLoaderIsRunning(cluster)
}

// WaitUntilDataLoaderIsRunning will wait until at least one data loader Pod is running.
func (factory *Factory) WaitUntilDataLoaderIsRunning(cluster *FdbCluster) {
gomega.Eventually(func() int {
pods := &corev1.PodList{}
gomega.Expect(
factory.controllerRuntimeClient.List(
context.Background(),
pods,
client.InNamespace(cluster.Namespace()),
client.MatchingLabels(map[string]string{"job-name": "fdb-data-loader"}),
),
).NotTo(gomega.HaveOccurred())

var runningPods int
for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodRunning {
runningPods++
}
}

return runningPods
}).WithTimeout(5 * time.Minute).WithPolling(5 * time.Second).Should(gomega.BeNumerically(">", 0))
}
3 changes: 3 additions & 0 deletions e2e/test_operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ var _ = Describe("Operator", Label("e2e", "pr"), func() {
}
Expect(fdbCluster.WaitForReconciliation()).ToNot(HaveOccurred())
factory.StopInvariantCheck()
// Make sure all data is present in the cluster
fdbCluster.EnsureTeamTrackersAreHealthy()
fdbCluster.EnsureTeamTrackersHaveMinReplicas()
})

JustBeforeEach(func() {
Expand Down
18 changes: 18 additions & 0 deletions e2e/test_operator_ha_upgrades/operator_ha_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ var _ = Describe("Operator HA Upgrades", Label("e2e", "pr"), func() {
// may get bounced at different times and also because of cluster controller
// change, which happens a number of times, during an upgrade).
Expect(finalGeneration).To(BeNumerically("<=", initialGeneration+80))
// Make sure the cluster has no data loss
fdbCluster.GetPrimary().EnsureTeamTrackersAreHealthy()
fdbCluster.GetPrimary().EnsureTeamTrackersHaveMinReplicas()
},
EntryDescription("Upgrade from %[1]s to %[2]s"),
fixtures.GenerateUpgradeTableEntries(testOptions),
Expand Down Expand Up @@ -377,6 +380,9 @@ var _ = Describe("Operator HA Upgrades", Label("e2e", "pr"), func() {
// Upgrade should make progress now - wait until all processes have upgraded
// to "targetVersion".
checkVersion(fdbCluster, targetVersion)

// Make sure the cluster has no data loss
fdbCluster.GetPrimary().EnsureTeamTrackersHaveMinReplicas()
},
EntryDescription("Upgrade, with a temporary partition, from %s to %s"),
fixtures.GenerateUpgradeTableEntries(testOptions),
Expand Down Expand Up @@ -439,6 +445,9 @@ var _ = Describe("Operator HA Upgrades", Label("e2e", "pr"), func() {
factory.DeletePod(randomPod)
return false
}).WithTimeout(30 * time.Minute).WithPolling(2 * time.Minute).Should(BeTrue())

// Make sure the cluster has no data loss
fdbCluster.GetPrimary().EnsureTeamTrackersHaveMinReplicas()
},
EntryDescription(
"Upgrade, with a random pod deleted during the staging phase, from %s to %s",
Expand Down Expand Up @@ -478,6 +487,9 @@ var _ = Describe("Operator HA Upgrades", Label("e2e", "pr"), func() {
}, "20")

upgradeAndVerify(fdbCluster, targetVersion)

// Make sure the cluster has no data loss
fdbCluster.GetPrimary().EnsureTeamTrackersHaveMinReplicas()
},
EntryDescription("Upgrade from %[1]s to %[2]s with network link that drops some packets"),
fixtures.GenerateUpgradeTableEntries(testOptions),
Expand Down Expand Up @@ -523,6 +535,9 @@ var _ = Describe("Operator HA Upgrades", Label("e2e", "pr"), func() {

// The cluster should still be able to upgrade.
Expect(fdbCluster.UpgradeCluster(targetVersion, true)).NotTo(HaveOccurred())

// Make sure the cluster has no data loss
fdbCluster.GetPrimary().EnsureTeamTrackersHaveMinReplicas()
},
EntryDescription("Upgrade from %[1]s to %[2]s when no remote storage processes are restarted"),
fixtures.GenerateUpgradeTableEntries(testOptions),
Expand Down Expand Up @@ -570,6 +585,9 @@ var _ = Describe("Operator HA Upgrades", Label("e2e", "pr"), func() {

// The cluster should still be able to upgrade.
Expect(fdbCluster.UpgradeCluster(targetVersion, true)).NotTo(HaveOccurred())

// Make sure the cluster has no data loss
fdbCluster.GetPrimary().EnsureTeamTrackersHaveMinReplicas()
},
EntryDescription("Upgrade from %[1]s to %[2]s when no remote processes are restarted"),
fixtures.GenerateUpgradeTableEntries(testOptions),
Expand Down
48 changes: 48 additions & 0 deletions e2e/test_operator_upgrades/operator_upgrades_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {
// replacements during an upgrade.
expectedProcessCounts := (processCounts.Total()-processCounts.Storage)*2 + 5
Expect(len(transactionSystemProcessGroups)).To(BeNumerically("<=", expectedProcessCounts))

// Make sure the cluster has no data loss.
fdbCluster.EnsureTeamTrackersHaveMinReplicas()
},
EntryDescription("Upgrade from %[1]s to %[2]s"),
fixtures.GenerateUpgradeTableEntries(testOptions),
Expand Down Expand Up @@ -222,6 +225,9 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {
log.Println("deleting chaos experiment and cluster should upgrade")
factory.DeleteChaosMeshExperimentSafe(exp)
verifyVersion(fdbCluster, targetVersion)

// Make sure the cluster has no data loss.
fdbCluster.EnsureTeamTrackersHaveMinReplicas()
},

EntryDescription("Upgrade from %[1]s to %[2]s with partitioned Pod"),
Expand Down Expand Up @@ -288,6 +294,9 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {

// 3. Upgrade should proceed without removing the partition, as Pod will be replaced.
verifyVersion(fdbCluster, targetVersion)

// Make sure the cluster has no data loss.
fdbCluster.EnsureTeamTrackersHaveMinReplicas()
},

EntryDescription(
Expand Down Expand Up @@ -358,6 +367,9 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {

// 5. Verify a final time that the cluster is reconciled, this should be quick.
Expect(fdbCluster.WaitForReconciliation()).NotTo(HaveOccurred())

// Make sure the cluster has no data loss.
fdbCluster.EnsureTeamTrackersHaveMinReplicas()
},

EntryDescription("Upgrade from %[1]s to %[2]s with pods deleted during rolling bounce"),
Expand Down Expand Up @@ -433,6 +445,9 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {

// Allow the operator to restart processes and the upgrade should continue and finish.
fdbCluster.SetKillProcesses(true)

// Make sure the cluster has no data loss.
fdbCluster.EnsureTeamTrackersHaveMinReplicas()
},

EntryDescription(
Expand Down Expand Up @@ -496,6 +511,9 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {

// 5. Upgrade should proceed after we stop killing the sidecar.
verifyVersion(fdbCluster, targetVersion)

// Make sure the cluster has no data loss.
fdbCluster.EnsureTeamTrackersHaveMinReplicas()
},
EntryDescription("Upgrade from %[1]s to %[2]s with crash-looping sidecar"),
fixtures.GenerateUpgradeTableEntries(testOptions),
Expand Down Expand Up @@ -541,6 +559,9 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {
Eventually(func() []string {
return fdbCluster.GetStatus().Cluster.IncompatibleConnections
}).WithTimeout(5 * time.Minute).WithPolling(5 * time.Second).MustPassRepeatedly(5).Should(BeEmpty())

// Make sure the cluster has no data loss.
fdbCluster.EnsureTeamTrackersHaveMinReplicas()
},
EntryDescription("Upgrade from %[1]s to %[2]s with one coordinator not being restarted"),
fixtures.GenerateUpgradeTableEntries(testOptions),
Expand Down Expand Up @@ -604,6 +625,9 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {

// The cluster should still be able to upgrade.
Expect(fdbCluster.UpgradeCluster(targetVersion, true)).NotTo(HaveOccurred())

// Make sure the cluster has no data loss.
fdbCluster.EnsureTeamTrackersHaveMinReplicas()
},
EntryDescription("Upgrade from %[1]s to %[2]s and multiple processes are not restarted"),
fixtures.GenerateUpgradeTableEntries(testOptions),
Expand Down Expand Up @@ -650,6 +674,9 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {

// 3. Upgrade should finish.
verifyVersion(fdbCluster, targetVersion)

// Make sure the cluster has no data loss.
fdbCluster.EnsureTeamTrackersHaveMinReplicas()
},
EntryDescription("Upgrade from %[1]s to %[2]s with network link that drops some packets"),
fixtures.GenerateUpgradeTableEntries(testOptions),
Expand Down Expand Up @@ -824,6 +851,9 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {

// Ensure the upgrade proceeds and is able to finish.
verifyVersion(fdbCluster, targetVersion)

// Make sure the cluster has no data loss.
fdbCluster.EnsureTeamTrackersHaveMinReplicas()
},
EntryDescription("Upgrade from %[1]s to %[2]s and one process has the fdbmonitor.conf file not ready"),
fixtures.GenerateUpgradeTableEntries(testOptions),
Expand Down Expand Up @@ -930,6 +960,9 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {

// Ensure the upgrade proceeds and is able to finish.
verifyVersion(fdbCluster, targetVersion)

// Make sure the cluster has no data loss.
fdbCluster.EnsureTeamTrackersHaveMinReplicas()
},
EntryDescription("Upgrade from %[1]s to %[2]s and one process is missing the new binary"),
fixtures.GenerateUpgradeTableEntries(testOptions),
Expand Down Expand Up @@ -1010,6 +1043,9 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {

return processesToUpdate
}).WithTimeout(30 * time.Minute).WithPolling(5 * time.Second).MustPassRepeatedly(5).Should(BeNumerically("==", 0))

// Make sure the cluster has no data loss.
fdbCluster.EnsureTeamTrackersHaveMinReplicas()
},
EntryDescription("Upgrade from %[1]s to %[2]s"),
fixtures.GenerateUpgradeTableEntries(testOptions),
Expand Down Expand Up @@ -1060,6 +1096,9 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {

// Make sure we remove the finalizer to not block the clean up.
Expect(factory.SetFinalizerForPod(&podMarkedForRemoval, []string{})).ShouldNot(HaveOccurred())

// Make sure the cluster has no data loss.
fdbCluster.EnsureTeamTrackersHaveMinReplicas()
},
EntryDescription("Upgrade from %[1]s to %[2]s"),
fixtures.GenerateUpgradeTableEntries(testOptions),
Expand Down Expand Up @@ -1134,6 +1173,9 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {
// replacements during an upgrade.
expectedProcessCounts := (processCounts.Total()-processCounts.Storage)*2 + 5
Expect(len(transactionSystemProcessGroups)).To(BeNumerically("<=", expectedProcessCounts))

// Make sure the cluster has no data loss.
fdbCluster.EnsureTeamTrackersHaveMinReplicas()
},
EntryDescription("Upgrade from %[1]s to %[2]s"),
fixtures.GenerateUpgradeTableEntries(testOptions),
Expand Down Expand Up @@ -1190,6 +1232,9 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {
Expect(err).NotTo(HaveOccurred())
expectedProcessCounts := (processCounts.Total()-processCounts.Storage)*2 + 5
Expect(len(transactionSystemProcessGroups)).To(BeNumerically("<=", expectedProcessCounts))

// Make sure the cluster has no data loss.
fdbCluster.EnsureTeamTrackersHaveMinReplicas()
},
EntryDescription("Upgrade from %[1]s to %[2]s"),
fixtures.GenerateUpgradeTableEntries(testOptions),
Expand Down Expand Up @@ -1241,6 +1286,9 @@ var _ = Describe("Operator Upgrades", Label("e2e", "pr"), func() {
}).WithTimeout(5 * time.Minute).WithPolling(1 * time.Second).MustPassRepeatedly(5).Should(Not(BeEmpty()))

verifyVersion(fdbCluster, targetVersion)

// Make sure the cluster has no data loss.
fdbCluster.EnsureTeamTrackersHaveMinReplicas()
},
EntryDescription("Upgrade from %[1]s to %[2]s"),
fixtures.GenerateUpgradeTableEntries(testOptions),
Expand Down

0 comments on commit ab7dc95

Please sign in to comment.