From b3f53d1973aeedf5a916ad0281f8957c3174ef8e Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Mon, 25 Nov 2024 09:30:57 -0800 Subject: [PATCH 1/3] feat: refactoring watch logic for e2e tests Signed-off-by: Dillen Padhiar --- tests/e2e/common.go | 32 +++++++++++++------- tests/e2e/functional_test.go | 29 ------------------ tests/e2e/isbservice.go | 51 ++++++++++--------------------- tests/e2e/monovertex.go | 38 ++++++++--------------- tests/e2e/numaflowcontroller.go | 16 ++-------- tests/e2e/pipeline.go | 53 ++++++++++++--------------------- tests/e2e/suite_test.go | 9 ++++-- 7 files changed, 77 insertions(+), 151 deletions(-) diff --git a/tests/e2e/common.go b/tests/e2e/common.go index a2f56dee..4139e38d 100644 --- a/tests/e2e/common.go +++ b/tests/e2e/common.go @@ -273,18 +273,8 @@ func watchPods() { continue } - file, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + err := writeToFile(fileName, pd) if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return - } - defer file.Close() - - bytes, _ := yaml.Marshal(pd) - updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) - _, err = file.WriteString(updateLog) - if err != nil { - fmt.Printf("Failed to write to log file: %v\n", err) return } } @@ -294,3 +284,23 @@ func watchPods() { } } } + +// helper func to write `kubectl get -o yaml` output to file +func writeToFile(fileName string, resource Output) error { + file, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + fmt.Printf("Failed to open log file: %v\n", err) + return err + } + defer file.Close() + + bytes, _ := yaml.Marshal(resource) + updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) + _, err = file.WriteString(updateLog) + if err != nil { + fmt.Printf("Failed to write to log file: %v\n", err) + return err + } + + return nil +} diff --git a/tests/e2e/functional_test.go b/tests/e2e/functional_test.go index 4e470b0b..3f0324f8 100644 --- a/tests/e2e/functional_test.go +++ b/tests/e2e/functional_test.go @@ -210,11 +210,6 @@ var _ = Describe("Functional e2e", Serial, func() { return err }, testTimeout, testPollingInterval).Should(Succeed()) - if disableTestArtifacts != "true" { - wg.Add(1) - go watchNumaflowControllerRollout() - } - verifyNumaflowControllerRolloutReady() verifyNumaflowControllerReady(Namespace) @@ -233,14 +228,6 @@ var _ = Describe("Functional e2e", Serial, func() { return err }, testTimeout, testPollingInterval).Should(Succeed()) - if disableTestArtifacts != "true" { - wg.Add(1) - go watchISBServiceRollout() - - wg.Add(1) - go watchISBService() - } - verifyISBSvcRolloutReady(isbServiceRolloutName) verifyISBSvcReady(Namespace, isbServiceRolloutName, 3) @@ -259,14 +246,6 @@ var _ = Describe("Functional e2e", Serial, func() { return err }, testTimeout, testPollingInterval).Should(Succeed()) - if disableTestArtifacts != "true" { - wg.Add(1) - go watchPipelineRollout() - - wg.Add(1) - go watchPipeline() - } - document("Verifying that the Pipeline was created") verifyPipelineSpec(Namespace, pipelineName, func(retrievedPipelineSpec numaflowv1.PipelineSpec) bool { return len(pipelineSpec.Vertices) == 2 // TODO: make less kludgey @@ -300,14 +279,6 @@ var _ = Describe("Functional e2e", Serial, func() { return monoVertexSpec.Source != nil }) - if disableTestArtifacts != "true" { - wg.Add(1) - go watchMonoVertexRollout() - - wg.Add(1) - go watchMonoVertex() - } - verifyMonoVertexRolloutReady(monoVertexRolloutName) verifyMonoVertexReady(Namespace, monoVertexName) diff --git a/tests/e2e/isbservice.go b/tests/e2e/isbservice.go index 37d57e9d..4e1de3b3 100644 --- a/tests/e2e/isbservice.go +++ b/tests/e2e/isbservice.go @@ -3,7 +3,6 @@ package e2e import ( "context" "fmt" - "os" "path/filepath" "strings" "time" @@ -11,7 +10,6 @@ import ( . "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "sigs.k8s.io/yaml" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -152,13 +150,6 @@ func watchISBServiceRollout() { } defer watcher.Stop() - file, err := os.OpenFile(filepath.Join(ResourceChangesISBServiceOutputPath, "isbservice_rollout.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return - } - defer file.Close() - for { select { case event := <-watcher.ResultChan(): @@ -172,11 +163,9 @@ func watchISBServiceRollout() { Spec: rollout.Spec, Status: rollout.Status, } - bytes, _ := yaml.Marshal(rl) - updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) - _, err = file.WriteString(updateLog) + + err := writeToFile(filepath.Join(ResourceChangesISBServiceOutputPath, "isbservice_rollout.yaml"), rl) if err != nil { - fmt.Printf("Failed to write to log file: %v\n", err) return } } @@ -197,13 +186,6 @@ func watchISBService() { } defer watcher.Stop() - file, err := os.OpenFile(filepath.Join(ResourceChangesISBServiceOutputPath, "isbservice.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return - } - defer file.Close() - for { select { case event := <-watcher.ResultChan(): @@ -223,11 +205,9 @@ func watchISBService() { Spec: isbsvc.Spec, Status: isbsvc.Status, } - bytes, _ := yaml.Marshal(output) - updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) - _, err = file.WriteString(updateLog) + + err = writeToFile(filepath.Join(ResourceChangesISBServiceOutputPath, "isbservice.yaml"), output) if err != nil { - fmt.Printf("Failed to write to log file: %v\n", err) return } } @@ -264,18 +244,8 @@ func watchStatefulSet() { Status: sts.Status, } - bytes, _ := yaml.Marshal(output) - updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) - fileName := filepath.Join(ResourceChangesISBServiceOutputPath, "statefulsets", strings.Join([]string{sts.Name, ".yaml"}, "")) - file, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return - } - defer file.Close() - _, err = file.WriteString(updateLog) + err := writeToFile(filepath.Join(ResourceChangesISBServiceOutputPath, "statefulsets", strings.Join([]string{sts.Name, ".yaml"}, "")), output) if err != nil { - fmt.Printf("Failed to write to log file: %v\n", err) return } } @@ -285,3 +255,14 @@ func watchStatefulSet() { } } } + +func startISBServiceRolloutWatches() { + wg.Add(1) + go watchISBServiceRollout() + + wg.Add(1) + go watchISBService() + + wg.Add(1) + go watchStatefulSet() +} diff --git a/tests/e2e/monovertex.go b/tests/e2e/monovertex.go index c2dde7d7..94a05641 100644 --- a/tests/e2e/monovertex.go +++ b/tests/e2e/monovertex.go @@ -3,9 +3,7 @@ package e2e import ( "context" "fmt" - "os" "path/filepath" - "time" . "github.com/onsi/gomega" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -13,7 +11,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/util/retry" - "sigs.k8s.io/yaml" numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaplane/internal/util" @@ -142,13 +139,6 @@ func watchMonoVertexRollout() { } defer watcher.Stop() - file, err := os.OpenFile(filepath.Join(ResourceChangesMonoVertexOutputPath, "monovertex_rollout.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return - } - defer file.Close() - for { select { case event := <-watcher.ResultChan(): @@ -162,13 +152,12 @@ func watchMonoVertexRollout() { Spec: rollout.Spec, Status: rollout.Status, } - bytes, _ := yaml.Marshal(rl) - updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) - _, err = file.WriteString(updateLog) + + err := writeToFile(filepath.Join(ResourceChangesMonoVertexOutputPath, "monovertex_rollout.yaml"), rl) if err != nil { - fmt.Printf("Failed to write to log file: %v\n", err) return } + } } case <-stopCh: @@ -187,13 +176,6 @@ func watchMonoVertex() { } defer watcher.Stop() - file, err := os.OpenFile(filepath.Join(ResourceChangesMonoVertexOutputPath, "monovertex.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return - } - defer file.Close() - for { select { case event := <-watcher.ResultChan(): @@ -213,11 +195,9 @@ func watchMonoVertex() { Spec: mvtx.Spec, Status: mvtx.Status, } - bytes, _ := yaml.Marshal(output) - updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) - _, err = file.WriteString(updateLog) + + err = writeToFile(filepath.Join(ResourceChangesMonoVertexOutputPath, "monovertex.yaml"), output) if err != nil { - fmt.Printf("Failed to write to log file: %v\n", err) return } } @@ -300,3 +280,11 @@ func verifyMonoVertexRolloutDeployed(monoVertexRolloutName string) { }, testTimeout, testPollingInterval).Should(Equal(metav1.ConditionTrue)) } + +func startMonoVertexRolloutWatches() { + wg.Add(1) + go watchMonoVertexRollout() + + wg.Add(1) + go watchMonoVertex() +} diff --git a/tests/e2e/numaflowcontroller.go b/tests/e2e/numaflowcontroller.go index e9ee9b7b..93311e69 100644 --- a/tests/e2e/numaflowcontroller.go +++ b/tests/e2e/numaflowcontroller.go @@ -3,12 +3,9 @@ package e2e import ( "context" "fmt" - "os" "path/filepath" - "time" . "github.com/onsi/gomega" - "sigs.k8s.io/yaml" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -98,13 +95,6 @@ func watchNumaflowControllerRollout() { } defer watcher.Stop() - file, err := os.OpenFile(filepath.Join(ResourceChangesNumaflowControllerOutputPath, "numaflowcontroller_rollout.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return - } - defer file.Close() - for { select { case event := <-watcher.ResultChan(): @@ -118,11 +108,9 @@ func watchNumaflowControllerRollout() { Spec: rollout.Spec, Status: rollout.Status, } - bytes, _ := yaml.Marshal(rl) - updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) - _, err = file.WriteString(updateLog) + + err := writeToFile(filepath.Join(ResourceChangesNumaflowControllerOutputPath, "numaflowcontroller_rollout.yaml"), rl) if err != nil { - fmt.Printf("Failed to write to log file: %v\n", err) return } } diff --git a/tests/e2e/pipeline.go b/tests/e2e/pipeline.go index 306b011d..bb31898b 100644 --- a/tests/e2e/pipeline.go +++ b/tests/e2e/pipeline.go @@ -3,7 +3,6 @@ package e2e import ( "context" "fmt" - "os" "path/filepath" "strings" "time" @@ -15,7 +14,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/util/retry" - "sigs.k8s.io/yaml" numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" @@ -257,13 +255,6 @@ func watchPipelineRollout() { } defer watcher.Stop() - file, err := os.OpenFile(filepath.Join(ResourceChangesPipelineOutputPath, "pipeline_rollout.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return - } - defer file.Close() - for { select { case event := <-watcher.ResultChan(): @@ -277,13 +268,12 @@ func watchPipelineRollout() { Spec: rollout.Spec, Status: rollout.Status, } - bytes, _ := yaml.Marshal(rl) - updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) - _, err = file.WriteString(updateLog) + + err = writeToFile(filepath.Join(ResourceChangesPipelineOutputPath, "pipeline_rollout.yaml"), rl) if err != nil { - fmt.Printf("Failed to write to log file: %v\n", err) return } + } } case <-stopCh: @@ -302,13 +292,6 @@ func watchPipeline() { } defer watcher.Stop() - file, err := os.OpenFile(filepath.Join(ResourceChangesPipelineOutputPath, "pipeline.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return - } - defer file.Close() - for { select { case event := <-watcher.ResultChan(): @@ -328,13 +311,12 @@ func watchPipeline() { Spec: pl.Spec, Status: pl.Status, } - bytes, _ := yaml.Marshal(output) - updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) - _, err = file.WriteString(updateLog) + + err = writeToFile(filepath.Join(ResourceChangesPipelineOutputPath, "pipeline.yaml"), output) if err != nil { - fmt.Printf("Failed to write to log file: %v\n", err) return } + } } case <-stopCh: @@ -373,18 +355,10 @@ func watchVertices() { Spec: vtx.Spec, Status: vtx.Status, } - bytes, _ := yaml.Marshal(output) - updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) + fileName := filepath.Join(ResourceChangesPipelineOutputPath, "vertices", strings.Join([]string{vtx.Name, ".yaml"}, "")) - file, err := os.OpenFile(filepath.Join(fileName), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return - } - defer file.Close() - _, err = file.WriteString(updateLog) + err = writeToFile(fileName, output) if err != nil { - fmt.Printf("Failed to write to log file: %v\n", err) return } } @@ -395,3 +369,14 @@ func watchVertices() { } } + +func startPipelineRolloutWatches() { + wg.Add(1) + go watchPipelineRollout() + + wg.Add(1) + go watchPipeline() + + wg.Add(1) + go watchVertices() +} diff --git a/tests/e2e/suite_test.go b/tests/e2e/suite_test.go index bbd6438e..5429346e 100644 --- a/tests/e2e/suite_test.go +++ b/tests/e2e/suite_test.go @@ -114,10 +114,13 @@ var _ = BeforeSuite(func() { go watchPods() wg.Add(1) - go watchStatefulSet() + go watchNumaflowControllerRollout() - wg.Add(1) - go watchVertices() + startPipelineRolloutWatches() + + startISBServiceRolloutWatches() + + startMonoVertexRolloutWatches() if enablePodLogs == "true" { wg.Add(1) From ec91a69f82bc9c40e0bda091111c3096effc0f19 Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Tue, 26 Nov 2024 12:33:51 -0800 Subject: [PATCH 2/3] feat: close all files at end of test Signed-off-by: Dillen Padhiar --- tests/e2e/common.go | 22 ++++++++++++++++++++-- tests/e2e/suite_test.go | 8 +++++++- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/tests/e2e/common.go b/tests/e2e/common.go index 4139e38d..96f0a2bf 100644 --- a/tests/e2e/common.go +++ b/tests/e2e/common.go @@ -48,11 +48,14 @@ var ( kubeClient clientgo.Interface wg sync.WaitGroup + mutex sync.RWMutex stopCh chan struct{} ppnd string disableTestArtifacts string enablePodLogs string + + openFiles map[string]*os.File ) const ( @@ -287,12 +290,16 @@ func watchPods() { // helper func to write `kubectl get -o yaml` output to file func writeToFile(fileName string, resource Output) error { - file, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + + mutex.Lock() + defer mutex.Unlock() + + file, err := os.Create(fileName) if err != nil { fmt.Printf("Failed to open log file: %v\n", err) return err } - defer file.Close() + openFiles[fileName] = file bytes, _ := yaml.Marshal(resource) updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) @@ -304,3 +311,14 @@ func writeToFile(fileName string, resource Output) error { return nil } + +func closeAllFiles() error { + for _, file := range openFiles { + err := file.Close() + if err != nil { + return err + } + } + + return nil +} diff --git a/tests/e2e/suite_test.go b/tests/e2e/suite_test.go index 5429346e..9742b02f 100644 --- a/tests/e2e/suite_test.go +++ b/tests/e2e/suite_test.go @@ -55,6 +55,8 @@ var _ = BeforeSuite(func() { setupOutputDir() } + openFiles = make(map[string]*os.File) + stopCh = make(chan struct{}) ppnd = os.Getenv("PPND") @@ -139,7 +141,11 @@ var _ = AfterSuite(func() { cancel() By("tearing down test environment") close(stopCh) - err := testEnv.Stop() + + err := closeAllFiles() + Expect(err).NotTo(HaveOccurred()) + + err = testEnv.Stop() Expect(err).NotTo(HaveOccurred()) }) From 60a073f80ca66bbdd7712fe88aa379d50447aeeb Mon Sep 17 00:00:00 2001 From: Dillen Padhiar Date: Tue, 26 Nov 2024 13:37:03 -0800 Subject: [PATCH 3/3] feat: only create file once Signed-off-by: Dillen Padhiar --- tests/e2e/common.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tests/e2e/common.go b/tests/e2e/common.go index 96f0a2bf..e194ab1c 100644 --- a/tests/e2e/common.go +++ b/tests/e2e/common.go @@ -294,16 +294,19 @@ func writeToFile(fileName string, resource Output) error { mutex.Lock() defer mutex.Unlock() - file, err := os.Create(fileName) - if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return err + if _, ok := openFiles[fileName]; !ok { + file, err := os.Create(fileName) + if err != nil { + fmt.Printf("Failed to open log file: %v\n", err) + return err + } + openFiles[fileName] = file } - openFiles[fileName] = file + file := openFiles[fileName] bytes, _ := yaml.Marshal(resource) updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) - _, err = file.WriteString(updateLog) + _, err := file.WriteString(updateLog) if err != nil { fmt.Printf("Failed to write to log file: %v\n", err) return err