Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactoring watch logic for e2e tests #431

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions tests/e2e/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,18 +273,8 @@ func watchPods() {
continue
}

file, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
err := writeToFile(fileName, pd)
if err != nil {
fmt.Printf("Failed to open log file: %v\n", err)
return
}
defer file.Close()

bytes, _ := yaml.Marshal(pd)
updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes))
_, err = file.WriteString(updateLog)
if err != nil {
fmt.Printf("Failed to write to log file: %v\n", err)
return
}
}
Expand All @@ -294,3 +284,23 @@ func watchPods() {
}
}
}

// helper func to write `kubectl get -o yaml` output to file
func writeToFile(fileName string, resource Output) error {
file, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Printf("Failed to open log file: %v\n", err)
return err
}
defer file.Close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is interesting that the file is being closed with every write. Might be a little less performant to continually open and close, but I guess it's working. The alternative would be to maintain a list of all open files and then close them all at the end. But since it's working, I'm fine as is. Maybe we can just add a "TODO" comment: "for better performance, don't close file after every write"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, this approach is much more cautious just cause I don't want any files to continue to be open for too long or not close to cause an issue.

Keeping a list of every open file would be good, we could automatically close everything when a test fails as well too.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine if you just want to add a "TODO" for now - up to you

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I could add this change to this PR since we're adding the writeToFile func here and the other comment to consolidate further could be its own PR.


bytes, _ := yaml.Marshal(resource)
updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes))
_, err = file.WriteString(updateLog)
if err != nil {
fmt.Printf("Failed to write to log file: %v\n", err)
return err
}

return nil
}
29 changes: 0 additions & 29 deletions tests/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,6 @@ var _ = Describe("Functional e2e", Serial, func() {
return err
}, testTimeout, testPollingInterval).Should(Succeed())

if disableTestArtifacts != "true" {
wg.Add(1)
go watchNumaflowControllerRollout()
}

verifyNumaflowControllerRolloutReady()

verifyNumaflowControllerReady(Namespace)
Expand All @@ -233,14 +228,6 @@ var _ = Describe("Functional e2e", Serial, func() {
return err
}, testTimeout, testPollingInterval).Should(Succeed())

if disableTestArtifacts != "true" {
wg.Add(1)
go watchISBServiceRollout()

wg.Add(1)
go watchISBService()
}

verifyISBSvcRolloutReady(isbServiceRolloutName)

verifyISBSvcReady(Namespace, isbServiceRolloutName, 3)
Expand All @@ -259,14 +246,6 @@ var _ = Describe("Functional e2e", Serial, func() {
return err
}, testTimeout, testPollingInterval).Should(Succeed())

if disableTestArtifacts != "true" {
wg.Add(1)
go watchPipelineRollout()

wg.Add(1)
go watchPipeline()
}

document("Verifying that the Pipeline was created")
verifyPipelineSpec(Namespace, pipelineName, func(retrievedPipelineSpec numaflowv1.PipelineSpec) bool {
return len(pipelineSpec.Vertices) == 2 // TODO: make less kludgey
Expand Down Expand Up @@ -300,14 +279,6 @@ var _ = Describe("Functional e2e", Serial, func() {
return monoVertexSpec.Source != nil
})

if disableTestArtifacts != "true" {
wg.Add(1)
go watchMonoVertexRollout()

wg.Add(1)
go watchMonoVertex()
}

verifyMonoVertexRolloutReady(monoVertexRolloutName)

verifyMonoVertexReady(Namespace, monoVertexName)
Expand Down
51 changes: 16 additions & 35 deletions tests/e2e/isbservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package e2e
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"time"

. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/yaml"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -152,13 +150,6 @@ func watchISBServiceRollout() {
}
defer watcher.Stop()

file, err := os.OpenFile(filepath.Join(ResourceChangesISBServiceOutputPath, "isbservice_rollout.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Printf("Failed to open log file: %v\n", err)
return
}
defer file.Close()

for {
select {
case event := <-watcher.ResultChan():
Expand All @@ -172,11 +163,9 @@ func watchISBServiceRollout() {
Spec: rollout.Spec,
Status: rollout.Status,
}
bytes, _ := yaml.Marshal(rl)
updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes))
_, err = file.WriteString(updateLog)

err := writeToFile(filepath.Join(ResourceChangesISBServiceOutputPath, "isbservice_rollout.yaml"), rl)
if err != nil {
fmt.Printf("Failed to write to log file: %v\n", err)
return
}
}
Expand All @@ -197,13 +186,6 @@ func watchISBService() {
}
defer watcher.Stop()

file, err := os.OpenFile(filepath.Join(ResourceChangesISBServiceOutputPath, "isbservice.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Printf("Failed to open log file: %v\n", err)
return
}
defer file.Close()

for {
select {
case event := <-watcher.ResultChan():
Expand All @@ -223,11 +205,9 @@ func watchISBService() {
Spec: isbsvc.Spec,
Status: isbsvc.Status,
}
bytes, _ := yaml.Marshal(output)
updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes))
_, err = file.WriteString(updateLog)

err = writeToFile(filepath.Join(ResourceChangesISBServiceOutputPath, "isbservice.yaml"), output)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a future PR, but if I were to imagine how to consolidate all of these functions into a single function, here is one idea:
Have a function watchResourceType() which takes as parameters 2 functions:

  1. a "watch" function which returns a channel
  2. an "extractOutputFromEvent" function which takes a runtime.Object (the event.Object you have above) and returns that Output structure (or alternatively have a "writeOutputFromEvent" function which also does the writing)
  3. may need some other parameters

if err != nil {
fmt.Printf("Failed to write to log file: %v\n", err)
return
}
}
Expand Down Expand Up @@ -264,18 +244,8 @@ func watchStatefulSet() {
Status: sts.Status,
}

bytes, _ := yaml.Marshal(output)
updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes))
fileName := filepath.Join(ResourceChangesISBServiceOutputPath, "statefulsets", strings.Join([]string{sts.Name, ".yaml"}, ""))
file, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Printf("Failed to open log file: %v\n", err)
return
}
defer file.Close()
_, err = file.WriteString(updateLog)
err := writeToFile(filepath.Join(ResourceChangesISBServiceOutputPath, "statefulsets", strings.Join([]string{sts.Name, ".yaml"}, "")), output)
if err != nil {
fmt.Printf("Failed to write to log file: %v\n", err)
return
}
}
Expand All @@ -285,3 +255,14 @@ func watchStatefulSet() {
}
}
}

