Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Agent to Collector 0.87 #5529

Merged
merged 7 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ Main (unreleased)

- Fix validation issue with ServiceMonitors when scrape timeout is greater than interval. (@captncraig)

- Static mode's spanmetrics processor will now prune histograms when the dimension cache is pruned.
Dimension cache was always pruned but histograms were not being pruned. This caused metric series
created by the spanmetrics processor to grow unbounded. Only static mode has this issue. Flow mode's
`otelcol.connector.spanmetrics` does not have this bug. (@nijave)

### Enhancements

- The `loki.write` WAL now has snappy compression enabled by default. (@thepalbi)
Expand All @@ -75,6 +80,11 @@ Main (unreleased)

- The `loki.source.docker` component now allows connecting to Docker daemons
over HTTP(S) and setting up TLS credentials. (@tpaschalis)

- Upgrade OpenTelemetry Collector packages to version 0.87 (@ptodev):
- `otelcol.receiver.kafka` has a new `header_extraction` block to extract headers from Kafka records.
- `otelcol.receiver.kafka` has a new `version` argument to change the version of
the SASL Protocol for SASL authentication.

v0.37.2 (2023-10-16)
-----------------
Expand Down
4 changes: 4 additions & 0 deletions component/otelcol/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ func (a *Auth) Update(args component.Arguments) error {

TracerProvider: a.opts.Tracer,
MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),

ReportComponentStatus: func(*otelcomponent.StatusEvent) error {
return nil
},
},

BuildInfo: otelcomponent.BuildInfo{
Expand Down
4 changes: 4 additions & 0 deletions component/otelcol/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ func (p *Connector) Update(args component.Arguments) error {

TracerProvider: p.opts.Tracer,
MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),

ReportComponentStatus: func(*otelcomponent.StatusEvent) error {
return nil
},
},

