From e63f40f17797cccdb68d7b3668b99601d68fb226 Mon Sep 17 00:00:00 2001 From: Morven Cao Date: Tue, 25 Jun 2024 11:45:38 +0800 Subject: [PATCH] fix maestro server resync after reconnecting. (#139) Signed-off-by: morvencao --- pkg/client/cloudevents/source_client.go | 5 + pkg/client/cloudevents/source_client_mock.go | 4 + pkg/dispatcher/hash_dispatcher.go | 20 ++ pkg/dispatcher/noop_dispatcher.go | 31 +++ test/e2e/pkg/status_resync_test.go | 194 ++++++++++++++++++- 5 files changed, 253 insertions(+), 1 deletion(-) diff --git a/pkg/client/cloudevents/source_client.go b/pkg/client/cloudevents/source_client.go index 85588b9f..2245f4f3 100644 --- a/pkg/client/cloudevents/source_client.go +++ b/pkg/client/cloudevents/source_client.go @@ -22,6 +22,7 @@ type SourceClient interface { OnDelete(ctx context.Context, id string) error Subscribe(ctx context.Context, handlers ...cegeneric.ResourceHandler[*api.Resource]) Resync(ctx context.Context, consumers []string) error + ReconnectedChan() <-chan struct{} } type SourceClientImpl struct { @@ -145,6 +146,10 @@ func (s *SourceClientImpl) Resync(ctx context.Context, consumers []string) error return nil } +func (s *SourceClientImpl) ReconnectedChan() <-chan struct{} { + return s.CloudEventSourceClient.ReconnectedChan() +} + func ResourceStatusHashGetter(res *api.Resource) (string, error) { status, err := api.DecodeStatus(res.Status) if err != nil { diff --git a/pkg/client/cloudevents/source_client_mock.go b/pkg/client/cloudevents/source_client_mock.go index 42324cba..d8566099 100644 --- a/pkg/client/cloudevents/source_client_mock.go +++ b/pkg/client/cloudevents/source_client_mock.go @@ -160,3 +160,7 @@ func (s *SourceClientMock) Subscribe(ctx context.Context, handlers ...cegeneric. func (s *SourceClientMock) Resync(ctx context.Context, consumers []string) error { return nil } + +func (s *SourceClientMock) ReconnectedChan() <-chan struct{} { + return nil +} diff --git a/pkg/dispatcher/hash_dispatcher.go b/pkg/dispatcher/hash_dispatcher.go index ae3b3f69..d618dfcf 100644 --- a/pkg/dispatcher/hash_dispatcher.go +++ b/pkg/dispatcher/hash_dispatcher.go @@ -57,11 +57,31 @@ func (d *HashDispatcher) Start(ctx context.Context) { // start a goroutine to periodically check the instances and consumers. go wait.UntilWithContext(ctx, d.check, 5*time.Second) + // start a goroutine to resync current consumers for this source when the client is reconnected + go d.resyncOnReconnect(ctx) + // wait until context is canceled <-ctx.Done() d.workQueue.ShutDown() } +// resyncOnReconnect listens for the client reconnected signal and resyncs current consumers for this source. +func (d *HashDispatcher) resyncOnReconnect(ctx context.Context) { + log := logger.NewOCMLogger(ctx) + // receive client reconnect signal and resync current consumers for this source + for { + select { + case <-ctx.Done(): + return + case <-d.sourceClient.ReconnectedChan(): + // when receiving a client reconnected signal, we resync current consumers for this source + if err := d.sourceClient.Resync(ctx, d.consumerSet.ToSlice()); err != nil { + log.Error(fmt.Sprintf("failed to resync resourcs status for consumers (%s), %v", d.consumerSet.ToSlice(), err)) + } + } + } +} + // Dispatch checks if the provided consumer ID is owned by the current maestro instance. // It returns true if the consumer is part of the current instance's consumer set; // otherwise, it returns false. diff --git a/pkg/dispatcher/noop_dispatcher.go b/pkg/dispatcher/noop_dispatcher.go index 1de98fa2..f1dd0f66 100644 --- a/pkg/dispatcher/noop_dispatcher.go +++ b/pkg/dispatcher/noop_dispatcher.go @@ -6,6 +6,7 @@ import ( "github.com/openshift-online/maestro/pkg/client/cloudevents" "github.com/openshift-online/maestro/pkg/dao" + "github.com/openshift-online/maestro/pkg/logger" ) var _ Dispatcher = &NoopDispatcher{} @@ -28,6 +29,36 @@ func NewNoopDispatcher(consumerDao dao.ConsumerDao, sourceClient cloudevents.Sou // Start is a no-op implementation. func (d *NoopDispatcher) Start(ctx context.Context) { + // handle client reconnected signal and resync status from consumers for this source + d.resyncOnReconnect(ctx) +} + +// resyncOnReconnect listens for client reconnected signal and resyncs all consumers for this source. +func (d *NoopDispatcher) resyncOnReconnect(ctx context.Context) { + log := logger.NewOCMLogger(ctx) + // receive client reconnect signal and resync current consumers for this source + for { + select { + case <-ctx.Done(): + return + case <-d.sourceClient.ReconnectedChan(): + // when receiving a client reconnected signal, we resync all consumers for this source + // TODO: optimize this to only resync resource status for necessary consumers + consumerIDs := []string{} + consumers, err := d.consumerDao.All(ctx) + if err != nil { + log.Error(fmt.Sprintf("failed to get all consumers: %v", err)) + continue + } + + for _, c := range consumers { + consumerIDs = append(consumerIDs, c.ID) + } + if err := d.sourceClient.Resync(ctx, consumerIDs); err != nil { + log.Error(fmt.Sprintf("failed to resync resourcs status for consumers (%s), %v", consumerIDs, err)) + } + } + } } // Dispatch always returns true, indicating that the current maestro instance should process the resource status update. diff --git a/test/e2e/pkg/status_resync_test.go b/test/e2e/pkg/status_resync_test.go index 8943a22e..afae02e3 100644 --- a/test/e2e/pkg/status_resync_test.go +++ b/test/e2e/pkg/status_resync_test.go @@ -15,12 +15,13 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" ) var _ = Describe("Status resync", Ordered, Label("e2e-tests-status-resync"), func() { var resource *openapi.Resource - var maestroServerReplicas int + var mqttReplicas, maestroServerReplicas int Context("Resource resync resource status after maestro server restarts", func() { @@ -184,4 +185,195 @@ var _ = Describe("Status resync", Ordered, Label("e2e-tests-status-resync"), fun }) }) + + Context("Resource resync resource status after maestro server reconnects", func() { + + It("post the nginx resource with non-default service account to the maestro api", func() { + + res := helper.NewAPIResourceWithSA(consumer_name, 1, "nginx") + var resp *http.Response + var err error + resource, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(context.Background()).Resource(res).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusCreated)) + Expect(*resource.Id).ShouldNot(BeEmpty()) + + Eventually(func() error { + deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + return err + } + if *deploy.Spec.Replicas != 1 { + return fmt.Errorf("unexpected replicas, expected 1, got %d", *deploy.Spec.Replicas) + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + + gotResource, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(context.Background(), *resource.Id).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + Expect(*gotResource.Id).To(Equal(*resource.Id)) + Expect(*gotResource.Version).To(Equal(*resource.Version)) + + Eventually(func() error { + gotResource, _, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(context.Background(), *resource.Id).Execute() + if err != nil { + return err + } + statusJSON, err := json.Marshal(gotResource.Status) + if err != nil { + return err + } + if !strings.Contains(string(statusJSON), "error looking up service account default/nginx") { + return fmt.Errorf("unexpected status, expected error looking up service account default/nginx, got %s", string(statusJSON)) + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("delete the mqtt-broker service for server", func() { + + err := kubeClient.CoreV1().Services("maestro").Delete(context.Background(), "maestro-mqtt-server", metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + }) + + It("create default/nginx serviceaccount", func() { + + _, err := kubeClient.CoreV1().ServiceAccounts("default").Create(context.Background(), &corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx", + }, + }, metav1.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + // delete the nginx deployment to tigger recreating + err = kubeClient.AppsV1().Deployments("default").Delete(context.Background(), "nginx", metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + }) + + It("Rollout the mqtt-broker", func() { + + deploy, err := kubeClient.AppsV1().Deployments("maestro").Get(context.Background(), "maestro-mqtt", metav1.GetOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + mqttReplicas = int(*deploy.Spec.Replicas) + deploy, err = kubeClient.AppsV1().Deployments("maestro").Patch(context.Background(), "maestro-mqtt", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{ + FieldManager: "testKubeClient", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(*deploy.Spec.Replicas).To(Equal(int32(0))) + + // ensure no running mqtt-broker pods + Eventually(func() error { + pods, err := kubeClient.CoreV1().Pods("maestro").List(context.Background(), metav1.ListOptions{ + LabelSelector: "name=maestro-mqtt", + }) + if err != nil { + return err + } + if len(pods.Items) > 0 { + return fmt.Errorf("maestro-mqtt pods still running") + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + + // patch mqtt-broker replicas to mqttReplicas + deploy, err = kubeClient.AppsV1().Deployments("maestro").Patch(context.Background(), "maestro-mqtt", types.MergePatchType, []byte(fmt.Sprintf(`{"spec":{"replicas":%d}}`, mqttReplicas)), metav1.PatchOptions{ + FieldManager: "testKubeClient", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(*deploy.Spec.Replicas).To(Equal(int32(mqttReplicas))) + + // ensure mqtt-broker pod is up and running + Eventually(func() error { + pods, err := kubeClient.CoreV1().Pods("maestro").List(context.Background(), metav1.ListOptions{ + LabelSelector: "name=maestro-mqtt", + }) + if err != nil { + return err + } + if len(pods.Items) != mqttReplicas { + return fmt.Errorf("unexpected maestro-mqtt pod count, expected %d, got %d", mqttReplicas, len(pods.Items)) + } + for _, pod := range pods.Items { + if pod.Status.Phase != "Running" { + return fmt.Errorf("maestro-mqtt pod not in running state") + } + if pod.Status.ContainerStatuses[0].State.Running == nil { + return fmt.Errorf("maestro-mqtt container not in running state") + } + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("recreate the mqtt-broker service for server", func() { + + mqttServerService := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "maestro-mqtt-server", + Namespace: "maestro", + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "name": "maestro-mqtt", + }, + Ports: []corev1.ServicePort{ + { + Name: "mosquitto", + Protocol: corev1.ProtocolTCP, + Port: 1883, + TargetPort: intstr.FromInt(1883), + }, + }, + Type: corev1.ServiceTypeClusterIP, + }, + } + + _, err := kubeClient.CoreV1().Services("maestro").Create(context.Background(), mqttServerService, metav1.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + }) + + It("ensure the resource status is resynced", func() { + + Eventually(func() error { + gotResource, _, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(context.Background(), *resource.Id).Execute() + if err != nil { + return err + } + if _, ok := gotResource.Status["ContentStatus"]; !ok { + return fmt.Errorf("unexpected status, expected contains ContentStatus, got %v", gotResource.Status) + } + statusJSON, err := json.Marshal(gotResource.Status) + if err != nil { + return err + } + if strings.Contains(string(statusJSON), "error looking up service account default/nginx") { + return fmt.Errorf("unexpected status, should not contain error looking up service account default/nginx, got %s", string(statusJSON)) + } + return nil + }, 3*time.Minute, 3*time.Second).ShouldNot(HaveOccurred()) + }) + + It("delete the nginx resource", func() { + + resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdDelete(context.Background(), *resource.Id).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) + + Eventually(func() error { + _, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + return fmt.Errorf("nginx deployment still exists") + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + + err = kubeClient.CoreV1().ServiceAccounts("default").Delete(context.Background(), "nginx", metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + }) + + }) })