diff --git a/receiver/dcgmreceiver/scraper.go b/receiver/dcgmreceiver/scraper.go index e6daeda28..898564ddf 100644 --- a/receiver/dcgmreceiver/scraper.go +++ b/receiver/dcgmreceiver/scraper.go @@ -40,11 +40,32 @@ type dcgmScraper struct { mb *metadata.MetricsBuilder // Aggregate cumulative values. aggregates struct { - energyConsumptionFallback rateIntegrator[float64] // ...from power usage rate. - pcieTxTotal rateIntegrator[int64] // ...from pcie tx. - pcieRxTotal rateIntegrator[int64] // ...from pcie rx. - nvlinkTxTotal rateIntegrator[int64] // ...from nvlink tx. - nvlinkRxTotal rateIntegrator[int64] // ...from nvlink rx. + energyConsumption struct { + total cumulativeTracker[int64] + fallback rateIntegrator[float64] // ...from power usage rate. + } + pcieTotal struct { + tx rateIntegrator[int64] // ...from pcie tx. + rx rateIntegrator[int64] // ...from pcie rx. + } + nvlinkTotal struct { + tx rateIntegrator[int64] // ...from nvlink tx. + rx rateIntegrator[int64] // ...from nvlink rx. + } + throttleDuration struct { + powerViolation cumulativeTracker[int64] + thermalViolation cumulativeTracker[int64] + syncBoostViolation cumulativeTracker[int64] + boardLimitViolation cumulativeTracker[int64] + lowUtilViolation cumulativeTracker[int64] + reliabilityViolation cumulativeTracker[int64] + totalAppClocksViolation cumulativeTracker[int64] + totalBaseClocksViolation cumulativeTracker[int64] + } + eccTotal struct { + sbe cumulativeTracker[int64] + dbe cumulativeTracker[int64] + } } } @@ -87,11 +108,22 @@ func (s *dcgmScraper) start(_ context.Context, _ component.Host) error { mbConfig.Metrics = s.config.Metrics s.mb = metadata.NewMetricsBuilder( mbConfig, s.settings, metadata.WithStartTime(startTime)) - s.aggregates.energyConsumptionFallback.Reset() - s.aggregates.pcieTxTotal.Reset() - s.aggregates.pcieRxTotal.Reset() - s.aggregates.nvlinkTxTotal.Reset() - s.aggregates.nvlinkRxTotal.Reset() + s.aggregates.energyConsumption.total.Reset() + s.aggregates.energyConsumption.fallback.Reset() + s.aggregates.pcieTotal.tx.Reset() + s.aggregates.pcieTotal.rx.Reset() + s.aggregates.nvlinkTotal.tx.Reset() + s.aggregates.nvlinkTotal.rx.Reset() + s.aggregates.throttleDuration.powerViolation.Reset() + s.aggregates.throttleDuration.thermalViolation.Reset() + s.aggregates.throttleDuration.syncBoostViolation.Reset() + s.aggregates.throttleDuration.boardLimitViolation.Reset() + s.aggregates.throttleDuration.lowUtilViolation.Reset() + s.aggregates.throttleDuration.reliabilityViolation.Reset() + s.aggregates.throttleDuration.totalAppClocksViolation.Reset() + s.aggregates.throttleDuration.totalBaseClocksViolation.Reset() + s.aggregates.eccTotal.sbe.Reset() + s.aggregates.eccTotal.dbe.Reset() return nil } @@ -260,31 +292,33 @@ func (s *dcgmScraper) scrape(_ context.Context) (pmetric.Metrics, error) { s.mb.RecordGpuDcgmMemoryBandwidthUtilizationDataPoint(now, memCopyUtil) } if metric, ok := metrics["DCGM_FI_PROF_PCIE_TX_BYTES"]; ok { - s.aggregates.pcieTxTotal.Update(metric.timestamp, metric.asInt64()) - _, pcieTx := s.aggregates.pcieTxTotal.Value() + s.aggregates.pcieTotal.tx.Update(metric.timestamp, metric.asInt64()) + _, pcieTx := s.aggregates.pcieTotal.tx.Value() s.mb.RecordGpuDcgmPcieIoDataPoint(now, pcieTx, metadata.AttributeNetworkIoDirectionTransmit) } if metric, ok := metrics["DCGM_FI_PROF_PCIE_RX_BYTES"]; ok { - s.aggregates.pcieRxTotal.Update(metric.timestamp, metric.asInt64()) - _, pcieRx := s.aggregates.pcieRxTotal.Value() + s.aggregates.pcieTotal.rx.Update(metric.timestamp, metric.asInt64()) + _, pcieRx := s.aggregates.pcieTotal.rx.Value() s.mb.RecordGpuDcgmPcieIoDataPoint(now, pcieRx, metadata.AttributeNetworkIoDirectionReceive) } if metric, ok := metrics["DCGM_FI_PROF_NVLINK_TX_BYTES"]; ok { - s.aggregates.nvlinkTxTotal.Update(metric.timestamp, metric.asInt64()) - _, nvlinkTx := s.aggregates.nvlinkTxTotal.Value() + s.aggregates.nvlinkTotal.tx.Update(metric.timestamp, metric.asInt64()) + _, nvlinkTx := s.aggregates.nvlinkTotal.tx.Value() s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, nvlinkTx, metadata.AttributeNetworkIoDirectionTransmit) } if metric, ok := metrics["DCGM_FI_PROF_NVLINK_RX_BYTES"]; ok { - s.aggregates.nvlinkRxTotal.Update(metric.timestamp, metric.asInt64()) - _, nvlinkRx := s.aggregates.nvlinkRxTotal.Value() + s.aggregates.nvlinkTotal.rx.Update(metric.timestamp, metric.asInt64()) + _, nvlinkRx := s.aggregates.nvlinkTotal.rx.Value() s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, nvlinkRx, metadata.AttributeNetworkIoDirectionReceive) } if metric, ok := metrics["DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION"]; ok { - energyUsed := float64(metric.asInt64()) / 1e3 /* mJ to J */ + s.aggregates.energyConsumption.total.Update(metric.timestamp, metric.asInt64()) + _, value := s.aggregates.energyConsumption.total.Value() + energyUsed := float64(value) / 1e3 /* mJ to J */ s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, energyUsed) } else if metric, ok := metrics["DCGM_FI_DEV_POWER_USAGE"]; ok { // fallback - s.aggregates.energyConsumptionFallback.Update(metric.timestamp, metric.asFloat64()) - _, energyUsed := s.aggregates.energyConsumptionFallback.Value() + s.aggregates.energyConsumption.fallback.Update(metric.timestamp, metric.asFloat64()) + _, energyUsed := s.aggregates.energyConsumption.fallback.Value() s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, energyUsed) } if metric, ok := metrics["DCGM_FI_DEV_GPU_TEMP"]; ok { @@ -295,42 +329,62 @@ func (s *dcgmScraper) scrape(_ context.Context) (pmetric.Metrics, error) { s.mb.RecordGpuDcgmClockFrequencyDataPoint(now, clockFreq) } if metric, ok := metrics["DCGM_FI_DEV_POWER_VIOLATION"]; ok { - violationTime := float64(metric.asInt64()) / 1e6 /* us to s */ + s.aggregates.throttleDuration.powerViolation.Update(metric.timestamp, metric.asInt64()) + _, value := s.aggregates.throttleDuration.powerViolation.Value() + violationTime := float64(value) / 1e6 /* us to s */ s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationPower) } if metric, ok := metrics["DCGM_FI_DEV_THERMAL_VIOLATION"]; ok { - violationTime := float64(metric.asInt64()) / 1e6 /* us to s */ + s.aggregates.throttleDuration.thermalViolation.Update(metric.timestamp, metric.asInt64()) + _, value := s.aggregates.throttleDuration.thermalViolation.Value() + violationTime := float64(value) / 1e6 /* us to s */ s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationThermal) } if metric, ok := metrics["DCGM_FI_DEV_SYNC_BOOST_VIOLATION"]; ok { - violationTime := float64(metric.asInt64()) / 1e6 /* us to s */ + s.aggregates.throttleDuration.syncBoostViolation.Update(metric.timestamp, metric.asInt64()) + _, value := s.aggregates.throttleDuration.syncBoostViolation.Value() + violationTime := float64(value) / 1e6 /* us to s */ s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationSyncBoost) } if metric, ok := metrics["DCGM_FI_DEV_BOARD_LIMIT_VIOLATION"]; ok { - violationTime := float64(metric.asInt64()) / 1e6 /* us to s */ + s.aggregates.throttleDuration.boardLimitViolation.Update(metric.timestamp, metric.asInt64()) + _, value := s.aggregates.throttleDuration.boardLimitViolation.Value() + violationTime := float64(value) / 1e6 /* us to s */ s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationBoardLimit) } if metric, ok := metrics["DCGM_FI_DEV_LOW_UTIL_VIOLATION"]; ok { - violationTime := float64(metric.asInt64()) / 1e6 /* us to s */ + s.aggregates.throttleDuration.lowUtilViolation.Update(metric.timestamp, metric.asInt64()) + _, value := s.aggregates.throttleDuration.lowUtilViolation.Value() + violationTime := float64(value) / 1e6 /* us to s */ s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationLowUtil) } if metric, ok := metrics["DCGM_FI_DEV_RELIABILITY_VIOLATION"]; ok { - violationTime := float64(metric.asInt64()) / 1e6 /* us to s */ + s.aggregates.throttleDuration.reliabilityViolation.Update(metric.timestamp, metric.asInt64()) + _, value := s.aggregates.throttleDuration.reliabilityViolation.Value() + violationTime := float64(value) / 1e6 /* us to s */ s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationReliability) } if metric, ok := metrics["DCGM_FI_DEV_TOTAL_APP_CLOCKS_VIOLATION"]; ok { - violationTime := float64(metric.asInt64()) / 1e6 /* us to s */ + s.aggregates.throttleDuration.totalAppClocksViolation.Update(metric.timestamp, metric.asInt64()) + _, value := s.aggregates.throttleDuration.totalAppClocksViolation.Value() + violationTime := float64(value) / 1e6 /* us to s */ s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationAppClock) } if metric, ok := metrics["DCGM_FI_DEV_TOTAL_BASE_CLOCKS_VIOLATION"]; ok { - violationTime := float64(metric.asInt64()) / 1e6 /* us to s */ + s.aggregates.throttleDuration.totalBaseClocksViolation.Update(metric.timestamp, metric.asInt64()) + _, value := s.aggregates.throttleDuration.totalBaseClocksViolation.Value() + violationTime := float64(value) / 1e6 /* us to s */ s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationBaseClock) } if metric, ok := metrics["DCGM_FI_DEV_ECC_SBE_VOL_TOTAL"]; ok { - s.mb.RecordGpuDcgmEccErrorsDataPoint(now, metric.asInt64(), metadata.AttributeGpuErrorTypeSbe) + s.aggregates.eccTotal.sbe.Update(metric.timestamp, metric.asInt64()) + _, sbeErrors := s.aggregates.eccTotal.sbe.Value() + s.mb.RecordGpuDcgmEccErrorsDataPoint(now, sbeErrors, metadata.AttributeGpuErrorTypeSbe) } if metric, ok := metrics["DCGM_FI_DEV_ECC_DBE_VOL_TOTAL"]; ok { - s.mb.RecordGpuDcgmEccErrorsDataPoint(now, metric.asInt64(), metadata.AttributeGpuErrorTypeDbe) + s.aggregates.eccTotal.dbe.Update(metric.timestamp, metric.asInt64()) + _, dbeErrors := s.aggregates.eccTotal.dbe.Value() + s.mb.RecordGpuDcgmEccErrorsDataPoint(now, dbeErrors, metadata.AttributeGpuErrorTypeDbe) } // TODO: XID errors. // s.mb.RecordGpuDcgmXidErrorsDataPoint(now, metric.asInt64(), xid) diff --git a/receiver/dcgmreceiver/util.go b/receiver/dcgmreceiver/util.go index b8033345c..8d3a819a3 100644 --- a/receiver/dcgmreceiver/util.go +++ b/receiver/dcgmreceiver/util.go @@ -54,6 +54,41 @@ func (i *rateIntegrator[V]) Value() (int64, V) { return i.lastTimestamp, i.aggregatedRateUs / V(1e6) } +// cumulativeTracker records cumulative values since last reset. +type cumulativeTracker[V int64 | float64] struct { + baseTimestamp int64 + baseline V // the value seen at baseTimestamp. + lastTimestamp int64 + lastValue V // the value seen at lastTimestamp. +} + +func (i *cumulativeTracker[V]) Reset() { + i.baseTimestamp = 0 + i.lastTimestamp = nowUnixMicro() + i.baseline = V(0) + i.lastValue = V(0) +} + +func (i *cumulativeTracker[V]) Update(ts int64, v V) { + // On first update, record the value as the baseline. + if i.baseTimestamp == 0 { + i.baseTimestamp, i.baseline = ts, v + } + // Drop stale points. + if ts <= i.lastTimestamp { + return + } + i.lastTimestamp, i.lastValue = ts, v +} + +func (i *cumulativeTracker[V]) Value() (int64, V) { + return i.lastTimestamp, i.lastValue - i.baseline +} + +func (i *cumulativeTracker[V]) Baseline() (int64, V) { + return i.baseTimestamp, i.baseline +} + var ( errBlankValue = fmt.Errorf("unspecified blank value") errDataNotFound = fmt.Errorf("data not found") diff --git a/receiver/dcgmreceiver/util_test.go b/receiver/dcgmreceiver/util_test.go index 60e9a8ad0..22a711ba3 100644 --- a/receiver/dcgmreceiver/util_test.go +++ b/receiver/dcgmreceiver/util_test.go @@ -66,3 +66,53 @@ func TestRateIntegratorInt64(t *testing.T) { func TestRateIntegratorFloat64(t *testing.T) { testRateIntegrator[float64](t) } + +func testCumulativeTracker[V int64 | float64](t *testing.T) { + origNowUnixMicro := nowUnixMicro + nowUnixMicro = func() int64 { return 10 } + defer func() { nowUnixMicro = origNowUnixMicro }() + + type P struct { + ts int64 + v V + } + p := func(ts int64, v V) P { return P{ts, v} } + + var ct cumulativeTracker[V] + + ct.Reset() + require.Equal(t, P{0, 0}, p(ct.Baseline())) + require.Equal(t, P{10, 0}, p(ct.Value())) + // Ensure first updates sets the baseline. + ct.Update(15, 50) + require.Equal(t, P{15, 50}, p(ct.Baseline())) + assert.Equal(t, P{15, 0}, p(ct.Value())) + // Ensure updates affect values, but not the baseline. + ct.Update(20, 80) + assert.Equal(t, P{15, 50}, p(ct.Baseline())) + assert.Equal(t, P{20, 30}, p(ct.Value())) + // Ensure stale points are ignored. + ct.Update(18, 1e8) + assert.Equal(t, P{20, 30}, p(ct.Value())) + ct.Update(20, 1e8) + assert.Equal(t, P{20, 30}, p(ct.Value())) + // Ensure updates affect values. + ct.Update(25, 100) + assert.Equal(t, P{25, 50}, p(ct.Value())) + // Ensure same inputs don't affect values. + ct.Update(30, 100) + assert.Equal(t, P{30, 50}, p(ct.Value())) + + // Ensure the value and baseline are cleared on reset. + ct.Reset() + assert.Equal(t, P{0, 0}, p(ct.Baseline())) + assert.Equal(t, P{10, 0}, p(ct.Value())) +} + +func TestCumulativeTrackerInt64(t *testing.T) { + testCumulativeTracker[int64](t) +} + +func TestCumulativeTrackerFloat64(t *testing.T) { + testCumulativeTracker[float64](t) +}