BuildInfo: otelcomponent.BuildInfo{
Expand Down
2 changes: 1 addition & 1 deletion component/otelcol/connector/spanmetrics/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
// The unit is a private type in an internal Otel package,
// so we need to convert it to a map and then back to the internal type.
// ConvertMetricUnit matches the Unit type in this internal package:
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/connector/spanmetricsconnector/internal/metrics/unit.go
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.87.0/connector/spanmetricsconnector/internal/metrics/unit.go
func ConvertMetricUnit(unit string) (map[string]interface{}, error) {
switch unit {
case MetricsUnitMilliseconds:
Expand Down
4 changes: 4 additions & 0 deletions component/otelcol/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ func (e *Exporter) Update(args component.Arguments) error {

TracerProvider: e.opts.Tracer,
MeterProvider: metric.NewMeterProvider(metricOpts...),

ReportComponentStatus: func(*otelcomponent.StatusEvent) error {
return nil
},
},

BuildInfo: otelcomponent.BuildInfo{
Expand Down
18 changes: 18 additions & 0 deletions component/otelcol/exporter/loadbalancing/loadbalancing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package loadbalancing

import (
"fmt"
"time"

"github.com/alecthomas/units"
Expand Down Expand Up @@ -48,6 +49,7 @@ type Arguments struct {
var (
_ exporter.Arguments = Arguments{}
_ river.Defaulter = &Arguments{}
_ river.Validator = &Arguments{}
)

var (
Expand All @@ -72,6 +74,22 @@ func (args *Arguments) SetToDefault() {
*args = DefaultArguments
}

// Validate implements river.Validator.
func (args *Arguments) Validate() error {
//TODO(ptodev): Add support for "resource" and "metric" routing keys later.
// The reason we can't add them yet is that otelcol.exporter.loadbalancing
// is labeled as "beta", but those routing keys are experimental.
// We need a way to label otelcol.exporter.loadbalancing as "beta"
// for logs and traces, but "experimental" for metrics.
switch args.RoutingKey {
case "service", "traceID":
// The routing key is valid.
default:
return fmt.Errorf("invalid routing key %q", args.RoutingKey)
}
return nil
}

// Convert implements exporter.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
return &loadbalancingexporter.Config{
Expand Down
4 changes: 4 additions & 0 deletions component/otelcol/extension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ func (e *Extension) Update(args component.Arguments) error {

TracerProvider: e.opts.Tracer,
MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),

ReportComponentStatus: func(*otelcomponent.StatusEvent) error {
return nil
},
},

BuildInfo: otelcomponent.BuildInfo{
Expand Down
4 changes: 4 additions & 0 deletions component/otelcol/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ func (p *Processor) Update(args component.Arguments) error {

TracerProvider: p.opts.Tracer,
MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),

ReportComponentStatus: func(*otelcomponent.StatusEvent) error {
return nil
},
},

BuildInfo: otelcomponent.BuildInfo{
Expand Down
125 changes: 77 additions & 48 deletions component/otelcol/receiver/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/grafana/agent/component/otelcol/receiver"
otel_service "github.com/grafana/agent/service/otel"
"github.com/grafana/river/rivertypes"
"github.com/mitchellh/mapstructure"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"
otelcomponent "go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -38,10 +39,11 @@ type Arguments struct {
ClientID string `river:"client_id,attr,optional"`
InitialOffset string `river:"initial_offset,attr,optional"`

Authentication AuthenticationArguments `river:"authentication,block,optional"`
Metadata MetadataArguments `river:"metadata,block,optional"`
AutoCommit AutoCommitArguments `river:"autocommit,block,optional"`
MessageMarking MessageMarkingArguments `river:"message_marking,block,optional"`
Authentication AuthenticationArguments `river:"authentication,block,optional"`
Metadata MetadataArguments `river:"metadata,block,optional"`
AutoCommit AutoCommitArguments `river:"autocommit,block,optional"`
MessageMarking MessageMarkingArguments `river:"message_marking,block,optional"`
HeaderExtraction HeaderExtraction `river:"header_extraction,block,optional"`

// DebugMetrics configures component internal metrics. Optional.
DebugMetrics otelcol.DebugMetricsArguments `river:"debug_metrics,block,optional"`
Expand Down Expand Up @@ -79,6 +81,10 @@ var DefaultArguments = Arguments{
AfterExecution: false,
IncludeUnsuccessful: false,
},
HeaderExtraction: HeaderExtraction{
ExtractHeaders: false,
Headers: []string{},
},
}

// SetToDefault implements river.Defaulter.
Expand All @@ -88,20 +94,28 @@ func (args *Arguments) SetToDefault() {

// Convert implements receiver.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
return &kafkareceiver.Config{
Brokers: args.Brokers,
ProtocolVersion: args.ProtocolVersion,
Topic: args.Topic,
Encoding: args.Encoding,
GroupID: args.GroupID,
ClientID: args.ClientID,
InitialOffset: args.InitialOffset,

Authentication: args.Authentication.Convert(),
Metadata: args.Metadata.Convert(),
AutoCommit: args.AutoCommit.Convert(),
MessageMarking: args.MessageMarking.Convert(),
}, nil
input := make(map[string]interface{})
input["auth"] = args.Authentication.Convert()

var result kafkareceiver.Config
err := mapstructure.Decode(input, &result)
if err != nil {
return nil, err
}
ptodev marked this conversation as resolved.
Show resolved Hide resolved

result.Brokers = args.Brokers
result.ProtocolVersion = args.ProtocolVersion
result.Topic = args.Topic
result.Encoding = args.Encoding
result.GroupID = args.GroupID
result.ClientID = args.ClientID
result.InitialOffset = args.InitialOffset
result.Metadata = args.Metadata.Convert()
result.AutoCommit = args.AutoCommit.Convert()
result.MessageMarking = args.MessageMarking.Convert()
result.HeaderExtraction = args.HeaderExtraction.Convert()

return &result, nil
}

// Extensions implements receiver.Arguments.
Expand All @@ -128,26 +142,26 @@ type AuthenticationArguments struct {
}

// Convert converts args into the upstream type.
func (args AuthenticationArguments) Convert() kafkaexporter.Authentication {
var res kafkaexporter.Authentication
func (args AuthenticationArguments) Convert() map[string]interface{} {
auth := make(map[string]interface{})

if args.Plaintext != nil {
conv := args.Plaintext.Convert()
res.PlainText = &conv
auth["plain_text"] = &conv
}
if args.SASL != nil {
conv := args.SASL.Convert()
res.SASL = &conv
auth["sasl"] = &conv
}
if args.TLS != nil {
res.TLS = args.TLS.Convert()
auth["tls"] = args.TLS.Convert()
}
if args.Kerberos != nil {
conv := args.Kerberos.Convert()
res.Kerberos = &conv
auth["kerberos"] = &conv
}

return res
return auth
}

// PlaintextArguments configures plaintext authentication against the Kafka
Expand All @@ -158,10 +172,10 @@ type PlaintextArguments struct {
}

// Convert converts args into the upstream type.
func (args PlaintextArguments) Convert() kafkaexporter.PlainTextConfig {
return kafkaexporter.PlainTextConfig{
Username: args.Username,
Password: string(args.Password),
func (args PlaintextArguments) Convert() map[string]interface{} {
return map[string]interface{}{
"username": args.Username,
"password": string(args.Password),
}
}

Expand All @@ -170,16 +184,18 @@ type SASLArguments struct {
Username string `river:"username,attr"`
Password rivertypes.Secret `river:"password,attr"`
Mechanism string `river:"mechanism,attr"`
Version int `river:"version,attr,optional"`
AWSMSK AWSMSKArguments `river:"aws_msk,block,optional"`
}

// Convert converts args into the upstream type.
func (args SASLArguments) Convert() kafkaexporter.SASLConfig {
return kafkaexporter.SASLConfig{
Username: args.Username,
Password: string(args.Password),
Mechanism: args.Mechanism,
AWSMSK: args.AWSMSK.Convert(),
func (args SASLArguments) Convert() map[string]interface{} {
return map[string]interface{}{
"username": args.Username,
"password": string(args.Password),
"mechanism": args.Mechanism,
"version": args.Version,
"aws_msk": args.AWSMSK.Convert(),
}
}

Expand All @@ -191,10 +207,10 @@ type AWSMSKArguments struct {
}

// Convert converts args into the upstream type.
func (args AWSMSKArguments) Convert() kafkaexporter.AWSMSKConfig {
return kafkaexporter.AWSMSKConfig{
Region: args.Region,
BrokerAddr: args.BrokerAddr,
func (args AWSMSKArguments) Convert() map[string]interface{} {
return map[string]interface{}{
"region": args.Region,
"broker_addr": args.BrokerAddr,
}
}

Expand All @@ -211,15 +227,15 @@ type KerberosArguments struct {
}

// Convert converts args into the upstream type.
func (args KerberosArguments) Convert() kafkaexporter.KerberosConfig {
return kafkaexporter.KerberosConfig{
ServiceName: args.ServiceName,
Realm: args.Realm,
UseKeyTab: args.UseKeyTab,
Username: args.Username,
Password: string(args.Password),
ConfigPath: args.ConfigPath,
KeyTabPath: args.KeyTabPath,
func (args KerberosArguments) Convert() map[string]interface{} {
return map[string]interface{}{
"service_name": args.ServiceName,
"realm": args.Realm,
"use_keytab": args.UseKeyTab,
"username": args.Username,
"password": string(args.Password),
"config_file": args.ConfigPath,
"keytab_file": args.KeyTabPath,
}
}

Expand Down Expand Up @@ -283,6 +299,19 @@ func (args MessageMarkingArguments) Convert() kafkareceiver.MessageMarking {
}
}

type HeaderExtraction struct {
ExtractHeaders bool `river:"extract_headers,attr,optional"`
Headers []string `river:"headers,attr,optional"`
}

// Convert converts HeaderExtraction into the upstream type.
func (h HeaderExtraction) Convert() kafkareceiver.HeaderExtraction {
return kafkareceiver.HeaderExtraction{
ExtractHeaders: h.ExtractHeaders,
Headers: h.Headers,
}
}

// DebugMetricsConfig implements receiver.Arguments.
func (args Arguments) DebugMetricsConfig() otelcol.DebugMetricsArguments {
return args.DebugMetrics
Expand Down
Loading