Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: changing health status thresholds #1905

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 36 additions & 11 deletions pkg/daemon/server/service/health_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ const (
)

// HealthThresholds are the thresholds used to compute the health of a vertex
const (
var (
// criticalBufferThreshold is the threshold above which the health of a vertex is critical
criticalBufferThreshold = 95
criticalBufferThreshold = v1alpha1.DefaultBufferUsageLimit * float64(90) // 95
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The threshold for different vertex should be different - based on the vertex limits configuration, is this right? @kohlisid

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes now that we have it relative, it can vary vertex to vertex.
The code will need changes to compare each timeline using the limits for the corresponding vertex limits.

// warningBufferThreshold is the threshold above which the health of a vertex is warning
warningBufferThreshold = 80
warningBufferThreshold = float64(0.85) * criticalBufferThreshold // 80

BufferToThreshold = make(map[string]uint32)
)

// Dataflow states
Expand Down Expand Up @@ -229,15 +231,20 @@ func (hc *HealthChecker) getPipelineVertexDataCriticality(ctx context.Context) (
if err != nil {
return nil, err
}
// update the usage timeline for all the ISBs used in the pipeline

// update the usage timeline for all the ISBs used in the pipelin
hc.updateUsageTimeline(buffers.Buffers)
//hc.udpateThresholds()

var vertexState []*vertexState

// iterate over the timeline data for each buffer and calculate the exponential weighted mean average
// for the last HEALTH_WINDOW_SIZE buffer usage entries
for bufferName := range hc.timelineData {
// Extract the buffer usage of the timeline
// temp := hc.pipeline.FindVertexWithBuffer(bufferName)
// hc.udpateThresholds(*temp.Limits.BufferUsageLimit)
// BufferToThreshold[bufferName] = *temp.Limits.BufferUsageLimit
var bufferUsage []float64
for _, entry := range hc.timelineData[bufferName].Items() {
bufferUsage = append(bufferUsage, entry.BufferUsage)
Expand All @@ -246,7 +253,7 @@ func (hc *HealthChecker) getPipelineVertexDataCriticality(ctx context.Context) (
ewmaBufferUsage := calculateEWMAUsage(bufferUsage)
// assign the state to the vertex based on the average buffer usage
// Look back is enabled for the critical state
currentState := assignStateToTimeline(ewmaBufferUsage, enableCriticalLookBack)
currentState := assignStateToTimeline(ewmaBufferUsage, enableCriticalLookBack, BufferToThreshold[bufferName])
// create a new vertex state object
currentVertexState := newVertexState(bufferName, currentState)
// add the vertex state to the list of vertex states
Expand All @@ -255,6 +262,15 @@ func (hc *HealthChecker) getPipelineVertexDataCriticality(ctx context.Context) (
return vertexState, nil
}

func (hc *HealthChecker) udpateThresholds(bufferLimit uint32) {
criticalBufferThreshold = float64(bufferLimit) * 0.9
warningBufferThreshold = criticalBufferThreshold * 0.85
}

func (hc *HealthChecker) GetThresholds() (float64, float64) {
return criticalBufferThreshold, warningBufferThreshold
}

// updateUsageTimeline is used to update the usage timeline for a given buffer list
// This iterates over all the buffers in the buffer list and updates the usage timeline for each buffer
// The timeline data is represented as a map of buffer name to a list of timelineEntry
Expand All @@ -274,6 +290,15 @@ func (hc *HealthChecker) updateUsageTimeline(bufferList []*daemon.BufferInfo) {
bufferName := buffer.GetBufferName()
timestamp := time.Now().Unix()

// check if the buffer name is present in the pipeline
// `FindVertexWithBuffer` is a method that returns the vertex with the given buffer name
// if the buffer name is present in the pipeline
if hc.pipeline != nil {
vert := hc.pipeline.FindVertexWithBuffer(bufferName)
if vert != nil {
BufferToThreshold[bufferName] = *vert.Limits.BufferUsageLimit
}
}
// if the buffer name is not present in the timeline data, add it
if _, ok := hc.timelineData[bufferName]; !ok {
hc.timelineData[bufferName] = sharedqueue.New[*timelineEntry](int(healthWindowSize))
Expand Down Expand Up @@ -345,12 +370,12 @@ func calculateEWMAUsage(bufferUsage []float64) []float64 {
// - if the buffer usage is above CRITICAL_THRESHOLD, the state is set to CRITICAL
// - if the buffer usage is above WARNING_THRESHOLD, the state is set to WARNING
// - otherwise, the state is set to HEALTHY
func assignStateToBufferUsage(ewmaValue float64) string {
func assignStateToBufferUsage(ewmaValue float64, buFUsageVal uint32) string {
// Assign the state to the buffer usage
var state string
if ewmaValue > criticalBufferThreshold {
if ewmaValue > float64(float64(buFUsageVal)*0.9) {
state = criticalState
} else if ewmaValue > warningBufferThreshold {
} else if ewmaValue > float64(float64(buFUsageVal)*0.9*0.85) {
state = warningState
} else {
state = healthyState
Expand All @@ -363,20 +388,20 @@ func assignStateToBufferUsage(ewmaValue float64) string {
// In this case, we check if the state is CRITICAL at least LOOK_BACK_COUNT times in the last CRITICAL_WINDOW_SIZE entries
// If the state is CRITICAL at least LOOK_BACK_COUNT times in the last CRITICAL_WINDOW_SIZE entries
// Set the state to CRITICAL
func assignStateToTimeline(ewmaValues []float64, lookBack bool) string {
func assignStateToTimeline(ewmaValues []float64, lookBack bool, bufUsageVal uint32) string {
// Extract the last entry of the timeline
ewmaUsage := ewmaValues[len(ewmaValues)-1]

// Assign the state to the buffer usage value
state := assignStateToBufferUsage(ewmaUsage)
state := assignStateToBufferUsage(ewmaUsage, bufUsageVal)

// If the state is CRITICAL, and we have a look back, we need to check we have
// LOOK_BACK_COUNT entries as CRITICAL
if state == criticalState && lookBack {
// Extract the states of the timeline
var states []string
for _, entry := range ewmaValues {
states = append(states, assignStateToBufferUsage(entry))
states = append(states, assignStateToBufferUsage(entry, bufUsageVal))
}
// Count the number of times the state is CRITICAL in the last CRITICAL_WINDOW_SIZE entries
var criticalCount int
Expand Down
87 changes: 72 additions & 15 deletions pkg/daemon/server/service/health_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,25 @@ func TestNewHealthChecker(t *testing.T) {
}
}

func TestHealthThresholds(t *testing.T) {

hc := NewHealthChecker(&v1alpha1.Pipeline{}, &mockISBService{})
a, _ := hc.GetThresholds()
assert.Equal(t, a, float64(72))

forty := uint32(40)
eighty := uint32(80)

hc.pipeline.Spec.Limits = &v1alpha1.PipelineLimits{BufferUsageLimit: &forty}
hc.udpateThresholds(uint32(*hc.pipeline.Spec.Limits.BufferUsageLimit))
c, _ := hc.GetThresholds()
assert.Equal(t, c, float64(36))

hc.pipeline.Spec.Limits = &v1alpha1.PipelineLimits{BufferUsageLimit: &eighty}
hc.udpateThresholds(uint32(*hc.pipeline.Spec.Limits.BufferUsageLimit))

}

type mockISBService struct {
isbsvc.ISBService
}
Expand Down Expand Up @@ -576,30 +595,32 @@ func TestAssignStateToBufferUsage(t *testing.T) {
}{
{
name: "Critical state",
ewmaValue: 96,
ewmaValue: v1alpha1.DefaultBufferUsageLimit * 95,
expected: criticalState,
},
{
name: "Warning state",
ewmaValue: 85,
ewmaValue: v1alpha1.DefaultBufferUsageLimit * 90 * 0.86,
expected: warningState,
},
{
name: "Healthy state",
ewmaValue: 30,
ewmaValue: v1alpha1.DefaultBufferUsageLimit * 30,
expected: healthyState,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := assignStateToBufferUsage(tt.ewmaValue)
result := assignStateToBufferUsage(tt.ewmaValue, 80)
t.Log(tt.ewmaValue, result)
assert.Equal(t, tt.expected, result)
})
}
}

func TestAssignStateToTimeline(t *testing.T) {
n := v1alpha1.DefaultBufferUsageLimit * 100
tests := []struct {
name string
ewmaValues []float64
Expand All @@ -608,74 +629,110 @@ func TestAssignStateToTimeline(t *testing.T) {
}{
{
name: "Single healthy value",
ewmaValues: []float64{30},
ewmaValues: []float64{n * 0.3},
lookBack: false,
expected: healthyState,
},
{
name: "Single warning value",
ewmaValues: []float64{85},
ewmaValues: []float64{n * 0.85},
lookBack: false,
expected: warningState,
},
{
name: "Single critical value without lookback",
ewmaValues: []float64{96},
ewmaValues: []float64{n * 0.95},
lookBack: false,
expected: criticalState,
},
{
name: "Single critical value with lookback",
ewmaValues: []float64{96},
ewmaValues: []float64{n * 0.95},
lookBack: true,
expected: warningState,
},
{
name: "Multiple values ending with critical, no lookback",
ewmaValues: []float64{30, 85, 96},
ewmaValues: []float64{n * 0.3, n * 0.7, n * 0.92},
lookBack: false,
expected: criticalState,
},
{
name: "Multiple values ending with critical, with lookback, insufficient critical count",
ewmaValues: []float64{30, 85, 96, 96},
ewmaValues: []float64{n * 0.3, n * 0.7, n * 0.95, n * 0.95},
lookBack: true,
expected: warningState,
},
{
name: "Multiple values ending with critical, with lookback, sufficient critical count",
ewmaValues: []float64{96, 96, 96, 96, 96},
ewmaValues: []float64{n * 0.95, n * 0.95, n * 0.95, n * 0.95, n * 0.95},
lookBack: true,
expected: criticalState,
},
{
name: "Values fluctuating between warning and critical",
ewmaValues: []float64{85, 96, 85, 96, 85},
ewmaValues: []float64{n * 0.85, n * 0.95, n * 0.85, n * 0.95, n * 0.85},
lookBack: true,
expected: warningState,
},
{
name: "Values increasing from healthy to critical",
ewmaValues: []float64{30, 50, 70, 90, 96},
ewmaValues: []float64{n * 0.3, n * 0.5, n * 0.7, n * 0.9, n * 0.96},
lookBack: true,
expected: warningState,
},
{
name: "Values decreasing from critical to healthy",
ewmaValues: []float64{96, 90, 70, 50, 30},
ewmaValues: []float64{n * 0.96, n * 0.9, n * 0.7, n * 0.5, n * 0.3},
lookBack: true,
expected: healthyState,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := assignStateToTimeline(tt.ewmaValues, tt.lookBack)
result := assignStateToTimeline(tt.ewmaValues, tt.lookBack, 80)
assert.Equal(t, tt.expected, result)
})
}
}

// func Scalalbility_error_test(t *testing.T) {
// pipelineName := "simple-pipeline"
// namespace := "numaflow-system"
// edges := []v1alpha1.Edge{
// {
// From: "in",
// To: "cat",
// },
// {
// From: "cat",
// To: "out",
// },
// }
// pipeline := &v1alpha1.Pipeline{
// ObjectMeta: metav1.ObjectMeta{
// Name: pipelineName,
// Namespace: namespace,
// },
// Spec: v1alpha1.PipelineSpec{
// Vertices: []v1alpha1.AbstractVertex{
// {Name: "in", Source: &v1alpha1.Source{}},
// {Name: "cat", UDF: &v1alpha1.UDF{}},
// {Name: "out", Sink: &v1alpha1.Sink{}},
// },
// Edges: edges,
// },
// }

// for v := range pipeline.Spec.Vertices {
// pipeline.Spec.Vertices[v].Scale = v1alpha1.Scale{Disabled: true}
// }

// pipeline.

// }

func TestConvertVertexStateToPipelineState(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading