Skip to content

Commit

Permalink
[internal/rabbitmq] move connection and retry logic into separate pkg (
Browse files Browse the repository at this point in the history
…#34361)

This PR moves the retry logic from amqp publisher to amqp connection.
Connection, client and other utility structures have been moved from
`exporter/rabbitmqexporter/internal/publisher` to `internal/rabbitmq`.

**Link to tracking Issue:** #34242


----

cc @swar8080 @atoulme

---------

Signed-off-by: Benedikt Bongartz <[email protected]>
Co-authored-by: Alex Boten <[email protected]>
  • Loading branch information
frzifus and codeboten authored Aug 14, 2024
1 parent a5096f1 commit 6f2d3b1
Show file tree
Hide file tree
Showing 20 changed files with 524 additions and 242 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ internal/kubelet/ @open-teleme
internal/metadataproviders/ @open-telemetry/collector-contrib-approvers @Aneurysm9 @dashpole
internal/otelarrow/ @open-telemetry/collector-contrib-approvers @jmacd @moh-osman3
internal/pdatautil/ @open-telemetry/collector-contrib-approvers @djaglowski
internal/rabbitmq/ @open-telemetry/collector-contrib-approvers @swar8080 @atoulme
internal/sharedcomponent/ @open-telemetry/collector-contrib-approvers @open-telemetry/collector-approvers
internal/splunk/ @open-telemetry/collector-contrib-approvers @dmitryax
internal/sqlquery/ @open-telemetry/collector-contrib-approvers @crobert-1 @dmitryax
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ body:
- internal/metadataproviders
- internal/otelarrow
- internal/pdatautil
- internal/rabbitmq
- internal/sharedcomponent
- internal/splunk
- internal/sqlquery
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ body:
- internal/metadataproviders
- internal/otelarrow
- internal/pdatautil
- internal/rabbitmq
- internal/sharedcomponent
- internal/splunk
- internal/sqlquery
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ body:
- internal/metadataproviders
- internal/otelarrow
- internal/pdatautil
- internal/rabbitmq
- internal/sharedcomponent
- internal/splunk
- internal/sqlquery
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/unmaintained.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ body:
- internal/metadataproviders
- internal/otelarrow
- internal/pdatautil
- internal/rabbitmq
- internal/sharedcomponent
- internal/splunk
- internal/sqlquery
Expand Down
1 change: 1 addition & 0 deletions cmd/otelcontribcol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ replaces:
- github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig => ../../internal/k8sconfig
- github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest => ../../internal/k8stest
- github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka => ../../internal/kafka
- github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq => ../../internal/rabbitmq
- github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver => ../../receiver/carbonreceiver
- github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter => ../../exporter/splunkhecexporter
- github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter => ../../exporter/prometheusexporter
Expand Down
3 changes: 3 additions & 0 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders v0.107.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow v0.107.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil v0.107.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq v0.107.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.107.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.107.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery v0.107.0 // indirect
Expand Down Expand Up @@ -1073,6 +1074,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8ste

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka => ../../internal/kafka

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq => ../../internal/rabbitmq

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver => ../../receiver/carbonreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter => ../../exporter/splunkhecexporter
Expand Down
3 changes: 2 additions & 1 deletion exporter/rabbitmqexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter/internal/publisher"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq"
)