func startISBServiceRolloutWatches() {
wg.Add(1)
go watchISBServiceRollout()

wg.Add(1)
go watchISBService()

wg.Add(1)
go watchStatefulSet()
}
38 changes: 13 additions & 25 deletions tests/e2e/monovertex.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,14 @@ package e2e
import (
"context"
"fmt"
"os"
"path/filepath"
"time"

. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/yaml"

numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaplane/internal/util"
Expand Down Expand Up @@ -142,13 +139,6 @@ func watchMonoVertexRollout() {
}
defer watcher.Stop()

file, err := os.OpenFile(filepath.Join(ResourceChangesMonoVertexOutputPath, "monovertex_rollout.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Printf("Failed to open log file: %v\n", err)
return
}
defer file.Close()

for {
select {
case event := <-watcher.ResultChan():
Expand All @@ -162,13 +152,12 @@ func watchMonoVertexRollout() {
Spec: rollout.Spec,
Status: rollout.Status,
}
bytes, _ := yaml.Marshal(rl)
updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes))
_, err = file.WriteString(updateLog)

err := writeToFile(filepath.Join(ResourceChangesMonoVertexOutputPath, "monovertex_rollout.yaml"), rl)
if err != nil {
fmt.Printf("Failed to write to log file: %v\n", err)
return
}

}
}
case <-stopCh:
Expand All @@ -187,13 +176,6 @@ func watchMonoVertex() {
}
defer watcher.Stop()

file, err := os.OpenFile(filepath.Join(ResourceChangesMonoVertexOutputPath, "monovertex.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Printf("Failed to open log file: %v\n", err)
return
}
defer file.Close()

for {
select {
case event := <-watcher.ResultChan():
Expand All @@ -213,11 +195,9 @@ func watchMonoVertex() {
Spec: mvtx.Spec,
Status: mvtx.Status,
}
bytes, _ := yaml.Marshal(output)
updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes))
_, err = file.WriteString(updateLog)

err = writeToFile(filepath.Join(ResourceChangesMonoVertexOutputPath, "monovertex.yaml"), output)
if err != nil {
fmt.Printf("Failed to write to log file: %v\n", err)
return
}
}
Expand Down Expand Up @@ -300,3 +280,11 @@ func verifyMonoVertexRolloutDeployed(monoVertexRolloutName string) {
}, testTimeout, testPollingInterval).Should(Equal(metav1.ConditionTrue))

}

func startMonoVertexRolloutWatches() {
wg.Add(1)
go watchMonoVertexRollout()

wg.Add(1)
go watchMonoVertex()
}
16 changes: 2 additions & 14 deletions tests/e2e/numaflowcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ package e2e
import (
"context"
"fmt"
"os"
"path/filepath"
"time"

. "github.com/onsi/gomega"
"sigs.k8s.io/yaml"

appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -98,13 +95,6 @@ func watchNumaflowControllerRollout() {
}
defer watcher.Stop()

file, err := os.OpenFile(filepath.Join(ResourceChangesNumaflowControllerOutputPath, "numaflowcontroller_rollout.yaml"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Printf("Failed to open log file: %v\n", err)
return
}
defer file.Close()

for {
select {
case event := <-watcher.ResultChan():
Expand All @@ -118,11 +108,9 @@ func watchNumaflowControllerRollout() {
Spec: rollout.Spec,
Status: rollout.Status,
}
bytes, _ := yaml.Marshal(rl)
updateLog := fmt.Sprintf("%s\n%v\n\n%s\n", LogSpacer, time.Now().Format(time.RFC3339Nano), string(bytes))
_, err = file.WriteString(updateLog)

err := writeToFile(filepath.Join(ResourceChangesNumaflowControllerOutputPath, "numaflowcontroller_rollout.yaml"), rl)
if err != nil {
fmt.Printf("Failed to write to log file: %v\n", err)
return
}
}
Expand Down
Loading
Loading