diff --git a/Makefile b/Makefile index d0c0f147..ffb1794e 100755 --- a/Makefile +++ b/Makefile @@ -410,7 +410,7 @@ e2e-test/teardown: .PHONY: e2e-test/teardown e2e-test: e2e-test/teardown e2e-test/setup - ginkgo --output-dir="${PWD}/test/e2e/report" --json-report=report.json --junit-report=report.xml \ + ginkgo -v --output-dir="${PWD}/test/e2e/report" --json-report=report.json --junit-report=report.xml \ ${PWD}/test/e2e/pkg -- -consumer_name=$(shell cat ${PWD}/test/e2e/.consumer_name) \ -api-server=https://$(shell cat ${PWD}/test/e2e/.external_host_ip):30080 \ -grpc-server=$(shell cat ${PWD}/test/e2e/.external_host_ip):30090 \ diff --git a/pkg/client/cloudevents/source_client.go b/pkg/client/cloudevents/source_client.go index 85588b9f..e05ea23d 100644 --- a/pkg/client/cloudevents/source_client.go +++ b/pkg/client/cloudevents/source_client.go @@ -33,6 +33,7 @@ type SourceClientImpl struct { func NewSourceClient(sourceOptions *ceoptions.CloudEventsSourceOptions, resourceService services.ResourceService) (SourceClient, error) { ctx := context.Background() + log := logger.NewOCMLogger(ctx) codec, bundleCodec := &Codec{sourceID: sourceOptions.SourceID}, &BundleCodec{sourceID: sourceOptions.SourceID} ceSourceClient, err := cegeneric.NewCloudEventSourceClient[*api.Resource](ctx, sourceOptions, resourceService, ResourceStatusHashGetter, codec, bundleCodec) @@ -40,6 +41,21 @@ func NewSourceClient(sourceOptions *ceoptions.CloudEventsSourceOptions, resource return nil, err } + // start a go routine to receive client reconnect signal + go func() { + for { + select { + case <-ctx.Done(): + return + case <-ceSourceClient.ReconnectedChan(): + // when receiving a client reconnected signal, we resync all clusters for this source + if err := ceSourceClient.Resync(ctx, cetypes.ClusterAll); err != nil { + log.Error(fmt.Sprintf("failed to send resync request, %v", err)) + } + } + } + }() + return &SourceClientImpl{ Codec: codec, BundleCodec: bundleCodec, diff --git a/test/e2e/pkg/spec_resync_test.go b/test/e2e/pkg/spec_resync_test.go index 8c71017c..2e83edc4 100644 --- a/test/e2e/pkg/spec_resync_test.go +++ b/test/e2e/pkg/spec_resync_test.go @@ -19,7 +19,7 @@ import ( var _ = Describe("Spec resync", Ordered, Label("e2e-tests-spec-resync"), func() { var resource1, resource2, resource3 *openapi.Resource - var mqttReplicas, maestroServerReplicas, maestroAgentReplicas int + var mqttReplicas, maestroAgentReplicas int Context("Resource resync resource spec after maestro agent restarts", func() { @@ -380,61 +380,6 @@ var _ = Describe("Spec resync", Ordered, Label("e2e-tests-spec-resync"), func() }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) }) - It("Rollout the maestro-server", func() { - - deploy, err := kubeClient.AppsV1().Deployments("maestro").Get(context.Background(), "maestro", metav1.GetOptions{}) - Expect(err).ShouldNot(HaveOccurred()) - maestroServerReplicas = int(*deploy.Spec.Replicas) - deploy, err = kubeClient.AppsV1().Deployments("maestro").Patch(context.Background(), "maestro", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{ - FieldManager: "testKubeClient", - }) - Expect(err).ShouldNot(HaveOccurred()) - Expect(*deploy.Spec.Replicas).To(Equal(int32(0))) - - // ensure no running maestro pods - Eventually(func() error { - pods, err := kubeClient.CoreV1().Pods("maestro").List(context.Background(), metav1.ListOptions{ - LabelSelector: "app=maestro", - }) - if err != nil { - return err - } - if len(pods.Items) > 0 { - return fmt.Errorf("maestro pods still running") - } - return nil - }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) - - // patch maestro replicas to maestroServerReplicas - deploy, err = kubeClient.AppsV1().Deployments("maestro").Patch(context.Background(), "maestro", types.MergePatchType, []byte(fmt.Sprintf(`{"spec":{"replicas":%d}}`, maestroServerReplicas)), metav1.PatchOptions{ - FieldManager: "testKubeClient", - }) - Expect(err).ShouldNot(HaveOccurred()) - Expect(*deploy.Spec.Replicas).To(Equal(int32(maestroServerReplicas))) - - // ensure maestro pod is up and running - Eventually(func() error { - pods, err := kubeClient.CoreV1().Pods("maestro").List(context.Background(), metav1.ListOptions{ - LabelSelector: "app=maestro", - }) - if err != nil { - return err - } - if len(pods.Items) != maestroServerReplicas { - return fmt.Errorf("unexpected maestro pod count, expected %d, got %d", maestroServerReplicas, len(pods.Items)) - } - for _, pod := range pods.Items { - if pod.Status.Phase != "Running" { - return fmt.Errorf("maestro pod not in running state") - } - if pod.Status.ContainerStatuses[0].State.Running == nil { - return fmt.Errorf("maestro server container not in running state") - } - } - return nil - }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) - }) - It("patch the nginx-1 resource", func() { newRes := helper.NewAPIResourceWithIndex(consumer_name, 2, 1) diff --git a/test/e2e/pkg/status_resync_test.go b/test/e2e/pkg/status_resync_test.go index 8943a22e..9e7ee6d5 100644 --- a/test/e2e/pkg/status_resync_test.go +++ b/test/e2e/pkg/status_resync_test.go @@ -15,12 +15,13 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" ) var _ = Describe("Status resync", Ordered, Label("e2e-tests-status-resync"), func() { var resource *openapi.Resource - var maestroServerReplicas int + var mqttReplicas, maestroServerReplicas int Context("Resource resync resource status after maestro server restarts", func() { @@ -184,4 +185,195 @@ var _ = Describe("Status resync", Ordered, Label("e2e-tests-status-resync"), fun }) }) + + Context("Resource resync resource status after maestro server reconnects", func() { + + It("post the nginx resource with non-default service account to the maestro api", func() { + + res := helper.NewAPIResourceWithSA(consumer_name, 1, "nginx") + var resp *http.Response + var err error + resource, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(context.Background()).Resource(res).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusCreated)) + Expect(*resource.Id).ShouldNot(BeEmpty()) + + Eventually(func() error { + deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + return err + } + if *deploy.Spec.Replicas != 1 { + return fmt.Errorf("unexpected replicas, expected 1, got %d", *deploy.Spec.Replicas) + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + + gotResource, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(context.Background(), *resource.Id).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + Expect(*gotResource.Id).To(Equal(*resource.Id)) + Expect(*gotResource.Version).To(Equal(*resource.Version)) + + Eventually(func() error { + gotResource, _, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(context.Background(), *resource.Id).Execute() + if err != nil { + return err + } + statusJSON, err := json.Marshal(gotResource.Status) + if err != nil { + return err + } + if !strings.Contains(string(statusJSON), "error looking up service account default/nginx") { + return fmt.Errorf("unexpected status, expected error looking up service account default/nginx, got %s", string(statusJSON)) + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("delete the mqtt-broker service for server", func() { + + err := kubeClient.CoreV1().Services("maestro").Delete(context.Background(), "maestro-mqtt-server", metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + }) + + It("Rollout the mqtt-broker", func() { + + deploy, err := kubeClient.AppsV1().Deployments("maestro").Get(context.Background(), "maestro-mqtt", metav1.GetOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + mqttReplicas = int(*deploy.Spec.Replicas) + deploy, err = kubeClient.AppsV1().Deployments("maestro").Patch(context.Background(), "maestro-mqtt", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{ + FieldManager: "testKubeClient", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(*deploy.Spec.Replicas).To(Equal(int32(0))) + + // ensure no running mqtt-broker pods + Eventually(func() error { + pods, err := kubeClient.CoreV1().Pods("maestro").List(context.Background(), metav1.ListOptions{ + LabelSelector: "name=maestro-mqtt", + }) + if err != nil { + return err + } + if len(pods.Items) > 0 { + return fmt.Errorf("maestro-mqtt pods still running") + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + + // patch mqtt-broker replicas to mqttReplicas + deploy, err = kubeClient.AppsV1().Deployments("maestro").Patch(context.Background(), "maestro-mqtt", types.MergePatchType, []byte(fmt.Sprintf(`{"spec":{"replicas":%d}}`, mqttReplicas)), metav1.PatchOptions{ + FieldManager: "testKubeClient", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(*deploy.Spec.Replicas).To(Equal(int32(mqttReplicas))) + + // ensure mqtt-broker pod is up and running + Eventually(func() error { + pods, err := kubeClient.CoreV1().Pods("maestro").List(context.Background(), metav1.ListOptions{ + LabelSelector: "name=maestro-mqtt", + }) + if err != nil { + return err + } + if len(pods.Items) != mqttReplicas { + return fmt.Errorf("unexpected maestro-mqtt pod count, expected %d, got %d", mqttReplicas, len(pods.Items)) + } + for _, pod := range pods.Items { + if pod.Status.Phase != "Running" { + return fmt.Errorf("maestro-mqtt pod not in running state") + } + if pod.Status.ContainerStatuses[0].State.Running == nil { + return fmt.Errorf("maestro-mqtt container not in running state") + } + } + return nil + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("create default/nginx serviceaccount", func() { + + _, err := kubeClient.CoreV1().ServiceAccounts("default").Create(context.Background(), &corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx", + }, + }, metav1.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + // delete the nginx deployment to tigger recreating + err = kubeClient.AppsV1().Deployments("default").Delete(context.Background(), "nginx", metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + }) + + It("recreate the mqtt-broker service for server", func() { + + mqttServerService := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "maestro-mqtt-server", + Namespace: "maestro", + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "name": "maestro-mqtt", + }, + Ports: []corev1.ServicePort{ + { + Name: "mosquitto", + Protocol: corev1.ProtocolTCP, + Port: 1883, + TargetPort: intstr.FromInt(1883), + }, + }, + Type: corev1.ServiceTypeClusterIP, + }, + } + + _, err := kubeClient.CoreV1().Services("maestro").Create(context.Background(), mqttServerService, metav1.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + }) + + It("ensure the resource status is resynced", func() { + + Eventually(func() error { + gotResource, _, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(context.Background(), *resource.Id).Execute() + if err != nil { + return err + } + if _, ok := gotResource.Status["ContentStatus"]; !ok { + return fmt.Errorf("unexpected status, expected contains ContentStatus, got %v", gotResource.Status) + } + statusJSON, err := json.Marshal(gotResource.Status) + if err != nil { + return err + } + if strings.Contains(string(statusJSON), "error looking up service account default/nginx") { + return fmt.Errorf("unexpected status, should not contain error looking up service account default/nginx, got %s", string(statusJSON)) + } + return nil + }, 3*time.Minute, 3*time.Second).ShouldNot(HaveOccurred()) + }) + + It("delete the nginx resource", func() { + + resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdDelete(context.Background(), *resource.Id).Execute() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) + + Eventually(func() error { + _, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + return fmt.Errorf("nginx deployment still exists") + }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + + err = kubeClient.CoreV1().ServiceAccounts("default").Delete(context.Background(), "nginx", metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + }) + + }) })