const (
Expand Down Expand Up @@ -133,7 +134,7 @@ func getRoutingKeyOrDefault(config *Config, fallback string) string {

func newPublisherFactory(set exporter.Settings) publisherFactory {
return func(dialConfig publisher.DialConfig) (publisher.Publisher, error) {
return publisher.NewConnection(set.Logger, publisher.NewAmqpClient(), dialConfig)
return publisher.NewConnection(set.Logger, rabbitmq.NewAmqpClient(set.Logger), dialConfig)
}
}

Expand Down
3 changes: 3 additions & 0 deletions exporter/rabbitmqexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.22.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.107.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq v0.107.0
github.com/rabbitmq/amqp091-go v1.10.0
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.31.0
Expand Down Expand Up @@ -110,6 +111,8 @@ require (

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq => ../../internal/rabbitmq

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest
Expand Down
110 changes: 0 additions & 110 deletions exporter/rabbitmqexporter/internal/publisher/client.go

This file was deleted.

114 changes: 22 additions & 92 deletions exporter/rabbitmqexporter/internal/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,20 @@ package publisher // import "github.com/open-telemetry/opentelemetry-collector-c

import (
"context"
"crypto/tls"
"errors"
"fmt"
"sync"
"time"

amqp "github.com/rabbitmq/amqp091-go"
"go.uber.org/zap"

otelrabbitmq "github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq"
)

type DialConfig struct {
URL string
otelrabbitmq.DialConfig
Durable bool
Vhost string
Auth amqp.Authentication
ConnectionTimeout time.Duration
Heartbeat time.Duration
PublishConfirmationTimeout time.Duration
TLS *tls.Config
ConnectionName string
}

type Message struct {
Expand All @@ -33,20 +27,20 @@ type Message struct {
Body []byte
}

func NewConnection(logger *zap.Logger, client AmqpClient, config DialConfig) (Publisher, error) {
func NewConnection(logger *zap.Logger, client otelrabbitmq.AmqpClient, config DialConfig) (Publisher, error) {
p := publisher{
logger: logger,
client: client,
config: config,
connLock: &sync.Mutex{},
connectionErrors: make(chan *amqp.Error, 1),
logger: logger,
client: client,
config: config,
}

p.connLock.Lock()
defer p.connLock.Unlock()
err := p.connect()
conn, err := p.client.DialConfig(p.config.DialConfig)
if err != nil {
return &p, err
}
p.connection = conn

return &p, err
return &p, nil
}

type Publisher interface {
Expand All @@ -55,16 +49,14 @@ type Publisher interface {
}

type publisher struct {
logger *zap.Logger
client AmqpClient
config DialConfig
connLock *sync.Mutex
connection Connection
connectionErrors chan *amqp.Error
logger *zap.Logger
client otelrabbitmq.AmqpClient
config DialConfig
connection otelrabbitmq.Connection
}

func (p *publisher) Publish(ctx context.Context, message Message) error {
err := p.reconnectIfUnhealthy()
err := p.connection.ReconnectIfUnhealthy()
if err != nil {
return err
}
Expand All @@ -73,7 +65,7 @@ func (p *publisher) Publish(ctx context.Context, message Message) error {
// This could later be optimized to re-use channels which avoids repeated network calls to create and close them.
// Concurrency-control through something like a resource pool would be necessary since aqmp channels are not thread safe.
channel, err := p.connection.Channel()
defer func(channel Channel) {
defer func(channel otelrabbitmq.Channel) {
if channel != nil {
err2 := channel.Close()
if err2 != nil {
Expand Down Expand Up @@ -125,71 +117,9 @@ func (p *publisher) Publish(ctx context.Context, message Message) error {
}
}

func (p *publisher) reconnectIfUnhealthy() error {
p.connLock.Lock()
defer p.connLock.Unlock()

hasConnectionError := false
select {
case err := <-p.connectionErrors:
hasConnectionError = true
p.logger.Info("Received connection error, will retry restoring unhealthy connection", zap.Error(err))
default:
break
}

if hasConnectionError || !p.isConnected() {
if p.isConnected() {
err := p.connection.Close()
if err != nil {
p.logger.Warn("Error closing unhealthy connection", zap.Error(err))
}
}

if err := p.connect(); err != nil {
return errors.Join(errors.New("failed attempt at restoring unhealthy connection"), err)
}
p.logger.Info("Successfully restored unhealthy rabbitmq connection")
}

return nil
}

func (p *publisher) connect() error {
p.logger.Debug("Connecting to rabbitmq")

properties := amqp.Table{}
properties.SetClientConnectionName(p.config.ConnectionName)

connection, err := p.client.DialConfig(p.config.URL, amqp.Config{
SASL: []amqp.Authentication{p.config.Auth},
Vhost: p.config.Vhost,
Heartbeat: p.config.Heartbeat,
Dial: amqp.DefaultDial(p.config.ConnectionTimeout),
Properties: properties,
TLSClientConfig: p.config.TLS,
})
if connection != nil {
p.connection = connection
}
if err != nil {
return err
}

// Goal is to lazily restore the connection so this needs to be buffered to avoid blocking on asynchronous amqp errors.
// Also re-create this channel each time because apparently the amqp library can close it
p.connectionErrors = make(chan *amqp.Error, 1)
p.connection.NotifyClose(p.connectionErrors)
return nil
}

func (p *publisher) Close() error {
if p.isConnected() {
return p.connection.Close()
if p.connection == nil {
return nil
}
return nil
}

func (p *publisher) isConnected() bool {
return p.connection != nil && !p.connection.IsClosed()
return p.connection.Close()
}
Loading

0 comments on commit 6f2d3b1

Please sign in to comment.