Skip to content

Commit

Permalink
[chore] update batch processor tests to use generated utility (#10256)
Browse files Browse the repository at this point in the history
This updates the tests for the batch processor. This allows us to remove
a bunch of custom test code.

---------

Signed-off-by: Alex Boten <[email protected]>
  • Loading branch information
codeboten authored May 29, 2024
1 parent 567a175 commit 8da8d0a
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 239 deletions.
282 changes: 251 additions & 31 deletions processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func TestBatchProcessorSentBySize(t *testing.T) {
sizeSum := 0
for requestNum := 0; requestNum < requestCount; requestNum++ {
td := testdata.GenerateTraces(spansPerRequest)
sizeSum += sizer.TracesSize(td)

assert.NoError(t, batcher.ConsumeTraces(context.Background(), td))
}

Expand All @@ -203,6 +203,7 @@ func TestBatchProcessorSentBySize(t *testing.T) {
receivedTraces := sink.AllTraces()
require.EqualValues(t, expectedBatchesNum, len(receivedTraces))
for _, td := range receivedTraces {
sizeSum += sizer.TracesSize(td)
rss := td.ResourceSpans()
require.Equal(t, expectedBatchingFactor, rss.Len())
for i := 0; i < expectedBatchingFactor; i++ {
Expand Down Expand Up @@ -285,18 +286,16 @@ func TestBatchProcessorSentBySize(t *testing.T) {
}

func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) {
telemetryTest(t, testBatchProcessorSentBySizeWithMaxSize)
}

func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry) {
tel := setupTestTelemetry()
sizer := &ptrace.ProtoMarshaler{}
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
sendBatchSize := 20
sendBatchMaxSize := 37
cfg.SendBatchSize = uint32(sendBatchSize)
cfg.SendBatchMaxSize = uint32(sendBatchMaxSize)
cfg.Timeout = 500 * time.Millisecond
creationSet := tel.NewProcessorCreateSettings()
creationSet := tel.NewCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
require.NoError(t, err)
Expand All @@ -307,6 +306,8 @@ func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry) {
totalSpans := requestCount * spansPerRequest

start := time.Now()

sizeSum := 0
for requestNum := 0; requestNum < requestCount; requestNum++ {
td := testdata.GenerateTraces(spansPerRequest)
assert.NoError(t, batcher.ConsumeTraces(context.Background(), td))
Expand All @@ -323,12 +324,105 @@ func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry) {
require.Equal(t, totalSpans, sink.SpanCount())
receivedTraces := sink.AllTraces()
require.EqualValues(t, expectedBatchesNum, len(receivedTraces))
// we have to count the size after it was processed since splitTraces will cause some
// repeated ResourceSpan data to be sent through the processor
var min, max int
for _, td := range receivedTraces {
if min == 0 || sizer.TracesSize(td) < min {
min = sizer.TracesSize(td)
}
if sizer.TracesSize(td) > max {
max = sizer.TracesSize(td)
}
sizeSum += sizer.TracesSize(td)
}

tel.assertMetrics(t, expectedMetrics{
sendCount: float64(expectedBatchesNum),
sendSizeSum: float64(sink.SpanCount()),
sizeTrigger: math.Floor(float64(totalSpans) / float64(sendBatchMaxSize)),
timeoutTrigger: 1,
tel.assertMetrics(t, []metricdata.Metrics{
{
Name: "processor_batch_batch_send_size_bytes",
Description: "Number of bytes in batch that was sent",
Unit: "By",
Data: metricdata.Histogram[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("processor", "batch")),
Count: uint64(expectedBatchesNum),
Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000,
100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000,
1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000},
BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, uint64(expectedBatchesNum - 1), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Sum: int64(sizeSum),
Min: metricdata.NewExtrema(int64(min)),
Max: metricdata.NewExtrema(int64(max)),
},
},
},
},
{
Name: "processor_batch_batch_send_size",
Description: "Number of units in the batch",
Unit: "1",
Data: metricdata.Histogram[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("processor", "batch")),
Count: uint64(expectedBatchesNum),
Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000},
BucketCounts: []uint64{0, 1, uint64(expectedBatchesNum - 1), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Sum: int64(sink.SpanCount()),
Min: metricdata.NewExtrema(int64(sendBatchSize - 1)),
Max: metricdata.NewExtrema(int64(cfg.SendBatchMaxSize)),
},
},
},
},
{
Name: "processor_batch_batch_size_trigger_send",
Description: "Number of times the batch was sent due to a size trigger",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: int64(expectedBatchesNum - 1),
Attributes: attribute.NewSet(attribute.String("processor", "batch")),
},
},
},
},
{
Name: "processor_batch_timeout_trigger_send",
Description: "Number of times the batch was sent due to a timeout trigger",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(attribute.String("processor", "batch")),
},
},
},
},
{
Name: "processor_batch_metadata_cardinality",
Description: "Number of distinct metadata value combinations being processed",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: false,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(attribute.String("processor", "batch")),
},
},
},
},
})
}

Expand Down Expand Up @@ -459,10 +553,7 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) {
}

func TestBatchMetricProcessorBatchSize(t *testing.T) {
telemetryTest(t, testBatchMetricProcessorBatchSize)
}

func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry) {
tel := setupTestTelemetry()
sizer := &pmetric.ProtoMarshaler{}

// Instantiate the batch processor with low config values to test data
Expand All @@ -478,7 +569,7 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry) {
dataPointsPerRequest := metricsPerRequest * dataPointsPerMetric
sink := new(consumertest.MetricsSink)

creationSet := tel.NewProcessorCreateSettings()
creationSet := tel.NewCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg)
require.NoError(t, err)
Expand Down Expand Up @@ -509,11 +600,77 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry) {
}
}

