Skip to content

Commit

Permalink
[otelcol] Add a custom zapcore.Core for confmap logging (open-telemet…
Browse files Browse the repository at this point in the history
…ry#10056)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

Provides a logger to confmap that buffers logs in memory until the
primary logger can be used. Once the primary logger exists, places that
used the original logger are given the updated Core.

If an error occurs that would shut down the collector before the primary
logger could be created, the logs are written to stdout/err using a
fallback logger.

Alternative to
open-telemetry#10008

I've pushed the testing I did to show how the logger successfully
updates. Before config resolution the debug log in confmap is not
printed, but afterwards it is.

test config:
```yaml
receivers:
  nop:

exporters:
  otlphttp:
    endpoint: http://0.0.0.0:4317
    headers:
      # Not set
      x-test: ${env:TEMP3}
  debug:
    # set to "detailed"
    verbosity: $TEMP

service:
  telemetry:
    logs:
      level: debug
  pipelines:
    traces:
      receivers:
        - nop
      exporters:
        - debug
```


![image](https://github.com/open-telemetry/opentelemetry-collector/assets/12352919/6a17993f-1f97-4c54-9165-5c34dd58d108)

<!-- Issue number if applicable -->
#### Link to tracking issue
Related to
open-telemetry#9162
Related to
open-telemetry#5615

<!--Describe what testing was performed and which tests were added.-->
#### Testing
If we like this approach I'll add tests

<!--Describe the documentation added.-->
#### Documentation

---------

Co-authored-by: Dan Jaglowski <[email protected]>
Co-authored-by: Pablo Baeyens <[email protected]>
  • Loading branch information
3 people authored and steves-canva committed Jun 13, 2024
1 parent aff7525 commit 74c2cf3
Show file tree
Hide file tree
Showing 7 changed files with 415 additions and 6 deletions.
25 changes: 25 additions & 0 deletions .chloggen/confmap-logger-wrapper-idea.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otelcol

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Enable logging during configuration resolution

# One or more tracking issues or pull requests related to the change
issues: [10056]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
81 changes: 81 additions & 0 deletions otelcol/buffered_core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// This logger implements zapcore.Core and is based on zaptest/observer.

package otelcol // import "go.opentelemetry.io/collector/otelcol"

import (
"fmt"
"sync"

"go.uber.org/zap/zapcore"
)

type loggedEntry struct {
zapcore.Entry
Context []zapcore.Field
}

func newBufferedCore(enab zapcore.LevelEnabler) *bufferedCore {
return &bufferedCore{LevelEnabler: enab}
}

var _ zapcore.Core = (*bufferedCore)(nil)

type bufferedCore struct {
zapcore.LevelEnabler
mu sync.RWMutex
logs []loggedEntry
context []zapcore.Field
logsTaken bool
}

func (bc *bufferedCore) Level() zapcore.Level {
return zapcore.LevelOf(bc.LevelEnabler)
}

func (bc *bufferedCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if bc.Enabled(ent.Level) {
return ce.AddCore(ent, bc)
}
return ce
}

func (bc *bufferedCore) With(fields []zapcore.Field) zapcore.Core {
return &bufferedCore{
LevelEnabler: bc.LevelEnabler,
logs: bc.logs,
logsTaken: bc.logsTaken,
context: append(bc.context, fields...),
}
}

func (bc *bufferedCore) Write(ent zapcore.Entry, fields []zapcore.Field) error {
bc.mu.Lock()
defer bc.mu.Unlock()
if bc.logsTaken {
return fmt.Errorf("the buffered logs have already been taken so writing is no longer supported")
}
all := make([]zapcore.Field, 0, len(fields)+len(bc.context))
all = append(all, bc.context...)
all = append(all, fields...)
bc.logs = append(bc.logs, loggedEntry{ent, all})
return nil
}

func (bc *bufferedCore) Sync() error {
return nil
}

func (bc *bufferedCore) TakeLogs() []loggedEntry {
if !bc.logsTaken {
bc.mu.Lock()
defer bc.mu.Unlock()
logs := bc.logs
bc.logs = nil
bc.logsTaken = true
return logs
}
return nil
}
107 changes: 107 additions & 0 deletions otelcol/buffered_core_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otelcol

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
)

func Test_bufferedCore_Level(t *testing.T) {
bc := newBufferedCore(zapcore.InfoLevel)
assert.Equal(t, zapcore.InfoLevel, bc.Level())
}

func Test_bufferedCore_Check(t *testing.T) {
t.Run("check passed", func(t *testing.T) {
bc := newBufferedCore(zapcore.InfoLevel)
e := zapcore.Entry{
Level: zapcore.InfoLevel,
}
expected := &zapcore.CheckedEntry{}
expected = expected.AddCore(e, bc)
ce := bc.Check(e, nil)
assert.Equal(t, expected, ce)
})

t.Run("check did not pass", func(t *testing.T) {
bc := newBufferedCore(zapcore.InfoLevel)
e := zapcore.Entry{
Level: zapcore.DebugLevel,
}
ce := bc.Check(e, nil)
assert.Nil(t, ce)
})
}

func Test_bufferedCore_With(t *testing.T) {
bc := newBufferedCore(zapcore.InfoLevel)
bc.logsTaken = true
bc.context = []zapcore.Field{
{Key: "original", String: "context"},
}
inputs := []zapcore.Field{
{Key: "test", String: "passed"},
}
expected := []zapcore.Field{
{Key: "original", String: "context"},
{Key: "test", String: "passed"},
}
newBC := bc.With(inputs)
assert.Equal(t, expected, newBC.(*bufferedCore).context)
assert.True(t, newBC.(*bufferedCore).logsTaken)
}

func Test_bufferedCore_Write(t *testing.T) {
bc := newBufferedCore(zapcore.InfoLevel)
e := zapcore.Entry{
Level: zapcore.DebugLevel,
Message: "test",
}
fields := []zapcore.Field{
{Key: "field1", String: "value1"},
}
err := bc.Write(e, fields)
require.NoError(t, err)

expected := loggedEntry{
e,
fields,
}
require.Equal(t, 1, len(bc.logs))
require.Equal(t, expected, bc.logs[0])
}

func Test_bufferedCore_Sync(t *testing.T) {
bc := newBufferedCore(zapcore.InfoLevel)
assert.NoError(t, bc.Sync())
}

func Test_bufferedCore_TakeLogs(t *testing.T) {
bc := newBufferedCore(zapcore.InfoLevel)
e := zapcore.Entry{
Level: zapcore.DebugLevel,
Message: "test",
}
fields := []zapcore.Field{
{Key: "field1", String: "value1"},
}
err := bc.Write(e, fields)
require.NoError(t, err)

expected := []loggedEntry{
{
e,
fields,
},
}
assert.Equal(t, expected, bc.TakeLogs())
assert.Nil(t, bc.logs)

assert.Error(t, bc.Write(e, fields))
assert.Nil(t, bc.TakeLogs())
}
59 changes: 54 additions & 5 deletions otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package otelcol // import "go.opentelemetry.io/collector/otelcol"

import (
"context"
"errors"
"fmt"
"os"
"os/signal"
Expand All @@ -15,6 +16,7 @@ import (

"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
Expand Down Expand Up @@ -108,15 +110,20 @@ type Collector struct {
// signalsChannel is used to receive termination signals from the OS.
signalsChannel chan os.Signal
// asyncErrorChannel is used to signal a fatal error from any component.
asyncErrorChannel chan error
asyncErrorChannel chan error
bc *bufferedCore
updateConfigProviderLogger func(core zapcore.Core)
}

// NewCollector creates and returns a new instance of Collector.
func NewCollector(set CollectorSettings) (*Collector, error) {
var err error
configProvider := set.ConfigProvider

set.ConfigProviderSettings.ResolverSettings.ProviderSettings = confmap.ProviderSettings{Logger: zap.NewNop()}
bc := newBufferedCore(zapcore.DebugLevel)
cc := &collectorCore{core: bc}
options := append([]zap.Option{zap.WithCaller(true)}, set.LoggingOptions...)
set.ConfigProviderSettings.ResolverSettings.ProviderSettings = confmap.ProviderSettings{Logger: zap.New(cc, options...)}
set.ConfigProviderSettings.ResolverSettings.ConverterSettings = confmap.ConverterSettings{}

if configProvider == nil {
Expand All @@ -134,9 +141,11 @@ func NewCollector(set CollectorSettings) (*Collector, error) {
shutdownChan: make(chan struct{}),
// Per signal.Notify documentation, a size of the channel equaled with
// the number of signals getting notified on is recommended.
signalsChannel: make(chan os.Signal, 3),
asyncErrorChannel: make(chan error),
configProvider: configProvider,
signalsChannel: make(chan os.Signal, 3),
asyncErrorChannel: make(chan error),
configProvider: configProvider,
bc: bc,
updateConfigProviderLogger: cc.SetCore,
}, nil
}

Expand Down Expand Up @@ -202,6 +211,18 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
if err != nil {
return err
}
if col.updateConfigProviderLogger != nil {
col.updateConfigProviderLogger(col.service.Logger().Core())
}
if col.bc != nil {
x := col.bc.TakeLogs()
for _, log := range x {
ce := col.service.Logger().Core().Check(log.Entry, nil)
if ce != nil {
ce.Write(log.Context...)
}
}
}

if !col.set.SkipSettingGRPCLogger {
grpclog.SetLogger(col.service.Logger(), cfg.Service.Telemetry.Logs.Level)
Expand Down Expand Up @@ -243,12 +264,40 @@ func (col *Collector) DryRun(ctx context.Context) error {
return cfg.Validate()
}

func newFallbackLogger(options []zap.Option) (*zap.Logger, error) {
ec := zap.NewProductionEncoderConfig()
ec.EncodeTime = zapcore.ISO8601TimeEncoder
zapCfg := &zap.Config{
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
Encoding: "console",
EncoderConfig: ec,
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
}
return zapCfg.Build(options...)
}

// Run starts the collector according to the given configuration, and waits for it to complete.
// Consecutive calls to Run are not allowed, Run shouldn't be called once a collector is shut down.
// Sets up the control logic for config reloading and shutdown.
func (col *Collector) Run(ctx context.Context) error {
if err := col.setupConfigurationComponents(ctx); err != nil {
col.setCollectorState(StateClosed)
logger, loggerErr := newFallbackLogger(col.set.LoggingOptions)
if loggerErr != nil {
return errors.Join(err, fmt.Errorf("unable to create fallback logger: %w", loggerErr))
}

if col.bc != nil {
x := col.bc.TakeLogs()
for _, log := range x {
ce := logger.Core().Check(log.Entry, nil)
if ce != nil {
ce.Write(log.Context...)
}
}
}

return err
}

Expand Down
58 changes: 58 additions & 0 deletions otelcol/collector_core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otelcol // import "go.opentelemetry.io/collector/otelcol"

import (
"sync"

"go.uber.org/zap/zapcore"
)

var _ zapcore.Core = (*collectorCore)(nil)

type collectorCore struct {
core zapcore.Core
rw sync.RWMutex
}

func (c *collectorCore) Enabled(l zapcore.Level) bool {
c.rw.RLock()
defer c.rw.RUnlock()
return c.core.Enabled(l)
}

func (c *collectorCore) With(f []zapcore.Field) zapcore.Core {
c.rw.RLock()
defer c.rw.RUnlock()
return &collectorCore{
core: c.core.With(f),
}
}

func (c *collectorCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
c.rw.RLock()
defer c.rw.RUnlock()
if c.core.Enabled(e.Level) {
return ce.AddCore(e, c)
}
return ce
}

func (c *collectorCore) Write(e zapcore.Entry, f []zapcore.Field) error {
c.rw.RLock()
defer c.rw.RUnlock()
return c.core.Write(e, f)
}

func (c *collectorCore) Sync() error {
c.rw.RLock()
defer c.rw.RUnlock()
return c.core.Sync()
}

func (c *collectorCore) SetCore(core zapcore.Core) {
c.rw.Lock()
defer c.rw.Unlock()
c.core = core
}
Loading

0 comments on commit 74c2cf3

Please sign in to comment.