Skip to content

Commit

Permalink
Implement a cumulativeTracker struct.
Browse files Browse the repository at this point in the history
Rearrange aggregates in scraper.
  • Loading branch information
igorpeshansky committed Jul 27, 2024
1 parent cc46305 commit b00e858
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 31 deletions.
116 changes: 85 additions & 31 deletions receiver/dcgmreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,32 @@ type dcgmScraper struct {
mb *metadata.MetricsBuilder
// Aggregate cumulative values.
aggregates struct {
energyConsumptionFallback rateIntegrator[float64] // ...from power usage rate.
pcieTxTotal rateIntegrator[int64] // ...from pcie tx.
pcieRxTotal rateIntegrator[int64] // ...from pcie rx.
nvlinkTxTotal rateIntegrator[int64] // ...from nvlink tx.
nvlinkRxTotal rateIntegrator[int64] // ...from nvlink rx.
energyConsumption struct {
total cumulativeTracker[int64]
fallback rateIntegrator[float64] // ...from power usage rate.
}
pcieTotal struct {
tx rateIntegrator[int64] // ...from pcie tx.
rx rateIntegrator[int64] // ...from pcie rx.
}
nvlinkTotal struct {
tx rateIntegrator[int64] // ...from nvlink tx.
rx rateIntegrator[int64] // ...from nvlink rx.
}
throttleDuration struct {
powerViolation cumulativeTracker[int64]
thermalViolation cumulativeTracker[int64]
syncBoostViolation cumulativeTracker[int64]
boardLimitViolation cumulativeTracker[int64]
lowUtilViolation cumulativeTracker[int64]
reliabilityViolation cumulativeTracker[int64]
totalAppClocksViolation cumulativeTracker[int64]
totalBaseClocksViolation cumulativeTracker[int64]
}
eccTotal struct {
sbe cumulativeTracker[int64]
dbe cumulativeTracker[int64]
}
}
}

Expand Down Expand Up @@ -87,11 +108,22 @@ func (s *dcgmScraper) start(_ context.Context, _ component.Host) error {
mbConfig.Metrics = s.config.Metrics
s.mb = metadata.NewMetricsBuilder(
mbConfig, s.settings, metadata.WithStartTime(startTime))
s.aggregates.energyConsumptionFallback.Reset()
s.aggregates.pcieTxTotal.Reset()
s.aggregates.pcieRxTotal.Reset()
s.aggregates.nvlinkTxTotal.Reset()
s.aggregates.nvlinkRxTotal.Reset()
s.aggregates.energyConsumption.total.Reset()
s.aggregates.energyConsumption.fallback.Reset()
s.aggregates.pcieTotal.tx.Reset()
s.aggregates.pcieTotal.rx.Reset()
s.aggregates.nvlinkTotal.tx.Reset()
s.aggregates.nvlinkTotal.rx.Reset()
s.aggregates.throttleDuration.powerViolation.Reset()
s.aggregates.throttleDuration.thermalViolation.Reset()
s.aggregates.throttleDuration.syncBoostViolation.Reset()
s.aggregates.throttleDuration.boardLimitViolation.Reset()
s.aggregates.throttleDuration.lowUtilViolation.Reset()
s.aggregates.throttleDuration.reliabilityViolation.Reset()
s.aggregates.throttleDuration.totalAppClocksViolation.Reset()
s.aggregates.throttleDuration.totalBaseClocksViolation.Reset()
s.aggregates.eccTotal.sbe.Reset()
s.aggregates.eccTotal.dbe.Reset()

return nil
}
Expand Down Expand Up @@ -260,31 +292,33 @@ func (s *dcgmScraper) scrape(_ context.Context) (pmetric.Metrics, error) {
s.mb.RecordGpuDcgmMemoryBandwidthUtilizationDataPoint(now, memCopyUtil)
}
if metric, ok := metrics["DCGM_FI_PROF_PCIE_TX_BYTES"]; ok {
s.aggregates.pcieTxTotal.Update(metric.timestamp, metric.asInt64())
_, pcieTx := s.aggregates.pcieTxTotal.Value()
s.aggregates.pcieTotal.tx.Update(metric.timestamp, metric.asInt64())
_, pcieTx := s.aggregates.pcieTotal.tx.Value()
s.mb.RecordGpuDcgmPcieIoDataPoint(now, pcieTx, metadata.AttributeNetworkIoDirectionTransmit)
}
if metric, ok := metrics["DCGM_FI_PROF_PCIE_RX_BYTES"]; ok {
s.aggregates.pcieRxTotal.Update(metric.timestamp, metric.asInt64())
_, pcieRx := s.aggregates.pcieRxTotal.Value()
s.aggregates.pcieTotal.rx.Update(metric.timestamp, metric.asInt64())
_, pcieRx := s.aggregates.pcieTotal.rx.Value()
s.mb.RecordGpuDcgmPcieIoDataPoint(now, pcieRx, metadata.AttributeNetworkIoDirectionReceive)
}
if metric, ok := metrics["DCGM_FI_PROF_NVLINK_TX_BYTES"]; ok {
s.aggregates.nvlinkTxTotal.Update(metric.timestamp, metric.asInt64())
_, nvlinkTx := s.aggregates.nvlinkTxTotal.Value()
s.aggregates.nvlinkTotal.tx.Update(metric.timestamp, metric.asInt64())
_, nvlinkTx := s.aggregates.nvlinkTotal.tx.Value()
s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, nvlinkTx, metadata.AttributeNetworkIoDirectionTransmit)
}
if metric, ok := metrics["DCGM_FI_PROF_NVLINK_RX_BYTES"]; ok {
s.aggregates.nvlinkRxTotal.Update(metric.timestamp, metric.asInt64())
_, nvlinkRx := s.aggregates.nvlinkRxTotal.Value()
s.aggregates.nvlinkTotal.rx.Update(metric.timestamp, metric.asInt64())
_, nvlinkRx := s.aggregates.nvlinkTotal.rx.Value()
s.mb.RecordGpuDcgmNvlinkIoDataPoint(now, nvlinkRx, metadata.AttributeNetworkIoDirectionReceive)
}
if metric, ok := metrics["DCGM_FI_DEV_TOTAL_ENERGY_CONSUMPTION"]; ok {
energyUsed := float64(metric.asInt64()) / 1e3 /* mJ to J */
s.aggregates.energyConsumption.total.Update(metric.timestamp, metric.asInt64())
_, value := s.aggregates.energyConsumption.total.Value()
energyUsed := float64(value) / 1e3 /* mJ to J */
s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, energyUsed)
} else if metric, ok := metrics["DCGM_FI_DEV_POWER_USAGE"]; ok { // fallback
s.aggregates.energyConsumptionFallback.Update(metric.timestamp, metric.asFloat64())
_, energyUsed := s.aggregates.energyConsumptionFallback.Value()
s.aggregates.energyConsumption.fallback.Update(metric.timestamp, metric.asFloat64())
_, energyUsed := s.aggregates.energyConsumption.fallback.Value()
s.mb.RecordGpuDcgmEnergyConsumptionDataPoint(now, energyUsed)
}
if metric, ok := metrics["DCGM_FI_DEV_GPU_TEMP"]; ok {
Expand All @@ -295,42 +329,62 @@ func (s *dcgmScraper) scrape(_ context.Context) (pmetric.Metrics, error) {
s.mb.RecordGpuDcgmClockFrequencyDataPoint(now, clockFreq)
}
if metric, ok := metrics["DCGM_FI_DEV_POWER_VIOLATION"]; ok {
violationTime := float64(metric.asInt64()) / 1e6 /* us to s */
s.aggregates.throttleDuration.powerViolation.Update(metric.timestamp, metric.asInt64())
_, value := s.aggregates.throttleDuration.powerViolation.Value()
violationTime := float64(value) / 1e6 /* us to s */
s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationPower)
}
if metric, ok := metrics["DCGM_FI_DEV_THERMAL_VIOLATION"]; ok {
violationTime := float64(metric.asInt64()) / 1e6 /* us to s */
s.aggregates.throttleDuration.thermalViolation.Update(metric.timestamp, metric.asInt64())
_, value := s.aggregates.throttleDuration.thermalViolation.Value()
violationTime := float64(value) / 1e6 /* us to s */
s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationThermal)
}
if metric, ok := metrics["DCGM_FI_DEV_SYNC_BOOST_VIOLATION"]; ok {
violationTime := float64(metric.asInt64()) / 1e6 /* us to s */
s.aggregates.throttleDuration.syncBoostViolation.Update(metric.timestamp, metric.asInt64())
_, value := s.aggregates.throttleDuration.syncBoostViolation.Value()
violationTime := float64(value) / 1e6 /* us to s */
s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationSyncBoost)
}
if metric, ok := metrics["DCGM_FI_DEV_BOARD_LIMIT_VIOLATION"]; ok {
violationTime := float64(metric.asInt64()) / 1e6 /* us to s */
s.aggregates.throttleDuration.boardLimitViolation.Update(metric.timestamp, metric.asInt64())
_, value := s.aggregates.throttleDuration.boardLimitViolation.Value()
violationTime := float64(value) / 1e6 /* us to s */
s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationBoardLimit)
}
if metric, ok := metrics["DCGM_FI_DEV_LOW_UTIL_VIOLATION"]; ok {
violationTime := float64(metric.asInt64()) / 1e6 /* us to s */
s.aggregates.throttleDuration.lowUtilViolation.Update(metric.timestamp, metric.asInt64())
_, value := s.aggregates.throttleDuration.lowUtilViolation.Value()
violationTime := float64(value) / 1e6 /* us to s */
s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationLowUtil)
}
if metric, ok := metrics["DCGM_FI_DEV_RELIABILITY_VIOLATION"]; ok {
violationTime := float64(metric.asInt64()) / 1e6 /* us to s */
s.aggregates.throttleDuration.reliabilityViolation.Update(metric.timestamp, metric.asInt64())
_, value := s.aggregates.throttleDuration.reliabilityViolation.Value()
violationTime := float64(value) / 1e6 /* us to s */
s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationReliability)
}
if metric, ok := metrics["DCGM_FI_DEV_TOTAL_APP_CLOCKS_VIOLATION"]; ok {
violationTime := float64(metric.asInt64()) / 1e6 /* us to s */
s.aggregates.throttleDuration.totalAppClocksViolation.Update(metric.timestamp, metric.asInt64())
_, value := s.aggregates.throttleDuration.totalAppClocksViolation.Value()
violationTime := float64(value) / 1e6 /* us to s */
s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationAppClock)
}
if metric, ok := metrics["DCGM_FI_DEV_TOTAL_BASE_CLOCKS_VIOLATION"]; ok {
violationTime := float64(metric.asInt64()) / 1e6 /* us to s */
s.aggregates.throttleDuration.totalBaseClocksViolation.Update(metric.timestamp, metric.asInt64())
_, value := s.aggregates.throttleDuration.totalBaseClocksViolation.Value()
violationTime := float64(value) / 1e6 /* us to s */
s.mb.RecordGpuDcgmClockThrottleDurationTimeDataPoint(now, violationTime, metadata.AttributeGpuClockViolationBaseClock)
}
if metric, ok := metrics["DCGM_FI_DEV_ECC_SBE_VOL_TOTAL"]; ok {
s.mb.RecordGpuDcgmEccErrorsDataPoint(now, metric.asInt64(), metadata.AttributeGpuErrorTypeSbe)
s.aggregates.eccTotal.sbe.Update(metric.timestamp, metric.asInt64())
_, sbeErrors := s.aggregates.eccTotal.sbe.Value()
s.mb.RecordGpuDcgmEccErrorsDataPoint(now, sbeErrors, metadata.AttributeGpuErrorTypeSbe)
}
if metric, ok := metrics["DCGM_FI_DEV_ECC_DBE_VOL_TOTAL"]; ok {
s.mb.RecordGpuDcgmEccErrorsDataPoint(now, metric.asInt64(), metadata.AttributeGpuErrorTypeDbe)
s.aggregates.eccTotal.dbe.Update(metric.timestamp, metric.asInt64())
_, dbeErrors := s.aggregates.eccTotal.dbe.Value()
s.mb.RecordGpuDcgmEccErrorsDataPoint(now, dbeErrors, metadata.AttributeGpuErrorTypeDbe)
}
// TODO: XID errors.
// s.mb.RecordGpuDcgmXidErrorsDataPoint(now, metric.asInt64(), xid)
Expand Down
35 changes: 35 additions & 0 deletions receiver/dcgmreceiver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,41 @@ func (i *rateIntegrator[V]) Value() (int64, V) {
return i.lastTimestamp, i.aggregatedRateUs / V(1e6)
}

// cumulativeTracker records cumulative values since last reset.
type cumulativeTracker[V int64 | float64] struct {
baseTimestamp int64
baseline V // the value seen at baseTimestamp.
lastTimestamp int64
lastValue V // the value seen at lastTimestamp.
}

func (i *cumulativeTracker[V]) Reset() {
i.baseTimestamp = 0
i.lastTimestamp = nowUnixMicro()
i.baseline = V(0)
i.lastValue = V(0)
}

func (i *cumulativeTracker[V]) Update(ts int64, v V) {
// On first update, record the value as the baseline.
if i.baseTimestamp == 0 {
i.baseTimestamp, i.baseline = ts, v
}
// Drop stale points.
if ts <= i.lastTimestamp {
return
}
i.lastTimestamp, i.lastValue = ts, v
}

func (i *cumulativeTracker[V]) Value() (int64, V) {
return i.lastTimestamp, i.lastValue - i.baseline
}

func (i *cumulativeTracker[V]) Baseline() (int64, V) {
return i.baseTimestamp, i.baseline
}

var (
errBlankValue = fmt.Errorf("unspecified blank value")
errDataNotFound = fmt.Errorf("data not found")
Expand Down
50 changes: 50 additions & 0 deletions receiver/dcgmreceiver/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,53 @@ func TestRateIntegratorInt64(t *testing.T) {
func TestRateIntegratorFloat64(t *testing.T) {
testRateIntegrator[float64](t)
}

func testCumulativeTracker[V int64 | float64](t *testing.T) {
origNowUnixMicro := nowUnixMicro
nowUnixMicro = func() int64 { return 10 }
defer func() { nowUnixMicro = origNowUnixMicro }()

type P struct {
ts int64
v V
}
p := func(ts int64, v V) P { return P{ts, v} }

var ct cumulativeTracker[V]

ct.Reset()
require.Equal(t, P{0, 0}, p(ct.Baseline()))
require.Equal(t, P{10, 0}, p(ct.Value()))
// Ensure first updates sets the baseline.
ct.Update(15, 50)
require.Equal(t, P{15, 50}, p(ct.Baseline()))
assert.Equal(t, P{15, 0}, p(ct.Value()))
// Ensure updates affect values, but not the baseline.
ct.Update(20, 80)
assert.Equal(t, P{15, 50}, p(ct.Baseline()))
assert.Equal(t, P{20, 30}, p(ct.Value()))
// Ensure stale points are ignored.
ct.Update(18, 1e8)
assert.Equal(t, P{20, 30}, p(ct.Value()))
ct.Update(20, 1e8)
assert.Equal(t, P{20, 30}, p(ct.Value()))
// Ensure updates affect values.
ct.Update(25, 100)
assert.Equal(t, P{25, 50}, p(ct.Value()))
// Ensure same inputs don't affect values.
ct.Update(30, 100)
assert.Equal(t, P{30, 50}, p(ct.Value()))

// Ensure the value and baseline are cleared on reset.
ct.Reset()
assert.Equal(t, P{0, 0}, p(ct.Baseline()))
assert.Equal(t, P{10, 0}, p(ct.Value()))
}

func TestCumulativeTrackerInt64(t *testing.T) {
testCumulativeTracker[int64](t)
}

func TestCumulativeTrackerFloat64(t *testing.T) {
testCumulativeTracker[float64](t)
}

0 comments on commit b00e858

Please sign in to comment.