Skip to content

Commit

Permalink
Refactor code to make use of the process group directly without passi…
Browse files Browse the repository at this point in the history
…ng the class and id number down
  • Loading branch information
johscheuer committed Sep 21, 2023
1 parent 83bb8cf commit 6970c48
Show file tree
Hide file tree
Showing 15 changed files with 236 additions and 245 deletions.
9 changes: 2 additions & 7 deletions controllers/add_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,9 @@ func (a addPods) reconcile(ctx context.Context, r *FoundationDBClusterReconciler
continue
}

idNum, err := processGroup.ProcessGroupID.GetIDNumber()
pod, err := internal.GetPod(cluster, processGroup)
if err != nil {
return &requeue{curError: err}
}

pod, err := internal.GetPod(cluster, processGroup.ProcessClass, idNum)
if err != nil {
r.Recorder.Event(cluster, corev1.EventTypeWarning, "GetPod", fmt.Sprintf("failed to get the PodSpec for %s/%d with error: %s", processGroup.ProcessClass, idNum, err))
r.Recorder.Event(cluster, corev1.EventTypeWarning, "GetPod", fmt.Sprintf("failed to get the PodSpec for %s with error: %s", processGroup.ProcessGroupID, err))
return &requeue{curError: err}
}

Expand Down
15 changes: 3 additions & 12 deletions controllers/add_pvcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,7 @@ func (a addPVCs) reconcile(ctx context.Context, r *FoundationDBClusterReconciler
continue
}

idNum, err := processGroup.ProcessGroupID.GetIDNumber()
if err != nil {
return &requeue{curError: err}
}

pvc, err := internal.GetPvc(cluster, processGroup.ProcessClass, idNum)
pvc, err := internal.GetPvc(cluster, processGroup)
if err != nil {
return &requeue{curError: err}
}
Expand All @@ -61,19 +56,15 @@ func (a addPVCs) reconcile(ctx context.Context, r *FoundationDBClusterReconciler
err = r.Get(ctx, client.ObjectKey{Namespace: pvc.Namespace, Name: pvc.Name}, existingPVC)
if err != nil {
if !k8serrors.IsNotFound(err) {
return &requeue{curError: err}
return &requeue{curError: err, delayedRequeue: true}
}

owner := internal.BuildOwnerReference(cluster.TypeMeta, cluster.ObjectMeta)
pvc.ObjectMeta.OwnerReferences = owner
logger.V(1).Info("Creating PVC", "name", pvc.Name)
err = r.Create(ctx, pvc)
if err != nil {
if internal.IsQuotaExceeded(err) {
return &requeue{curError: err, delayedRequeue: true}
}

return &requeue{curError: err}
return &requeue{curError: err, delayedRequeue: true}
}
}
}
Expand Down
36 changes: 13 additions & 23 deletions controllers/add_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,23 @@ type addServices struct{}

