diff --git a/cmd/otelcontribcol/exporters_test.go b/cmd/otelcontribcol/exporters_test.go deleted file mode 100644 index bbd250507b02..000000000000 --- a/cmd/otelcontribcol/exporters_test.go +++ /dev/null @@ -1,737 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package main - -import ( - "context" - "errors" - "path/filepath" - "runtime" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configgrpc" - "go.opentelemetry.io/collector/config/confighttp" - "go.opentelemetry.io/collector/config/configopaque" - "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/exporter/otlpexporter" - "go.opentelemetry.io/collector/exporter/otlphttpexporter" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" - "go.opentelemetry.io/collector/pipeline" - - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/alertmanagerexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/alibabacloudlogserviceexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awscloudwatchlogsexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsxrayexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/azuredataexplorerexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/azuremonitorexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/carbonexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/cassandraexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/coralogixexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datasetexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/honeycombmarkerexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/influxdbexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/instanaexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/logicmonitorexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/logzioexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/lokiexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/mezmoexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opencensusexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opensearchexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/pulsarexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sapmexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sentryexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/syslogexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/tencentcloudlogserviceexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/zipkinexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" -) - -func TestDefaultExporters(t *testing.T) { - factories, err := components() - assert.NoError(t, err) - - expFactories := factories.Exporters - endpoint := testutil.GetAvailableLocalAddress(t) - - tests := []struct { - getConfigFn getExporterConfigFn - exporter component.Type - skipLifecycle bool - expectConsumeErr bool - }{ - { - exporter: "awscloudwatchlogs", - getConfigFn: func() component.Config { - cfg := expFactories["awscloudwatchlogs"].CreateDefaultConfig().(*awscloudwatchlogsexporter.Config) - cfg.Endpoint = "http://" + endpoint - cfg.Region = "local" - - // disable queue/retry to validate passing the test data synchronously - cfg.QueueSettings.Enabled = false - cfg.BackOffConfig.Enabled = false - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "awss3", - expectConsumeErr: true, - }, - { - exporter: "file", - getConfigFn: func() component.Config { - cfg := expFactories["file"].CreateDefaultConfig().(*fileexporter.Config) - cfg.Path = filepath.Join(t.TempDir(), "file.exporter.random.file") - return cfg - }, - skipLifecycle: runtime.GOOS == "windows", // On Windows not all handles are closed when the exporter is shutdown. - }, - { - exporter: "kafka", - getConfigFn: func() component.Config { - cfg := expFactories["kafka"].CreateDefaultConfig().(*kafkaexporter.Config) - cfg.Brokers = []string{"invalid:9092"} - // this disables contacting the broker so we can successfully create the exporter - cfg.Metadata.Full = false - // disable queue/retry to validate passing the test data synchronously - cfg.QueueSettings.Enabled = false - cfg.BackOffConfig.Enabled = false - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "debug", - }, - { - exporter: "opencensus", - getConfigFn: func() component.Config { - cfg := expFactories["opencensus"].CreateDefaultConfig().(*opencensusexporter.Config) - cfg.ClientConfig = configgrpc.ClientConfig{ - Endpoint: endpoint, - } - // disable queue/retry to validate passing the test data synchronously - cfg.QueueSettings.Enabled = false - cfg.BackOffConfig.Enabled = false - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "opensearch", - getConfigFn: func() component.Config { - cfg := expFactories["opensearch"].CreateDefaultConfig().(*opensearchexporter.Config) - cfg.ClientConfig = confighttp.ClientConfig{ - Endpoint: "http://" + endpoint, - } - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "otlp", - getConfigFn: func() component.Config { - cfg := expFactories["otlp"].CreateDefaultConfig().(*otlpexporter.Config) - cfg.ClientConfig = configgrpc.ClientConfig{ - Endpoint: endpoint, - } - // disable queue/retry to validate passing the test data synchronously - cfg.QueueConfig.Enabled = false - cfg.RetryConfig.Enabled = false - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "otlphttp", - getConfigFn: func() component.Config { - cfg := expFactories["otlphttp"].CreateDefaultConfig().(*otlphttpexporter.Config) - cfg.Endpoint = "http://" + endpoint - // disable queue/retry to validate passing the test data synchronously - cfg.QueueConfig.Enabled = false - cfg.RetryConfig.Enabled = false - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "prometheus", - getConfigFn: func() component.Config { - cfg := expFactories["prometheus"].CreateDefaultConfig().(*prometheusexporter.Config) - cfg.Endpoint = endpoint - return cfg - }, - }, - { - exporter: "prometheusremotewrite", - getConfigFn: func() component.Config { - cfg := expFactories["prometheusremotewrite"].CreateDefaultConfig().(*prometheusremotewriteexporter.Config) - // disable queue/retry to validate passing the test data synchronously - cfg.RemoteWriteQueue.Enabled = false - cfg.BackOffConfig.Enabled = false - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "pulsar", - getConfigFn: func() component.Config { - cfg := expFactories["pulsar"].CreateDefaultConfig().(*pulsarexporter.Config) - cfg.Endpoint = "http://localhost:6650" - return cfg - }, - skipLifecycle: true, - }, - { - exporter: "rabbitmq", - getConfigFn: func() component.Config { - cfg := expFactories["rabbitmq"].CreateDefaultConfig().(*rabbitmqexporter.Config) - cfg.Connection.Endpoint = "amqp://localhost:5672" - cfg.Connection.Auth.Plain.Username = "user" - return cfg - }, - skipLifecycle: true, - }, - { - exporter: "sapm", - getConfigFn: func() component.Config { - cfg := expFactories["sapm"].CreateDefaultConfig().(*sapmexporter.Config) - cfg.Endpoint = "http://" + endpoint - // disable queue/retry to validate passing the test data synchronously - cfg.QueueSettings.Enabled = false - cfg.BackOffConfig.Enabled = false - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "signalfx", - getConfigFn: func() component.Config { - cfg := expFactories["signalfx"].CreateDefaultConfig().(*signalfxexporter.Config) - cfg.AccessToken = "my_fake_token" - cfg.IngestURL = "http://" + endpoint - cfg.APIURL = "http://" + endpoint - // disable queue/retry to validate passing the test data synchronously - cfg.QueueSettings.Enabled = false - cfg.BackOffConfig.Enabled = false - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "splunk_hec", - getConfigFn: func() component.Config { - cfg := expFactories["splunk_hec"].CreateDefaultConfig().(*splunkhecexporter.Config) - cfg.Token = "my_fake_token" - cfg.Endpoint = "http://" + endpoint - // disable queue/retry to validate passing the test data synchronously - cfg.QueueSettings.Enabled = false - cfg.BackOffConfig.Enabled = false - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "zipkin", - getConfigFn: func() component.Config { - cfg := expFactories["zipkin"].CreateDefaultConfig().(*zipkinexporter.Config) - cfg.Endpoint = endpoint - // disable queue/retry to validate passing the test data synchronously - cfg.QueueSettings.Enabled = false - cfg.BackOffConfig.Enabled = false - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "awskinesis", - getConfigFn: func() component.Config { - cfg := expFactories["awskinesis"].CreateDefaultConfig().(*awskinesisexporter.Config) - cfg.AWS.KinesisEndpoint = "http://" + endpoint - return cfg - }, - skipLifecycle: true, - }, - { - exporter: "alertmanager", - getConfigFn: func() component.Config { - cfg := expFactories["alertmanager"].CreateDefaultConfig().(*alertmanagerexporter.Config) - cfg.ClientConfig = confighttp.ClientConfig{ - Endpoint: "http://" + endpoint, - } - cfg.GeneratorURL = "opentelemetry-collector" - cfg.DefaultSeverity = "info" - // disable queue/retry to validate passing the test data synchronously - cfg.QueueSettings.Enabled = false - cfg.BackoffConfig.Enabled = false - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "alibabacloud_logservice", - getConfigFn: func() component.Config { - cfg := expFactories["alibabacloud_logservice"].CreateDefaultConfig().(*alibabacloudlogserviceexporter.Config) - cfg.Endpoint = "http://" + endpoint - cfg.Project = "otel-testing" - cfg.Logstore = "otel-data" - return cfg - }, - }, - { - exporter: "awsemf", - getConfigFn: func() component.Config { - cfg := expFactories["awsemf"].CreateDefaultConfig().(*awsemfexporter.Config) - cfg.Endpoint = "http://" + endpoint - cfg.Region = "local" - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "awsxray", - getConfigFn: func() component.Config { - cfg := expFactories["awsxray"].CreateDefaultConfig().(*awsxrayexporter.Config) - cfg.Endpoint = "http://" + endpoint - cfg.Region = "local" - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "azuredataexplorer", - getConfigFn: func() component.Config { - cfg := expFactories["azuredataexplorer"].CreateDefaultConfig().(*azuredataexplorerexporter.Config) - cfg.ClusterURI = "https://" + endpoint - cfg.ApplicationID = "otel-app-id" - cfg.ApplicationKey = "otel-app-key" - cfg.TenantID = "otel-tenant-id" - return cfg - }, - skipLifecycle: true, - }, - { - exporter: "azuremonitor", - getConfigFn: func() component.Config { - cfg := expFactories["azuremonitor"].CreateDefaultConfig().(*azuremonitorexporter.Config) - cfg.Endpoint = "http://" + endpoint - cfg.ConnectionString = configopaque.String("InstrumentationKey=00000000-0000-0000-0000-000000000000;IngestionEndpoint=" + cfg.Endpoint) - - return cfg - }, - }, - { - exporter: "carbon", - getConfigFn: func() component.Config { - cfg := expFactories["carbon"].CreateDefaultConfig().(*carbonexporter.Config) - cfg.Endpoint = "http://" + endpoint - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "clickhouse", - getConfigFn: func() component.Config { - cfg := expFactories["clickhouse"].CreateDefaultConfig().(*clickhouseexporter.Config) - cfg.Endpoint = "tcp://" + endpoint - return cfg - }, - skipLifecycle: true, - }, - { - exporter: "cassandra", - getConfigFn: func() component.Config { - cfg := expFactories["cassandra"].CreateDefaultConfig().(*cassandraexporter.Config) - cfg.DSN = endpoint - return cfg - }, - skipLifecycle: true, - }, - { - exporter: "coralogix", - getConfigFn: func() component.Config { - cfg := expFactories["coralogix"].CreateDefaultConfig().(*coralogixexporter.Config) - cfg.Traces.Endpoint = endpoint - cfg.Logs.Endpoint = endpoint - cfg.Metrics.Endpoint = endpoint - // disable queue/retry to validate passing the test data synchronously - cfg.QueueSettings.Enabled = false - cfg.BackOffConfig.Enabled = false - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "datadog", - getConfigFn: func() component.Config { - cfg := expFactories["datadog"].CreateDefaultConfig().(*datadogexporter.Config) - cfg.API.Key = "cutedogsgotoheaven" - // disable queue/retry to validate passing the test data synchronously - cfg.QueueSettings.Enabled = false - cfg.BackOffConfig.Enabled = false - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "dataset", - getConfigFn: func() component.Config { - cfg := expFactories["dataset"].CreateDefaultConfig().(*datasetexporter.Config) - cfg.DatasetURL = "https://" + endpoint - cfg.APIKey = "secret-key" - // disable queue/retry to validate passing the test data synchronously - cfg.QueueSettings.Enabled = false - cfg.BackOffConfig.Enabled = false - return cfg - }, - expectConsumeErr: true, - skipLifecycle: true, // shutdown fails if there is buffered data - }, - { - exporter: "elasticsearch", - getConfigFn: func() component.Config { - cfg := expFactories["elasticsearch"].CreateDefaultConfig().(*elasticsearchexporter.Config) - cfg.Endpoints = []string{"http://" + endpoint} - // disable queue to validate passing the test data synchronously - cfg.QueueSettings.Enabled = false - return cfg - }, - }, - { - exporter: "googlecloud", - skipLifecycle: true, // Requires credentials to be able to successfully load the exporter - }, - { - exporter: "googlemanagedprometheus", - skipLifecycle: true, // Requires credentials to be able to successfully load the exporter - }, - { - exporter: "googlecloudpubsub", - skipLifecycle: true, - }, - { - exporter: "honeycombmarker", - getConfigFn: func() component.Config { - cfg := expFactories["honeycombmarker"].CreateDefaultConfig().(*honeycombmarkerexporter.Config) - cfg.Endpoint = "http://" + endpoint - // disable queue to validate passing the test data synchronously - cfg.QueueSettings.Enabled = false - cfg.BackOffConfig.Enabled = false - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "influxdb", - getConfigFn: func() component.Config { - cfg := expFactories["influxdb"].CreateDefaultConfig().(*influxdbexporter.Config) - cfg.Endpoint = "http://" + endpoint - // disable queue to validate passing the test data synchronously - cfg.QueueSettings.Enabled = false - cfg.BackOffConfig.Enabled = false - return cfg - }, - skipLifecycle: true, - }, - { - exporter: "instana", - getConfigFn: func() component.Config { - cfg := expFactories["instana"].CreateDefaultConfig().(*instanaexporter.Config) - cfg.Endpoint = "http://" + endpoint - cfg.AgentKey = "Key1" - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "loadbalancing", - getConfigFn: func() component.Config { - cfg := expFactories["loadbalancing"].CreateDefaultConfig().(*loadbalancingexporter.Config) - cfg.Resolver = loadbalancingexporter.ResolverSettings{Static: &loadbalancingexporter.StaticResolver{Hostnames: []string{"127.0.0.1"}}} - return cfg - }, - expectConsumeErr: true, // the exporter requires traces with service.name resource attribute - }, - { - exporter: "logicmonitor", - getConfigFn: func() component.Config { - cfg := expFactories["logicmonitor"].CreateDefaultConfig().(*logicmonitorexporter.Config) - // disable queue to validate passing the test data synchronously - cfg.QueueSettings.Enabled = false - cfg.BackOffConfig.Enabled = false - return cfg - }, - skipLifecycle: true, - }, - { - exporter: "logzio", - getConfigFn: func() component.Config { - cfg := expFactories["logzio"].CreateDefaultConfig().(*logzioexporter.Config) - cfg.Endpoint = "http://" + endpoint - // disable queue to validate passing the test data synchronously - cfg.QueueSettings.Enabled = false - cfg.BackOffConfig.Enabled = false - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "loki", - getConfigFn: func() component.Config { - cfg := expFactories["loki"].CreateDefaultConfig().(*lokiexporter.Config) - cfg.Endpoint = "http://" + endpoint - // disable queue to validate passing the test data synchronously - cfg.QueueSettings.Enabled = false - cfg.BackOffConfig.Enabled = false - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "mezmo", - getConfigFn: func() component.Config { - cfg := expFactories["mezmo"].CreateDefaultConfig().(*mezmoexporter.Config) - cfg.Endpoint = "http://" + endpoint - // disable queue to validate passing the test data synchronously - cfg.QueueSettings.Enabled = false - cfg.BackOffConfig.Enabled = false - return cfg - }, - }, - { - exporter: "sentry", - getConfigFn: func() component.Config { - cfg := expFactories["sentry"].CreateDefaultConfig().(*sentryexporter.Config) - return cfg - }, - skipLifecycle: true, // causes race detector to fail - }, - { - exporter: "sumologic", - getConfigFn: func() component.Config { - cfg := expFactories["sumologic"].CreateDefaultConfig().(*sumologicexporter.Config) - cfg.Endpoint = "http://" + endpoint - // disable queue to validate passing the test data synchronously - cfg.QueueSettings.Enabled = false - cfg.BackOffConfig.Enabled = false - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "syslog", - getConfigFn: func() component.Config { - cfg := expFactories["syslog"].CreateDefaultConfig().(*syslogexporter.Config) - cfg.Endpoint = "http://" + endpoint - // disable queue to validate passing the test data synchronously - cfg.QueueSettings.Enabled = false - cfg.BackOffConfig.Enabled = false - return cfg - }, - expectConsumeErr: true, - }, - { - exporter: "tencentcloud_logservice", - getConfigFn: func() component.Config { - cfg := expFactories["tencentcloud_logservice"].CreateDefaultConfig().(*tencentcloudlogserviceexporter.Config) - return cfg - }, - expectConsumeErr: true, - }, - } - - assert.Equal(t, len(expFactories), len(tests), "All user configurable components must be added to the lifecycle test") - for _, tt := range tests { - t.Run(string(tt.exporter), func(t *testing.T) { - factory := expFactories[tt.exporter] - assert.Equal(t, tt.exporter, factory.Type()) - t.Run("shutdown", func(t *testing.T) { - verifyExporterShutdown(t, factory, tt.getConfigFn) - }) - t.Run("lifecycle", func(t *testing.T) { - if tt.skipLifecycle { - t.SkipNow() - } - verifyExporterLifecycle(t, factory, tt.getConfigFn, tt.expectConsumeErr) - }) - }) - } -} - -// GetExporterConfigFn is used customize the configuration passed to the verification. -// This is used to change ports or provide values required but not provided by the -// default configuration. -type getExporterConfigFn func() component.Config - -// verifyExporterLifecycle is used to test if an exporter type can handle the typical -// lifecycle of a component. The getConfigFn parameter only need to be specified if -// the test can't be done with the default configuration for the component. -func verifyExporterLifecycle(t *testing.T, factory exporter.Factory, getConfigFn getExporterConfigFn, expectErr bool) { - ctx := context.Background() - host := newAssertNoErrorHost(t) - expCreateSettings := exportertest.NewNopSettings() - - cfg := factory.CreateDefaultConfig() - if getConfigFn != nil { - cfg = getConfigFn() - } - - createFns := []createExporterFn{ - wrapCreateLogsExp(factory), - wrapCreateTracesExp(factory), - wrapCreateMetricsExp(factory), - } - - for i := 0; i < 2; i++ { - var exps []component.Component - for _, createFn := range createFns { - exp, err := createFn(ctx, expCreateSettings, cfg) - if errors.Is(err, pipeline.ErrSignalNotSupported) { - continue - } - require.NoError(t, err) - require.NoError(t, exp.Start(ctx, host)) - exps = append(exps, exp) - } - for _, exp := range exps { - var err error - assert.NotPanics(t, func() { - switch e := exp.(type) { - case exporter.Logs: - logs := generateTestLogs() - if !e.Capabilities().MutatesData { - logs.MarkReadOnly() - } - err = e.ConsumeLogs(ctx, logs) - case exporter.Metrics: - metrics := generateTestMetrics() - if !e.Capabilities().MutatesData { - metrics.MarkReadOnly() - } - err = e.ConsumeMetrics(ctx, metrics) - case exporter.Traces: - traces := generateTestTraces() - if !e.Capabilities().MutatesData { - traces.MarkReadOnly() - } - err = e.ConsumeTraces(ctx, traces) - } - }) - if !expectErr { - assert.NoError(t, err) - } - assert.NoError(t, exp.Shutdown(ctx)) - } - } -} - -func generateTestLogs() plog.Logs { - logs := plog.NewLogs() - rl := logs.ResourceLogs().AppendEmpty() - rl.Resource().Attributes().PutStr("resource", "R1") - l := rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() - l.Body().SetStr("test log message") - l.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) - return logs -} - -func generateTestMetrics() pmetric.Metrics { - metrics := pmetric.NewMetrics() - rm := metrics.ResourceMetrics().AppendEmpty() - rm.Resource().Attributes().PutStr("resource", "R1") - m := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() - m.SetName("test_metric") - dp := m.Gauge().DataPoints().AppendEmpty() - dp.Attributes().PutStr("test_attr", "value_1") - dp.SetIntValue(123) - dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) - return metrics -} - -func generateTestTraces() ptrace.Traces { - traces := ptrace.NewTraces() - rs := traces.ResourceSpans().AppendEmpty() - rs.Resource().Attributes().PutStr("resource", "R1") - span := rs.ScopeSpans().AppendEmpty().Spans().AppendEmpty() - span.Attributes().PutStr("test_attr", "value_1") - span.SetName("test_span") - span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(-1 * time.Second))) - span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now())) - return traces -} - -// verifyExporterShutdown is used to test if an exporter type can be shutdown without being started first. -func verifyExporterShutdown(tb testing.TB, factory exporter.Factory, getConfigFn getExporterConfigFn) { - ctx := context.Background() - expCreateSettings := exportertest.NewNopSettings() - - if getConfigFn == nil { - getConfigFn = factory.CreateDefaultConfig - } - - createFns := []createExporterFn{ - wrapCreateLogsExp(factory), - wrapCreateTracesExp(factory), - wrapCreateMetricsExp(factory), - } - - for _, createFn := range createFns { - r, err := createFn(ctx, expCreateSettings, getConfigFn()) - if errors.Is(err, pipeline.ErrSignalNotSupported) { - continue - } - if r == nil { - continue - } - assert.NotPanics(tb, func() { - assert.NoError(tb, r.Shutdown(ctx)) - }) - } -} - -type createExporterFn func( - ctx context.Context, - set exporter.Settings, - cfg component.Config, -) (component.Component, error) - -func wrapCreateLogsExp(factory exporter.Factory) createExporterFn { - return func(ctx context.Context, set exporter.Settings, cfg component.Config) (component.Component, error) { - return factory.CreateLogsExporter(ctx, set, cfg) - } -} - -func wrapCreateTracesExp(factory exporter.Factory) createExporterFn { - return func(ctx context.Context, set exporter.Settings, cfg component.Config) (component.Component, error) { - return factory.CreateTracesExporter(ctx, set, cfg) - } -} - -func wrapCreateMetricsExp(factory exporter.Factory) createExporterFn { - return func(ctx context.Context, set exporter.Settings, cfg component.Config) (component.Component, error) { - return factory.CreateMetricsExporter(ctx, set, cfg) - } -} diff --git a/cmd/otelcontribcol/extensions_test.go b/cmd/otelcontribcol/extensions_test.go deleted file mode 100644 index 2bf065edaf5c..000000000000 --- a/cmd/otelcontribcol/extensions_test.go +++ /dev/null @@ -1,353 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package main - -import ( - "context" - "errors" - "os" - "path/filepath" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/extension" - "go.opentelemetry.io/collector/extension/extensiontest" - "go.opentelemetry.io/collector/extension/zpagesextension" - "go.opentelemetry.io/collector/pipeline" - - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/asapauthextension" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/basicauthextension" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/bearertokenauthextension" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/googleclientauthextension" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/headerssetterextension" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/httpforwarderextension" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecstaskobserver" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/hostobserver" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampextension" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/pprofextension" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/remotetapextension" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/sigv4authextension" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/dbstorage" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/filestorage" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" -) - -func TestDefaultExtensions(t *testing.T) { - allFactories, err := components() - require.NoError(t, err) - - extFactories := allFactories.Extensions - endpoint := testutil.GetAvailableLocalAddress(t) - - tests := []struct { - getConfigFn getExtensionConfigFn - extension component.Type - skipLifecycle bool - }{ - { - extension: "health_check", - getConfigFn: func() component.Config { - cfg := extFactories["health_check"].CreateDefaultConfig().(*healthcheckextension.Config) - cfg.Endpoint = endpoint - return cfg - }, - }, - { - extension: "healthcheckv2", - getConfigFn: func() component.Config { - cfg := extFactories["healthcheckv2"].CreateDefaultConfig().(*healthcheckv2extension.Config) - cfg.Endpoint = endpoint - return cfg - }, - }, - { - extension: "pprof", - getConfigFn: func() component.Config { - cfg := extFactories["pprof"].CreateDefaultConfig().(*pprofextension.Config) - cfg.TCPAddr.Endpoint = endpoint - return cfg - }, - }, - { - extension: "sigv4auth", - getConfigFn: func() component.Config { - cfg := extFactories["sigv4auth"].CreateDefaultConfig().(*sigv4authextension.Config) - return cfg - }, - }, - { - extension: "zpages", - getConfigFn: func() component.Config { - cfg := extFactories["zpages"].CreateDefaultConfig().(*zpagesextension.Config) - cfg.TCPAddr.Endpoint = endpoint - return cfg - }, - }, - { - extension: "basicauth", - getConfigFn: func() component.Config { - cfg := extFactories["basicauth"].CreateDefaultConfig().(*basicauthextension.Config) - // No need to clean up, t.TempDir will be deleted entirely. - fileName := filepath.Join(t.TempDir(), "random.file") - require.NoError(t, os.WriteFile(fileName, []byte("username:password"), 0600)) - - cfg.Htpasswd = &basicauthextension.HtpasswdSettings{ - File: fileName, - Inline: "username:password", - } - return cfg - }, - }, - { - extension: "bearertokenauth", - getConfigFn: func() component.Config { - cfg := extFactories["bearertokenauth"].CreateDefaultConfig().(*bearertokenauthextension.Config) - cfg.BearerToken = "sometoken" - return cfg - }, - }, - { - extension: "asapclient", - getConfigFn: func() component.Config { - cfg := extFactories["asapclient"].CreateDefaultConfig().(*asapauthextension.Config) - cfg.KeyID = "test_issuer/test_kid" - cfg.Issuer = "test_issuer" - cfg.Audience = []string{"some_service"} - cfg.TTL = 10 * time.Second - // Valid PEM data required for successful initialisation. Key not actually used anywhere. - cfg.PrivateKey = "data:application/pkcs8;kid=test;base64,MIIBUwIBADANBgkqhkiG9w0BAQEFAASCAT0wggE5AgE" + - "AAkEA0ZPr5JeyVDoB8RyZqQsx6qUD+9gMFg1/0hgdAvmytWBMXQJYdwkK2dFJwwZcWJVhJGcOJBDfB/8tcbdJd34KZQIDAQ" + - "ABAkBZD20tJTHJDSWKGsdJyNIbjqhUu4jXTkFFPK4Hd6jz3gV3fFvGnaolsD5Bt50dTXAiSCpFNSb9M9GY6XUAAdlBAiEA6" + - "MccfdZRfVapxKtAZbjXuAgMvnPtTvkVmwvhWLT5Wy0CIQDmfE8Et/pou0Jl6eM0eniT8/8oRzBWgy9ejDGfj86PGQIgWePq" + - "IL4OofRBgu0O5TlINI0HPtTNo12U9lbUIslgMdECICXT2RQpLcvqj+cyD7wZLZj6vrHZnTFVrnyR/cL2UyxhAiBswe/MCcD" + - "7T7J4QkNrCG+ceQGypc7LsxlIxQuKh5GWYA==" - return cfg - }, - }, - { - extension: "ecs_observer", - skipLifecycle: true, - }, - { - extension: "ecs_task_observer", - getConfigFn: func() component.Config { - cfg := extFactories["ecs_task_observer"].CreateDefaultConfig().(*ecstaskobserver.Config) - cfg.Endpoint = "http://localhost" - return cfg - }, - }, - { - extension: "awsproxy", - skipLifecycle: true, // Requires EC2 metadata service to be running - }, - { - extension: "http_forwarder", - getConfigFn: func() component.Config { - cfg := extFactories["http_forwarder"].CreateDefaultConfig().(*httpforwarderextension.Config) - cfg.Egress.Endpoint = "http://" + endpoint - cfg.Ingress.Endpoint = testutil.GetAvailableLocalAddress(t) - return cfg - }, - }, - { - extension: "oauth2client", - getConfigFn: func() component.Config { - cfg := extFactories["oauth2client"].CreateDefaultConfig().(*oauth2clientauthextension.Config) - cfg.ClientID = "otel-extension" - cfg.ClientSecret = "testsarehard" - cfg.TokenURL = "http://" + endpoint - return cfg - }, - }, - { - extension: "oidc", - skipLifecycle: true, // Requires a running OIDC server in order to complete life cycle testing - }, - { - extension: "db_storage", - getConfigFn: func() component.Config { - cfg := extFactories["db_storage"].CreateDefaultConfig().(*dbstorage.Config) - cfg.DriverName = "sqlite3" - cfg.DataSource = filepath.Join(t.TempDir(), "foo.db") - return cfg - }, - }, - { - extension: "file_storage", - getConfigFn: func() component.Config { - cfg := extFactories["file_storage"].CreateDefaultConfig().(*filestorage.Config) - cfg.Directory = t.TempDir() - return cfg - }, - }, - { - extension: "host_observer", - getConfigFn: func() component.Config { - cfg := extFactories["host_observer"].CreateDefaultConfig().(*hostobserver.Config) - return cfg - }, - }, - { - extension: "k8s_observer", - skipLifecycle: true, // Requires a K8s api to interfact with and validate - }, - { - extension: "docker_observer", - skipLifecycle: true, // Requires a docker api to interface and validate. - }, - { - extension: "headers_setter", - getConfigFn: func() component.Config { - cfg := extFactories["headers_setter"].CreateDefaultConfig().(*headerssetterextension.Config) - return cfg - }, - }, - { - extension: "jaegerremotesampling", - skipLifecycle: true, - getConfigFn: func() component.Config { - return extFactories["jaegerremotesampling"].CreateDefaultConfig().(*jaegerremotesampling.Config) - }, - }, - { - extension: "otlp_encoding", - }, - { - extension: "text_encoding", - }, - { - extension: "jaeger_encoding", - }, - { - extension: "json_log_encoding", - }, - { - extension: "zipkin_encoding", - }, - { - extension: "remotetap", - getConfigFn: func() component.Config { - return extFactories["remotetap"].CreateDefaultConfig().(*remotetapextension.Config) - }, - }, - { - extension: "opamp", - getConfigFn: func() component.Config { - cfg := extFactories["opamp"].CreateDefaultConfig().(*opampextension.Config) - cfg.Server.WS.Endpoint = "wss://" + endpoint - return cfg - }, - }, - { - extension: "solarwindsapmsettings", - skipLifecycle: true, // Requires Solarwinds APM endpoint and token - }, - { - extension: "ackextension", - getConfigFn: func() component.Config { - return extFactories["ackextension"].CreateDefaultConfig().(*ackextension.Config) - }, - }, - { - extension: "googleclientauthextension", - getConfigFn: func() component.Config { - return extFactories["googleclientauthextension"].CreateDefaultConfig().(*googleclientauthextension.Config) - }, - skipLifecycle: true, - }, - } - - extensionCount := 0 - expectedExtensions := map[component.Type]struct{}{} - for k := range extFactories { - expectedExtensions[k] = struct{}{} - } - for _, tt := range tests { - _, ok := extFactories[tt.extension] - if !ok { - // not part of the distro, skipping. - continue - } - delete(expectedExtensions, tt.extension) - extensionCount++ - t.Run(string(tt.extension), func(t *testing.T) { - factory := extFactories[tt.extension] - assert.Equal(t, tt.extension, factory.Type()) - - t.Run("shutdown", func(t *testing.T) { - verifyExtensionShutdown(t, factory, tt.getConfigFn) - }) - t.Run("lifecycle", func(t *testing.T) { - if tt.skipLifecycle { - t.SkipNow() - } - verifyExtensionLifecycle(t, factory, tt.getConfigFn) - }) - - }) - } - assert.Len(t, extFactories, extensionCount, "All extensions must be added to the lifecycle tests", expectedExtensions) -} - -// getExtensionConfigFn is used customize the configuration passed to the verification. -// This is used to change ports or provide values required but not provided by the -// default configuration. -type getExtensionConfigFn func() component.Config - -// verifyExtensionLifecycle is used to test if an extension type can handle the typical -// lifecycle of a component. The getConfigFn parameter only need to be specified if -// the test can't be done with the default configuration for the component. -func verifyExtensionLifecycle(t *testing.T, factory extension.Factory, getConfigFn getExtensionConfigFn) { - ctx := context.Background() - host := componenttest.NewNopHost() - extCreateSet := extensiontest.NewNopSettings() - extCreateSet.ReportStatus = func(event *component.StatusEvent) { - require.NoError(t, event.Err()) - } - - if getConfigFn == nil { - getConfigFn = factory.CreateDefaultConfig - } - - firstExt, err := factory.CreateExtension(ctx, extCreateSet, getConfigFn()) - require.NoError(t, err) - require.NoError(t, firstExt.Start(ctx, host)) - require.NoError(t, firstExt.Shutdown(ctx)) - - secondExt, err := factory.CreateExtension(ctx, extCreateSet, getConfigFn()) - require.NoError(t, err) - require.NoError(t, secondExt.Start(ctx, host)) - require.NoError(t, secondExt.Shutdown(ctx)) -} - -// verifyExtensionShutdown is used to test if an extension type can be shutdown without being started first. -func verifyExtensionShutdown(tb testing.TB, factory extension.Factory, getConfigFn getExtensionConfigFn) { - ctx := context.Background() - extCreateSet := extensiontest.NewNopSettings() - - if getConfigFn == nil { - getConfigFn = factory.CreateDefaultConfig - } - - e, err := factory.CreateExtension(ctx, extCreateSet, getConfigFn()) - if errors.Is(err, pipeline.ErrSignalNotSupported) { - return - } - if e == nil { - return - } - - assert.NotPanics(tb, func() { - assert.NoError(tb, e.Shutdown(ctx)) - }) -} diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index 7a08972ab3de..b2f9f4be4662 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -96,8 +96,6 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/dbstorage v0.110.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/filestorage v0.110.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/sumologicextension v0.110.0 - github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.110.0 - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.110.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.110.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor v0.110.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.110.0 @@ -210,12 +208,8 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/receiver/windowsperfcountersreceiver v0.110.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.110.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zookeeperreceiver v0.110.0 - github.com/prometheus/prometheus v0.54.1 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.110.0 - go.opentelemetry.io/collector/config/configgrpc v0.110.0 - go.opentelemetry.io/collector/config/confighttp v0.110.0 - go.opentelemetry.io/collector/config/configopaque v1.16.0 go.opentelemetry.io/collector/confmap v1.16.0 go.opentelemetry.io/collector/confmap/provider/envprovider v1.16.0 go.opentelemetry.io/collector/confmap/provider/fileprovider v1.16.0 @@ -224,7 +218,6 @@ require ( go.opentelemetry.io/collector/confmap/provider/yamlprovider v0.110.0 go.opentelemetry.io/collector/connector v0.110.0 go.opentelemetry.io/collector/connector/forwardconnector v0.110.0 - go.opentelemetry.io/collector/consumer/consumertest v0.110.0 go.opentelemetry.io/collector/exporter v0.110.0 go.opentelemetry.io/collector/exporter/debugexporter v0.110.0 go.opentelemetry.io/collector/exporter/nopexporter v0.110.0 @@ -233,8 +226,6 @@ require ( go.opentelemetry.io/collector/extension v0.110.0 go.opentelemetry.io/collector/extension/zpagesextension v0.110.0 go.opentelemetry.io/collector/otelcol v0.110.0 - go.opentelemetry.io/collector/pdata v1.16.0 - go.opentelemetry.io/collector/pipeline v0.110.0 go.opentelemetry.io/collector/processor v0.110.0 go.opentelemetry.io/collector/processor/batchprocessor v0.110.0 go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.110.0 @@ -642,6 +633,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/proxy v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/xray v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/collectd v0.110.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.110.0 // indirect @@ -666,6 +658,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.110.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki v0.110.0 // indirect @@ -710,6 +703,7 @@ require ( github.com/prometheus/common v0.59.1 // indirect github.com/prometheus/common/sigv4 v0.1.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect + github.com/prometheus/prometheus v0.54.1 // indirect github.com/rabbitmq/amqp091-go v1.10.0 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/redis/go-redis/v9 v9.6.1 // indirect @@ -787,7 +781,10 @@ require ( go.opentelemetry.io/collector/component/componentstatus v0.110.0 // indirect go.opentelemetry.io/collector/config/configauth v0.110.0 // indirect go.opentelemetry.io/collector/config/configcompression v1.16.0 // indirect + go.opentelemetry.io/collector/config/configgrpc v0.110.0 // indirect + go.opentelemetry.io/collector/config/confighttp v0.110.0 // indirect go.opentelemetry.io/collector/config/confignet v1.16.0 // indirect + go.opentelemetry.io/collector/config/configopaque v1.16.0 // indirect go.opentelemetry.io/collector/config/configretry v1.16.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.110.0 // indirect go.opentelemetry.io/collector/config/configtls v1.16.0 // indirect @@ -795,6 +792,7 @@ require ( go.opentelemetry.io/collector/connector/connectorprofiles v0.110.0 // indirect go.opentelemetry.io/collector/consumer v0.110.0 // indirect go.opentelemetry.io/collector/consumer/consumerprofiles v0.110.0 // indirect + go.opentelemetry.io/collector/consumer/consumertest v0.110.0 // indirect go.opentelemetry.io/collector/exporter/exporterprofiles v0.110.0 // indirect go.opentelemetry.io/collector/extension/auth v0.110.0 // indirect go.opentelemetry.io/collector/extension/experimental/storage v0.110.0 // indirect @@ -803,8 +801,10 @@ require ( go.opentelemetry.io/collector/filter v0.110.0 // indirect go.opentelemetry.io/collector/internal/globalgates v0.110.0 // indirect go.opentelemetry.io/collector/internal/globalsignal v0.110.0 // indirect + go.opentelemetry.io/collector/pdata v1.16.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.110.0 // indirect go.opentelemetry.io/collector/pdata/testdata v0.110.0 // indirect + go.opentelemetry.io/collector/pipeline v0.110.0 // indirect go.opentelemetry.io/collector/processor/processorprofiles v0.110.0 // indirect go.opentelemetry.io/collector/receiver/receiverprofiles v0.110.0 // indirect go.opentelemetry.io/collector/semconv v0.110.0 // indirect diff --git a/cmd/otelcontribcol/receivers_test.go b/cmd/otelcontribcol/receivers_test.go deleted file mode 100644 index b1afd74c2e6a..000000000000 --- a/cmd/otelcontribcol/receivers_test.go +++ /dev/null @@ -1,587 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package main - -import ( - "context" - "errors" - "path/filepath" - "runtime" - "testing" - - promconfig "github.com/prometheus/prometheus/config" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/receiver" - "go.opentelemetry.io/collector/receiver/receivertest" - - tcpop "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscloudwatchreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azuremonitorreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/chronyreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filelogreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jmxreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/namedpipereceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otlpjsonfilereceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/redisreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/snmpreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcplogreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/udplogreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/webhookeventreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/windowseventlogreceiver" -) - -func TestDefaultReceivers(t *testing.T) { - allFactories, err := components() - assert.NoError(t, err) - - rcvrFactories := allFactories.Receivers - - tests := []struct { - getConfigFn getReceiverConfigFn - receiver component.Type - skipLifecycle bool - }{ - { - receiver: "active_directory_ds", - skipLifecycle: true, // Requires a running windows service - }, - { - receiver: "aerospike", - }, - { - receiver: "apache", - }, - { - receiver: "apachespark", - }, - { - receiver: "awscloudwatch", - getConfigFn: func() component.Config { - cfg := rcvrFactories["awscloudwatch"].CreateDefaultConfig().(*awscloudwatchreceiver.Config) - cfg.Region = "us-west-2" - cfg.Logs.Groups = awscloudwatchreceiver.GroupConfig{AutodiscoverConfig: nil} - return cfg - }, - }, - { - receiver: "awscontainerinsightreceiver", - // TODO: skipped since it will only function in a container environment with procfs in expected location. - skipLifecycle: true, - }, - { - receiver: "awsecscontainermetrics", - skipLifecycle: true, // Requires container metaendpoint to be running - }, - { - receiver: "awsfirehose", - }, - { - receiver: "awsxray", - skipLifecycle: true, // Requires AWS endpoint to check identity to run - }, - { - receiver: "azureblob", - getConfigFn: func() component.Config { - cfg := rcvrFactories["azureblob"].CreateDefaultConfig().(*azureblobreceiver.Config) - cfg.ConnectionString = "DefaultEndpointsProtocol=http;AccountName=accountName;AccountKey=accountKey==;BlobEndpoint=test" - cfg.EventHub.EndPoint = "DefaultEndpointsProtocol=http;SharedAccessKeyName=secret;SharedAccessKey=secret;Endpoint=test.test" - return cfg - }, - skipLifecycle: true, // Requires Azure event hub to run - }, - { - receiver: "azureeventhub", - getConfigFn: func() component.Config { - cfg := rcvrFactories["azureeventhub"].CreateDefaultConfig().(*azureeventhubreceiver.Config) - cfg.Connection = "Endpoint=sb://example.com/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName" - return cfg - }, - skipLifecycle: true, // Requires Azure event hub to run - }, - { - receiver: "azuremonitor", - getConfigFn: func() component.Config { - cfg := rcvrFactories["azuremonitor"].CreateDefaultConfig().(*azuremonitorreceiver.Config) - cfg.TenantID = "tenant_id" - cfg.SubscriptionID = "subscription_id" - cfg.ClientID = "client_id" - cfg.ClientSecret = "client_secret" - return cfg - }, - skipLifecycle: true, // Requires Azure event hub to run - }, - { - receiver: "bigip", - }, - { - receiver: "carbon", - getConfigFn: func() component.Config { - cfg := rcvrFactories["carbon"].CreateDefaultConfig().(*carbonreceiver.Config) - cfg.Endpoint = "0.0.0.0:0" - return cfg - }, - skipLifecycle: true, // Panics after test have completed, requires a wait group - }, - { - receiver: "cloudflare", - skipLifecycle: true, - }, - { - receiver: "cloudfoundry", - skipLifecycle: true, // Requires UAA (auth) endpoint to run - }, - { - receiver: "chrony", - getConfigFn: func() component.Config { - cfg := rcvrFactories["chrony"].CreateDefaultConfig().(*chronyreceiver.Config) - cfg.Endpoint = "udp://localhost:323" - return cfg - }, - }, - { - receiver: "collectd", - }, - { - receiver: "couchdb", - }, - { - receiver: "datadog", - getConfigFn: func() component.Config { - cfg := rcvrFactories["datadog"].CreateDefaultConfig().(*datadogreceiver.Config) - cfg.Endpoint = "localhost:0" // Using a randomly assigned address - return cfg - }, - }, - { - receiver: "docker_stats", - skipLifecycle: true, - }, - { - receiver: "elasticsearch", - }, - { - receiver: "expvar", - }, - { - receiver: "filelog", - getConfigFn: func() component.Config { - cfg := rcvrFactories["filelog"].CreateDefaultConfig().(*filelogreceiver.FileLogConfig) - cfg.InputConfig.Include = []string{filepath.Join(t.TempDir(), "*")} - return cfg - }, - }, - { - receiver: "file", - skipLifecycle: true, // Requires an existing JSONL file - }, - { - receiver: "filestats", - }, - { - receiver: "flinkmetrics", - }, - { - receiver: "fluentforward", - }, - { - receiver: "googlecloudspanner", - }, - { - receiver: "googlecloudpubsub", - skipLifecycle: true, // Requires a pubsub subscription - }, - { - receiver: "haproxy", - }, - { - receiver: "hostmetrics", - }, - { - receiver: "httpcheck", - }, - { - receiver: "influxdb", - }, - { - receiver: "iis", - skipLifecycle: true, // Requires a running windows process - }, - { - receiver: "jaeger", - }, - { - receiver: "jmx", - skipLifecycle: true, // Requires a running instance with JMX - getConfigFn: func() component.Config { - cfg := jmxreceiver.NewFactory().CreateDefaultConfig().(*jmxreceiver.Config) - cfg.Endpoint = "localhost:1234" - cfg.TargetSystem = "jvm" - return cfg - }, - }, - { - receiver: "journald", - skipLifecycle: runtime.GOOS != "linux", - }, - { - receiver: "k8s_events", - skipLifecycle: true, // need a valid Kubernetes host and port - }, - { - receiver: "k8sobjects", - skipLifecycle: true, // need a valid Kubernetes host and port - }, - { - receiver: "kafka", - skipLifecycle: true, // TODO: It needs access to internals to successful start. - }, - { - receiver: "kafkametrics", - }, - { - receiver: "k8s_cluster", - skipLifecycle: true, // Requires access to the k8s host and port in order to run - }, - { - receiver: "kubeletstats", - skipLifecycle: true, // Requires access to certificates to auth against kubelet - }, - { - receiver: "loki", - }, - { - receiver: "memcached", - }, - { - receiver: "mongodb", - skipLifecycle: true, // Causes tests to timeout - }, - { - receiver: "mongodbatlas", - getConfigFn: func() component.Config { - cfg := rcvrFactories["mongodbatlas"].CreateDefaultConfig().(*mongodbatlasreceiver.Config) - cfg.Logs.Enabled = true - return cfg - }, - }, - { - receiver: "mysql", - }, - { - receiver: "nginx", - }, - { - receiver: "nsxt", - }, - { - receiver: "opencensus", - skipLifecycle: true, // TODO: Usage of CMux doesn't allow proper shutdown. - }, - { - receiver: "oracledb", - }, - { - receiver: "otlp", - }, - { - receiver: "otlpjsonfile", - getConfigFn: func() component.Config { - cfg := rcvrFactories["otlpjsonfile"].CreateDefaultConfig().(*otlpjsonfilereceiver.Config) - cfg.Include = []string{"/tmp/*.log"} - return cfg - }, - }, - { - receiver: "podman_stats", - skipLifecycle: true, // Requires a running podman daemon - }, - { - receiver: "postgresql", - }, - { - receiver: "prometheus", - getConfigFn: func() component.Config { - cfg := rcvrFactories["prometheus"].CreateDefaultConfig().(*prometheusreceiver.Config) - cfg.PrometheusConfig.ScrapeConfigs = []*promconfig.ScrapeConfig{ - {JobName: "test"}, - } - return cfg - }, - }, - { - receiver: "pulsar", - skipLifecycle: true, // TODO It requires a running pulsar instance to start successfully. - }, - { - receiver: "rabbitmq", - }, - { - receiver: "purefa", - }, - { - receiver: "purefb", - }, - { - receiver: "receiver_creator", - }, - { - receiver: "redis", - getConfigFn: func() component.Config { - cfg := rcvrFactories["redis"].CreateDefaultConfig().(*redisreceiver.Config) - cfg.Endpoint = "localhost:6379" - return cfg - }, - }, - { - receiver: "riak", - }, - { - receiver: "sapm", - }, - { - receiver: "signalfx", - }, - { - receiver: "prometheus_simple", - }, - { - receiver: "skywalking", - }, - { - receiver: "snmp", - getConfigFn: func() component.Config { - cfg := rcvrFactories["snmp"].CreateDefaultConfig().(*snmpreceiver.Config) - cfg.Metrics = map[string]*snmpreceiver.MetricConfig{ - "m1": { - Unit: "1", - Gauge: &snmpreceiver.GaugeMetric{ValueType: "int"}, - ScalarOIDs: []snmpreceiver.ScalarOID{{ - OID: ".1", - }}, - }, - } - return cfg - }, - }, - { - receiver: "snowflake", - }, - { - receiver: "splunkenterprise", - }, - { - receiver: "splunk_hec", - }, - { - receiver: "sqlquery", - }, - { - receiver: "sqlserver", - skipLifecycle: true, // Requires a running windows process - }, - { - receiver: "sshcheck", - skipLifecycle: runtime.GOOS == "windows", - }, - - { - receiver: "statsd", - }, - { - receiver: "wavefront", - skipLifecycle: true, // Depends on carbon receiver to be running correctly - }, - { - receiver: "webhookevent", - getConfigFn: func() component.Config { - cfg := rcvrFactories["webhookevent"].CreateDefaultConfig().(*webhookeventreceiver.Config) - cfg.Endpoint = "127.0.0.1:8088" - return cfg - }, - }, - { - receiver: "windowseventlog", - skipLifecycle: runtime.GOOS != "windows", - getConfigFn: func() component.Config { - cfg := rcvrFactories["windowseventlog"].CreateDefaultConfig().(*windowseventlogreceiver.WindowsLogConfig) - cfg.InputConfig.Channel = "Application" - return cfg - }, - }, - { - receiver: "windowsperfcounters", - skipLifecycle: runtime.GOOS != "windows", - }, - { - receiver: "zipkin", - }, - { - receiver: "zookeeper", - }, - { - receiver: "syslog", - getConfigFn: func() component.Config { - cfg := rcvrFactories["syslog"].CreateDefaultConfig().(*syslogreceiver.SysLogConfig) - cfg.InputConfig.TCP = &tcpop.NewConfig().BaseConfig - cfg.InputConfig.TCP.ListenAddress = "0.0.0.0:0" - cfg.InputConfig.Protocol = "rfc5424" - return cfg - }, - }, - { - receiver: "tcplog", - getConfigFn: func() component.Config { - cfg := rcvrFactories["tcplog"].CreateDefaultConfig().(*tcplogreceiver.TCPLogConfig) - cfg.InputConfig.ListenAddress = "0.0.0.0:0" - return cfg - }, - }, - { - receiver: "udplog", - getConfigFn: func() component.Config { - cfg := rcvrFactories["udplog"].CreateDefaultConfig().(*udplogreceiver.UDPLogConfig) - cfg.InputConfig.ListenAddress = "0.0.0.0:0" - return cfg - }, - }, - { - receiver: "vcenter", - }, - { - receiver: "solace", - skipLifecycle: true, // Requires a solace broker to connect to - }, - { - receiver: "namedpipe", - skipLifecycle: runtime.GOOS != "linux", - getConfigFn: func() component.Config { - cfg := rcvrFactories["namedpipe"].CreateDefaultConfig().(*namedpipereceiver.NamedPipeConfig) - cfg.InputConfig.Path = "/tmp/foo" - return cfg - }, - }, - } - - assert.Equal(t, len(rcvrFactories), len(tests), "All receivers must be added to the lifecycle suite") - for _, tt := range tests { - t.Run(string(tt.receiver), func(t *testing.T) { - factory := rcvrFactories[tt.receiver] - assert.Equal(t, tt.receiver, factory.Type()) - - t.Run("shutdown", func(t *testing.T) { - verifyReceiverShutdown(t, factory, tt.getConfigFn) - }) - t.Run("lifecycle", func(t *testing.T) { - if tt.skipLifecycle { - t.SkipNow() - } - verifyReceiverLifecycle(t, factory, tt.getConfigFn) - }) - }) - } -} - -// getReceiverConfigFn is used customize the configuration passed to the verification. -// This is used to change ports or provide values required but not provided by the -// default configuration. -type getReceiverConfigFn func() component.Config - -// verifyReceiverLifecycle is used to test if a receiver type can handle the typical -// lifecycle of a component. The getConfigFn parameter only need to be specified if -// the test can't be done with the default configuration for the component. -func verifyReceiverLifecycle(t *testing.T, factory receiver.Factory, getConfigFn getReceiverConfigFn) { - ctx := context.Background() - host := newAssertNoErrorHost(t) - receiverCreateSet := receivertest.NewNopSettings() - - if getConfigFn == nil { - getConfigFn = factory.CreateDefaultConfig - } - - createFns := []createReceiverFn{ - wrapCreateLogsRcvr(factory), - wrapCreateTracesRcvr(factory), - wrapCreateMetricsRcvr(factory), - } - - for _, createFn := range createFns { - firstRcvr, err := createFn(ctx, receiverCreateSet, getConfigFn()) - if errors.Is(err, pipeline.ErrSignalNotSupported) { - continue - } - require.NoError(t, err) - require.NoError(t, firstRcvr.Start(ctx, host)) - require.NoError(t, firstRcvr.Shutdown(ctx)) - - secondRcvr, err := createFn(ctx, receiverCreateSet, getConfigFn()) - require.NoError(t, err) - require.NoError(t, secondRcvr.Start(ctx, host)) - require.NoError(t, secondRcvr.Shutdown(ctx)) - } -} - -// verifyReceiverShutdown is used to test if a receiver type can be shutdown without being started first. -func verifyReceiverShutdown(tb testing.TB, factory receiver.Factory, getConfigFn getReceiverConfigFn) { - ctx := context.Background() - receiverCreateSet := receivertest.NewNopSettings() - - if getConfigFn == nil { - getConfigFn = factory.CreateDefaultConfig - } - - createFns := []createReceiverFn{ - wrapCreateLogsRcvr(factory), - wrapCreateTracesRcvr(factory), - wrapCreateMetricsRcvr(factory), - } - - for _, createFn := range createFns { - r, err := createFn(ctx, receiverCreateSet, getConfigFn()) - if errors.Is(err, pipeline.ErrSignalNotSupported) { - continue - } - if r == nil { - continue - } - assert.NotPanics(tb, func() { - assert.NoError(tb, r.Shutdown(ctx)) - }) - } -} - -// assertNoErrorHost implements a component.Host that asserts that there were no errors. -type createReceiverFn func( - ctx context.Context, - set receiver.Settings, - cfg component.Config, -) (component.Component, error) - -func wrapCreateLogsRcvr(factory receiver.Factory) createReceiverFn { - return func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) { - return factory.CreateLogsReceiver(ctx, set, cfg, consumertest.NewNop()) - } -} - -func wrapCreateMetricsRcvr(factory receiver.Factory) createReceiverFn { - return func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) { - return factory.CreateMetricsReceiver(ctx, set, cfg, consumertest.NewNop()) - } -} - -func wrapCreateTracesRcvr(factory receiver.Factory) createReceiverFn { - return func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) { - return factory.CreateTracesReceiver(ctx, set, cfg, consumertest.NewNop()) - } -}