diff --git a/tests/e2e/common.go b/tests/e2e/common.go index a2f56de..4139e38 100644 --- a/tests/e2e/common.go +++ b/tests/e2e/common.go @@ -273,18 +273,8 @@ func watchPods() { continue } - file, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + err := writeToFile(fileName, pd) if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return - } - defer file.Close() - - bytes, _ := yaml.Marshal(pd) - updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) - _, err = file.WriteString(updateLog) - if err != nil { - fmt.Printf("Failed to write to log file: %v\n", err) return } } @@ -294,3 +284,23 @@ func watchPods() { } } } + +// helper func to write `kubectl get -o yaml` output to file +func writeToFile(fileName string, resource Output) error { + file, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + fmt.Printf("Failed to open log file: %v\n", err) + return err + } + defer file.Close() + + bytes, _ := yaml.Marshal(resource) + updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) + _, err = file.WriteString(updateLog) + if err != nil { + fmt.Printf("Failed to write to log file: %v\n", err) + return err + } + + return nil +} diff --git a/tests/e2e/functional_test.go b/tests/e2e/functional_test.go index 4e470b0..3f0324f 100644 --- a/tests/e2e/functional_test.go +++ b/tests/e2e/functional_test.go @@ -210,11 +210,6 @@ var _ = Describe("Functional e2e", Serial, func() { return err }, testTimeout, testPollingInterval).Should(Succeed()) - if disableTestArtifacts != "true" { - wg.Add(1) - go watchNumaflowControllerRollout() - } - verifyNumaflowControllerRolloutReady() verifyNumaflowControllerReady(Namespace) @@ -233,14 +228,6 @@ var _ = Describe("Functional e2e", Serial, func() { return err }, testTimeout, testPollingInterval).Should(Succeed()) - if disableTestArtifacts != "true" { - wg.Add(1) - go watchISBServiceRollout() - - wg.Add(1) - go watchISBService() - } - verifyISBSvcRolloutReady(isbServiceRolloutName) verifyISBSvcReady(Namespace, isbServiceRolloutName, 3) @@ -259,14 +246,6 @@ var _ = Describe("Functional e2e", Serial, func() { return err }, testTimeout, testPollingInterval).Should(Succeed()) - if disableTestArtifacts != "true" { - wg.Add(1) - go watchPipelineRollout() - - wg.Add(1) - go watchPipeline() - } - document("Verifying that the Pipeline was created") verifyPipelineSpec(Namespace, pipelineName, func(retrievedPipelineSpec numaflowv1.PipelineSpec) bool { return len(pipelineSpec.Vertices) == 2 // TODO: make less kludgey @@ -300,14 +279,6 @@ var _ = Describe("Functional e2e", Serial, func() { return monoVertexSpec.Source != nil }) - if disableTestArtifacts != "true" { - wg.Add(1) - go watchMonoVertexRollout() - - wg.Add(1) - go watchMonoVertex() - } - verifyMonoVertexRolloutReady(monoVertexRolloutName) verifyMonoVertexReady(Namespace, monoVertexName) diff --git a/tests/e2e/isbservice.go b/tests/e2e/isbservice.go index 37d57e9..4e1de3b 100644 --- a/tests/e2e/isbservice.go +++ b/tests/e2e/isbservice.go @@ -3,7 +3,6 @@ package e2e import ( "context" "fmt" - "os" "path/filepath" "strings" "time" @@ -11,7 +10,6 @@ import ( . "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "sigs.k8s.io/yaml" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -152,13 +150,6 @@ func watchISBServiceRollout() { } defer watcher.Stop() - file, err := os.OpenFile(filepath.Join(ResourceChangesISBServiceOutputPath, "isbservice_rollout.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return - } - defer file.Close() - for { select { case event := <-watcher.ResultChan(): @@ -172,11 +163,9 @@ func watchISBServiceRollout() { Spec: rollout.Spec, Status: rollout.Status, } - bytes, _ := yaml.Marshal(rl) - updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) - _, err = file.WriteString(updateLog) + + err := writeToFile(filepath.Join(ResourceChangesISBServiceOutputPath, "isbservice_rollout.yaml"), rl) if err != nil { - fmt.Printf("Failed to write to log file: %v\n", err) return } } @@ -197,13 +186,6 @@ func watchISBService() { } defer watcher.Stop() - file, err := os.OpenFile(filepath.Join(ResourceChangesISBServiceOutputPath, "isbservice.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return - } - defer file.Close() - for { select { case event := <-watcher.ResultChan(): @@ -223,11 +205,9 @@ func watchISBService() { Spec: isbsvc.Spec, Status: isbsvc.Status, } - bytes, _ := yaml.Marshal(output) - updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) - _, err = file.WriteString(updateLog) + + err = writeToFile(filepath.Join(ResourceChangesISBServiceOutputPath, "isbservice.yaml"), output) if err != nil { - fmt.Printf("Failed to write to log file: %v\n", err) return } } @@ -264,18 +244,8 @@ func watchStatefulSet() { Status: sts.Status, } - bytes, _ := yaml.Marshal(output) - updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) - fileName := filepath.Join(ResourceChangesISBServiceOutputPath, "statefulsets", strings.Join([]string{sts.Name, ".yaml"}, "")) - file, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return - } - defer file.Close() - _, err = file.WriteString(updateLog) + err := writeToFile(filepath.Join(ResourceChangesISBServiceOutputPath, "statefulsets", strings.Join([]string{sts.Name, ".yaml"}, "")), output) if err != nil { - fmt.Printf("Failed to write to log file: %v\n", err) return } } @@ -285,3 +255,14 @@ func watchStatefulSet() { } } } + +func startISBServiceRolloutWatches() { + wg.Add(1) + go watchISBServiceRollout() + + wg.Add(1) + go watchISBService() + + wg.Add(1) + go watchStatefulSet() +} diff --git a/tests/e2e/monovertex.go b/tests/e2e/monovertex.go index c2dde7d..94a0564 100644 --- a/tests/e2e/monovertex.go +++ b/tests/e2e/monovertex.go @@ -3,9 +3,7 @@ package e2e import ( "context" "fmt" - "os" "path/filepath" - "time" . "github.com/onsi/gomega" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -13,7 +11,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/util/retry" - "sigs.k8s.io/yaml" numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaplane/internal/util" @@ -142,13 +139,6 @@ func watchMonoVertexRollout() { } defer watcher.Stop() - file, err := os.OpenFile(filepath.Join(ResourceChangesMonoVertexOutputPath, "monovertex_rollout.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return - } - defer file.Close() - for { select { case event := <-watcher.ResultChan(): @@ -162,13 +152,12 @@ func watchMonoVertexRollout() { Spec: rollout.Spec, Status: rollout.Status, } - bytes, _ := yaml.Marshal(rl) - updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) - _, err = file.WriteString(updateLog) + + err := writeToFile(filepath.Join(ResourceChangesMonoVertexOutputPath, "monovertex_rollout.yaml"), rl) if err != nil { - fmt.Printf("Failed to write to log file: %v\n", err) return } + } } case <-stopCh: @@ -187,13 +176,6 @@ func watchMonoVertex() { } defer watcher.Stop() - file, err := os.OpenFile(filepath.Join(ResourceChangesMonoVertexOutputPath, "monovertex.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return - } - defer file.Close() - for { select { case event := <-watcher.ResultChan(): @@ -213,11 +195,9 @@ func watchMonoVertex() { Spec: mvtx.Spec, Status: mvtx.Status, } - bytes, _ := yaml.Marshal(output) - updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) - _, err = file.WriteString(updateLog) + + err = writeToFile(filepath.Join(ResourceChangesMonoVertexOutputPath, "monovertex.yaml"), output) if err != nil { - fmt.Printf("Failed to write to log file: %v\n", err) return } } @@ -300,3 +280,11 @@ func verifyMonoVertexRolloutDeployed(monoVertexRolloutName string) { }, testTimeout, testPollingInterval).Should(Equal(metav1.ConditionTrue)) } + +func startMonoVertexRolloutWatches() { + wg.Add(1) + go watchMonoVertexRollout() + + wg.Add(1) + go watchMonoVertex() +} diff --git a/tests/e2e/numaflowcontroller.go b/tests/e2e/numaflowcontroller.go index e9ee9b7..93311e6 100644 --- a/tests/e2e/numaflowcontroller.go +++ b/tests/e2e/numaflowcontroller.go @@ -3,12 +3,9 @@ package e2e import ( "context" "fmt" - "os" "path/filepath" - "time" . "github.com/onsi/gomega" - "sigs.k8s.io/yaml" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -98,13 +95,6 @@ func watchNumaflowControllerRollout() { } defer watcher.Stop() - file, err := os.OpenFile(filepath.Join(ResourceChangesNumaflowControllerOutputPath, "numaflowcontroller_rollout.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return - } - defer file.Close() - for { select { case event := <-watcher.ResultChan(): @@ -118,11 +108,9 @@ func watchNumaflowControllerRollout() { Spec: rollout.Spec, Status: rollout.Status, } - bytes, _ := yaml.Marshal(rl) - updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) - _, err = file.WriteString(updateLog) + + err := writeToFile(filepath.Join(ResourceChangesNumaflowControllerOutputPath, "numaflowcontroller_rollout.yaml"), rl) if err != nil { - fmt.Printf("Failed to write to log file: %v\n", err) return } } diff --git a/tests/e2e/pipeline.go b/tests/e2e/pipeline.go index 306b011..bb31898 100644 --- a/tests/e2e/pipeline.go +++ b/tests/e2e/pipeline.go @@ -3,7 +3,6 @@ package e2e import ( "context" "fmt" - "os" "path/filepath" "strings" "time" @@ -15,7 +14,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/util/retry" - "sigs.k8s.io/yaml" numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" @@ -257,13 +255,6 @@ func watchPipelineRollout() { } defer watcher.Stop() - file, err := os.OpenFile(filepath.Join(ResourceChangesPipelineOutputPath, "pipeline_rollout.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return - } - defer file.Close() - for { select { case event := <-watcher.ResultChan(): @@ -277,13 +268,12 @@ func watchPipelineRollout() { Spec: rollout.Spec, Status: rollout.Status, } - bytes, _ := yaml.Marshal(rl) - updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) - _, err = file.WriteString(updateLog) + + err = writeToFile(filepath.Join(ResourceChangesPipelineOutputPath, "pipeline_rollout.yaml"), rl) if err != nil { - fmt.Printf("Failed to write to log file: %v\n", err) return } + } } case <-stopCh: @@ -302,13 +292,6 @@ func watchPipeline() { } defer watcher.Stop() - file, err := os.OpenFile(filepath.Join(ResourceChangesPipelineOutputPath, "pipeline.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return - } - defer file.Close() - for { select { case event := <-watcher.ResultChan(): @@ -328,13 +311,12 @@ func watchPipeline() { Spec: pl.Spec, Status: pl.Status, } - bytes, _ := yaml.Marshal(output) - updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) - _, err = file.WriteString(updateLog) + + err = writeToFile(filepath.Join(ResourceChangesPipelineOutputPath, "pipeline.yaml"), output) if err != nil { - fmt.Printf("Failed to write to log file: %v\n", err) return } + } } case <-stopCh: @@ -373,18 +355,10 @@ func watchVertices() { Spec: vtx.Spec, Status: vtx.Status, } - bytes, _ := yaml.Marshal(output) - updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes)) + fileName := filepath.Join(ResourceChangesPipelineOutputPath, "vertices", strings.Join([]string{vtx.Name, ".yaml"}, "")) - file, err := os.OpenFile(filepath.Join(fileName), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - fmt.Printf("Failed to open log file: %v\n", err) - return - } - defer file.Close() - _, err = file.WriteString(updateLog) + err = writeToFile(fileName, output) if err != nil { - fmt.Printf("Failed to write to log file: %v\n", err) return } } @@ -395,3 +369,14 @@ func watchVertices() { } } + +func startPipelineRolloutWatches() { + wg.Add(1) + go watchPipelineRollout() + + wg.Add(1) + go watchPipeline() + + wg.Add(1) + go watchVertices() +} diff --git a/tests/e2e/suite_test.go b/tests/e2e/suite_test.go index bbd6438..5429346 100644 --- a/tests/e2e/suite_test.go +++ b/tests/e2e/suite_test.go @@ -114,10 +114,13 @@ var _ = BeforeSuite(func() { go watchPods() wg.Add(1) - go watchStatefulSet() + go watchNumaflowControllerRollout() - wg.Add(1) - go watchVertices() + startPipelineRolloutWatches() + + startISBServiceRolloutWatches() + + startMonoVertexRolloutWatches() if enablePodLogs == "true" { wg.Add(1)