Skip to content

Commit

Permalink
Merge pull request #499 from nick-demianchuk/ndemyanchuk/timelag-func…
Browse files Browse the repository at this point in the history
…-fix

TimeLagSeries function fix
  • Loading branch information
deniszh authored Dec 20, 2023
2 parents e18bfaf + a211739 commit 4030828
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 18 deletions.
24 changes: 9 additions & 15 deletions pkg/expr/functions/timeLag/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,31 +41,25 @@ func MakeTimeLag(consumerMetric, producerMetric *types.MetricData, name string)
r.Values = make([]float64, len(consumerMetric.Values))
r.IsAbsent = make([]bool, len(consumerMetric.Values))

var pIndex int32 = 0
// Set initial producer index to -1
var pIndex int32 = -1
for i, v := range consumerMetric.Values {
// reset producer offset and scan it again if consumer offset decreased
if i > 0 && consumerMetric.Values[i-1] > v {
pIndex = 0
pIndex = -1
}

if consumerMetric.IsAbsent[i] || len(producerMetric.Values) == 0 {
r.IsAbsent[i] = true
continue
}

npIndex := pIndex
// npIndex: find first index in producer metric that is higher than v
for (producerMetric.IsAbsent[npIndex] || producerMetric.Values[npIndex] <= v) && (npIndex+1) < pLen {
npIndex++
for producerMetric.IsAbsent[npIndex] && (npIndex+1) < pLen {
npIndex++
}
// maintain: pIndex is highest index for which producer metric <= v
if !producerMetric.IsAbsent[npIndex] && producerMetric.Values[npIndex] <= v {
pIndex = npIndex
}
// Move Producer index to the right as much as possible
for pIndex < (int32)(i) && (pIndex+1) < pLen && producerMetric.Values[pIndex+1] <= v {
pIndex += 1
}
// we can't compute timeLag for the value that is lower than the smallest data point in producer metric
if producerMetric.IsAbsent[pIndex] || producerMetric.Values[pIndex] > v {

if pIndex == -1 || producerMetric.IsAbsent[pIndex] {
r.IsAbsent[i] = true
continue
}
Expand Down
45 changes: 42 additions & 3 deletions pkg/expr/functions/timeLag/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,36 @@ func TestTimeLagSeriesMultiReturn(t *testing.T) {
"timeLagSeries(metric2,metric2)": {types.MakeMetricData("timeLagSeries(metric2,metric2)", []float64{0, 0, 0, 0, 0}, 1, now32)},
},
},
{
"timeLagSeries(metric1,metric2)",
map[parser.MetricRequest][]*types.MetricData{
{"metric1", 0, 1}: {
types.MakeMetricData("metric1", []float64{1, 30, 40, 60, 80, 100, 100, 100, 100, 100, 150, 200, 200, 200}, 1, now32),
},
{"metric2", 0, 1}: {
types.MakeMetricData("metric2", []float64{1, 100, 100, 100, 100, 100, 100, 100, 100, 200, 200, 200, 200, 200}, 1, now32),
},
},
"timeLagSeries",
map[string][]*types.MetricData{
"timeLagSeries(metric1,metric2)": {types.MakeMetricData("timeLagSeries(metric1,metric2)", []float64{0, 1, 2, 3, 4, 0, 0, 0, 0, 1, 2, 0, 0, 0}, 1, now32)},
},
},
{
"timeLagSeries(metric1,metric2)",
map[parser.MetricRequest][]*types.MetricData{
{"metric1", 0, 1}: {
types.MakeMetricData("metric1", []float64{50, 50, 50, 60, 70, 75, 80, 90, 90, 90}, 1, now32),
},
{"metric2", 0, 1}: {
types.MakeMetricData("metric2", []float64{50, 50, 50, 90, 90, 90, 90, 90, 90, 90}, 1, now32),
},
},
"timeLagSeries",
map[string][]*types.MetricData{
"timeLagSeries(metric1,metric2)": {types.MakeMetricData("timeLagSeries(metric1,metric2)", []float64{0, 0, 0, 1, 2, 3, 4, 0, 0, 0}, 1, now32)},
},
},
}

for _, tt := range tests {
Expand All @@ -58,7 +88,6 @@ func TestTimeLagSeriesMultiReturn(t *testing.T) {
th.TestMultiReturnEvalExpr(t, &tt)
})
}

}

func TestTimeLagSeries(t *testing.T) {
Expand All @@ -72,7 +101,16 @@ func TestTimeLagSeries(t *testing.T) {
{"metric2", 0, 1}: {types.MakeMetricData("metric2", []float64{2, math.NaN(), 3, math.NaN(), 5, 12}, 1, now32)},
},
[]*types.MetricData{types.MakeMetricData("timeLagSeries(metric1,metric2)",
[]float64{math.NaN(), math.NaN(), math.NaN(), 1, 2, 0}, 1, now32)},
[]float64{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), 0}, 1, now32)},
},
{
"timeLagSeries(metric1,metric2)",
map[parser.MetricRequest][]*types.MetricData{
{"metric1", 0, 1}: {types.MakeMetricData("metric1", []float64{4857060, 4857060, 4857060, 4857200, 4858101, 4859001, 4859901}, 1, now32)},
{"metric2", 0, 1}: {types.MakeMetricData("metric2", []float64{4857060, 4857060, 4857060, 4884065, 4884065, 4884065, 4884065}, 1, now32)},
},
[]*types.MetricData{types.MakeMetricData("timeLagSeries(metric1,metric2)",
[]float64{0, 0, 0, 1, 2, 3, 4}, 1, now32)},
},
{
"timeLagSeries(metric[12])",
Expand All @@ -83,7 +121,7 @@ func TestTimeLagSeries(t *testing.T) {
},
},
[]*types.MetricData{types.MakeMetricData("timeLagSeries(metric[12])",
[]float64{math.NaN(), math.NaN(), math.NaN(), 1, 2, 0}, 1, now32)},
[]float64{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), 0}, 1, now32)},
},
{
"timeLagSeries(metric1,metric2)",
Expand All @@ -108,6 +146,7 @@ func TestTimeLagSeries(t *testing.T) {
for _, tt := range tests {
tt := tt
testName := tt.Target

t.Run(testName, func(t *testing.T) {
th.TestEvalExpr(t, &tt)
})
Expand Down

0 comments on commit 4030828

Please sign in to comment.