tel.assertMetrics(t, expectedMetrics{
sendCount: float64(expectedBatchesNum),
sendSizeSum: float64(sink.DataPointCount()),
sendSizeBytesSum: float64(size),
sizeTrigger: 20,
tel.assertMetrics(t, []metricdata.Metrics{
{
Name: "processor_batch_batch_send_size_bytes",
Description: "Number of bytes in batch that was sent",
Unit: "By",
Data: metricdata.Histogram[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("processor", "batch")),
Count: uint64(expectedBatchesNum),
Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000,
100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000,
1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000},
BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Sum: int64(size),
Min: metricdata.NewExtrema(int64(size / expectedBatchesNum)),
Max: metricdata.NewExtrema(int64(size / expectedBatchesNum)),
},
},
},
},
{
Name: "processor_batch_batch_send_size",
Description: "Number of units in the batch",
Unit: "1",
Data: metricdata.Histogram[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("processor", "batch")),
Count: uint64(expectedBatchesNum),
Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000},
BucketCounts: []uint64{0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Sum: int64(sink.DataPointCount()),
Min: metricdata.NewExtrema(int64(cfg.SendBatchSize)),
Max: metricdata.NewExtrema(int64(cfg.SendBatchSize)),
},
},
},
},
{
Name: "processor_batch_batch_size_trigger_send",
Description: "Number of times the batch was sent due to a size trigger",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: int64(expectedBatchesNum),
Attributes: attribute.NewSet(attribute.String("processor", "batch")),
},
},
},
},
{
Name: "processor_batch_metadata_cardinality",
Description: "Number of distinct metadata value combinations being processed",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: false,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(attribute.String("processor", "batch")),
},
},
},
},
})
}

Expand Down Expand Up @@ -778,10 +935,7 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) {
}

func TestBatchLogProcessor_BatchSize(t *testing.T) {
telemetryTest(t, testBatchLogProcessorBatchSize)
}

func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry) {
tel := setupTestTelemetry()
sizer := &plog.ProtoMarshaler{}

// Instantiate the batch processor with low config values to test data
Expand All @@ -795,7 +949,7 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry) {
logsPerRequest := 5
sink := new(consumertest.LogsSink)

creationSet := tel.NewProcessorCreateSettings()
creationSet := tel.NewCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg)
require.NoError(t, err)
Expand Down Expand Up @@ -826,11 +980,77 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry) {
}
}

tel.assertMetrics(t, expectedMetrics{
sendCount: float64(expectedBatchesNum),
sendSizeSum: float64(sink.LogRecordCount()),
sendSizeBytesSum: float64(size),
sizeTrigger: float64(expectedBatchesNum),
tel.assertMetrics(t, []metricdata.Metrics{
{
Name: "processor_batch_batch_send_size_bytes",
Description: "Number of bytes in batch that was sent",
Unit: "By",
Data: metricdata.Histogram[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("processor", "batch")),
Count: uint64(expectedBatchesNum),
Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000,
100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000,
1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000},
BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Sum: int64(size),
Min: metricdata.NewExtrema(int64(size / expectedBatchesNum)),
Max: metricdata.NewExtrema(int64(size / expectedBatchesNum)),
},
},
},
},
{
Name: "processor_batch_batch_send_size",
Description: "Number of units in the batch",
Unit: "1",
Data: metricdata.Histogram[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("processor", "batch")),
Count: uint64(expectedBatchesNum),
Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000},
BucketCounts: []uint64{0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Sum: int64(sink.LogRecordCount()),
Min: metricdata.NewExtrema(int64(cfg.SendBatchSize)),
Max: metricdata.NewExtrema(int64(cfg.SendBatchSize)),
},
},
},
},
{
Name: "processor_batch_batch_size_trigger_send",
Description: "Number of times the batch was sent due to a size trigger",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: int64(expectedBatchesNum),
Attributes: attribute.NewSet(attribute.String("processor", "batch")),
},
},
},
},
{
Name: "processor_batch_metadata_cardinality",
Description: "Number of distinct metadata value combinations being processed",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: false,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(attribute.String("processor", "batch")),
},
},
},
},
})
}

Expand Down
10 changes: 5 additions & 5 deletions processor/batchprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ module go.opentelemetry.io/collector/processor/batchprocessor
go 1.21.0

require (
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.53.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector v0.101.0
go.opentelemetry.io/collector/component v0.101.0
Expand All @@ -16,9 +13,7 @@ require (
go.opentelemetry.io/collector/pdata/testdata v0.101.0
go.opentelemetry.io/collector/processor v0.101.0
go.opentelemetry.io/otel v1.27.0
go.opentelemetry.io/otel/exporters/prometheus v0.49.0
go.opentelemetry.io/otel/metric v1.27.0
go.opentelemetry.io/otel/sdk v1.27.0
go.opentelemetry.io/otel/sdk/metric v1.27.0
go.opentelemetry.io/otel/trace v1.27.0
go.uber.org/goleak v1.3.0
Expand All @@ -43,7 +38,12 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.53.0 // indirect
github.com/prometheus/procfs v0.15.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.49.0 // indirect
go.opentelemetry.io/otel/sdk v1.27.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sys v0.20.0 // indirect
Expand Down
Loading

0 comments on commit 8da8d0a

Please sign in to comment.