Skip to content

Commit

Permalink
[chore] Add Beholder custom event for Workflows and Capabilities (#854)
Browse files Browse the repository at this point in the history
* [chore] Add Beholder custom event for Workflows and Capabilities

* [chore] Add Beholder custom event for Workflows and Capabilities
  • Loading branch information
cedric-cordenier authored Oct 16, 2024
1 parent 935b2ee commit 87939ad
Show file tree
Hide file tree
Showing 5 changed files with 534 additions and 0 deletions.
199 changes: 199 additions & 0 deletions pkg/capabilities/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package events

import (
"context"
"errors"
"fmt"

"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/events/pb"
"github.com/smartcontractkit/chainlink-common/pkg/values"
)

const (
// Duplicates the attributes in beholder/message.go::Metadata
labelWorkflowOwner = "workflow_owner_address"
labelWorkflowID = "workflow_id"
labelWorkflowExecutionID = "workflow_execution_id"
labelWorkflowName = "workflow_name"
labelCapabilityContractAddress = "capability_contract_address"
labelCapabilityID = "capability_id"
labelCapabilityVersion = "capability_version"
labelCapabilityName = "capability_name"
)

type EmitMetadata struct {
WorkflowOwner string // required
WorkflowID string // required
WorkflowName string // required

WorkflowExecutionID string // optional
CapabilityContractAddress string // optional
CapabilityID string // optional
CapabilityVersion string // optional
CapabilityName string // optional
}

func (e EmitMetadata) merge(otherE EmitMetadata) EmitMetadata {
owner := e.WorkflowOwner
if otherE.WorkflowOwner != "" {
owner = otherE.WorkflowOwner
}

wid := e.WorkflowID
if otherE.WorkflowID != "" {
wid = otherE.WorkflowID
}

eid := e.WorkflowExecutionID
if otherE.WorkflowExecutionID != "" {
eid = otherE.WorkflowExecutionID
}

name := e.WorkflowName
if otherE.WorkflowName != "" {
name = otherE.WorkflowName
}

addr := e.CapabilityContractAddress
if otherE.CapabilityContractAddress != "" {
addr = otherE.CapabilityContractAddress
}

capID := e.CapabilityID
if otherE.CapabilityID != "" {
capID = otherE.CapabilityID
}

capVersion := e.CapabilityVersion
if otherE.CapabilityVersion != "" {
capVersion = otherE.CapabilityVersion
}

capName := e.CapabilityName
if otherE.CapabilityName != "" {
capName = otherE.CapabilityName
}

return EmitMetadata{
WorkflowOwner: owner,
WorkflowID: wid,
WorkflowExecutionID: eid,
WorkflowName: name,
CapabilityContractAddress: addr,
CapabilityID: capID,
CapabilityVersion: capVersion,
CapabilityName: capName,
}
}

func (e EmitMetadata) attrs() []any {
a := []any{}

if e.WorkflowOwner != "" {
a = append(a, labelWorkflowOwner, e.WorkflowOwner)
}

if e.WorkflowID != "" {
a = append(a, labelWorkflowID, e.WorkflowID)
}

if e.WorkflowExecutionID != "" {
a = append(a, labelWorkflowExecutionID, e.WorkflowExecutionID)
}

if e.WorkflowName != "" {
a = append(a, labelWorkflowName, e.WorkflowName)
}

if e.CapabilityContractAddress != "" {
a = append(a, labelCapabilityContractAddress, e.CapabilityContractAddress)
}

if e.CapabilityID != "" {
a = append(a, labelCapabilityID, e.CapabilityID)
}

if e.CapabilityVersion != "" {
a = append(a, labelCapabilityVersion, e.CapabilityVersion)
}

if e.CapabilityName != "" {
a = append(a, labelCapabilityName, e.CapabilityName)
}

return a
}

type Emitter struct {
client beholder.Emitter
md EmitMetadata
}

func (e *Emitter) With(md EmitMetadata) *Emitter {
nmd := e.md.merge(md)
return &Emitter{
client: e.client,
md: nmd,
}
}

func NewEmitter() *Emitter {
return &Emitter{
client: beholder.GetClient().Emitter,
}
}

type Message struct {
Msg string
Labels map[string]any
Metadata EmitMetadata
}

func (e *Emitter) Emit(ctx context.Context, msg Message) error {
nmd := e.md.merge(msg.Metadata)

if nmd.WorkflowOwner == "" {
return errors.New("must provide workflow owner to emit event")
}

if nmd.WorkflowID == "" {
return errors.New("must provide workflow id to emit event")
}

if nmd.WorkflowName == "" {
return errors.New("must provide workflow name to emit event")
}

wm, err := values.WrapMap(msg.Labels)
if err != nil {
return fmt.Errorf("could not wrap map: %w", err)
}

pm := values.ProtoMap(wm)

bytes, err := proto.Marshal(&pb.OperationalEvent{
Labels: pm,
Message: msg.Msg,
})
if err != nil {
return fmt.Errorf("could not marshal operational event: %w", err)
}

attrs := []any{
"beholder_data_schema",
"/capabilities-operational-event/versions/1",
"beholder_data_type",
"custom_message",
}

attrs = append(attrs, nmd.attrs()...)

return e.client.Emit(
ctx,
bytes,
attrs...,
)
}
88 changes: 88 additions & 0 deletions pkg/capabilities/events/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package events

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities/events/pb"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
)

type testEmitter struct {
payload []byte
attrs []any
}

func (t *testEmitter) Emit(ctx context.Context, payload []byte, attrKVs ...any) error {
t.payload = payload
t.attrs = attrKVs
return nil
}

func TestEmitter(t *testing.T) {
client := &testEmitter{}
emitter := &Emitter{client: client}
msg := "a message"

message := Message{
Msg: msg,
}
err := emitter.Emit(tests.Context(t), message)
assert.ErrorContains(t, err, "must provide workflow owner")

message.Metadata.WorkflowOwner = "owner"
err = emitter.Emit(tests.Context(t), message)
assert.ErrorContains(t, err, "must provide workflow id")

message.Metadata.WorkflowID = "id"
err = emitter.Emit(tests.Context(t), message)
assert.ErrorContains(t, err, "must provide workflow name")

message.Metadata.WorkflowName = "name"
err = emitter.Emit(tests.Context(t), message)
require.NoError(t, err)

event := &pb.OperationalEvent{}
err = proto.Unmarshal(client.payload, event)
require.NoError(t, err)

assert.Equal(t, event.Message, msg)
}

func assertHasKey(t *testing.T, attrs []any, keyName, keyValue string) {
for i, a := range attrs {
if a.(string) == keyName {
assert.Equal(t, attrs[i+1].(string), keyValue)
return
}
}

assert.FailNow(t, fmt.Sprintf("could not find keyName %s in attrs", keyName))
}

func TestEmitter_WithMetadata(t *testing.T) {
client := &testEmitter{}
emitter := &Emitter{client: client}
emitter = emitter.With(EmitMetadata{
WorkflowOwner: "owner",
WorkflowID: "id",
WorkflowName: "name",
})
msg := "a message"

message := Message{
Msg: msg,
}
err := emitter.Emit(tests.Context(t), message)
require.NoError(t, err)

fmt.Printf("%+v", client.attrs)
assertHasKey(t, client.attrs, "workflow_owner_address", "owner")
assertHasKey(t, client.attrs, "workflow_id", "id")
assertHasKey(t, client.attrs, "workflow_name", "name")
}
Loading

0 comments on commit 87939ad

Please sign in to comment.