From 97ab7b7b6a09fecf7669a51fedad8b1bf12b0ab1 Mon Sep 17 00:00:00 2001 From: Samhith Kakarla Date: Fri, 2 Aug 2024 15:01:56 -0700 Subject: [PATCH 1/5] inital Signed-off-by: Samhith Kakarla --- pkg/daemon/server/service/health_status.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/pkg/daemon/server/service/health_status.go b/pkg/daemon/server/service/health_status.go index 3b00668be8..0b8a0063e8 100644 --- a/pkg/daemon/server/service/health_status.go +++ b/pkg/daemon/server/service/health_status.go @@ -59,11 +59,11 @@ const ( ) // HealthThresholds are the thresholds used to compute the health of a vertex -const ( +var ( // criticalBufferThreshold is the threshold above which the health of a vertex is critical - criticalBufferThreshold = 95 + criticalBufferThreshold = v1alpha1.DefaultBufferUsageLimit * 90 // 95 // warningBufferThreshold is the threshold above which the health of a vertex is warning - warningBufferThreshold = 80 + warningBufferThreshold = (0.85) * criticalBufferThreshold // 80 ) // Dataflow states @@ -231,6 +231,7 @@ func (hc *HealthChecker) getPipelineVertexDataCriticality(ctx context.Context) ( } // update the usage timeline for all the ISBs used in the pipeline hc.updateUsageTimeline(buffers.Buffers) + hc.udpateThresholds() var vertexState []*vertexState @@ -255,6 +256,11 @@ func (hc *HealthChecker) getPipelineVertexDataCriticality(ctx context.Context) ( return vertexState, nil } +func (hc *HealthChecker) udpateThresholds() { + criticalBufferThreshold = float64(*hc.pipeline.Spec.Limits.BufferUsageLimit) * 0.9 + warningBufferThreshold = criticalBufferThreshold * 0.85 +} + // updateUsageTimeline is used to update the usage timeline for a given buffer list // This iterates over all the buffers in the buffer list and updates the usage timeline for each buffer // The timeline data is represented as a map of buffer name to a list of timelineEntry @@ -348,9 +354,9 @@ func calculateEWMAUsage(bufferUsage []float64) []float64 { func assignStateToBufferUsage(ewmaValue float64) string { // Assign the state to the buffer usage var state string - if ewmaValue > criticalBufferThreshold { + if ewmaValue > float64(criticalBufferThreshold) { state = criticalState - } else if ewmaValue > warningBufferThreshold { + } else if ewmaValue > float64(warningBufferThreshold) { state = warningState } else { state = healthyState From dc02db486d5264d335c80eca600e586cc892415c Mon Sep 17 00:00:00 2001 From: Samhith Kakarla Date: Tue, 6 Aug 2024 16:59:22 -0700 Subject: [PATCH 2/5] unit tests Signed-off-by: Samhith Kakarla --- pkg/daemon/server/service/health_status.go | 8 ++- .../server/service/health_status_test.go | 51 +++++++++++++++++++ 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/pkg/daemon/server/service/health_status.go b/pkg/daemon/server/service/health_status.go index 0b8a0063e8..241a83c8b0 100644 --- a/pkg/daemon/server/service/health_status.go +++ b/pkg/daemon/server/service/health_status.go @@ -61,9 +61,9 @@ const ( // HealthThresholds are the thresholds used to compute the health of a vertex var ( // criticalBufferThreshold is the threshold above which the health of a vertex is critical - criticalBufferThreshold = v1alpha1.DefaultBufferUsageLimit * 90 // 95 + criticalBufferThreshold = v1alpha1.DefaultBufferUsageLimit * float64(90) // 95 // warningBufferThreshold is the threshold above which the health of a vertex is warning - warningBufferThreshold = (0.85) * criticalBufferThreshold // 80 + warningBufferThreshold = float64(0.85) * criticalBufferThreshold // 80 ) // Dataflow states @@ -261,6 +261,10 @@ func (hc *HealthChecker) udpateThresholds() { warningBufferThreshold = criticalBufferThreshold * 0.85 } +func (hc *HealthChecker) GetThresholds() (float64, float64) { + return criticalBufferThreshold, warningBufferThreshold +} + // updateUsageTimeline is used to update the usage timeline for a given buffer list // This iterates over all the buffers in the buffer list and updates the usage timeline for each buffer // The timeline data is represented as a map of buffer name to a list of timelineEntry diff --git a/pkg/daemon/server/service/health_status_test.go b/pkg/daemon/server/service/health_status_test.go index ec636096be..ced9612928 100644 --- a/pkg/daemon/server/service/health_status_test.go +++ b/pkg/daemon/server/service/health_status_test.go @@ -107,6 +107,21 @@ func TestNewHealthChecker(t *testing.T) { } } +func TestHealthThresholds(t *testing.T) { + + hc := NewHealthChecker(&v1alpha1.Pipeline{}, &mockISBService{}) + a, _ := hc.GetThresholds() + assert.Equal(t, a, float64(72)) + //assert.Equal(t, b, float64(61.2)) + forty := uint32(40) + hc.pipeline.Spec.Limits = &v1alpha1.PipelineLimits{BufferUsageLimit: &forty} + hc.udpateThresholds() + c, _ := hc.GetThresholds() + assert.Equal(t, c, float64(36)) + //assert.Equal(t, d, float64(30.6)) + +} + type mockISBService struct { isbsvc.ISBService } @@ -676,6 +691,42 @@ func TestAssignStateToTimeline(t *testing.T) { } } +// func Scalalbility_error_test(t *testing.T) { +// pipelineName := "simple-pipeline" +// namespace := "numaflow-system" +// edges := []v1alpha1.Edge{ +// { +// From: "in", +// To: "cat", +// }, +// { +// From: "cat", +// To: "out", +// }, +// } +// pipeline := &v1alpha1.Pipeline{ +// ObjectMeta: metav1.ObjectMeta{ +// Name: pipelineName, +// Namespace: namespace, +// }, +// Spec: v1alpha1.PipelineSpec{ +// Vertices: []v1alpha1.AbstractVertex{ +// {Name: "in", Source: &v1alpha1.Source{}}, +// {Name: "cat", UDF: &v1alpha1.UDF{}}, +// {Name: "out", Sink: &v1alpha1.Sink{}}, +// }, +// Edges: edges, +// }, +// } + +// for v := range pipeline.Spec.Vertices { +// pipeline.Spec.Vertices[v].Scale = v1alpha1.Scale{Disabled: true} +// } + +// pipeline. + +// } + func TestConvertVertexStateToPipelineState(t *testing.T) { tests := []struct { name string From 052f8ef2877b1229fc3320793850e0fa4ff3eea7 Mon Sep 17 00:00:00 2001 From: Samhith Kakarla Date: Wed, 7 Aug 2024 12:04:57 -0700 Subject: [PATCH 3/5] testing Signed-off-by: Samhith Kakarla --- .../server/service/health_status_test.go | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/pkg/daemon/server/service/health_status_test.go b/pkg/daemon/server/service/health_status_test.go index ced9612928..a4b89cdc27 100644 --- a/pkg/daemon/server/service/health_status_test.go +++ b/pkg/daemon/server/service/health_status_test.go @@ -591,17 +591,17 @@ func TestAssignStateToBufferUsage(t *testing.T) { }{ { name: "Critical state", - ewmaValue: 96, + ewmaValue: v1alpha1.DefaultBufferUsageLimit * 95, expected: criticalState, }, { name: "Warning state", - ewmaValue: 85, + ewmaValue: v1alpha1.DefaultBufferUsageLimit * 90 * 0.9, expected: warningState, }, { name: "Healthy state", - ewmaValue: 30, + ewmaValue: v1alpha1.DefaultBufferUsageLimit * 30, expected: healthyState, }, } @@ -615,6 +615,7 @@ func TestAssignStateToBufferUsage(t *testing.T) { } func TestAssignStateToTimeline(t *testing.T) { + n := v1alpha1.DefaultBufferUsageLimit * 100 tests := []struct { name string ewmaValues []float64 @@ -623,61 +624,61 @@ func TestAssignStateToTimeline(t *testing.T) { }{ { name: "Single healthy value", - ewmaValues: []float64{30}, + ewmaValues: []float64{n * 0.3}, lookBack: false, expected: healthyState, }, { name: "Single warning value", - ewmaValues: []float64{85}, + ewmaValues: []float64{n * 0.85}, lookBack: false, expected: warningState, }, { name: "Single critical value without lookback", - ewmaValues: []float64{96}, + ewmaValues: []float64{n * 0.95}, lookBack: false, expected: criticalState, }, { name: "Single critical value with lookback", - ewmaValues: []float64{96}, + ewmaValues: []float64{n * 0.95}, lookBack: true, expected: warningState, }, { name: "Multiple values ending with critical, no lookback", - ewmaValues: []float64{30, 85, 96}, + ewmaValues: []float64{n * 0.3, n * 0.7, n * 0.92}, lookBack: false, expected: criticalState, }, { name: "Multiple values ending with critical, with lookback, insufficient critical count", - ewmaValues: []float64{30, 85, 96, 96}, + ewmaValues: []float64{n * 0.3, n * 0.7, n * 0.95, n * 0.95}, lookBack: true, expected: warningState, }, { name: "Multiple values ending with critical, with lookback, sufficient critical count", - ewmaValues: []float64{96, 96, 96, 96, 96}, + ewmaValues: []float64{n * 0.95, n * 0.95, n * 0.95, n * 0.95, n * 0.95}, lookBack: true, expected: criticalState, }, { name: "Values fluctuating between warning and critical", - ewmaValues: []float64{85, 96, 85, 96, 85}, + ewmaValues: []float64{n * 0.85, n * 0.95, n * 0.85, n * 0.95, n * 0.85}, lookBack: true, expected: warningState, }, { name: "Values increasing from healthy to critical", - ewmaValues: []float64{30, 50, 70, 90, 96}, + ewmaValues: []float64{n * 0.3, n * 0.5, n * 0.7, n * 0.9, n * 0.96}, lookBack: true, expected: warningState, }, { name: "Values decreasing from critical to healthy", - ewmaValues: []float64{96, 90, 70, 50, 30}, + ewmaValues: []float64{n * 0.96, n * 0.9, n * 0.7, n * 0.5, n * 0.3}, lookBack: true, expected: healthyState, }, From 973436c3bb6e82025a72a3482681f9bcd83faa05 Mon Sep 17 00:00:00 2001 From: Samhith Kakarla Date: Wed, 7 Aug 2024 14:33:33 -0700 Subject: [PATCH 4/5] testing-fix Signed-off-by: Samhith Kakarla --- pkg/daemon/server/service/health_status_test.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/daemon/server/service/health_status_test.go b/pkg/daemon/server/service/health_status_test.go index a4b89cdc27..437de56871 100644 --- a/pkg/daemon/server/service/health_status_test.go +++ b/pkg/daemon/server/service/health_status_test.go @@ -112,13 +112,17 @@ func TestHealthThresholds(t *testing.T) { hc := NewHealthChecker(&v1alpha1.Pipeline{}, &mockISBService{}) a, _ := hc.GetThresholds() assert.Equal(t, a, float64(72)) - //assert.Equal(t, b, float64(61.2)) + forty := uint32(40) + eighty := uint32(80) + hc.pipeline.Spec.Limits = &v1alpha1.PipelineLimits{BufferUsageLimit: &forty} hc.udpateThresholds() c, _ := hc.GetThresholds() assert.Equal(t, c, float64(36)) - //assert.Equal(t, d, float64(30.6)) + + hc.pipeline.Spec.Limits = &v1alpha1.PipelineLimits{BufferUsageLimit: &eighty} + hc.udpateThresholds() } @@ -596,7 +600,7 @@ func TestAssignStateToBufferUsage(t *testing.T) { }, { name: "Warning state", - ewmaValue: v1alpha1.DefaultBufferUsageLimit * 90 * 0.9, + ewmaValue: v1alpha1.DefaultBufferUsageLimit * 90 * 0.86, expected: warningState, }, { @@ -609,6 +613,7 @@ func TestAssignStateToBufferUsage(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { result := assignStateToBufferUsage(tt.ewmaValue) + t.Log(tt.ewmaValue, result) assert.Equal(t, tt.expected, result) }) } From 7626eb0191a980bd1c02a1f173c1953385630f40 Mon Sep 17 00:00:00 2001 From: Samhith Kakarla Date: Fri, 9 Aug 2024 11:15:29 -0700 Subject: [PATCH 5/5] testing Signed-off-by: Samhith Kakarla --- pkg/daemon/server/service/health_status.go | 37 +++++++++++++------ .../server/service/health_status_test.go | 8 ++-- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/pkg/daemon/server/service/health_status.go b/pkg/daemon/server/service/health_status.go index 241a83c8b0..6494fb9f50 100644 --- a/pkg/daemon/server/service/health_status.go +++ b/pkg/daemon/server/service/health_status.go @@ -64,6 +64,8 @@ var ( criticalBufferThreshold = v1alpha1.DefaultBufferUsageLimit * float64(90) // 95 // warningBufferThreshold is the threshold above which the health of a vertex is warning warningBufferThreshold = float64(0.85) * criticalBufferThreshold // 80 + + BufferToThreshold = make(map[string]uint32) ) // Dataflow states @@ -229,9 +231,10 @@ func (hc *HealthChecker) getPipelineVertexDataCriticality(ctx context.Context) ( if err != nil { return nil, err } - // update the usage timeline for all the ISBs used in the pipeline + + // update the usage timeline for all the ISBs used in the pipelin hc.updateUsageTimeline(buffers.Buffers) - hc.udpateThresholds() + //hc.udpateThresholds() var vertexState []*vertexState @@ -239,6 +242,9 @@ func (hc *HealthChecker) getPipelineVertexDataCriticality(ctx context.Context) ( // for the last HEALTH_WINDOW_SIZE buffer usage entries for bufferName := range hc.timelineData { // Extract the buffer usage of the timeline + // temp := hc.pipeline.FindVertexWithBuffer(bufferName) + // hc.udpateThresholds(*temp.Limits.BufferUsageLimit) + // BufferToThreshold[bufferName] = *temp.Limits.BufferUsageLimit var bufferUsage []float64 for _, entry := range hc.timelineData[bufferName].Items() { bufferUsage = append(bufferUsage, entry.BufferUsage) @@ -247,7 +253,7 @@ func (hc *HealthChecker) getPipelineVertexDataCriticality(ctx context.Context) ( ewmaBufferUsage := calculateEWMAUsage(bufferUsage) // assign the state to the vertex based on the average buffer usage // Look back is enabled for the critical state - currentState := assignStateToTimeline(ewmaBufferUsage, enableCriticalLookBack) + currentState := assignStateToTimeline(ewmaBufferUsage, enableCriticalLookBack, BufferToThreshold[bufferName]) // create a new vertex state object currentVertexState := newVertexState(bufferName, currentState) // add the vertex state to the list of vertex states @@ -256,8 +262,8 @@ func (hc *HealthChecker) getPipelineVertexDataCriticality(ctx context.Context) ( return vertexState, nil } -func (hc *HealthChecker) udpateThresholds() { - criticalBufferThreshold = float64(*hc.pipeline.Spec.Limits.BufferUsageLimit) * 0.9 +func (hc *HealthChecker) udpateThresholds(bufferLimit uint32) { + criticalBufferThreshold = float64(bufferLimit) * 0.9 warningBufferThreshold = criticalBufferThreshold * 0.85 } @@ -284,6 +290,15 @@ func (hc *HealthChecker) updateUsageTimeline(bufferList []*daemon.BufferInfo) { bufferName := buffer.GetBufferName() timestamp := time.Now().Unix() + // check if the buffer name is present in the pipeline + // `FindVertexWithBuffer` is a method that returns the vertex with the given buffer name + // if the buffer name is present in the pipeline + if hc.pipeline != nil { + vert := hc.pipeline.FindVertexWithBuffer(bufferName) + if vert != nil { + BufferToThreshold[bufferName] = *vert.Limits.BufferUsageLimit + } + } // if the buffer name is not present in the timeline data, add it if _, ok := hc.timelineData[bufferName]; !ok { hc.timelineData[bufferName] = sharedqueue.New[*timelineEntry](int(healthWindowSize)) @@ -355,12 +370,12 @@ func calculateEWMAUsage(bufferUsage []float64) []float64 { // - if the buffer usage is above CRITICAL_THRESHOLD, the state is set to CRITICAL // - if the buffer usage is above WARNING_THRESHOLD, the state is set to WARNING // - otherwise, the state is set to HEALTHY -func assignStateToBufferUsage(ewmaValue float64) string { +func assignStateToBufferUsage(ewmaValue float64, buFUsageVal uint32) string { // Assign the state to the buffer usage var state string - if ewmaValue > float64(criticalBufferThreshold) { + if ewmaValue > float64(float64(buFUsageVal)*0.9) { state = criticalState - } else if ewmaValue > float64(warningBufferThreshold) { + } else if ewmaValue > float64(float64(buFUsageVal)*0.9*0.85) { state = warningState } else { state = healthyState @@ -373,12 +388,12 @@ func assignStateToBufferUsage(ewmaValue float64) string { // In this case, we check if the state is CRITICAL at least LOOK_BACK_COUNT times in the last CRITICAL_WINDOW_SIZE entries // If the state is CRITICAL at least LOOK_BACK_COUNT times in the last CRITICAL_WINDOW_SIZE entries // Set the state to CRITICAL -func assignStateToTimeline(ewmaValues []float64, lookBack bool) string { +func assignStateToTimeline(ewmaValues []float64, lookBack bool, bufUsageVal uint32) string { // Extract the last entry of the timeline ewmaUsage := ewmaValues[len(ewmaValues)-1] // Assign the state to the buffer usage value - state := assignStateToBufferUsage(ewmaUsage) + state := assignStateToBufferUsage(ewmaUsage, bufUsageVal) // If the state is CRITICAL, and we have a look back, we need to check we have // LOOK_BACK_COUNT entries as CRITICAL @@ -386,7 +401,7 @@ func assignStateToTimeline(ewmaValues []float64, lookBack bool) string { // Extract the states of the timeline var states []string for _, entry := range ewmaValues { - states = append(states, assignStateToBufferUsage(entry)) + states = append(states, assignStateToBufferUsage(entry, bufUsageVal)) } // Count the number of times the state is CRITICAL in the last CRITICAL_WINDOW_SIZE entries var criticalCount int diff --git a/pkg/daemon/server/service/health_status_test.go b/pkg/daemon/server/service/health_status_test.go index 437de56871..38b0e47b5f 100644 --- a/pkg/daemon/server/service/health_status_test.go +++ b/pkg/daemon/server/service/health_status_test.go @@ -117,12 +117,12 @@ func TestHealthThresholds(t *testing.T) { eighty := uint32(80) hc.pipeline.Spec.Limits = &v1alpha1.PipelineLimits{BufferUsageLimit: &forty} - hc.udpateThresholds() + hc.udpateThresholds(uint32(*hc.pipeline.Spec.Limits.BufferUsageLimit)) c, _ := hc.GetThresholds() assert.Equal(t, c, float64(36)) hc.pipeline.Spec.Limits = &v1alpha1.PipelineLimits{BufferUsageLimit: &eighty} - hc.udpateThresholds() + hc.udpateThresholds(uint32(*hc.pipeline.Spec.Limits.BufferUsageLimit)) } @@ -612,7 +612,7 @@ func TestAssignStateToBufferUsage(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result := assignStateToBufferUsage(tt.ewmaValue) + result := assignStateToBufferUsage(tt.ewmaValue, 80) t.Log(tt.ewmaValue, result) assert.Equal(t, tt.expected, result) }) @@ -691,7 +691,7 @@ func TestAssignStateToTimeline(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result := assignStateToTimeline(tt.ewmaValues, tt.lookBack) + result := assignStateToTimeline(tt.ewmaValues, tt.lookBack, 80) assert.Equal(t, tt.expected, result) }) }