From b299e6837734edaa75647a7b4103e562072cc4f9 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Thu, 3 Oct 2024 16:24:58 -0400 Subject: [PATCH] [chore][graph] Remodel node id as attribute sets --- service/internal/graph/attribute/attribute.go | 99 +++++++++++++++++ .../graph/attribute/attribute_test.go | 104 ++++++++++++++++++ service/internal/graph/capabilities.go | 7 +- service/internal/graph/connector.go | 7 +- service/internal/graph/exporter.go | 7 +- service/internal/graph/fanout.go | 7 +- service/internal/graph/graph_test.go | 18 +-- service/internal/graph/node.go | 21 ---- service/internal/graph/processor.go | 7 +- service/internal/graph/reciever.go | 7 +- service/internal/graph/util_test.go | 5 +- 11 files changed, 234 insertions(+), 55 deletions(-) create mode 100644 service/internal/graph/attribute/attribute.go create mode 100644 service/internal/graph/attribute/attribute_test.go delete mode 100644 service/internal/graph/node.go diff --git a/service/internal/graph/attribute/attribute.go b/service/internal/graph/attribute/attribute.go new file mode 100644 index 00000000000..7a0b423aaf3 --- /dev/null +++ b/service/internal/graph/attribute/attribute.go @@ -0,0 +1,99 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package attribute // import "go.opentelemetry.io/collector/service/internal/graph/attribute" + +import ( + "fmt" + "hash/fnv" + + "go.opentelemetry.io/otel/attribute" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pipeline" +) + +const ( + signalKey = "otel.signal" + componentIDKey = "otel.component.id" + pipelineIDKey = "otel.pipeline.id" + componentKindKey = "otel.component.kind" + + receiverKind = "receiver" + processorKind = "processor" + exporterKind = "exporter" + connectorKind = "connector" + capabiltiesKind = "capabilities" + fanoutKind = "fanout" +) + +type Attributes struct { + set attribute.Set + id int64 +} + +func newAttributes(attrs ...attribute.KeyValue) *Attributes { + h := fnv.New64a() + for _, kv := range attrs { + h.Write([]byte("(" + string(kv.Key) + "|" + kv.Value.AsString() + ")")) + } + return &Attributes{ + set: attribute.NewSet(attrs...), + id: int64(h.Sum64()), + } +} + +func (a Attributes) Attributes() *attribute.Set { + return &a.set +} + +func (a Attributes) ID() int64 { + return a.id +} + +func Receiver(pipelineType pipeline.Signal, id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, receiverKind), + attribute.String(signalKey, pipelineType.String()), + attribute.String(componentIDKey, id.String()), + ) +} + +func Processor(pipelineID pipeline.ID, id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, processorKind), + attribute.String(signalKey, pipelineID.Signal().String()), + attribute.String(pipelineIDKey, pipelineID.String()), + attribute.String(componentIDKey, id.String()), + ) +} + +func Exporter(pipelineType pipeline.Signal, id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, exporterKind), + attribute.String(signalKey, pipelineType.String()), + attribute.String(componentIDKey, id.String()), + ) +} + +func Connector(exprPipelineType, rcvrPipelineType pipeline.Signal, id component.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, connectorKind), + attribute.String(signalKey, fmt.Sprintf("%s_to_%s", exprPipelineType.String(), rcvrPipelineType.String())), + attribute.String(componentIDKey, id.String()), + ) +} + +func Capabilities(pipelineID pipeline.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, capabiltiesKind), + attribute.String(pipelineIDKey, pipelineID.String()), + ) +} + +func Fanout(pipelineID pipeline.ID) *Attributes { + return newAttributes( + attribute.String(componentKindKey, fanoutKind), + attribute.String(pipelineIDKey, pipelineID.String()), + ) +} diff --git a/service/internal/graph/attribute/attribute_test.go b/service/internal/graph/attribute/attribute_test.go new file mode 100644 index 00000000000..7310e97e41a --- /dev/null +++ b/service/internal/graph/attribute/attribute_test.go @@ -0,0 +1,104 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package attribute + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentprofiles" + "go.opentelemetry.io/collector/pipeline" +) + +var ( + signals = []pipeline.Signal{ + pipeline.SignalTraces, + pipeline.SignalMetrics, + pipeline.SignalLogs, + componentprofiles.SignalProfiles, + } + + cIDs = []component.ID{ + component.MustNewID("foo"), + component.MustNewID("foo2"), + component.MustNewID("bar"), + } + + pIDs = []pipeline.ID{ + pipeline.MustNewID("traces"), + pipeline.MustNewIDWithName("traces", "2"), + pipeline.MustNewID("metrics"), + pipeline.MustNewIDWithName("metrics", "2"), + pipeline.MustNewID("logs"), + pipeline.MustNewIDWithName("logs", "2"), + pipeline.MustNewID("profiles"), + pipeline.MustNewIDWithName("profiles", "2"), + } +) + +func TestAttributes(t *testing.T) { + // The sets are created independently but should be exactly equivalent. + // We will ensure that corresponding elements are equal and that + // non-corresponding elements are not equal. + setI, setJ := createExampleSets(), createExampleSets() + for i, ei := range setI { + for j, ej := range setJ { + if i == j { + require.Equal(t, ei.ID(), ej.ID()) + require.True(t, ei.Attributes().Equals(ej.Attributes())) + } else { + require.NotEqual(t, ei.ID(), ej.ID()) + require.False(t, ei.Attributes().Equals(ej.Attributes())) + } + } + } +} + +func createExampleSets() []*Attributes { + sets := []*Attributes{} + + // Receiver examples. + for _, sig := range signals { + for _, id := range cIDs { + sets = append(sets, Receiver(sig, id)) + } + } + + // Processor examples. + for _, pID := range pIDs { + for _, cID := range cIDs { + sets = append(sets, Processor(pID, cID)) + } + } + + // Exporter examples. + for _, sig := range signals { + for _, id := range cIDs { + sets = append(sets, Exporter(sig, id)) + } + } + + // Connector examples. + for _, exprSig := range signals { + for _, rcvrSig := range signals { + for _, id := range cIDs { + sets = append(sets, Connector(exprSig, rcvrSig, id)) + } + } + } + + // Capabilities examples. + for _, pID := range pIDs { + sets = append(sets, Capabilities(pID)) + } + + // Fanout examples. + for _, pID := range pIDs { + sets = append(sets, Fanout(pID)) + } + + return sets +} diff --git a/service/internal/graph/capabilities.go b/service/internal/graph/capabilities.go index 8a16ae67853..2f31e0c0d10 100644 --- a/service/internal/graph/capabilities.go +++ b/service/internal/graph/capabilities.go @@ -7,10 +7,9 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumerprofiles" "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/service/internal/graph/attribute" ) -const capabilitiesSeed = "capabilities" - var _ consumerNode = (*capabilitiesNode)(nil) // Every pipeline has a "virtual" capabilities node immediately after the receiver(s). @@ -19,7 +18,7 @@ var _ consumerNode = (*capabilitiesNode)(nil) // 2. Present a consistent "first consumer" for each pipeline. // The nodeID is derived from "pipeline ID". type capabilitiesNode struct { - nodeID + *attribute.Attributes pipelineID pipeline.ID baseConsumer consumer.ConsumeTracesFunc @@ -30,7 +29,7 @@ type capabilitiesNode struct { func newCapabilitiesNode(pipelineID pipeline.ID) *capabilitiesNode { return &capabilitiesNode{ - nodeID: newNodeID(capabilitiesSeed, pipelineID.String()), + Attributes: attribute.Capabilities(pipelineID), pipelineID: pipelineID, } } diff --git a/service/internal/graph/connector.go b/service/internal/graph/connector.go index d9ad44f885c..dc3ec040fa9 100644 --- a/service/internal/graph/connector.go +++ b/service/internal/graph/connector.go @@ -16,16 +16,15 @@ import ( "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/capabilityconsumer" "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/graph/attribute" ) -const connectorSeed = "connector" - var _ consumerNode = (*connectorNode)(nil) // A connector instance connects one pipeline type to one other pipeline type. // Therefore, nodeID is derived from "exporter pipeline type", "receiver pipeline type", and "component ID". type connectorNode struct { - nodeID + *attribute.Attributes componentID component.ID exprPipelineType pipeline.Signal rcvrPipelineType pipeline.Signal @@ -35,7 +34,7 @@ type connectorNode struct { func newConnectorNode(exprPipelineType, rcvrPipelineType pipeline.Signal, connID component.ID) *connectorNode { return &connectorNode{ - nodeID: newNodeID(connectorSeed, connID.String(), exprPipelineType.String(), rcvrPipelineType.String()), + Attributes: attribute.Connector(exprPipelineType, rcvrPipelineType, connID), componentID: connID, exprPipelineType: exprPipelineType, rcvrPipelineType: rcvrPipelineType, diff --git a/service/internal/graph/exporter.go b/service/internal/graph/exporter.go index 04532a81992..1213173856c 100644 --- a/service/internal/graph/exporter.go +++ b/service/internal/graph/exporter.go @@ -13,16 +13,15 @@ import ( "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/graph/attribute" ) -const exporterSeed = "exporter" - var _ consumerNode = (*exporterNode)(nil) // An exporter instance can be shared by multiple pipelines of the same type. // Therefore, nodeID is derived from "pipeline type" and "component ID". type exporterNode struct { - nodeID + *attribute.Attributes componentID component.ID pipelineType pipeline.Signal component.Component @@ -30,7 +29,7 @@ type exporterNode struct { func newExporterNode(pipelineType pipeline.Signal, exprID component.ID) *exporterNode { return &exporterNode{ - nodeID: newNodeID(exporterSeed, pipelineType.String(), exprID.String()), + Attributes: attribute.Exporter(pipelineType, exprID), componentID: exprID, pipelineType: pipelineType, } diff --git a/service/internal/graph/fanout.go b/service/internal/graph/fanout.go index 13c8d4ad1c5..7d1a76f086f 100644 --- a/service/internal/graph/fanout.go +++ b/service/internal/graph/fanout.go @@ -5,23 +5,22 @@ package graph // import "go.opentelemetry.io/collector/service/internal/graph" import ( "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/service/internal/graph/attribute" ) -const fanOutToExporters = "fanout_to_exporters" - var _ consumerNode = (*fanOutNode)(nil) // Each pipeline has one fan-out node before exporters. // Therefore, nodeID is derived from "pipeline ID". type fanOutNode struct { - nodeID + *attribute.Attributes pipelineID pipeline.ID baseConsumer } func newFanOutNode(pipelineID pipeline.ID) *fanOutNode { return &fanOutNode{ - nodeID: newNodeID(fanOutToExporters, pipelineID.String()), + Attributes: attribute.Fanout(pipelineID), pipelineID: pipelineID, } } diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index 6c4dc61307a..fd995f31f0a 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -2101,11 +2101,11 @@ func TestGraphBuildErrors(t *testing.T) { }, }, expected: `cycle detected: ` + - `connector "nop/conn1" (traces to traces) -> ` + - `processor "nop" in pipeline "traces/2" -> ` + `connector "nop/conn" (traces to traces) -> ` + `processor "nop" in pipeline "traces/1" -> ` + - `connector "nop/conn1" (traces to traces)`, + `connector "nop/conn1" (traces to traces) -> ` + + `processor "nop" in pipeline "traces/2" -> ` + + `connector "nop/conn" (traces to traces)`, }, { name: "not_allowed_deep_cycle_metrics.yaml", @@ -2191,11 +2191,11 @@ func TestGraphBuildErrors(t *testing.T) { }, }, expected: `cycle detected: ` + - `connector "nop/conn1" (logs to logs) -> ` + - `processor "nop" in pipeline "logs/2" -> ` + `connector "nop/conn" (logs to logs) -> ` + `processor "nop" in pipeline "logs/1" -> ` + - `connector "nop/conn1" (logs to logs)`, + `connector "nop/conn1" (logs to logs) -> ` + + `processor "nop" in pipeline "logs/2" -> ` + + `connector "nop/conn" (logs to logs)`, }, { name: "not_allowed_deep_cycle_profiles.yaml", @@ -2297,13 +2297,13 @@ func TestGraphBuildErrors(t *testing.T) { }, }, expected: `cycle detected: ` + + `connector "nop/forkagain" (traces to traces) -> ` + + `processor "nop" in pipeline "traces/copy2b" -> ` + `connector "nop/rawlog" (traces to logs) -> ` + `processor "nop" in pipeline "logs/raw" -> ` + `connector "nop/fork" (logs to traces) -> ` + `processor "nop" in pipeline "traces/copy2" -> ` + - `connector "nop/forkagain" (traces to traces) -> ` + - `processor "nop" in pipeline "traces/copy2b" -> ` + - `connector "nop/rawlog" (traces to logs)`, + `connector "nop/forkagain" (traces to traces)`, }, { name: "unknown_exporter_config", diff --git a/service/internal/graph/node.go b/service/internal/graph/node.go deleted file mode 100644 index 0e17bb74bf5..00000000000 --- a/service/internal/graph/node.go +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package graph // import "go.opentelemetry.io/collector/service/internal/graph" - -import ( - "hash/fnv" - "strings" -) - -type nodeID int64 - -func (n nodeID) ID() int64 { - return int64(n) -} - -func newNodeID(parts ...string) nodeID { - h := fnv.New64a() - h.Write([]byte(strings.Join(parts, "|"))) - return nodeID(h.Sum64()) -} diff --git a/service/internal/graph/processor.go b/service/internal/graph/processor.go index a20e8e8dfd8..e2438d7f030 100644 --- a/service/internal/graph/processor.go +++ b/service/internal/graph/processor.go @@ -15,16 +15,15 @@ import ( "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/graph/attribute" ) -const processorSeed = "processor" - var _ consumerNode = (*processorNode)(nil) // Every processor instance is unique to one pipeline. // Therefore, nodeID is derived from "pipeline ID" and "component ID". type processorNode struct { - nodeID + *attribute.Attributes componentID component.ID pipelineID pipeline.ID component.Component @@ -32,7 +31,7 @@ type processorNode struct { func newProcessorNode(pipelineID pipeline.ID, procID component.ID) *processorNode { return &processorNode{ - nodeID: newNodeID(processorSeed, pipelineID.String(), procID.String()), + Attributes: attribute.Processor(pipelineID, procID), componentID: procID, pipelineID: pipelineID, } diff --git a/service/internal/graph/reciever.go b/service/internal/graph/reciever.go index e462671bfc7..ce751b095d9 100644 --- a/service/internal/graph/reciever.go +++ b/service/internal/graph/reciever.go @@ -16,14 +16,13 @@ import ( "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/graph/attribute" ) -const receiverSeed = "receiver" - // A receiver instance can be shared by multiple pipelines of the same type. // Therefore, nodeID is derived from "pipeline type" and "component ID". type receiverNode struct { - nodeID + *attribute.Attributes componentID component.ID pipelineType pipeline.Signal component.Component @@ -31,7 +30,7 @@ type receiverNode struct { func newReceiverNode(pipelineType pipeline.Signal, recvID component.ID) *receiverNode { return &receiverNode{ - nodeID: newNodeID(receiverSeed, pipelineType.String(), recvID.String()), + Attributes: attribute.Receiver(pipelineType, recvID), componentID: recvID, pipelineType: pipelineType, } diff --git a/service/internal/graph/util_test.go b/service/internal/graph/util_test.go index 43f1e64bb6c..bf62f3d5b94 100644 --- a/service/internal/graph/util_test.go +++ b/service/internal/graph/util_test.go @@ -6,6 +6,7 @@ package graph import ( "context" "errors" + "hash/fnv" "sync" "go.opentelemetry.io/collector/component" @@ -36,7 +37,9 @@ type testNode struct { // ID satisfies the graph.Node interface, allowing // testNode to be used in a simple.DirectedGraph func (n *testNode) ID() int64 { - return int64(newNodeID(n.id.String())) + h := fnv.New64a() + h.Write([]byte(n.id.String())) + return int64(h.Sum64()) } func (n *testNode) Start(ctx context.Context, _ component.Host) error {