// reconcile runs the reconciler's work.
func (a addServices) reconcile(ctx context.Context, r *FoundationDBClusterReconciler, cluster *fdbv1beta2.FoundationDBCluster, _ *fdbv1beta2.FoundationDBStatus, logger logr.Logger) *requeue {
service := internal.GetHeadlessService(cluster)
if service != nil {
headlessService := internal.GetHeadlessService(cluster)
if headlessService != nil {
existingService := &corev1.Service{}
err := r.Get(ctx, client.ObjectKey{Namespace: cluster.Namespace, Name: cluster.Name}, existingService)
if err == nil {
err = updateService(ctx, logger, cluster, r, existingService, service)
err = updateService(ctx, logger, cluster, r, existingService, headlessService)
if err != nil {
return &requeue{curError: err}
return &requeue{curError: err, delayedRequeue: true}
}
} else {
if !k8serrors.IsNotFound(err) {
return &requeue{curError: err}
}
owner := internal.BuildOwnerReference(cluster.TypeMeta, cluster.ObjectMeta)
service.ObjectMeta.OwnerReferences = owner
logger.V(1).Info("Creating service", "name", service.Name)
err = r.Create(ctx, service)
headlessService.ObjectMeta.OwnerReferences = owner
logger.V(1).Info("Creating service", "name", headlessService.Name)
err = r.Create(ctx, headlessService)
if err != nil {
return &requeue{curError: err, delayedRequeue: true}
}
Expand All @@ -67,36 +67,26 @@ func (a addServices) reconcile(ctx context.Context, r *FoundationDBClusterReconc
continue
}

idNum, err := processGroup.ProcessGroupID.GetIDNumber()
if err != nil {
return &requeue{curError: err}
}

serviceName, _ := cluster.GetProcessGroupID(processGroup.ProcessClass, idNum)
service, err := internal.GetService(cluster, processGroup.ProcessClass, idNum)
service, err := internal.GetService(cluster, processGroup)
if err != nil {
return &requeue{curError: err}
return &requeue{curError: err, delayedRequeue: true}
}

existingService := &corev1.Service{}
err = r.Get(ctx, client.ObjectKey{Namespace: cluster.Namespace, Name: serviceName}, existingService)
err = r.Get(ctx, client.ObjectKey{Namespace: cluster.Namespace, Name: service.Name}, existingService)
if err == nil {
err = updateService(ctx, logger, cluster, r, existingService, service)
if err != nil {
return &requeue{curError: err}
return &requeue{curError: err, delayedRequeue: true}
}
} else if k8serrors.IsNotFound(err) {
logger.V(1).Info("Creating service", "name", service.Name)
err = r.Create(ctx, service)
if err != nil {
if internal.IsQuotaExceeded(err) {
return &requeue{curError: err, delayedRequeue: true}
}

return &requeue{curError: err}
return &requeue{curError: err, delayedRequeue: true}
}
} else {
return &requeue{curError: err}
return &requeue{curError: err, delayedRequeue: true}
}
}
}
Expand Down
44 changes: 28 additions & 16 deletions controllers/cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1336,10 +1336,10 @@ var _ = Describe("cluster_controller", func() {
Expect(err).NotTo(HaveOccurred())

for _, item := range pods.Items {
id, err := fdbv1beta2.ProcessGroupID(item.Labels[fdbv1beta2.FDBProcessGroupIDLabel]).GetIDNumber()
Expect(err).NotTo(HaveOccurred())

hash, err := internal.GetPodSpecHash(cluster, internal.ProcessClassFromLabels(cluster, item.Labels), id, nil)
hash, err := internal.GetPodSpecHash(cluster, &fdbv1beta2.ProcessGroupStatus{
ProcessGroupID: fdbv1beta2.ProcessGroupID(item.Labels[fdbv1beta2.FDBProcessGroupIDLabel]),
ProcessClass: internal.ProcessClassFromLabels(cluster, item.Labels),
}, nil)
Expect(err).NotTo(HaveOccurred())

configMapHash, err := getConfigMapHash(cluster, internal.GetProcessClassFromMeta(cluster, item.ObjectMeta), &item)
Expand Down Expand Up @@ -1477,10 +1477,10 @@ var _ = Describe("cluster_controller", func() {
err = k8sClient.List(context.TODO(), pods, getListOptions(cluster)...)
Expect(err).NotTo(HaveOccurred())
for _, item := range pods.Items {
id, err := fdbv1beta2.ProcessGroupID(item.Labels[fdbv1beta2.FDBProcessGroupIDLabel]).GetIDNumber()
Expect(err).NotTo(HaveOccurred())

hash, err := internal.GetPodSpecHash(cluster, internal.ProcessClassFromLabels(cluster, item.Labels), id, nil)
hash, err := internal.GetPodSpecHash(cluster, &fdbv1beta2.ProcessGroupStatus{
ProcessGroupID: fdbv1beta2.ProcessGroupID(item.Labels[fdbv1beta2.FDBProcessGroupIDLabel]),
ProcessClass: internal.ProcessClassFromLabels(cluster, item.Labels),
}, nil)
Expect(err).NotTo(HaveOccurred())

configMapHash, err := getConfigMapHash(cluster, internal.GetProcessClassFromMeta(cluster, item.ObjectMeta), &item)
Expand Down Expand Up @@ -1586,10 +1586,10 @@ var _ = Describe("cluster_controller", func() {
Expect(err).NotTo(HaveOccurred())

for _, item := range pods.Items {
id, err := fdbv1beta2.ProcessGroupID(item.Labels[fdbv1beta2.FDBProcessGroupIDLabel]).GetIDNumber()
Expect(err).NotTo(HaveOccurred())

hash, err := internal.GetPodSpecHash(cluster, internal.ProcessClassFromLabels(cluster, item.Labels), id, nil)
hash, err := internal.GetPodSpecHash(cluster, &fdbv1beta2.ProcessGroupStatus{
ProcessGroupID: fdbv1beta2.ProcessGroupID(item.Labels[fdbv1beta2.FDBProcessGroupIDLabel]),
ProcessClass: internal.ProcessClassFromLabels(cluster, item.Labels),
}, nil)
Expect(err).NotTo(HaveOccurred())

configMapHash, err := getConfigMapHash(cluster, internal.GetProcessClassFromMeta(cluster, item.ObjectMeta), &item)
Expand Down Expand Up @@ -3671,7 +3671,10 @@ var _ = Describe("cluster_controller", func() {
Context("with a default pod", func() {
BeforeEach(func() {
var err error
pod, err = internal.GetPod(cluster, "storage", 1)
pod, err = internal.GetPod(cluster, &fdbv1beta2.ProcessGroupStatus{
ProcessClass: fdbv1beta2.ProcessClassStorage,
ProcessGroupID: "storage-1",
})
Expect(err).NotTo(HaveOccurred())
pod.Status.PodIP = "1.1.1.1"
pod.Status.PodIPs = []corev1.PodIP{
Expand All @@ -3690,7 +3693,10 @@ var _ = Describe("cluster_controller", func() {
BeforeEach(func() {
var err error
cluster.Spec.Routing.PodIPFamily = pointer.Int(6)
pod, err = internal.GetPod(cluster, "storage", 1)
pod, err = internal.GetPod(cluster, &fdbv1beta2.ProcessGroupStatus{
ProcessClass: fdbv1beta2.ProcessClassStorage,
ProcessGroupID: "storage-1",
})
Expect(err).NotTo(HaveOccurred())
pod.Status.PodIP = "1.1.1.1"
pod.Status.PodIPs = []corev1.PodIP{
Expand All @@ -3707,7 +3713,10 @@ var _ = Describe("cluster_controller", func() {
Context("with no matching IPs in the Pod IP list", func() {
BeforeEach(func() {
var err error
pod, err = internal.GetPod(cluster, "storage", 1)
pod, err = internal.GetPod(cluster, &fdbv1beta2.ProcessGroupStatus{
ProcessClass: fdbv1beta2.ProcessClassStorage,
ProcessGroupID: "storage-1",
})
Expect(err).NotTo(HaveOccurred())
pod.Status.PodIPs = []corev1.PodIP{
{IP: "1.1.1.2"},
Expand All @@ -3725,7 +3734,10 @@ var _ = Describe("cluster_controller", func() {
BeforeEach(func() {
var err error
cluster.Spec.Routing.PodIPFamily = pointer.Int(4)
pod, err = internal.GetPod(cluster, "storage", 1)
pod, err = internal.GetPod(cluster, &fdbv1beta2.ProcessGroupStatus{
ProcessClass: fdbv1beta2.ProcessClassStorage,
ProcessGroupID: "storage-1",
})
Expect(err).NotTo(HaveOccurred())
pod.Status.PodIP = "1.1.1.2"
pod.Status.PodIPs = []corev1.PodIP{
Expand Down
10 changes: 1 addition & 9 deletions controllers/update_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,7 @@ func getPodsToUpdate(ctx context.Context, logger logr.Logger, reconciler *Founda
return nil, fmt.Errorf("cluster has Pod %s that is pending deletion", pod.Name)
}

idNum, err := processGroup.ProcessGroupID.GetIDNumber()
if err != nil {
logger.Info("Skipping Pod due to error parsing Process Group ID",
"processGroupID", processGroup.ProcessGroupID,
"error", err.Error())
continue
}

specHash, err := internal.GetPodSpecHash(cluster, processGroup.ProcessClass, idNum, nil)
specHash, err := internal.GetPodSpecHash(cluster, processGroup, nil)
if err != nil {
logger.Info("Skipping Pod due to error generating spec hash",
"processGroupID", processGroup.ProcessGroupID,
Expand Down
9 changes: 2 additions & 7 deletions controllers/update_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,12 +544,7 @@ func validateProcessGroup(ctx context.Context, r *FoundationDBClusterReconciler,
return nil
}

idNum, err := processGroupStatus.ProcessGroupID.GetIDNumber()
if err != nil {
return err
}

specHash, err := internal.GetPodSpecHash(cluster, processGroupStatus.ProcessClass, idNum, nil)
specHash, err := internal.GetPodSpecHash(cluster, processGroupStatus, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -582,7 +577,7 @@ func validateProcessGroup(ctx context.Context, r *FoundationDBClusterReconciler,

processGroupStatus.UpdateCondition(fdbv1beta2.IncorrectConfigMap, !synced)

desiredPvc, err := internal.GetPvc(cluster, processGroupStatus.ProcessClass, idNum)
desiredPvc, err := internal.GetPvc(cluster, processGroupStatus)
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion internal/monitor_conf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,10 @@ var _ = Describe("monitor_conf", func() {
var processGroupID = "storage-1"

BeforeEach(func() {
pod, err = GetPod(cluster, fdbv1beta2.ProcessClassStorage, 1)
pod, err = GetPod(cluster, &fdbv1beta2.ProcessGroupStatus{
ProcessClass: processClass,
ProcessGroupID: fdbv1beta2.ProcessGroupID(processGroupID),
})
Expect(err).NotTo(HaveOccurred())
address = pod.Status.PodIP
})
Expand Down
4 changes: 2 additions & 2 deletions internal/pod_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var _ = Describe("pod_client", func() {
})

It("should not have TLS sidecar TLS", func() {
pod, err := GetPod(cluster, fdbv1beta2.ProcessClassStorage, 1)
pod, err := GetPod(cluster, GetProcessGroup(cluster, fdbv1beta2.ProcessClassStorage, 1))
Expect(err).NotTo(HaveOccurred())
Expect(podHasSidecarTLS(pod)).To(BeFalse())
})
Expand All @@ -58,7 +58,7 @@ var _ = Describe("pod_client", func() {
})

It("should have TLS sidecar TLS", func() {
pod, err := GetPod(cluster, fdbv1beta2.ProcessClassStorage, 1)
pod, err := GetPod(cluster, GetProcessGroup(cluster, fdbv1beta2.ProcessClassStorage, 1))
Expect(err).NotTo(HaveOccurred())
Expect(podHasSidecarTLS(pod)).To(BeTrue())
})
Expand Down
5 changes: 3 additions & 2 deletions internal/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ func GetProcessGroupIDFromMeta(cluster *fdbv1beta2.FoundationDBCluster, metadata
}

// GetPodSpecHash builds the hash of the expected spec for a pod.
func GetPodSpecHash(cluster *fdbv1beta2.FoundationDBCluster, processClass fdbv1beta2.ProcessClass, id int, spec *corev1.PodSpec) (string, error) {
func GetPodSpecHash(cluster *fdbv1beta2.FoundationDBCluster, processGroup *fdbv1beta2.ProcessGroupStatus, spec *corev1.PodSpec) (string, error) {
var err error
if spec == nil {
spec, err = GetPodSpec(cluster, processClass, id)
spec, err = GetPodSpec(cluster, processGroup)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -197,6 +197,7 @@ func GetPvcMetadata(cluster *fdbv1beta2.FoundationDBCluster, processClass fdbv1b
} else {
customMetadata = nil
}

return GetObjectMetadata(cluster, customMetadata, processClass, id)
}

Expand Down
Loading

0 comments on commit 6970c48

Please sign in to comment.