Skip to content

Commit

Permalink
DCGM: Enable the receiver to retry dcgm initialization at each collec…
Browse files Browse the repository at this point in the history
…tion time (#178)
  • Loading branch information
LujieDuan authored Aug 14, 2023
1 parent ebf9c13 commit 2c2416e
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 38 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.20
require (
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/collector v0.42.0
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/collector/googlemanagedprometheus v0.42.0
github.com/NVIDIA/go-dcgm v0.0.0-20221107203308-b6ed78cdc8d3
github.com/NVIDIA/go-dcgm v0.0.0-20230811193702-90bac724c747
github.com/NVIDIA/go-nvml v0.11.6-0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter v0.81.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/googlecloudexporter v0.81.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2B
github.com/Mottl/ctimefmt v0.0.0-20190803144728-fd2ac23a585a/go.mod h1:eyj2WSIdoPMPs2eNTLpSmM6Nzqo4V80/d6jHpnJ1SAI=
github.com/NVIDIA/go-dcgm v0.0.0-20221107203308-b6ed78cdc8d3 h1:qk/qQaBY+918SBhcJsqYwZdDaS3MRtUH2SUX53SC7Cs=
github.com/NVIDIA/go-dcgm v0.0.0-20221107203308-b6ed78cdc8d3/go.mod h1:atKWXstYFllLarJucBxnt9ivFIPe26Y6S4JQvzqeSr8=
github.com/NVIDIA/go-dcgm v0.0.0-20230811193702-90bac724c747 h1:inukNBh/RJl4CsxPL3rwSAUwKI90YC3Vf8uaJ0V/J3o=
github.com/NVIDIA/go-dcgm v0.0.0-20230811193702-90bac724c747/go.mod h1:atKWXstYFllLarJucBxnt9ivFIPe26Y6S4JQvzqeSr8=
github.com/NVIDIA/go-nvml v0.11.6-0 h1:tugQzmaX84Y/6+03wZ/MAgcpfSKDkvkAWeuxFNLHmxY=
github.com/NVIDIA/go-nvml v0.11.6-0/go.mod h1:hy7HYeQy335x6nEss0Ne3PYqleRa6Ct+VKD9RQ4nyFs=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
Expand Down
52 changes: 26 additions & 26 deletions receiver/dcgmreceiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ import (

const maxWarningsForFailedDeviceMetricQuery = 5

var ErrDcgmInitialization = errors.New("error initializing DCGM")

type dcgmClient struct {
logger *zap.SugaredLogger
disable bool
handleCleanup func()
enabledFieldIDs []dcgm.Short
enabledFieldGroup dcgm.FieldHandle
Expand All @@ -57,21 +58,13 @@ var dcgmGetLatestValuesForFields = dcgm.GetLatestValuesForFields

func newClient(config *Config, logger *zap.Logger) (*dcgmClient, error) {
dcgmCleanup, err := initializeDcgm(config, logger)
// When DCGM is not installed or not running, return empty client
if err != nil {
return &dcgmClient{logger: logger.Sugar(), disable: true}, nil
}

deviceIndices, names, UUIDs, err := discoverDevices(logger)
if err != nil {
return nil, err
return nil, errors.Join(ErrDcgmInitialization, err)
}

deviceGroup, err := createDeviceGroup(logger, deviceIndices)
if err != nil {
return nil, err
}

deviceIndices := make([]uint, 0)
names := make([]string, 0)
UUIDs := make([]string, 0)
enabledFieldGroup := dcgm.FieldHandle{}
requestedFieldIDs := discoverRequestedFieldIDs(config)
supportedFieldIDs, err := getAllSupportedFields()
if err != nil {
Expand All @@ -83,11 +76,20 @@ func newClient(config *Config, logger *zap.Logger) (*dcgmClient, error) {
for _, f := range unavailableFields {
logger.Sugar().Warnf("Field '%s' is not supported. Metric '%s' will not be collected", dcgmIDToName[f], dcgmNameToMetricName[dcgmIDToName[f]])
}
enabledFieldGroup, err := setWatchesOnEnabledFields(config, logger, deviceGroup, enabledFields)
if err != nil {
return nil, fmt.Errorf("Unable to set field watches on %w", err)
if len(enabledFields) != 0 {
deviceIndices, names, UUIDs, err = discoverDevices(logger)
if err != nil {
return nil, err
}
deviceGroup, err := createDeviceGroup(logger, deviceIndices)
if err != nil {
return nil, err
}
enabledFieldGroup, err = setWatchesOnEnabledFields(config, logger, deviceGroup, enabledFields)
if err != nil {
return nil, fmt.Errorf("Unable to set field watches on %w", err)
}
}

return &dcgmClient{
logger: logger.Sugar(),
handleCleanup: dcgmCleanup,
Expand All @@ -100,15 +102,19 @@ func newClient(config *Config, logger *zap.Logger) (*dcgmClient, error) {
}, nil
}

// initializeDcgm tries to initialize a DCGM connection; returns a cleanup func
// only if the connection is initialized successfully without error
func initializeDcgm(config *Config, logger *zap.Logger) (func(), error) {
isSocket := "0"
dcgmCleanup, err := dcgmInit(config.TCPAddr.Endpoint, isSocket)
if err != nil {
msg := fmt.Sprintf("Unable to connect to DCGM daemon at %s on %v; Is the DCGM daemon running?", config.TCPAddr.Endpoint, err)
logger.Sugar().Warn(msg)
if dcgmCleanup != nil {
dcgmCleanup()
}
return nil, fmt.Errorf("%s", msg)
}

logger.Sugar().Infof("Connected to DCGM daemon at %s", config.TCPAddr.Endpoint)
return dcgmCleanup, nil
}
Expand Down Expand Up @@ -285,9 +291,7 @@ func (client *dcgmClient) cleanup() {
client.handleCleanup()
}

if !client.disable {
client.logger.Info("Shutdown DCGM")
}
client.logger.Info("Shutdown DCGM")
}

func (client *dcgmClient) getDeviceModelName(gpuIndex uint) string {
Expand All @@ -299,10 +303,6 @@ func (client *dcgmClient) getDeviceUUID(gpuIndex uint) string {
}

func (client *dcgmClient) collectDeviceMetrics() ([]dcgmMetric, error) {
if client.disable {
return nil, nil
}

var err scrapererror.ScrapeErrors
gpuMetrics := make([]dcgmMetric, 0, len(client.enabledFieldIDs)*len(client.deviceIndices))
for _, gpuIndex := range client.deviceIndices {
Expand Down
9 changes: 6 additions & 3 deletions receiver/dcgmreceiver/client_gpu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type modelSupportedFields struct {
func TestSupportedFieldsWithGolden(t *testing.T) {
config := createDefaultConfig().(*Config)
client, err := newClient(config, zaptest.NewLogger(t))
require.Nil(t, err)
require.Nil(t, err, "cannot initialize DCGM. Install and run DCGM before running tests.")

assert.NotEmpty(t, client.devicesModelName)
gpuModel := client.getDeviceModelName(0)
Expand Down Expand Up @@ -86,6 +86,7 @@ func TestSupportedFieldsWithGolden(t *testing.T) {
assert.Equal(t, len(dcgmNameToMetricName), len(client.enabledFieldIDs)+len(unavailableFieldsString))
goldenPath := getModelGoldenFilePath(t, gpuModel)
golden.Assert(t, string(actual), goldenPath)
client.cleanup()
}

// LoadExpectedMetrics read the supported metrics of a GPU model from the golden
Expand Down Expand Up @@ -137,7 +138,7 @@ func getModelGoldenFilePath(t *testing.T, model string) string {

func TestNewDcgmClientWithGpuPresent(t *testing.T) {
client, err := newClient(createDefaultConfig().(*Config), zaptest.NewLogger(t))
require.Nil(t, err)
require.Nil(t, err, "cannot initialize DCGM. Install and run DCGM before running tests.")

assert.NotNil(t, client)
assert.NotNil(t, client.handleCleanup)
Expand All @@ -146,11 +147,12 @@ func TestNewDcgmClientWithGpuPresent(t *testing.T) {
assert.Greater(t, len(client.devicesModelName[gpuIndex]), 0)
assert.Greater(t, len(client.devicesUUID[gpuIndex]), 0)
}
client.cleanup()
}

func TestCollectGpuProfilingMetrics(t *testing.T) {
client, err := newClient(createDefaultConfig().(*Config), zaptest.NewLogger(t))
require.Nil(t, err)
require.Nil(t, err, "cannot initialize DCGM. Install and run DCGM before running tests.")
expectedMetrics := LoadExpectedMetrics(t, client.devicesModelName[0])
var maxCollectionInterval = 60 * time.Second
before := time.Now().UnixMicro() - maxCollectionInterval.Microseconds()
Expand Down Expand Up @@ -213,4 +215,5 @@ func TestCollectGpuProfilingMetrics(t *testing.T) {
assert.Equal(t, seenMetric[fmt.Sprintf("gpu{%d}.metric{%s}", gpuIndex, metric)], true)
}
}
client.cleanup()
}
8 changes: 4 additions & 4 deletions receiver/dcgmreceiver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
package dcgmreceiver

import (
"errors"
"fmt"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"
Expand All @@ -46,7 +46,7 @@ func TestNewDcgmClientOnInitializationError(t *testing.T) {

client, err := newClient(createDefaultConfig().(*Config), logger)
assert.Equal(t, seenDcgmConnectionWarning, true)
require.NotNil(t, client)
require.Equal(t, client.disable, true)
require.Nil(t, err)
assert.True(t, errors.Is(err, ErrDcgmInitialization))
assert.Regexp(t, ".*Unable to connect.*", err)
assert.Nil(t, client)
}
28 changes: 25 additions & 3 deletions receiver/dcgmreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package dcgmreceiver

import (
"context"
"errors"
"fmt"
"time"

Expand All @@ -41,13 +42,29 @@ func newDcgmScraper(config *Config, settings receiver.CreateSettings) *dcgmScrap
return &dcgmScraper{config: config, settings: settings}
}

func (s *dcgmScraper) start(_ context.Context, _ component.Host) error {
var err error
s.client, err = newClient(s.config, s.settings.Logger)
// initClient will try to create a new dcgmClient if currently has no client;
// it will try to initialize the communication with the DCGM service; if
// success, create a client; only return errors if DCGM service is available but
// failed to create client.
func (s *dcgmScraper) initClient() error {
if s.client != nil {
return nil
}
client, err := newClient(s.config, s.settings.Logger)
if err != nil {
s.settings.Logger.Sugar().Warn(err)
if errors.Is(err, ErrDcgmInitialization) {
// If cannot connect to DCGM, return no error and retry at next
// collection time
return nil
}
return err
}
s.client = client
return nil
}

func (s *dcgmScraper) start(_ context.Context, _ component.Host) error {
startTime := pcommon.NewTimestampFromTime(time.Now())
mbConfig := metadata.DefaultMetricsBuilderConfig()
mbConfig.Metrics = s.config.Metrics
Expand All @@ -65,6 +82,11 @@ func (s *dcgmScraper) stop(_ context.Context) error {
}

func (s *dcgmScraper) scrape(_ context.Context) (pmetric.Metrics, error) {
err := s.initClient()
if err != nil || s.client == nil {
return s.mb.Emit(), err
}

deviceMetrics, err := s.client.collectDeviceMetrics()

now := pcommon.NewTimestampFromTime(time.Now())
Expand Down
87 changes: 86 additions & 1 deletion receiver/dcgmreceiver/scraper_gpu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/scraperhelper"
"go.uber.org/zap/zaptest"

"github.com/GoogleCloudPlatform/opentelemetry-operations-collector/receiver/dcgmreceiver/internal/metadata"
"github.com/GoogleCloudPlatform/opentelemetry-operations-collector/receiver/dcgmreceiver/testprofilepause"
)

Expand All @@ -49,6 +52,88 @@ func TestScrapeWithGpuPresent(t *testing.T) {
validateScraperResult(t, metrics, expectedMetrics)
}

func TestScrapeWithDelayedDcgmService(t *testing.T) {
realDcgmInit := dcgmInit
defer func() { dcgmInit = realDcgmInit }()
dcgmInit = func(args ...string) (func(), error) {
return nil, fmt.Errorf("No DCGM client library *OR* No DCGM connection")
}

var settings receiver.CreateSettings
settings.Logger = zaptest.NewLogger(t)

scraper := newDcgmScraper(createDefaultConfig().(*Config), settings)
require.NotNil(t, scraper)

err := scraper.start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)

metrics, err := scraper.scrape(context.Background())
assert.NoError(t, err) // If failed to init DCGM, should have no error
assert.Equal(t, metrics.MetricCount(), 0)

// Scrape again with DCGM not available
metrics, err = scraper.scrape(context.Background())
assert.NoError(t, err)
assert.Equal(t, metrics.MetricCount(), 0)

// Simulate DCGM becomes available
dcgmInit = realDcgmInit
metrics, err = scraper.scrape(context.Background())
assert.NoError(t, err)
expectedMetrics := loadExpectedScraperMetrics(t, scraper.client.getDeviceModelName(0))
validateScraperResult(t, metrics, expectedMetrics)
}

func TestScrapeWithEmptyMetricsConfig(t *testing.T) {
var settings receiver.CreateSettings
settings.Logger = zaptest.NewLogger(t)
emptyConfig := &Config{
ScraperControllerSettings: scraperhelper.ScraperControllerSettings{
CollectionInterval: defaultCollectionInterval,
},
TCPAddr: confignet.TCPAddr{
Endpoint: defaultEndpoint,
},
Metrics: metadata.MetricsSettings{
DcgmGpuMemoryBytesUsed: metadata.MetricSettings{
Enabled: false,
},
DcgmGpuProfilingDramUtilization: metadata.MetricSettings{
Enabled: false,
},
DcgmGpuProfilingNvlinkTrafficRate: metadata.MetricSettings{
Enabled: false,
},
DcgmGpuProfilingPcieTrafficRate: metadata.MetricSettings{
Enabled: false,
},
DcgmGpuProfilingPipeUtilization: metadata.MetricSettings{
Enabled: false,
},
DcgmGpuProfilingSmOccupancy: metadata.MetricSettings{
Enabled: false,
},
DcgmGpuProfilingSmUtilization: metadata.MetricSettings{
Enabled: false,
},
DcgmGpuUtilization: metadata.MetricSettings{
Enabled: false,
},
},
}

scraper := newDcgmScraper(emptyConfig, settings)
require.NotNil(t, scraper)

err := scraper.start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)

metrics, err := scraper.scrape(context.Background())
assert.NoError(t, err)
assert.Equal(t, metrics.MetricCount(), 0)
}

func TestScrapeOnPollingError(t *testing.T) {
realDcgmGetLatestValuesForFields := dcgmGetLatestValuesForFields
defer func() { dcgmGetLatestValuesForFields = realDcgmGetLatestValuesForFields }()
Expand Down Expand Up @@ -111,7 +196,7 @@ func TestScrapeOnProfilingPaused(t *testing.T) {

// loadExpectedScraperMetrics calls LoadExpectedMetrics to read the supported
// metrics from the golden file given a GPU model, and then convert the name
// from how they are definied in the dcgm client to scraper naming
// from how they are defined in the dcgm client to scraper naming
func loadExpectedScraperMetrics(t *testing.T, model string) map[string]int {
t.Helper()
expectedMetrics := make(map[string]int)
Expand Down
Loading

0 comments on commit 2c2416e

Please sign in to comment.