From 16e2224b6a84a7c70338c35a8146be9298acc2e2 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Fri, 13 Sep 2024 15:23:57 -0400 Subject: [PATCH] Add 'pipeline' attribute to processor metrics --- .../component_telemetry_test.go.tmpl | 6 +- cmd/mdatagen/templates/component_test.go.tmpl | 6 +- component/componenttest/obsreporttest.go | 16 +- .../componenttest/otelprometheuschecker.go | 25 +- .../otelprometheuschecker_test.go | 15 +- .../testdata/prometheus_response | 24 +- .../batchprocessor/batch_processor_test.go | 91 +++--- processor/batchprocessor/factory_test.go | 9 +- .../generated_component_telemetry_test.go | 4 +- .../generated_component_test.go | 4 +- processor/batchprocessor/go.sum | 2 + processor/batchprocessor/metrics.go | 2 +- processor/go.mod | 1 + processor/go.sum | 2 + processor/internal/obsmetrics.go | 2 +- processor/internal/processor.go | 3 + .../memorylimiterprocessor/factory_test.go | 7 +- .../generated_component_test.go | 2 +- processor/memorylimiterprocessor/go.sum | 2 + .../memorylimiter_test.go | 14 +- .../generated_component_telemetry_test.go | 4 +- processor/processorhelper/logs_test.go | 16 +- processor/processorhelper/metrics_test.go | 16 +- processor/processorhelper/obsreport.go | 1 + processor/processorhelper/obsreport_test.go | 301 +++++++++++------- processor/processorhelper/traces_test.go | 16 +- processor/processortest/nop_processor.go | 3 +- processor/processortest/nop_processor_test.go | 9 +- processor/processortest/shutdown_verifier.go | 6 +- service/internal/builders/processor_test.go | 27 +- service/internal/graph/nodes.go | 2 +- 31 files changed, 364 insertions(+), 274 deletions(-) diff --git a/cmd/mdatagen/templates/component_telemetry_test.go.tmpl b/cmd/mdatagen/templates/component_telemetry_test.go.tmpl index 58c0ab7d6f6..335cc88889a 100644 --- a/cmd/mdatagen/templates/component_telemetry_test.go.tmpl +++ b/cmd/mdatagen/templates/component_telemetry_test.go.tmpl @@ -13,7 +13,7 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" sdkmetric "go.opentelemetry.io/otel/sdk/metric" - + "go.opentelemetry.io/collector/component" {{- if or isConnector isExporter isExtension isProcessor isReceiver }} "go.opentelemetry.io/collector/config/configtelemetry" @@ -28,8 +28,8 @@ type componentTestTelemetry struct { } {{- if or isConnector isExporter isExtension isProcessor isReceiver }} -func (tt *componentTestTelemetry) NewSettings() {{ .Status.Class }}.Settings { - settings := {{ .Status.Class }}test.NewNopSettings() +func (tt *componentTestTelemetry) NewSettings({{- if isProcessor -}}dt component.DataType{{- end -}}) {{ .Status.Class }}.Settings { + settings := {{ .Status.Class }}test.NewNopSettings({{- if isProcessor -}}dt{{- end -}}) settings.MeterProvider = tt.meterProvider settings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { return tt.meterProvider diff --git a/cmd/mdatagen/templates/component_test.go.tmpl b/cmd/mdatagen/templates/component_test.go.tmpl index be75cb1bae9..bcec504a2cf 100644 --- a/cmd/mdatagen/templates/component_test.go.tmpl +++ b/cmd/mdatagen/templates/component_test.go.tmpl @@ -125,7 +125,7 @@ func TestComponentLifecycle(t *testing.T) { switch test.name { case "logs": e, ok := c.(exporter.Logs) - require.True(t, ok) + require.True(t, ok) logs := generateLifecycleTestLogs() if !e.Capabilities().MutatesData { logs.MarkReadOnly() @@ -204,7 +204,7 @@ func TestComponentLifecycle(t *testing.T) { for _, test := range tests { {{- if not .Tests.SkipShutdown }} t.Run(test.name + "-shutdown", func(t *testing.T) { - c, err := test.createFn(context.Background(), processortest.NewNopSettings(), cfg) + c, err := test.createFn(context.Background(), processortest.NewNopSettings(component.MustNewType(test.name)), cfg) require.NoError(t, err) err = c.Shutdown(context.Background()) require.NoError(t, err) @@ -213,7 +213,7 @@ func TestComponentLifecycle(t *testing.T) { {{- if not .Tests.SkipLifecycle }} t.Run(test.name + "-lifecycle", func(t *testing.T) { - c, err := test.createFn(context.Background(), processortest.NewNopSettings(), cfg) + c, err := test.createFn(context.Background(), processortest.NewNopSettings(component.MustNewType(test.name)), cfg) require.NoError(t, err) host := {{ .Tests.Host }} err = c.Start(context.Background(), host) diff --git a/component/componenttest/obsreporttest.go b/component/componenttest/obsreporttest.go index 3662c640671..f8dd5594d02 100644 --- a/component/componenttest/obsreporttest.go +++ b/component/componenttest/obsreporttest.go @@ -33,11 +33,13 @@ const ( transportTag = "transport" exporterTag = "exporter" processorTag = "processor" + pipelineTag = "pipeline" ) type TestTelemetry struct { ts component.TelemetrySettings id component.ID + extraAttrs []attribute.KeyValue SpanRecorder *tracetest.SpanRecorder prometheusChecker *prometheusChecker @@ -83,19 +85,22 @@ func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64, ext // CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values. // Note: SetupTelemetry must be called before this function. func (tts *TestTelemetry) CheckProcessorTraces(acceptedSpans, refusedSpans, droppedSpans int64) error { - return tts.prometheusChecker.checkProcessorTraces(tts.id, acceptedSpans, refusedSpans, droppedSpans) + attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs) + return tts.prometheusChecker.checkProcessorTraces(attrs, acceptedSpans, refusedSpans, droppedSpans) } // CheckProcessorMetrics checks that for the current exported values for metrics exporter metrics match given values. // Note: SetupTelemetry must be called before this function. func (tts *TestTelemetry) CheckProcessorMetrics(acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints int64) error { - return tts.prometheusChecker.checkProcessorMetrics(tts.id, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints) + attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs) + return tts.prometheusChecker.checkProcessorMetrics(attrs, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints) } // CheckProcessorLogs checks that for the current exported values for logs exporter metrics match given values. // Note: SetupTelemetry must be called before this function. func (tts *TestTelemetry) CheckProcessorLogs(acceptedLogRecords, refusedLogRecords, droppedLogRecords int64) error { - return tts.prometheusChecker.checkProcessorLogs(tts.id, acceptedLogRecords, refusedLogRecords, droppedLogRecords) + attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs) + return tts.prometheusChecker.checkProcessorLogs(attrs, acceptedLogRecords, refusedLogRecords, droppedLogRecords) } // CheckReceiverTraces checks that for the current exported values for trace receiver metrics match given values. @@ -137,16 +142,17 @@ func (tts *TestTelemetry) TelemetrySettings() component.TelemetrySettings { return tts.ts } -// SetupTelemetry sets up the testing environment to check the metrics recorded by receivers, producers, or exporters. +// SetupTelemetry sets up the testing environment to check the metrics recorded by receivers, or exporters. // The caller must pass the ID of the component being tested. The ID will be used by the CreateSettings and Check methods. // The caller must defer a call to `Shutdown` on the returned TestTelemetry. -func SetupTelemetry(id component.ID) (TestTelemetry, error) { +func SetupTelemetry(id component.ID, extraAttrs ...attribute.KeyValue) (TestTelemetry, error) { sr := new(tracetest.SpanRecorder) tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) settings := TestTelemetry{ ts: NewNopTelemetrySettings(), id: id, + extraAttrs: extraAttrs, SpanRecorder: sr, } settings.ts.TracerProvider = tp diff --git a/component/componenttest/otelprometheuschecker.go b/component/componenttest/otelprometheuschecker.go index 14b357b9704..252e5e6ddb9 100644 --- a/component/componenttest/otelprometheuschecker.go +++ b/component/componenttest/otelprometheuschecker.go @@ -48,24 +48,23 @@ func (pc *prometheusChecker) checkReceiver(receiver component.ID, datatype, prot pc.checkCounter(fmt.Sprintf("receiver_refused_%s", datatype), droppedMetricPoints, receiverAttrs)) } -func (pc *prometheusChecker) checkProcessorTraces(processor component.ID, accepted, refused, dropped int64) error { - return pc.checkProcessor(processor, "spans", accepted, refused, dropped) +func (pc *prometheusChecker) checkProcessorTraces(attrs []attribute.KeyValue, accepted, refused, dropped int64) error { + return pc.checkProcessor(attrs, "spans", accepted, refused, dropped) } -func (pc *prometheusChecker) checkProcessorMetrics(processor component.ID, accepted, refused, dropped int64) error { - return pc.checkProcessor(processor, "metric_points", accepted, refused, dropped) +func (pc *prometheusChecker) checkProcessorMetrics(attrs []attribute.KeyValue, accepted, refused, dropped int64) error { + return pc.checkProcessor(attrs, "metric_points", accepted, refused, dropped) } -func (pc *prometheusChecker) checkProcessorLogs(processor component.ID, accepted, refused, dropped int64) error { - return pc.checkProcessor(processor, "log_records", accepted, refused, dropped) +func (pc *prometheusChecker) checkProcessorLogs(attrs []attribute.KeyValue, accepted, refused, dropped int64) error { + return pc.checkProcessor(attrs, "log_records", accepted, refused, dropped) } -func (pc *prometheusChecker) checkProcessor(processor component.ID, datatype string, accepted, refused, dropped int64) error { - processorAttrs := attributesForProcessorMetrics(processor) +func (pc *prometheusChecker) checkProcessor(attrs []attribute.KeyValue, datatype string, accepted, refused, dropped int64) error { return multierr.Combine( - pc.checkCounter(fmt.Sprintf("processor_accepted_%s", datatype), accepted, processorAttrs), - pc.checkCounter(fmt.Sprintf("processor_refused_%s", datatype), refused, processorAttrs), - pc.checkCounter(fmt.Sprintf("processor_dropped_%s", datatype), dropped, processorAttrs), + pc.checkCounter(fmt.Sprintf("processor_accepted_%s", datatype), accepted, attrs), + pc.checkCounter(fmt.Sprintf("processor_refused_%s", datatype), refused, attrs), + pc.checkCounter(fmt.Sprintf("processor_dropped_%s", datatype), dropped, attrs), ) } @@ -190,8 +189,8 @@ func attributesForReceiverMetrics(receiver component.ID, transport string) []att } } -func attributesForProcessorMetrics(processor component.ID) []attribute.KeyValue { - return []attribute.KeyValue{attribute.String(processorTag, processor.String())} +func attributesForProcessorMetrics(processor component.ID, extraAttrs []attribute.KeyValue) []attribute.KeyValue { + return append(extraAttrs, attribute.String(processorTag, processor.String())) } // attributesForExporterMetrics returns the attributes that are needed for the receiver metrics. diff --git a/component/componenttest/otelprometheuschecker_test.go b/component/componenttest/otelprometheuschecker_test.go index d7176134f54..5ec56031545 100644 --- a/component/componenttest/otelprometheuschecker_test.go +++ b/component/componenttest/otelprometheuschecker_test.go @@ -88,17 +88,26 @@ func TestPromChecker(t *testing.T) { ) assert.NoError(t, - pc.checkProcessorTraces(processor, 42, 13, 7), + pc.checkProcessorTraces([]attribute.KeyValue{ + attribute.String("processor", processor.String()), + attribute.String("pipeline", "traces/fakePipeline"), + }, 42, 13, 7), "metrics from Receiver Traces should be valid", ) assert.NoError(t, - pc.checkProcessorMetrics(processor, 7, 41, 13), + pc.checkProcessorMetrics([]attribute.KeyValue{ + attribute.String("processor", processor.String()), + attribute.String("pipeline", "metrics/fakePipeline"), + }, 7, 41, 13), "metrics from Receiver Metrics should be valid", ) assert.NoError(t, - pc.checkProcessorLogs(processor, 102, 35, 14), + pc.checkProcessorLogs([]attribute.KeyValue{ + attribute.String("processor", processor.String()), + attribute.String("pipeline", "logs/fakePipeline"), + }, 102, 35, 14), "metrics from Receiver Logs should be valid", ) diff --git a/component/componenttest/testdata/prometheus_response b/component/componenttest/testdata/prometheus_response index 9d0eb69ee7f..7bd017aa266 100644 --- a/component/componenttest/testdata/prometheus_response +++ b/component/componenttest/testdata/prometheus_response @@ -18,40 +18,40 @@ otelcol_exporter_send_failed_log_records{exporter="fakeExporter"} 36 otelcol_exporter_sent_log_records{exporter="fakeExporter"} 103 # HELP otelcol_processor_accepted_spans Number of spans successfully pushed into the next component in the pipeline. # TYPE otelcol_processor_accepted_spans counter -otelcol_processor_accepted_spans{processor="fakeProcessor"} 42 +otelcol_processor_accepted_spans{processor="fakeProcessor",pipeline="traces/fakePipeline"} 42 # HELP otelcol_processor_refused_spans Number of spans that were rejected by the next component in the pipeline. # TYPE otelcol_processor_refused_spans counter -otelcol_processor_refused_spans{processor="fakeProcessor"} 13 +otelcol_processor_refused_spans{processor="fakeProcessor",pipeline="traces/fakePipeline"} 13 # HELP otelcol_processor_dropped_spans Number of spans that were dropped. # TYPE otelcol_processor_dropped_spans counter -otelcol_processor_dropped_spans{processor="fakeProcessor"} 7 +otelcol_processor_dropped_spans{processor="fakeProcessor",pipeline="traces/fakePipeline"} 7 # HELP otelcol_processor_inserted_spans Number of spans that were inserted. # TYPE otelcol_processor_inserted_spans counter -otelcol_processor_inserted_spans{processor="fakeProcessor"} 5 +otelcol_processor_inserted_spans{processor="fakeProcessor",pipeline="traces/fakePipeline"} 5 # HELP otelcol_processor_accepted_metric_points Number of metric points successfully pushed into the next component in the pipeline. # TYPE otelcol_processor_accepted_metric_points counter -otelcol_processor_accepted_metric_points{processor="fakeProcessor"} 7 +otelcol_processor_accepted_metric_points{processor="fakeProcessor",pipeline="metrics/fakePipeline"} 7 # HELP otelcol_processor_refused_metric_points Number of metric points that were rejected by the next component in the pipeline. # TYPE otelcol_processor_refused_metric_points counter -otelcol_processor_refused_metric_points{processor="fakeProcessor"} 41 +otelcol_processor_refused_metric_points{processor="fakeProcessor",pipeline="metrics/fakePipeline"} 41 # HELP otelcol_processor_dropped_metric_points Number of metric points that were dropped. # TYPE otelcol_processor_dropped_metric_points counter -otelcol_processor_dropped_metric_points{processor="fakeProcessor"} 13 +otelcol_processor_dropped_metric_points{processor="fakeProcessor",pipeline="metrics/fakePipeline"} 13 # HELP otelcol_processor_inserted_metric_points Number of metric points that were inserted. # TYPE otelcol_processor_inserted_metric_points counter -otelcol_processor_inserted_metric_points{processor="fakeProcessor"} 4 +otelcol_processor_inserted_metric_points{processor="fakeProcessor",pipeline="metrics/fakePipeline"} 4 # HELP otelcol_processor_accepted_log_records Number of log records successfully pushed into the next component in the pipeline. # TYPE otelcol_processor_accepted_log_records counter -otelcol_processor_accepted_log_records{processor="fakeProcessor"} 102 +otelcol_processor_accepted_log_records{processor="fakeProcessor",pipeline="logs/fakePipeline"} 102 # HELP otelcol_processor_refused_log_records Number of log records that were rejected by the next component in the pipeline. # TYPE otelcol_processor_refused_log_records counter -otelcol_processor_refused_log_records{processor="fakeProcessor"} 35 +otelcol_processor_refused_log_records{processor="fakeProcessor",pipeline="logs/fakePipeline"} 35 # HELP otelcol_processor_dropped_log_records Number of log records that were dropped. # TYPE otelcol_processor_dropped_log_records counter -otelcol_processor_dropped_log_records{processor="fakeProcessor"} 14 +otelcol_processor_dropped_log_records{processor="fakeProcessor",pipeline="logs/fakePipeline"} 14 # HELP otelcol_processor_inserted_log_records Number of log records that were inserted. # TYPE otelcol_processor_inserted_log_records counter -otelcol_processor_inserted_log_records{processor="fakeProcessor"} 3 +otelcol_processor_inserted_log_records{processor="fakeProcessor",pipeline="logs/fakePipeline"} 3 # HELP otelcol_receiver_accepted_log_records Number of log records successfully pushed into the pipeline. # TYPE otelcol_receiver_accepted_log_records counter otelcol_receiver_accepted_log_records{receiver="fakeReceiver",transport="fakeTransport"} 102 diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index 8647670a3c8..bca60718edb 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -17,6 +17,7 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/collector/client" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" @@ -31,25 +32,22 @@ import ( func TestProcessorShutdown(t *testing.T) { factory := NewFactory() - ctx := context.Background() - processorCreationSet := processortest.NewNopSettings() - for i := 0; i < 5; i++ { require.NotPanics(t, func() { - tProc, err := factory.CreateTracesProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + tProc, err := factory.CreateTracesProcessor(ctx, processortest.NewNopSettings(component.DataTypeTraces), factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) _ = tProc.Shutdown(ctx) }) require.NotPanics(t, func() { - mProc, err := factory.CreateMetricsProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + mProc, err := factory.CreateMetricsProcessor(ctx, processortest.NewNopSettings(component.DataTypeMetrics), factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) _ = mProc.Shutdown(ctx) }) require.NotPanics(t, func() { - lProc, err := factory.CreateLogsProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + lProc, err := factory.CreateLogsProcessor(ctx, processortest.NewNopSettings(component.DataTypeLogs), factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) _ = lProc.Shutdown(ctx) }) @@ -58,22 +56,19 @@ func TestProcessorShutdown(t *testing.T) { func TestProcessorLifecycle(t *testing.T) { factory := NewFactory() - ctx := context.Background() - processorCreationSet := processortest.NewNopSettings() - for i := 0; i < 5; i++ { - tProc, err := factory.CreateTracesProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + tProc, err := factory.CreateTracesProcessor(ctx, processortest.NewNopSettings(component.DataTypeTraces), factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) require.NoError(t, tProc.Start(ctx, componenttest.NewNopHost())) require.NoError(t, tProc.Shutdown(ctx)) - mProc, err := factory.CreateMetricsProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + mProc, err := factory.CreateMetricsProcessor(ctx, processortest.NewNopSettings(component.DataTypeMetrics), factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) require.NoError(t, mProc.Start(ctx, componenttest.NewNopHost())) require.NoError(t, mProc.Shutdown(ctx)) - lProc, err := factory.CreateLogsProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + lProc, err := factory.CreateLogsProcessor(ctx, processortest.NewNopSettings(component.DataTypeLogs), factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) require.NoError(t, lProc.Start(ctx, componenttest.NewNopHost())) require.NoError(t, lProc.Shutdown(ctx)) @@ -84,7 +79,7 @@ func TestBatchProcessorSpansDelivered(t *testing.T) { sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -127,7 +122,7 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 cfg.SendBatchMaxSize = 130 - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -174,7 +169,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { sendBatchSize := 20 cfg.SendBatchSize = uint32(sendBatchSize) cfg.Timeout = 500 * time.Millisecond - creationSet := tel.NewSettings() + creationSet := tel.NewSettings(component.DataTypeTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -220,7 +215,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, @@ -241,7 +236,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, BucketCounts: []uint64{0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, @@ -262,7 +257,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: int64(expectedBatchesNum), - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), }, }, }, @@ -277,7 +272,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), }, }, }, @@ -295,7 +290,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { cfg.SendBatchSize = uint32(sendBatchSize) cfg.SendBatchMaxSize = uint32(sendBatchMaxSize) cfg.Timeout = 500 * time.Millisecond - creationSet := tel.NewSettings() + creationSet := tel.NewSettings(component.DataTypeTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -346,7 +341,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, @@ -367,7 +362,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, BucketCounts: []uint64{0, 1, uint64(expectedBatchesNum - 1), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, @@ -388,7 +383,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: int64(expectedBatchesNum - 1), - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), }, }, }, @@ -403,7 +398,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), }, }, }, @@ -418,7 +413,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), }, }, }, @@ -437,7 +432,7 @@ func TestBatchProcessorSentByTimeout(t *testing.T) { spansPerRequest := 10 start := time.Now() - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -484,7 +479,7 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) { } sink := new(consumertest.TracesSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -515,7 +510,7 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { metricsPerRequest := 5 sink := new(consumertest.MetricsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeMetrics) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -569,7 +564,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { dataPointsPerRequest := metricsPerRequest * dataPointsPerMetric sink := new(consumertest.MetricsSink) - creationSet := tel.NewSettings() + creationSet := tel.NewSettings(component.DataTypeMetrics) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -609,7 +604,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "metrics")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, @@ -630,7 +625,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "metrics")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, BucketCounts: []uint64{0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, @@ -651,7 +646,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: int64(expectedBatchesNum), - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "metrics")), }, }, }, @@ -666,7 +661,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "metrics")), }, }, }, @@ -702,7 +697,7 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) { metricsPerRequest := 10 sink := new(consumertest.MetricsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeMetrics) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -751,7 +746,7 @@ func TestBatchMetricProcessor_Shutdown(t *testing.T) { metricsPerRequest := 10 sink := new(consumertest.MetricsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeMetrics) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -849,7 +844,7 @@ func BenchmarkMultiBatchMetricProcessor(b *testing.B) { func runMetricsProcessorBenchmark(b *testing.B, cfg Config) { ctx := context.Background() sink := new(metricsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeMetrics) creationSet.MetricsLevel = configtelemetry.LevelDetailed metricsPerRequest := 1000 batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) @@ -897,7 +892,7 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { logsPerRequest := 5 sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeLogs) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -949,7 +944,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { logsPerRequest := 5 sink := new(consumertest.LogsSink) - creationSet := tel.NewSettings() + creationSet := tel.NewSettings(component.DataTypeLogs) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -989,7 +984,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "logs")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, @@ -1010,7 +1005,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "logs")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, BucketCounts: []uint64{0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, @@ -1031,7 +1026,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: int64(expectedBatchesNum), - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "logs")), }, }, }, @@ -1046,7 +1041,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "logs")), }, }, }, @@ -1063,7 +1058,7 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) { logsPerRequest := 10 sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeLogs) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -1112,7 +1107,7 @@ func TestBatchLogProcessor_Shutdown(t *testing.T) { logsPerRequest := 10 sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeLogs) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -1191,7 +1186,7 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) { cfg.SendBatchSize = 1000 cfg.Timeout = 10 * time.Minute cfg.MetadataKeys = []string{"token1", "token2"} - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -1285,7 +1280,7 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.MetadataKeys = []string{"token"} cfg.MetadataCardinalityLimit = cardLimit - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeTraces) batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -1327,7 +1322,7 @@ func TestBatchZeroConfig(t *testing.T) { const requestCount = 5 const logsPerRequest = 10 sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeLogs) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -1368,7 +1363,7 @@ func TestBatchSplitOnly(t *testing.T) { require.NoError(t, cfg.Validate()) sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeLogs) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) require.NoError(t, err) diff --git a/processor/batchprocessor/factory_test.go b/processor/batchprocessor/factory_test.go index 6dbc3af13da..8e61c90c9dc 100644 --- a/processor/batchprocessor/factory_test.go +++ b/processor/batchprocessor/factory_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/processor/processortest" ) @@ -23,20 +24,18 @@ func TestCreateDefaultConfig(t *testing.T) { func TestCreateProcessor(t *testing.T) { factory := NewFactory() - cfg := factory.CreateDefaultConfig() - creationSet := processortest.NewNopSettings() - tp, err := factory.CreateTracesProcessor(context.Background(), creationSet, cfg, nil) + tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeTraces), cfg, nil) assert.NotNil(t, tp) assert.NoError(t, err, "cannot create trace processor") assert.NoError(t, tp.Shutdown(context.Background())) - mp, err := factory.CreateMetricsProcessor(context.Background(), creationSet, cfg, nil) + mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeMetrics), cfg, nil) assert.NotNil(t, mp) assert.NoError(t, err, "cannot create metric processor") assert.NoError(t, mp.Shutdown(context.Background())) - lp, err := factory.CreateLogsProcessor(context.Background(), creationSet, cfg, nil) + lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeLogs), cfg, nil) assert.NotNil(t, lp) assert.NoError(t, err, "cannot create logs processor") assert.NoError(t, lp.Shutdown(context.Background())) diff --git a/processor/batchprocessor/generated_component_telemetry_test.go b/processor/batchprocessor/generated_component_telemetry_test.go index 4747507bcb3..a670bcb63ef 100644 --- a/processor/batchprocessor/generated_component_telemetry_test.go +++ b/processor/batchprocessor/generated_component_telemetry_test.go @@ -23,8 +23,8 @@ type componentTestTelemetry struct { meterProvider *sdkmetric.MeterProvider } -func (tt *componentTestTelemetry) NewSettings() processor.Settings { - settings := processortest.NewNopSettings() +func (tt *componentTestTelemetry) NewSettings(dt component.DataType) processor.Settings { + settings := processortest.NewNopSettings(dt) settings.MeterProvider = tt.meterProvider settings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { return tt.meterProvider diff --git a/processor/batchprocessor/generated_component_test.go b/processor/batchprocessor/generated_component_test.go index af68a5bf9a8..4d85d8e5975 100644 --- a/processor/batchprocessor/generated_component_test.go +++ b/processor/batchprocessor/generated_component_test.go @@ -68,13 +68,13 @@ func TestComponentLifecycle(t *testing.T) { for _, test := range tests { t.Run(test.name+"-shutdown", func(t *testing.T) { - c, err := test.createFn(context.Background(), processortest.NewNopSettings(), cfg) + c, err := test.createFn(context.Background(), processortest.NewNopSettings(component.MustNewType(test.name)), cfg) require.NoError(t, err) err = c.Shutdown(context.Background()) require.NoError(t, err) }) t.Run(test.name+"-lifecycle", func(t *testing.T) { - c, err := test.createFn(context.Background(), processortest.NewNopSettings(), cfg) + c, err := test.createFn(context.Background(), processortest.NewNopSettings(component.MustNewType(test.name)), cfg) require.NoError(t, err) host := componenttest.NewNopHost() err = c.Start(context.Background(), host) diff --git a/processor/batchprocessor/go.sum b/processor/batchprocessor/go.sum index 923e7d7ba7f..a6ed38acb1d 100644 --- a/processor/batchprocessor/go.sum +++ b/processor/batchprocessor/go.sum @@ -66,6 +66,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/collector/component/componentprofiles v0.109.0 h1:W+IHaK1SdExcp3lmb454Y6v+JArsWHD0gsoBiX+dKNY= +go.opentelemetry.io/collector/component/componentprofiles v0.109.0/go.mod h1:rmD8l1mpJULa3UFi/2c62Mij3QNH00BzQ05ZkfQqNYc= go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts= go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc= go.opentelemetry.io/otel/exporters/prometheus v0.52.0 h1:kmU3H0b9ufFSi8IQCcxack+sWUblKkFbqWYs6YiACGQ= diff --git a/processor/batchprocessor/metrics.go b/processor/batchprocessor/metrics.go index 0c98063ceb2..eca517fd33c 100644 --- a/processor/batchprocessor/metrics.go +++ b/processor/batchprocessor/metrics.go @@ -32,7 +32,7 @@ type batchProcessorTelemetry struct { } func newBatchProcessorTelemetry(set processor.Settings, currentMetadataCardinality func() int) (*batchProcessorTelemetry, error) { - attrs := attribute.NewSet(attribute.String(internal.ProcessorKey, set.ID.String())) + attrs := attribute.NewSet(attribute.String(internal.ProcessorKey, set.ID.String()), attribute.String("pipeline", set.PipelineID.String())) telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings, metadata.WithProcessorBatchMetadataCardinalityCallback(func() int64 { diff --git a/processor/go.mod b/processor/go.mod index 726c6aff708..ee36b7e8d76 100644 --- a/processor/go.mod +++ b/processor/go.mod @@ -6,6 +6,7 @@ require ( github.com/google/uuid v1.6.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.109.0 + go.opentelemetry.io/collector/component/componentprofiles v0.109.0 go.opentelemetry.io/collector/component/componentstatus v0.109.0 go.opentelemetry.io/collector/config/configtelemetry v0.109.0 go.opentelemetry.io/collector/consumer v0.109.0 diff --git a/processor/go.sum b/processor/go.sum index 46c53d70967..5f315c12e5d 100644 --- a/processor/go.sum +++ b/processor/go.sum @@ -54,6 +54,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/collector/component/componentprofiles v0.109.0 h1:W+IHaK1SdExcp3lmb454Y6v+JArsWHD0gsoBiX+dKNY= +go.opentelemetry.io/collector/component/componentprofiles v0.109.0/go.mod h1:rmD8l1mpJULa3UFi/2c62Mij3QNH00BzQ05ZkfQqNYc= go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts= go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc= go.opentelemetry.io/otel/exporters/prometheus v0.52.0 h1:kmU3H0b9ufFSi8IQCcxack+sWUblKkFbqWYs6YiACGQ= diff --git a/processor/internal/obsmetrics.go b/processor/internal/obsmetrics.go index c96fbe5e9e0..8bb3c705acc 100644 --- a/processor/internal/obsmetrics.go +++ b/processor/internal/obsmetrics.go @@ -6,7 +6,7 @@ package internal // import "go.opentelemetry.io/collector/processor/internal" const ( MetricNameSep = "_" - // ProcessorKey is the key used to identify processors in metrics and traces. + PipelineKey = "pipeline" ProcessorKey = "processor" ProcessorMetricPrefix = ProcessorKey + MetricNameSep diff --git a/processor/internal/processor.go b/processor/internal/processor.go index 4e7a07a18b8..7750183490e 100644 --- a/processor/internal/processor.go +++ b/processor/internal/processor.go @@ -14,4 +14,7 @@ type Settings struct { // BuildInfo can be used by components for informational purposes BuildInfo component.BuildInfo + + // PipelineID indicates which pipeline contains this processor. + PipelineID component.ID } diff --git a/processor/memorylimiterprocessor/factory_test.go b/processor/memorylimiterprocessor/factory_test.go index f63d27a381e..df9eaa9695c 100644 --- a/processor/memorylimiterprocessor/factory_test.go +++ b/processor/memorylimiterprocessor/factory_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/memorylimiter" @@ -38,19 +39,19 @@ func TestCreateProcessor(t *testing.T) { pCfg.MemorySpikeLimitMiB = 1907 pCfg.CheckInterval = 100 * time.Millisecond - tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop()) + tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeTraces), cfg, consumertest.NewNop()) assert.NoError(t, err) assert.NotNil(t, tp) // test if we can shutdown a monitoring routine that has not started assert.ErrorIs(t, tp.Shutdown(context.Background()), memorylimiter.ErrShutdownNotStarted) assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost())) - mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop()) + mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeMetrics), cfg, consumertest.NewNop()) assert.NoError(t, err) assert.NotNil(t, mp) assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost())) - lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop()) + lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeLogs), cfg, consumertest.NewNop()) assert.NoError(t, err) assert.NotNil(t, lp) assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost())) diff --git a/processor/memorylimiterprocessor/generated_component_test.go b/processor/memorylimiterprocessor/generated_component_test.go index 1820e3e0a8c..46f43ebaa58 100644 --- a/processor/memorylimiterprocessor/generated_component_test.go +++ b/processor/memorylimiterprocessor/generated_component_test.go @@ -68,7 +68,7 @@ func TestComponentLifecycle(t *testing.T) { for _, test := range tests { t.Run(test.name+"-lifecycle", func(t *testing.T) { - c, err := test.createFn(context.Background(), processortest.NewNopSettings(), cfg) + c, err := test.createFn(context.Background(), processortest.NewNopSettings(component.MustNewType(test.name)), cfg) require.NoError(t, err) host := componenttest.NewNopHost() err = c.Start(context.Background(), host) diff --git a/processor/memorylimiterprocessor/go.sum b/processor/memorylimiterprocessor/go.sum index 556ddb58cb8..35028457e42 100644 --- a/processor/memorylimiterprocessor/go.sum +++ b/processor/memorylimiterprocessor/go.sum @@ -81,6 +81,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.opentelemetry.io/collector/component/componentprofiles v0.109.0 h1:W+IHaK1SdExcp3lmb454Y6v+JArsWHD0gsoBiX+dKNY= +go.opentelemetry.io/collector/component/componentprofiles v0.109.0/go.mod h1:rmD8l1mpJULa3UFi/2c62Mij3QNH00BzQ05ZkfQqNYc= go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts= go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc= go.opentelemetry.io/otel/exporters/prometheus v0.52.0 h1:kmU3H0b9ufFSi8IQCcxack+sWUblKkFbqWYs6YiACGQ= diff --git a/processor/memorylimiterprocessor/memorylimiter_test.go b/processor/memorylimiterprocessor/memorylimiter_test.go index a09c31e1ff0..8ea274d36c7 100644 --- a/processor/memorylimiterprocessor/memorylimiter_test.go +++ b/processor/memorylimiterprocessor/memorylimiter_test.go @@ -51,7 +51,7 @@ func TestNoDataLoss(t *testing.T) { cfg.MemoryLimitMiB = uint32(ms.Alloc/(1024*1024) + expectedMemoryIncreaseMiB) cfg.MemorySpikeLimitMiB = 1 - set := processortest.NewNopSettings() + set := processortest.NewNopSettings(component.DataTypeLogs) limiter, err := newMemoryLimiterProcessor(set, cfg) require.NoError(t, err) @@ -174,11 +174,11 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { ms.Alloc = tt.memAlloc } - ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) + ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(component.DataTypeMetrics), tt.mlCfg) require.NoError(t, err) mp, err := processorhelper.NewMetricsProcessor( context.Background(), - processortest.NewNopSettings(), + processortest.NewNopSettings(component.DataTypeMetrics), tt.mlCfg, consumertest.NewNop(), ml.processMetrics, @@ -264,11 +264,11 @@ func TestTraceMemoryPressureResponse(t *testing.T) { ms.Alloc = tt.memAlloc } - ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) + ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(component.DataTypeTraces), tt.mlCfg) require.NoError(t, err) tp, err := processorhelper.NewTracesProcessor( context.Background(), - processortest.NewNopSettings(), + processortest.NewNopSettings(component.DataTypeTraces), tt.mlCfg, consumertest.NewNop(), ml.processTraces, @@ -354,11 +354,11 @@ func TestLogMemoryPressureResponse(t *testing.T) { ms.Alloc = tt.memAlloc } - ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) + ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(component.DataTypeLogs), tt.mlCfg) require.NoError(t, err) tp, err := processorhelper.NewLogsProcessor( context.Background(), - processortest.NewNopSettings(), + processortest.NewNopSettings(component.DataTypeLogs), tt.mlCfg, consumertest.NewNop(), ml.processLogs, diff --git a/processor/processorhelper/generated_component_telemetry_test.go b/processor/processorhelper/generated_component_telemetry_test.go index f03fb0ad564..12d31a2b1cd 100644 --- a/processor/processorhelper/generated_component_telemetry_test.go +++ b/processor/processorhelper/generated_component_telemetry_test.go @@ -23,8 +23,8 @@ type componentTestTelemetry struct { meterProvider *sdkmetric.MeterProvider } -func (tt *componentTestTelemetry) NewSettings() processor.Settings { - settings := processortest.NewNopSettings() +func (tt *componentTestTelemetry) NewSettings(dt component.DataType) processor.Settings { + settings := processortest.NewNopSettings(dt) settings.MeterProvider = tt.meterProvider settings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { return tt.meterProvider diff --git a/processor/processorhelper/logs_test.go b/processor/processorhelper/logs_test.go index b3a220bfb3f..03fde68fa69 100644 --- a/processor/processorhelper/logs_test.go +++ b/processor/processorhelper/logs_test.go @@ -24,7 +24,7 @@ import ( var testLogsCfg = struct{}{} func TestNewLogsProcessor(t *testing.T) { - lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil)) + lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeLogs), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil)) require.NoError(t, err) assert.True(t, lp.Capabilities().MutatesData) @@ -35,7 +35,7 @@ func TestNewLogsProcessor(t *testing.T) { func TestNewLogsProcessor_WithOptions(t *testing.T) { want := errors.New("my_error") - lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil), + lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeLogs), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil), WithStart(func(context.Context, component.Host) error { return want }), WithShutdown(func(context.Context) error { return want }), WithCapabilities(consumer.Capabilities{MutatesData: false})) @@ -47,19 +47,19 @@ func TestNewLogsProcessor_WithOptions(t *testing.T) { } func TestNewLogsProcessor_NilRequiredFields(t *testing.T) { - _, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), nil) + _, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeLogs), &testLogsCfg, consumertest.NewNop(), nil) assert.Error(t, err) } func TestNewLogsProcessor_ProcessLogError(t *testing.T) { want := errors.New("my_error") - lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(want)) + lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeLogs), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(want)) require.NoError(t, err) assert.Equal(t, want, lp.ConsumeLogs(context.Background(), plog.NewLogs())) } func TestNewLogsProcessor_ProcessLogsErrSkipProcessingData(t *testing.T) { - lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(ErrSkipProcessingData)) + lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeLogs), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(ErrSkipProcessingData)) require.NoError(t, err) assert.NoError(t, lp.ConsumeLogs(context.Background(), plog.NewLogs())) } @@ -87,7 +87,7 @@ func TestLogsProcessor_RecordInOut(t *testing.T) { incomingLogRecords.AppendEmpty() testTelemetry := setupTestTelemetry() - lp, err := NewLogsProcessor(context.Background(), testTelemetry.NewSettings(), &testLogsCfg, consumertest.NewNop(), mockAggregate) + lp, err := NewLogsProcessor(context.Background(), testTelemetry.NewSettings(component.DataTypeLogs), &testLogsCfg, consumertest.NewNop(), mockAggregate) require.NoError(t, err) assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost())) @@ -105,7 +105,7 @@ func TestLogsProcessor_RecordInOut(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 3, - Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")), + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("pipeline", "logs")), }, }, }, @@ -120,7 +120,7 @@ func TestLogsProcessor_RecordInOut(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")), + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("pipeline", "logs")), }, }, }, diff --git a/processor/processorhelper/metrics_test.go b/processor/processorhelper/metrics_test.go index 110d72fbc18..77655a29c46 100644 --- a/processor/processorhelper/metrics_test.go +++ b/processor/processorhelper/metrics_test.go @@ -24,7 +24,7 @@ import ( var testMetricsCfg = struct{}{} func TestNewMetricsProcessor(t *testing.T) { - mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil)) + mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeMetrics), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil)) require.NoError(t, err) assert.True(t, mp.Capabilities().MutatesData) @@ -35,7 +35,7 @@ func TestNewMetricsProcessor(t *testing.T) { func TestNewMetricsProcessor_WithOptions(t *testing.T) { want := errors.New("my_error") - mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil), + mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeMetrics), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil), WithStart(func(context.Context, component.Host) error { return want }), WithShutdown(func(context.Context) error { return want }), WithCapabilities(consumer.Capabilities{MutatesData: false})) @@ -47,19 +47,19 @@ func TestNewMetricsProcessor_WithOptions(t *testing.T) { } func TestNewMetricsProcessor_NilRequiredFields(t *testing.T) { - _, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), nil) + _, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeMetrics), &testMetricsCfg, consumertest.NewNop(), nil) assert.Error(t, err) } func TestNewMetricsProcessor_ProcessMetricsError(t *testing.T) { want := errors.New("my_error") - mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(want)) + mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeMetrics), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(want)) require.NoError(t, err) assert.Equal(t, want, mp.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) } func TestNewMetricsProcessor_ProcessMetricsErrSkipProcessingData(t *testing.T) { - mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(ErrSkipProcessingData)) + mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeMetrics), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(ErrSkipProcessingData)) require.NoError(t, err) assert.NoError(t, mp.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) } @@ -88,7 +88,7 @@ func TestMetricsProcessor_RecordInOut(t *testing.T) { dps.AppendEmpty() testTelemetry := setupTestTelemetry() - mp, err := NewMetricsProcessor(context.Background(), testTelemetry.NewSettings(), &testMetricsCfg, consumertest.NewNop(), mockAggregate) + mp, err := NewMetricsProcessor(context.Background(), testTelemetry.NewSettings(component.DataTypeMetrics), &testMetricsCfg, consumertest.NewNop(), mockAggregate) require.NoError(t, err) assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost())) @@ -106,7 +106,7 @@ func TestMetricsProcessor_RecordInOut(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 2, - Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")), + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("pipeline", "metrics")), }, }, }, @@ -121,7 +121,7 @@ func TestMetricsProcessor_RecordInOut(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 3, - Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")), + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("pipeline", "metrics")), }, }, }, diff --git a/processor/processorhelper/obsreport.go b/processor/processorhelper/obsreport.go index d8fffafaf94..d786bad2515 100644 --- a/processor/processorhelper/obsreport.go +++ b/processor/processorhelper/obsreport.go @@ -55,6 +55,7 @@ func newObsReport(cfg ObsReportSettings) (*ObsReport, error) { return &ObsReport{ otelAttrs: []attribute.KeyValue{ attribute.String(internal.ProcessorKey, cfg.ProcessorID.String()), + attribute.String(internal.PipelineKey, cfg.ProcessorCreateSettings.PipelineID.String()), }, telemetryBuilder: telemetryBuilder, }, nil diff --git a/processor/processorhelper/obsreport_test.go b/processor/processorhelper/obsreport_test.go index 6b8aa846e8c..5facc4f03e1 100644 --- a/processor/processorhelper/obsreport_test.go +++ b/processor/processorhelper/obsreport_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" @@ -19,45 +20,62 @@ import ( ) var ( - processorID = component.MustNewID("fakeProcessor") + processorID = component.MustNewID("fakeProcessor") + tracesPipeline = component.MustNewIDWithName("traces", "fakePipeline") + metricsPipeline = component.MustNewIDWithName("metrics", "fakePipeline") + logsPipeline = component.MustNewIDWithName("logs", "fakePipeline") ) func TestProcessorTraceData(t *testing.T) { - testTelemetry(t, processorID, func(t *testing.T, tt componenttest.TestTelemetry) { - const acceptedSpans = 27 - const refusedSpans = 19 - const droppedSpans = 13 - - obsrep, err := newObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + testTelemetry(t, processorID, + []attribute.KeyValue{attribute.String("pipeline", tracesPipeline.String())}, + func(t *testing.T, tt componenttest.TestTelemetry) { + const acceptedSpans = 27 + const refusedSpans = 19 + const droppedSpans = 13 + + obsrep, err := newObsReport(ObsReportSettings{ + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: tt.TelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: tracesPipeline, + }, + }) + require.NoError(t, err) + obsrep.TracesAccepted(context.Background(), acceptedSpans) + obsrep.TracesRefused(context.Background(), refusedSpans) + obsrep.TracesDropped(context.Background(), droppedSpans) + + require.NoError(t, tt.CheckProcessorTraces(acceptedSpans, refusedSpans, droppedSpans)) }) - require.NoError(t, err) - obsrep.TracesAccepted(context.Background(), acceptedSpans) - obsrep.TracesRefused(context.Background(), refusedSpans) - obsrep.TracesDropped(context.Background(), droppedSpans) - - require.NoError(t, tt.CheckProcessorTraces(acceptedSpans, refusedSpans, droppedSpans)) - }) } func TestProcessorMetricsData(t *testing.T) { - testTelemetry(t, processorID, func(t *testing.T, tt componenttest.TestTelemetry) { - const acceptedPoints = 29 - const refusedPoints = 11 - const droppedPoints = 17 - - obsrep, err := newObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + testTelemetry(t, processorID, + []attribute.KeyValue{attribute.String("pipeline", metricsPipeline.String())}, + func(t *testing.T, tt componenttest.TestTelemetry) { + const acceptedPoints = 29 + const refusedPoints = 11 + const droppedPoints = 17 + + obsrep, err := newObsReport(ObsReportSettings{ + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: tt.TelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: metricsPipeline, + }, + }) + require.NoError(t, err) + obsrep.MetricsAccepted(context.Background(), acceptedPoints) + obsrep.MetricsRefused(context.Background(), refusedPoints) + obsrep.MetricsDropped(context.Background(), droppedPoints) + + require.NoError(t, tt.CheckProcessorMetrics(acceptedPoints, refusedPoints, droppedPoints)) }) - require.NoError(t, err) - obsrep.MetricsAccepted(context.Background(), acceptedPoints) - obsrep.MetricsRefused(context.Background(), refusedPoints) - obsrep.MetricsDropped(context.Background(), droppedPoints) - - require.NoError(t, tt.CheckProcessorMetrics(acceptedPoints, refusedPoints, droppedPoints)) - }) } func TestBuildProcessorCustomMetricName(t *testing.T) { @@ -83,32 +101,44 @@ func TestBuildProcessorCustomMetricName(t *testing.T) { } func TestProcessorLogRecords(t *testing.T) { - testTelemetry(t, processorID, func(t *testing.T, tt componenttest.TestTelemetry) { - const acceptedRecords = 29 - const refusedRecords = 11 - const droppedRecords = 17 - - obsrep, err := newObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + testTelemetry(t, processorID, + []attribute.KeyValue{attribute.String("pipeline", logsPipeline.String())}, + func(t *testing.T, tt componenttest.TestTelemetry) { + const acceptedRecords = 29 + const refusedRecords = 11 + const droppedRecords = 17 + + obsrep, err := newObsReport(ObsReportSettings{ + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: tt.TelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: logsPipeline, + }, + }) + require.NoError(t, err) + obsrep.LogsAccepted(context.Background(), acceptedRecords) + obsrep.LogsRefused(context.Background(), refusedRecords) + obsrep.LogsDropped(context.Background(), droppedRecords) + + require.NoError(t, tt.CheckProcessorLogs(acceptedRecords, refusedRecords, droppedRecords)) }) - require.NoError(t, err) - obsrep.LogsAccepted(context.Background(), acceptedRecords) - obsrep.LogsRefused(context.Background(), refusedRecords) - obsrep.LogsDropped(context.Background(), droppedRecords) - - require.NoError(t, tt.CheckProcessorLogs(acceptedRecords, refusedRecords, droppedRecords)) - }) } func TestCheckProcessorTracesViews(t *testing.T) { - tt, err := componenttest.SetupTelemetry(processorID) + tt, err := componenttest.SetupTelemetry(processorID, attribute.String("pipeline", tracesPipeline.String())) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) por, err := NewObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: tt.TelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: tracesPipeline, + }, }) assert.NoError(t, err) @@ -131,13 +161,18 @@ func TestCheckProcessorTracesViews(t *testing.T) { } func TestCheckProcessorMetricsViews(t *testing.T) { - tt, err := componenttest.SetupTelemetry(processorID) + tt, err := componenttest.SetupTelemetry(processorID, attribute.String("pipeline", metricsPipeline.String())) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) por, err := NewObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: tt.TelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: metricsPipeline, + }, }) assert.NoError(t, err) @@ -160,13 +195,18 @@ func TestCheckProcessorMetricsViews(t *testing.T) { } func TestCheckProcessorLogViews(t *testing.T) { - tt, err := componenttest.SetupTelemetry(processorID) + tt, err := componenttest.SetupTelemetry(processorID, attribute.String("pipeline", logsPipeline.String())) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) por, err := NewObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: tt.TelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: logsPipeline, + }, }) assert.NoError(t, err) @@ -190,76 +230,97 @@ func TestCheckProcessorLogViews(t *testing.T) { func TestNoMetrics(t *testing.T) { // ensure if LevelNone is configured, no metrics are emitted by the component - testTelemetry(t, processorID, func(t *testing.T, tt componenttest.TestTelemetry) { - const accepted = 29 - const refused = 11 - const dropped = 17 - - set := tt.TelemetrySettings() - set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { - return noop.MeterProvider{} - } - - por, err := NewObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: set, BuildInfo: component.NewDefaultBuildInfo()}, + testTelemetry(t, processorID, + []attribute.KeyValue{attribute.String("pipeline", tracesPipeline.String())}, + func(t *testing.T, tt componenttest.TestTelemetry) { + const accepted = 29 + const refused = 11 + const dropped = 17 + + set := tt.TelemetrySettings() + set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { + return noop.MeterProvider{} + } + + por, err := NewObsReport(ObsReportSettings{ + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: set, + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: tracesPipeline, + }, + }) + assert.NoError(t, err) + + por.TracesAccepted(context.Background(), accepted) + por.TracesRefused(context.Background(), refused) + por.TracesDropped(context.Background(), dropped) + + require.Error(t, tt.CheckProcessorTraces(accepted, refused, dropped)) }) - assert.NoError(t, err) - - por.TracesAccepted(context.Background(), accepted) - por.TracesRefused(context.Background(), refused) - por.TracesDropped(context.Background(), dropped) - - require.Error(t, tt.CheckProcessorTraces(accepted, refused, dropped)) - }) - testTelemetry(t, processorID, func(t *testing.T, tt componenttest.TestTelemetry) { - const accepted = 29 - const refused = 11 - const dropped = 17 - - set := tt.TelemetrySettings() - set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { - return noop.MeterProvider{} - } - - por, err := NewObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: set, BuildInfo: component.NewDefaultBuildInfo()}, + testTelemetry(t, processorID, + []attribute.KeyValue{attribute.String("pipeline", metricsPipeline.String())}, + func(t *testing.T, tt componenttest.TestTelemetry) { + const accepted = 29 + const refused = 11 + const dropped = 17 + + set := tt.TelemetrySettings() + set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { + return noop.MeterProvider{} + } + + por, err := NewObsReport(ObsReportSettings{ + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: set, + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: metricsPipeline, + }, + }) + assert.NoError(t, err) + + por.MetricsAccepted(context.Background(), accepted) + por.MetricsRefused(context.Background(), refused) + por.MetricsDropped(context.Background(), dropped) + + require.Error(t, tt.CheckProcessorMetrics(accepted, refused, dropped)) }) - assert.NoError(t, err) - - por.MetricsAccepted(context.Background(), accepted) - por.MetricsRefused(context.Background(), refused) - por.MetricsDropped(context.Background(), dropped) - - require.Error(t, tt.CheckProcessorMetrics(accepted, refused, dropped)) - }) - testTelemetry(t, processorID, func(t *testing.T, tt componenttest.TestTelemetry) { - const accepted = 29 - const refused = 11 - const dropped = 17 - - set := tt.TelemetrySettings() - set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { - return noop.MeterProvider{} - } - - por, err := NewObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: set, BuildInfo: component.NewDefaultBuildInfo()}, + testTelemetry(t, processorID, + []attribute.KeyValue{attribute.String("pipeline", logsPipeline.String())}, + func(t *testing.T, tt componenttest.TestTelemetry) { + const accepted = 29 + const refused = 11 + const dropped = 17 + + set := tt.TelemetrySettings() + set.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { + return noop.MeterProvider{} + } + + por, err := NewObsReport(ObsReportSettings{ + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: set, + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: logsPipeline, + }, + }) + assert.NoError(t, err) + + por.LogsAccepted(context.Background(), accepted) + por.LogsRefused(context.Background(), refused) + por.LogsDropped(context.Background(), dropped) + + require.Error(t, tt.CheckProcessorLogs(accepted, refused, dropped)) }) - assert.NoError(t, err) - - por.LogsAccepted(context.Background(), accepted) - por.LogsRefused(context.Background(), refused) - por.LogsDropped(context.Background(), dropped) - - require.Error(t, tt.CheckProcessorLogs(accepted, refused, dropped)) - }) } -func testTelemetry(t *testing.T, id component.ID, testFunc func(t *testing.T, tt componenttest.TestTelemetry)) { - tt, err := componenttest.SetupTelemetry(id) +func testTelemetry(t *testing.T, id component.ID, extraAttrs []attribute.KeyValue, testFunc func(t *testing.T, tt componenttest.TestTelemetry)) { + tt, err := componenttest.SetupTelemetry(id, extraAttrs...) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) diff --git a/processor/processorhelper/traces_test.go b/processor/processorhelper/traces_test.go index ec45568890b..e0da0d977d8 100644 --- a/processor/processorhelper/traces_test.go +++ b/processor/processorhelper/traces_test.go @@ -24,7 +24,7 @@ import ( var testTracesCfg = struct{}{} func TestNewTracesProcessor(t *testing.T) { - tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(nil)) + tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeTraces), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(nil)) require.NoError(t, err) assert.True(t, tp.Capabilities().MutatesData) @@ -35,7 +35,7 @@ func TestNewTracesProcessor(t *testing.T) { func TestNewTracesProcessor_WithOptions(t *testing.T) { want := errors.New("my_error") - tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(nil), + tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeTraces), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(nil), WithStart(func(context.Context, component.Host) error { return want }), WithShutdown(func(context.Context) error { return want }), WithCapabilities(consumer.Capabilities{MutatesData: false})) @@ -47,19 +47,19 @@ func TestNewTracesProcessor_WithOptions(t *testing.T) { } func TestNewTracesProcessor_NilRequiredFields(t *testing.T) { - _, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), nil) + _, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeTraces), &testTracesCfg, consumertest.NewNop(), nil) assert.Error(t, err) } func TestNewTracesProcessor_ProcessTraceError(t *testing.T) { want := errors.New("my_error") - tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(want)) + tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeTraces), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(want)) require.NoError(t, err) assert.Equal(t, want, tp.ConsumeTraces(context.Background(), ptrace.NewTraces())) } func TestNewTracesProcessor_ProcessTracesErrSkipProcessingData(t *testing.T) { - tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(ErrSkipProcessingData)) + tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeTraces), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(ErrSkipProcessingData)) require.NoError(t, err) assert.NoError(t, tp.ConsumeTraces(context.Background(), ptrace.NewTraces())) } @@ -88,7 +88,7 @@ func TestTracesProcessor_RecordInOut(t *testing.T) { incomingSpans.AppendEmpty() testTelemetry := setupTestTelemetry() - tp, err := NewTracesProcessor(context.Background(), testTelemetry.NewSettings(), &testLogsCfg, consumertest.NewNop(), mockAggregate) + tp, err := NewTracesProcessor(context.Background(), testTelemetry.NewSettings(component.DataTypeTraces), &testLogsCfg, consumertest.NewNop(), mockAggregate) require.NoError(t, err) assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost())) @@ -106,7 +106,7 @@ func TestTracesProcessor_RecordInOut(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 4, - Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")), + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("pipeline", "traces")), }, }, }, @@ -121,7 +121,7 @@ func TestTracesProcessor_RecordInOut(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")), + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("pipeline", "traces")), }, }, }, diff --git a/processor/processortest/nop_processor.go b/processor/processortest/nop_processor.go index f1dd64bfd52..9a6d195dca6 100644 --- a/processor/processortest/nop_processor.go +++ b/processor/processortest/nop_processor.go @@ -20,11 +20,12 @@ import ( var nopType = component.MustNewType("nop") // NewNopSettings returns a new nop settings for Create*Processor functions. -func NewNopSettings() processor.Settings { +func NewNopSettings(dt component.DataType) processor.Settings { return processor.Settings{ ID: component.NewIDWithName(nopType, uuid.NewString()), TelemetrySettings: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: component.NewID(dt), } } diff --git a/processor/processortest/nop_processor_test.go b/processor/processortest/nop_processor_test.go index fcf902e552d..4bf2d1fe6be 100644 --- a/processor/processortest/nop_processor_test.go +++ b/processor/processortest/nop_processor_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentprofiles" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" @@ -27,28 +28,28 @@ func TestNewNopFactory(t *testing.T) { cfg := factory.CreateDefaultConfig() assert.Equal(t, &nopConfig{}, cfg) - traces, err := factory.CreateTracesProcessor(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + traces, err := factory.CreateTracesProcessor(context.Background(), NewNopSettings(component.DataTypeTraces), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, traces.Capabilities()) assert.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, traces.ConsumeTraces(context.Background(), ptrace.NewTraces())) assert.NoError(t, traces.Shutdown(context.Background())) - metrics, err := factory.CreateMetricsProcessor(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + metrics, err := factory.CreateMetricsProcessor(context.Background(), NewNopSettings(component.DataTypeMetrics), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, metrics.Capabilities()) assert.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) assert.NoError(t, metrics.Shutdown(context.Background())) - logs, err := factory.CreateLogsProcessor(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + logs, err := factory.CreateLogsProcessor(context.Background(), NewNopSettings(component.DataTypeLogs), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, logs.Capabilities()) assert.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, logs.ConsumeLogs(context.Background(), plog.NewLogs())) assert.NoError(t, logs.Shutdown(context.Background())) - profiles, err := factory.CreateProfilesProcessor(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + profiles, err := factory.CreateProfilesProcessor(context.Background(), NewNopSettings(componentprofiles.DataTypeProfiles), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, profiles.Capabilities()) assert.NoError(t, profiles.Start(context.Background(), componenttest.NewNopHost())) diff --git a/processor/processortest/shutdown_verifier.go b/processor/processortest/shutdown_verifier.go index d020f6e4f8a..11c01ad3ba6 100644 --- a/processor/processortest/shutdown_verifier.go +++ b/processor/processortest/shutdown_verifier.go @@ -21,7 +21,7 @@ import ( func verifyTracesDoesNotProduceAfterShutdown(t *testing.T, factory processor.Factory, cfg component.Config) { // Create a proc and output its produce to a sink. nextSink := new(consumertest.TracesSink) - proc, err := factory.CreateTracesProcessor(context.Background(), NewNopSettings(), cfg, nextSink) + proc, err := factory.CreateTracesProcessor(context.Background(), NewNopSettings(component.DataTypeTraces), cfg, nextSink) if errors.Is(err, component.ErrDataTypeIsNotSupported) { return } @@ -45,7 +45,7 @@ func verifyTracesDoesNotProduceAfterShutdown(t *testing.T, factory processor.Fac func verifyLogsDoesNotProduceAfterShutdown(t *testing.T, factory processor.Factory, cfg component.Config) { // Create a proc and output its produce to a sink. nextSink := new(consumertest.LogsSink) - proc, err := factory.CreateLogsProcessor(context.Background(), NewNopSettings(), cfg, nextSink) + proc, err := factory.CreateLogsProcessor(context.Background(), NewNopSettings(component.DataTypeLogs), cfg, nextSink) if errors.Is(err, component.ErrDataTypeIsNotSupported) { return } @@ -69,7 +69,7 @@ func verifyLogsDoesNotProduceAfterShutdown(t *testing.T, factory processor.Facto func verifyMetricsDoesNotProduceAfterShutdown(t *testing.T, factory processor.Factory, cfg component.Config) { // Create a proc and output its produce to a sink. nextSink := new(consumertest.MetricsSink) - proc, err := factory.CreateMetricsProcessor(context.Background(), NewNopSettings(), cfg, nextSink) + proc, err := factory.CreateMetricsProcessor(context.Background(), NewNopSettings(component.DataTypeMetrics), cfg, nextSink) if errors.Is(err, component.ErrDataTypeIsNotSupported) { return } diff --git a/service/internal/builders/processor_test.go b/service/internal/builders/processor_test.go index cc1ce0d5db2..01e6eadcbf8 100644 --- a/service/internal/builders/processor_test.go +++ b/service/internal/builders/processor_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentprofiles" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumerprofiles" @@ -186,30 +187,36 @@ func TestNewNopProcessorBuilder(t *testing.T) { factory := processortest.NewNopFactory() cfg := factory.CreateDefaultConfig() - set := processortest.NewNopSettings() - set.ID = component.NewID(nopType) - traces, err := factory.CreateTracesProcessor(context.Background(), set, cfg, consumertest.NewNop()) + tracesSet := processortest.NewNopSettings(component.DataTypeTraces) + tracesSet.ID = component.NewID(nopType) + traces, err := factory.CreateTracesProcessor(context.Background(), tracesSet, cfg, consumertest.NewNop()) require.NoError(t, err) - bTraces, err := builder.CreateTraces(context.Background(), set, consumertest.NewNop()) + bTraces, err := builder.CreateTraces(context.Background(), tracesSet, consumertest.NewNop()) require.NoError(t, err) assert.IsType(t, traces, bTraces) - metrics, err := factory.CreateMetricsProcessor(context.Background(), set, cfg, consumertest.NewNop()) + metricsSet := processortest.NewNopSettings(component.DataTypeMetrics) + metricsSet.ID = component.NewID(nopType) + metrics, err := factory.CreateMetricsProcessor(context.Background(), metricsSet, cfg, consumertest.NewNop()) require.NoError(t, err) - bMetrics, err := builder.CreateMetrics(context.Background(), set, consumertest.NewNop()) + bMetrics, err := builder.CreateMetrics(context.Background(), metricsSet, consumertest.NewNop()) require.NoError(t, err) assert.IsType(t, metrics, bMetrics) - logs, err := factory.CreateLogsProcessor(context.Background(), set, cfg, consumertest.NewNop()) + logsSet := processortest.NewNopSettings(component.DataTypeLogs) + logsSet.ID = component.NewID(nopType) + logs, err := factory.CreateLogsProcessor(context.Background(), logsSet, cfg, consumertest.NewNop()) require.NoError(t, err) - bLogs, err := builder.CreateLogs(context.Background(), set, consumertest.NewNop()) + bLogs, err := builder.CreateLogs(context.Background(), logsSet, consumertest.NewNop()) require.NoError(t, err) assert.IsType(t, logs, bLogs) - profiles, err := factory.CreateProfilesProcessor(context.Background(), set, cfg, consumertest.NewNop()) + profilesSet := processortest.NewNopSettings(componentprofiles.DataTypeProfiles) + profilesSet.ID = component.NewID(nopType) + profiles, err := factory.CreateProfilesProcessor(context.Background(), profilesSet, cfg, consumertest.NewNop()) require.NoError(t, err) - bProfiles, err := builder.CreateProfiles(context.Background(), set, consumertest.NewNop()) + bProfiles, err := builder.CreateProfiles(context.Background(), profilesSet, consumertest.NewNop()) require.NoError(t, err) assert.IsType(t, profiles, bProfiles) } diff --git a/service/internal/graph/nodes.go b/service/internal/graph/nodes.go index f9cc3e6fa7a..f65246b7734 100644 --- a/service/internal/graph/nodes.go +++ b/service/internal/graph/nodes.go @@ -144,7 +144,7 @@ func (n *processorNode) buildComponent(ctx context.Context, next baseConsumer, ) error { tel.Logger = components.ProcessorLogger(tel.Logger, n.componentID, n.pipelineID) - set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} + set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info, PipelineID: n.pipelineID} var err error switch n.pipelineID.Type() { case component.DataTypeTraces: