Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][graph] Remodel node id as attribute sets #11344

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions service/internal/graph/attribute/attribute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package attribute // import "go.opentelemetry.io/collector/service/internal/graph/attribute"

import (
"fmt"
"hash/fnv"

"go.opentelemetry.io/otel/attribute"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pipeline"
)

const (
componentKindKey = "otelcol.component.kind"
componentIDKey = "otelcol.component.id"
pipelineIDKey = "otelcol.pipeline.id"
signalKey = "otelcol.signal"
signalOutputKey = "otelcol.signal.output"

receiverKind = "receiver"
processorKind = "processor"
exporterKind = "exporter"
connectorKind = "connector"
capabiltiesKind = "capabilities"
fanoutKind = "fanout"
)

type Attributes struct {
set attribute.Set
id int64
}

func newAttributes(attrs ...attribute.KeyValue) *Attributes {
h := fnv.New64a()
for _, kv := range attrs {
h.Write([]byte("(" + string(kv.Key) + "|" + kv.Value.AsString() + ")"))
}
return &Attributes{
set: attribute.NewSet(attrs...),
id: int64(h.Sum64()), // #nosec G115
}
}

func (a Attributes) Attributes() *attribute.Set {
return &a.set
}

func (a Attributes) ID() int64 {
return a.id
}

func Receiver(pipelineType pipeline.Signal, id component.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, receiverKind),
attribute.String(signalKey, pipelineType.String()),
attribute.String(componentIDKey, id.String()),
)
}

func Processor(pipelineID pipeline.ID, id component.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, processorKind),
attribute.String(signalKey, pipelineID.Signal().String()),
attribute.String(pipelineIDKey, pipelineID.String()),
attribute.String(componentIDKey, id.String()),
)
}

func Exporter(pipelineType pipeline.Signal, id component.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, exporterKind),
attribute.String(signalKey, pipelineType.String()),
attribute.String(componentIDKey, id.String()),
)
}

func Connector(exprPipelineType, rcvrPipelineType pipeline.Signal, id component.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, connectorKind),
attribute.String(signalKey, fmt.Sprintf("%s_to_%s", exprPipelineType.String(), rcvrPipelineType.String())),
attribute.String(componentIDKey, id.String()),
)
}

func Capabilities(pipelineID pipeline.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, capabiltiesKind),
attribute.String(pipelineIDKey, pipelineID.String()),
)
}

func Fanout(pipelineID pipeline.ID) *Attributes {
return newAttributes(
attribute.String(componentKindKey, fanoutKind),
attribute.String(pipelineIDKey, pipelineID.String()),
)
}
104 changes: 104 additions & 0 deletions service/internal/graph/attribute/attribute_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package attribute

import (
"testing"

"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/pipeline/pipelineprofiles"
)

var (
signals = []pipeline.Signal{
pipeline.SignalTraces,
pipeline.SignalMetrics,
pipeline.SignalLogs,
pipelineprofiles.SignalProfiles,
}

cIDs = []component.ID{
component.MustNewID("foo"),
component.MustNewID("foo2"),
component.MustNewID("bar"),
}

pIDs = []pipeline.ID{
pipeline.MustNewID("traces"),
pipeline.MustNewIDWithName("traces", "2"),
pipeline.MustNewID("metrics"),
pipeline.MustNewIDWithName("metrics", "2"),
pipeline.MustNewID("logs"),
pipeline.MustNewIDWithName("logs", "2"),
pipeline.MustNewID("profiles"),
pipeline.MustNewIDWithName("profiles", "2"),
}
)

func TestAttributes(t *testing.T) {
// The sets are created independently but should be exactly equivalent.
// We will ensure that corresponding elements are equal and that
// non-corresponding elements are not equal.
setI, setJ := createExampleSets(), createExampleSets()
for i, ei := range setI {
for j, ej := range setJ {
if i == j {
require.Equal(t, ei.ID(), ej.ID())
require.True(t, ei.Attributes().Equals(ej.Attributes()))
} else {
require.NotEqual(t, ei.ID(), ej.ID())
require.False(t, ei.Attributes().Equals(ej.Attributes()))
}
}
}
}

func createExampleSets() []*Attributes {
sets := []*Attributes{}

// Receiver examples.
for _, sig := range signals {
for _, id := range cIDs {
sets = append(sets, Receiver(sig, id))
}
}

// Processor examples.
for _, pID := range pIDs {
for _, cID := range cIDs {
sets = append(sets, Processor(pID, cID))
}
}

// Exporter examples.
for _, sig := range signals {
for _, id := range cIDs {
sets = append(sets, Exporter(sig, id))
}
}

// Connector examples.
for _, exprSig := range signals {
for _, rcvrSig := range signals {
for _, id := range cIDs {
sets = append(sets, Connector(exprSig, rcvrSig, id))
}
}
}

// Capabilities examples.
for _, pID := range pIDs {
sets = append(sets, Capabilities(pID))
}

// Fanout examples.
for _, pID := range pIDs {
sets = append(sets, Fanout(pID))
}

return sets
}
7 changes: 3 additions & 4 deletions service/internal/graph/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/xconsumer"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/service/internal/graph/attribute"
)

const capabilitiesSeed = "capabilities"

var _ consumerNode = (*capabilitiesNode)(nil)

// Every pipeline has a "virtual" capabilities node immediately after the receiver(s).
Expand All @@ -19,7 +18,7 @@ var _ consumerNode = (*capabilitiesNode)(nil)
// 2. Present a consistent "first consumer" for each pipeline.
// The nodeID is derived from "pipeline ID".
type capabilitiesNode struct {
nodeID
*attribute.Attributes
pipelineID pipeline.ID
baseConsumer
consumer.ConsumeTracesFunc
Expand All @@ -30,7 +29,7 @@ type capabilitiesNode struct {

func newCapabilitiesNode(pipelineID pipeline.ID) *capabilitiesNode {
return &capabilitiesNode{
nodeID: newNodeID(capabilitiesSeed, pipelineID.String()),
Attributes: attribute.Capabilities(pipelineID),
pipelineID: pipelineID,
}
}
Expand Down
7 changes: 3 additions & 4 deletions service/internal/graph/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ import (
"go.opentelemetry.io/collector/service/internal/builders"
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
"go.opentelemetry.io/collector/service/internal/components"
"go.opentelemetry.io/collector/service/internal/graph/attribute"
)

const connectorSeed = "connector"

var _ consumerNode = (*connectorNode)(nil)

// A connector instance connects one pipeline type to one other pipeline type.
// Therefore, nodeID is derived from "exporter pipeline type", "receiver pipeline type", and "component ID".
type connectorNode struct {
nodeID
*attribute.Attributes
componentID component.ID
exprPipelineType pipeline.Signal
rcvrPipelineType pipeline.Signal
Expand All @@ -34,7 +33,7 @@ type connectorNode struct {

func newConnectorNode(exprPipelineType, rcvrPipelineType pipeline.Signal, connID component.ID) *connectorNode {
return &connectorNode{
nodeID: newNodeID(connectorSeed, connID.String(), exprPipelineType.String(), rcvrPipelineType.String()),
Attributes: attribute.Connector(exprPipelineType, rcvrPipelineType, connID),
componentID: connID,
exprPipelineType: exprPipelineType,
rcvrPipelineType: rcvrPipelineType,
Expand Down
7 changes: 3 additions & 4 deletions service/internal/graph/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,23 @@ import (
"go.opentelemetry.io/collector/pipeline/pipelineprofiles"
"go.opentelemetry.io/collector/service/internal/builders"
"go.opentelemetry.io/collector/service/internal/components"
"go.opentelemetry.io/collector/service/internal/graph/attribute"
)

const exporterSeed = "exporter"

var _ consumerNode = (*exporterNode)(nil)

// An exporter instance can be shared by multiple pipelines of the same type.
// Therefore, nodeID is derived from "pipeline type" and "component ID".
type exporterNode struct {
nodeID
*attribute.Attributes
componentID component.ID
pipelineType pipeline.Signal
component.Component
}

func newExporterNode(pipelineType pipeline.Signal, exprID component.ID) *exporterNode {
return &exporterNode{
nodeID: newNodeID(exporterSeed, pipelineType.String(), exprID.String()),
Attributes: attribute.Exporter(pipelineType, exprID),
componentID: exprID,
pipelineType: pipelineType,
}
Expand Down
7 changes: 3 additions & 4 deletions service/internal/graph/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,22 @@ package graph // import "go.opentelemetry.io/collector/service/internal/graph"

import (
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/service/internal/graph/attribute"
)

const fanOutToExporters = "fanout_to_exporters"

var _ consumerNode = (*fanOutNode)(nil)

// Each pipeline has one fan-out node before exporters.
// Therefore, nodeID is derived from "pipeline ID".
type fanOutNode struct {
nodeID
*attribute.Attributes
pipelineID pipeline.ID
baseConsumer
}

func newFanOutNode(pipelineID pipeline.ID) *fanOutNode {
return &fanOutNode{
nodeID: newNodeID(fanOutToExporters, pipelineID.String()),
Attributes: attribute.Fanout(pipelineID),
pipelineID: pipelineID,
}
}
Expand Down
18 changes: 9 additions & 9 deletions service/internal/graph/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2067,11 +2067,11 @@ func TestGraphBuildErrors(t *testing.T) {
},
},
expected: `cycle detected: ` +
`connector "nop/conn1" (traces to traces) -> ` +
`processor "nop" in pipeline "traces/2" -> ` +
`connector "nop/conn" (traces to traces) -> ` +
`processor "nop" in pipeline "traces/1" -> ` +
`connector "nop/conn1" (traces to traces)`,
`connector "nop/conn1" (traces to traces) -> ` +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the change in test here?

Copy link
Member Author

@djaglowski djaglowski Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's really just a change in where the start of the cycle is reported, but it still reports the same cycle deterministically.

What it comes down to is that this PR changes the way node IDs are generated. Previously, we built a particualr string for each kind of component. Now we build the ID from the attribute set. Both methods are deterministic, but the graph presumably contains some logic where the node ids are considered in an order that is sensitive to their values (e.g. sort the ids, then for each one, follow the edges, etc). The logical graph that is produced is the same, but when a cycle is encountered, the order in which it is reported is affected by the order in which nodes were considered.

`processor "nop" in pipeline "traces/2" -> ` +
`connector "nop/conn" (traces to traces)`,
},
{
name: "not_allowed_deep_cycle_metrics.yaml",
Expand Down Expand Up @@ -2157,11 +2157,11 @@ func TestGraphBuildErrors(t *testing.T) {
},
},
expected: `cycle detected: ` +
`connector "nop/conn1" (logs to logs) -> ` +
`processor "nop" in pipeline "logs/2" -> ` +
`connector "nop/conn" (logs to logs) -> ` +
`processor "nop" in pipeline "logs/1" -> ` +
`connector "nop/conn1" (logs to logs)`,
`connector "nop/conn1" (logs to logs) -> ` +
`processor "nop" in pipeline "logs/2" -> ` +
`connector "nop/conn" (logs to logs)`,
},
{
name: "not_allowed_deep_cycle_profiles.yaml",
Expand Down Expand Up @@ -2263,13 +2263,13 @@ func TestGraphBuildErrors(t *testing.T) {
},
},
expected: `cycle detected: ` +
`connector "nop/forkagain" (traces to traces) -> ` +
`processor "nop" in pipeline "traces/copy2b" -> ` +
`connector "nop/rawlog" (traces to logs) -> ` +
`processor "nop" in pipeline "logs/raw" -> ` +
`connector "nop/fork" (logs to traces) -> ` +
`processor "nop" in pipeline "traces/copy2" -> ` +
`connector "nop/forkagain" (traces to traces) -> ` +
`processor "nop" in pipeline "traces/copy2b" -> ` +
`connector "nop/rawlog" (traces to logs)`,
`connector "nop/forkagain" (traces to traces)`,
},
{
name: "unknown_exporter_config",
Expand Down
22 changes: 0 additions & 22 deletions service/internal/graph/node.go

This file was deleted.

Loading
Loading