From 2c2416e3c7599e7ceee8835451deb41accd5441a Mon Sep 17 00:00:00 2001 From: Lujie Duan Date: Mon, 14 Aug 2023 10:39:43 -0400 Subject: [PATCH] DCGM: Enable the receiver to retry dcgm initialization at each collection time (#178) --- go.mod | 2 +- go.sum | 2 + receiver/dcgmreceiver/client.go | 52 +++++++------- receiver/dcgmreceiver/client_gpu_test.go | 9 ++- receiver/dcgmreceiver/client_test.go | 8 +-- receiver/dcgmreceiver/scraper.go | 28 +++++++- receiver/dcgmreceiver/scraper_gpu_test.go | 87 ++++++++++++++++++++++- receiver/dcgmreceiver/scraper_test.go | 63 ++++++++++++++++ 8 files changed, 213 insertions(+), 38 deletions(-) create mode 100644 receiver/dcgmreceiver/scraper_test.go diff --git a/go.mod b/go.mod index 5b1a67c20..bf686733c 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.20 require ( github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/collector v0.42.0 github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/collector/googlemanagedprometheus v0.42.0 - github.com/NVIDIA/go-dcgm v0.0.0-20221107203308-b6ed78cdc8d3 + github.com/NVIDIA/go-dcgm v0.0.0-20230811193702-90bac724c747 github.com/NVIDIA/go-nvml v0.11.6-0 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter v0.81.0 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/googlecloudexporter v0.81.0 diff --git a/go.sum b/go.sum index 57e48a311..bcab758e9 100644 --- a/go.sum +++ b/go.sum @@ -109,6 +109,8 @@ github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2B github.com/Mottl/ctimefmt v0.0.0-20190803144728-fd2ac23a585a/go.mod h1:eyj2WSIdoPMPs2eNTLpSmM6Nzqo4V80/d6jHpnJ1SAI= github.com/NVIDIA/go-dcgm v0.0.0-20221107203308-b6ed78cdc8d3 h1:qk/qQaBY+918SBhcJsqYwZdDaS3MRtUH2SUX53SC7Cs= github.com/NVIDIA/go-dcgm v0.0.0-20221107203308-b6ed78cdc8d3/go.mod h1:atKWXstYFllLarJucBxnt9ivFIPe26Y6S4JQvzqeSr8= +github.com/NVIDIA/go-dcgm v0.0.0-20230811193702-90bac724c747 h1:inukNBh/RJl4CsxPL3rwSAUwKI90YC3Vf8uaJ0V/J3o= +github.com/NVIDIA/go-dcgm v0.0.0-20230811193702-90bac724c747/go.mod h1:atKWXstYFllLarJucBxnt9ivFIPe26Y6S4JQvzqeSr8= github.com/NVIDIA/go-nvml v0.11.6-0 h1:tugQzmaX84Y/6+03wZ/MAgcpfSKDkvkAWeuxFNLHmxY= github.com/NVIDIA/go-nvml v0.11.6-0/go.mod h1:hy7HYeQy335x6nEss0Ne3PYqleRa6Ct+VKD9RQ4nyFs= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= diff --git a/receiver/dcgmreceiver/client.go b/receiver/dcgmreceiver/client.go index f3e5407eb..04b592db2 100644 --- a/receiver/dcgmreceiver/client.go +++ b/receiver/dcgmreceiver/client.go @@ -29,9 +29,10 @@ import ( const maxWarningsForFailedDeviceMetricQuery = 5 +var ErrDcgmInitialization = errors.New("error initializing DCGM") + type dcgmClient struct { logger *zap.SugaredLogger - disable bool handleCleanup func() enabledFieldIDs []dcgm.Short enabledFieldGroup dcgm.FieldHandle @@ -57,21 +58,13 @@ var dcgmGetLatestValuesForFields = dcgm.GetLatestValuesForFields func newClient(config *Config, logger *zap.Logger) (*dcgmClient, error) { dcgmCleanup, err := initializeDcgm(config, logger) - // When DCGM is not installed or not running, return empty client - if err != nil { - return &dcgmClient{logger: logger.Sugar(), disable: true}, nil - } - - deviceIndices, names, UUIDs, err := discoverDevices(logger) if err != nil { - return nil, err + return nil, errors.Join(ErrDcgmInitialization, err) } - - deviceGroup, err := createDeviceGroup(logger, deviceIndices) - if err != nil { - return nil, err - } - + deviceIndices := make([]uint, 0) + names := make([]string, 0) + UUIDs := make([]string, 0) + enabledFieldGroup := dcgm.FieldHandle{} requestedFieldIDs := discoverRequestedFieldIDs(config) supportedFieldIDs, err := getAllSupportedFields() if err != nil { @@ -83,11 +76,20 @@ func newClient(config *Config, logger *zap.Logger) (*dcgmClient, error) { for _, f := range unavailableFields { logger.Sugar().Warnf("Field '%s' is not supported. Metric '%s' will not be collected", dcgmIDToName[f], dcgmNameToMetricName[dcgmIDToName[f]]) } - enabledFieldGroup, err := setWatchesOnEnabledFields(config, logger, deviceGroup, enabledFields) - if err != nil { - return nil, fmt.Errorf("Unable to set field watches on %w", err) + if len(enabledFields) != 0 { + deviceIndices, names, UUIDs, err = discoverDevices(logger) + if err != nil { + return nil, err + } + deviceGroup, err := createDeviceGroup(logger, deviceIndices) + if err != nil { + return nil, err + } + enabledFieldGroup, err = setWatchesOnEnabledFields(config, logger, deviceGroup, enabledFields) + if err != nil { + return nil, fmt.Errorf("Unable to set field watches on %w", err) + } } - return &dcgmClient{ logger: logger.Sugar(), handleCleanup: dcgmCleanup, @@ -100,15 +102,19 @@ func newClient(config *Config, logger *zap.Logger) (*dcgmClient, error) { }, nil } +// initializeDcgm tries to initialize a DCGM connection; returns a cleanup func +// only if the connection is initialized successfully without error func initializeDcgm(config *Config, logger *zap.Logger) (func(), error) { isSocket := "0" dcgmCleanup, err := dcgmInit(config.TCPAddr.Endpoint, isSocket) if err != nil { msg := fmt.Sprintf("Unable to connect to DCGM daemon at %s on %v; Is the DCGM daemon running?", config.TCPAddr.Endpoint, err) logger.Sugar().Warn(msg) + if dcgmCleanup != nil { + dcgmCleanup() + } return nil, fmt.Errorf("%s", msg) } - logger.Sugar().Infof("Connected to DCGM daemon at %s", config.TCPAddr.Endpoint) return dcgmCleanup, nil } @@ -285,9 +291,7 @@ func (client *dcgmClient) cleanup() { client.handleCleanup() } - if !client.disable { - client.logger.Info("Shutdown DCGM") - } + client.logger.Info("Shutdown DCGM") } func (client *dcgmClient) getDeviceModelName(gpuIndex uint) string { @@ -299,10 +303,6 @@ func (client *dcgmClient) getDeviceUUID(gpuIndex uint) string { } func (client *dcgmClient) collectDeviceMetrics() ([]dcgmMetric, error) { - if client.disable { - return nil, nil - } - var err scrapererror.ScrapeErrors gpuMetrics := make([]dcgmMetric, 0, len(client.enabledFieldIDs)*len(client.deviceIndices)) for _, gpuIndex := range client.deviceIndices { diff --git a/receiver/dcgmreceiver/client_gpu_test.go b/receiver/dcgmreceiver/client_gpu_test.go index debcbf678..76ad622a7 100644 --- a/receiver/dcgmreceiver/client_gpu_test.go +++ b/receiver/dcgmreceiver/client_gpu_test.go @@ -53,7 +53,7 @@ type modelSupportedFields struct { func TestSupportedFieldsWithGolden(t *testing.T) { config := createDefaultConfig().(*Config) client, err := newClient(config, zaptest.NewLogger(t)) - require.Nil(t, err) + require.Nil(t, err, "cannot initialize DCGM. Install and run DCGM before running tests.") assert.NotEmpty(t, client.devicesModelName) gpuModel := client.getDeviceModelName(0) @@ -86,6 +86,7 @@ func TestSupportedFieldsWithGolden(t *testing.T) { assert.Equal(t, len(dcgmNameToMetricName), len(client.enabledFieldIDs)+len(unavailableFieldsString)) goldenPath := getModelGoldenFilePath(t, gpuModel) golden.Assert(t, string(actual), goldenPath) + client.cleanup() } // LoadExpectedMetrics read the supported metrics of a GPU model from the golden @@ -137,7 +138,7 @@ func getModelGoldenFilePath(t *testing.T, model string) string { func TestNewDcgmClientWithGpuPresent(t *testing.T) { client, err := newClient(createDefaultConfig().(*Config), zaptest.NewLogger(t)) - require.Nil(t, err) + require.Nil(t, err, "cannot initialize DCGM. Install and run DCGM before running tests.") assert.NotNil(t, client) assert.NotNil(t, client.handleCleanup) @@ -146,11 +147,12 @@ func TestNewDcgmClientWithGpuPresent(t *testing.T) { assert.Greater(t, len(client.devicesModelName[gpuIndex]), 0) assert.Greater(t, len(client.devicesUUID[gpuIndex]), 0) } + client.cleanup() } func TestCollectGpuProfilingMetrics(t *testing.T) { client, err := newClient(createDefaultConfig().(*Config), zaptest.NewLogger(t)) - require.Nil(t, err) + require.Nil(t, err, "cannot initialize DCGM. Install and run DCGM before running tests.") expectedMetrics := LoadExpectedMetrics(t, client.devicesModelName[0]) var maxCollectionInterval = 60 * time.Second before := time.Now().UnixMicro() - maxCollectionInterval.Microseconds() @@ -213,4 +215,5 @@ func TestCollectGpuProfilingMetrics(t *testing.T) { assert.Equal(t, seenMetric[fmt.Sprintf("gpu{%d}.metric{%s}", gpuIndex, metric)], true) } } + client.cleanup() } diff --git a/receiver/dcgmreceiver/client_test.go b/receiver/dcgmreceiver/client_test.go index 66d29357a..6d390aa60 100644 --- a/receiver/dcgmreceiver/client_test.go +++ b/receiver/dcgmreceiver/client_test.go @@ -18,12 +18,12 @@ package dcgmreceiver import ( + "errors" "fmt" "strings" "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest" @@ -46,7 +46,7 @@ func TestNewDcgmClientOnInitializationError(t *testing.T) { client, err := newClient(createDefaultConfig().(*Config), logger) assert.Equal(t, seenDcgmConnectionWarning, true) - require.NotNil(t, client) - require.Equal(t, client.disable, true) - require.Nil(t, err) + assert.True(t, errors.Is(err, ErrDcgmInitialization)) + assert.Regexp(t, ".*Unable to connect.*", err) + assert.Nil(t, client) } diff --git a/receiver/dcgmreceiver/scraper.go b/receiver/dcgmreceiver/scraper.go index 1eb5ad5a1..2768e50d9 100644 --- a/receiver/dcgmreceiver/scraper.go +++ b/receiver/dcgmreceiver/scraper.go @@ -19,6 +19,7 @@ package dcgmreceiver import ( "context" + "errors" "fmt" "time" @@ -41,13 +42,29 @@ func newDcgmScraper(config *Config, settings receiver.CreateSettings) *dcgmScrap return &dcgmScraper{config: config, settings: settings} } -func (s *dcgmScraper) start(_ context.Context, _ component.Host) error { - var err error - s.client, err = newClient(s.config, s.settings.Logger) +// initClient will try to create a new dcgmClient if currently has no client; +// it will try to initialize the communication with the DCGM service; if +// success, create a client; only return errors if DCGM service is available but +// failed to create client. +func (s *dcgmScraper) initClient() error { + if s.client != nil { + return nil + } + client, err := newClient(s.config, s.settings.Logger) if err != nil { + s.settings.Logger.Sugar().Warn(err) + if errors.Is(err, ErrDcgmInitialization) { + // If cannot connect to DCGM, return no error and retry at next + // collection time + return nil + } return err } + s.client = client + return nil +} +func (s *dcgmScraper) start(_ context.Context, _ component.Host) error { startTime := pcommon.NewTimestampFromTime(time.Now()) mbConfig := metadata.DefaultMetricsBuilderConfig() mbConfig.Metrics = s.config.Metrics @@ -65,6 +82,11 @@ func (s *dcgmScraper) stop(_ context.Context) error { } func (s *dcgmScraper) scrape(_ context.Context) (pmetric.Metrics, error) { + err := s.initClient() + if err != nil || s.client == nil { + return s.mb.Emit(), err + } + deviceMetrics, err := s.client.collectDeviceMetrics() now := pcommon.NewTimestampFromTime(time.Now()) diff --git a/receiver/dcgmreceiver/scraper_gpu_test.go b/receiver/dcgmreceiver/scraper_gpu_test.go index dedcb0ebb..439c11eae 100644 --- a/receiver/dcgmreceiver/scraper_gpu_test.go +++ b/receiver/dcgmreceiver/scraper_gpu_test.go @@ -27,10 +27,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/scraperhelper" "go.uber.org/zap/zaptest" + "github.com/GoogleCloudPlatform/opentelemetry-operations-collector/receiver/dcgmreceiver/internal/metadata" "github.com/GoogleCloudPlatform/opentelemetry-operations-collector/receiver/dcgmreceiver/testprofilepause" ) @@ -49,6 +52,88 @@ func TestScrapeWithGpuPresent(t *testing.T) { validateScraperResult(t, metrics, expectedMetrics) } +func TestScrapeWithDelayedDcgmService(t *testing.T) { + realDcgmInit := dcgmInit + defer func() { dcgmInit = realDcgmInit }() + dcgmInit = func(args ...string) (func(), error) { + return nil, fmt.Errorf("No DCGM client library *OR* No DCGM connection") + } + + var settings receiver.CreateSettings + settings.Logger = zaptest.NewLogger(t) + + scraper := newDcgmScraper(createDefaultConfig().(*Config), settings) + require.NotNil(t, scraper) + + err := scraper.start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + + metrics, err := scraper.scrape(context.Background()) + assert.NoError(t, err) // If failed to init DCGM, should have no error + assert.Equal(t, metrics.MetricCount(), 0) + + // Scrape again with DCGM not available + metrics, err = scraper.scrape(context.Background()) + assert.NoError(t, err) + assert.Equal(t, metrics.MetricCount(), 0) + + // Simulate DCGM becomes available + dcgmInit = realDcgmInit + metrics, err = scraper.scrape(context.Background()) + assert.NoError(t, err) + expectedMetrics := loadExpectedScraperMetrics(t, scraper.client.getDeviceModelName(0)) + validateScraperResult(t, metrics, expectedMetrics) +} + +func TestScrapeWithEmptyMetricsConfig(t *testing.T) { + var settings receiver.CreateSettings + settings.Logger = zaptest.NewLogger(t) + emptyConfig := &Config{ + ScraperControllerSettings: scraperhelper.ScraperControllerSettings{ + CollectionInterval: defaultCollectionInterval, + }, + TCPAddr: confignet.TCPAddr{ + Endpoint: defaultEndpoint, + }, + Metrics: metadata.MetricsSettings{ + DcgmGpuMemoryBytesUsed: metadata.MetricSettings{ + Enabled: false, + }, + DcgmGpuProfilingDramUtilization: metadata.MetricSettings{ + Enabled: false, + }, + DcgmGpuProfilingNvlinkTrafficRate: metadata.MetricSettings{ + Enabled: false, + }, + DcgmGpuProfilingPcieTrafficRate: metadata.MetricSettings{ + Enabled: false, + }, + DcgmGpuProfilingPipeUtilization: metadata.MetricSettings{ + Enabled: false, + }, + DcgmGpuProfilingSmOccupancy: metadata.MetricSettings{ + Enabled: false, + }, + DcgmGpuProfilingSmUtilization: metadata.MetricSettings{ + Enabled: false, + }, + DcgmGpuUtilization: metadata.MetricSettings{ + Enabled: false, + }, + }, + } + + scraper := newDcgmScraper(emptyConfig, settings) + require.NotNil(t, scraper) + + err := scraper.start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + + metrics, err := scraper.scrape(context.Background()) + assert.NoError(t, err) + assert.Equal(t, metrics.MetricCount(), 0) +} + func TestScrapeOnPollingError(t *testing.T) { realDcgmGetLatestValuesForFields := dcgmGetLatestValuesForFields defer func() { dcgmGetLatestValuesForFields = realDcgmGetLatestValuesForFields }() @@ -111,7 +196,7 @@ func TestScrapeOnProfilingPaused(t *testing.T) { // loadExpectedScraperMetrics calls LoadExpectedMetrics to read the supported // metrics from the golden file given a GPU model, and then convert the name -// from how they are definied in the dcgm client to scraper naming +// from how they are defined in the dcgm client to scraper naming func loadExpectedScraperMetrics(t *testing.T, model string) map[string]int { t.Helper() expectedMetrics := make(map[string]int) diff --git a/receiver/dcgmreceiver/scraper_test.go b/receiver/dcgmreceiver/scraper_test.go new file mode 100644 index 000000000..b4900aaa1 --- /dev/null +++ b/receiver/dcgmreceiver/scraper_test.go @@ -0,0 +1,63 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build gpu && !has_gpu +// +build gpu,!has_gpu + +package dcgmreceiver + +import ( + "context" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest" +) + +func TestScraperWithoutDcgm(t *testing.T) { + var settings receiver.CreateSettings + seenDcgmNotInstalledWarning := false + settings.Logger = zaptest.NewLogger(t, zaptest.WrapOptions(zap.Hooks(func(e zapcore.Entry) error { + if e.Level == zap.WarnLevel && strings.Contains(e.Message, "Unable to connect to DCGM daemon at localhost:5555 on libdcgm.so not Found; Is the DCGM daemon running") { + seenDcgmNotInstalledWarning = true + } + return nil + }))) + + scraper := newDcgmScraper(createDefaultConfig().(*Config), settings) + require.NotNil(t, scraper) + + err := scraper.start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + + metrics, err := scraper.scrape(context.Background()) + assert.Equal(t, true, seenDcgmNotInstalledWarning) + assert.NoError(t, err) // If failed to init DCGM, should have no error + assert.Equal(t, 0, metrics.MetricCount()) + + // Scrape again with DCGM not available + metrics, err = scraper.scrape(context.Background()) + assert.Equal(t, true, seenDcgmNotInstalledWarning) + assert.NoError(t, err) + assert.Equal(t, 0, metrics.MetricCount()) + + err = scraper.stop(context.Background()) + assert.NoError(t, err) +}