diff --git a/pkg/capabilities/events/events.go b/pkg/capabilities/events/events.go new file mode 100644 index 000000000..81503b42b --- /dev/null +++ b/pkg/capabilities/events/events.go @@ -0,0 +1,199 @@ +package events + +import ( + "context" + "errors" + "fmt" + + "google.golang.org/protobuf/proto" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/events/pb" + "github.com/smartcontractkit/chainlink-common/pkg/values" +) + +const ( + // Duplicates the attributes in beholder/message.go::Metadata + labelWorkflowOwner = "workflow_owner_address" + labelWorkflowID = "workflow_id" + labelWorkflowExecutionID = "workflow_execution_id" + labelWorkflowName = "workflow_name" + labelCapabilityContractAddress = "capability_contract_address" + labelCapabilityID = "capability_id" + labelCapabilityVersion = "capability_version" + labelCapabilityName = "capability_name" +) + +type EmitMetadata struct { + WorkflowOwner string // required + WorkflowID string // required + WorkflowName string // required + + WorkflowExecutionID string // optional + CapabilityContractAddress string // optional + CapabilityID string // optional + CapabilityVersion string // optional + CapabilityName string // optional +} + +func (e EmitMetadata) merge(otherE EmitMetadata) EmitMetadata { + owner := e.WorkflowOwner + if otherE.WorkflowOwner != "" { + owner = otherE.WorkflowOwner + } + + wid := e.WorkflowID + if otherE.WorkflowID != "" { + wid = otherE.WorkflowID + } + + eid := e.WorkflowExecutionID + if otherE.WorkflowExecutionID != "" { + eid = otherE.WorkflowExecutionID + } + + name := e.WorkflowName + if otherE.WorkflowName != "" { + name = otherE.WorkflowName + } + + addr := e.CapabilityContractAddress + if otherE.CapabilityContractAddress != "" { + addr = otherE.CapabilityContractAddress + } + + capID := e.CapabilityID + if otherE.CapabilityID != "" { + capID = otherE.CapabilityID + } + + capVersion := e.CapabilityVersion + if otherE.CapabilityVersion != "" { + capVersion = otherE.CapabilityVersion + } + + capName := e.CapabilityName + if otherE.CapabilityName != "" { + capName = otherE.CapabilityName + } + + return EmitMetadata{ + WorkflowOwner: owner, + WorkflowID: wid, + WorkflowExecutionID: eid, + WorkflowName: name, + CapabilityContractAddress: addr, + CapabilityID: capID, + CapabilityVersion: capVersion, + CapabilityName: capName, + } +} + +func (e EmitMetadata) attrs() []any { + a := []any{} + + if e.WorkflowOwner != "" { + a = append(a, labelWorkflowOwner, e.WorkflowOwner) + } + + if e.WorkflowID != "" { + a = append(a, labelWorkflowID, e.WorkflowID) + } + + if e.WorkflowExecutionID != "" { + a = append(a, labelWorkflowExecutionID, e.WorkflowExecutionID) + } + + if e.WorkflowName != "" { + a = append(a, labelWorkflowName, e.WorkflowName) + } + + if e.CapabilityContractAddress != "" { + a = append(a, labelCapabilityContractAddress, e.CapabilityContractAddress) + } + + if e.CapabilityID != "" { + a = append(a, labelCapabilityID, e.CapabilityID) + } + + if e.CapabilityVersion != "" { + a = append(a, labelCapabilityVersion, e.CapabilityVersion) + } + + if e.CapabilityName != "" { + a = append(a, labelCapabilityName, e.CapabilityName) + } + + return a +} + +type Emitter struct { + client beholder.Emitter + md EmitMetadata +} + +func (e *Emitter) With(md EmitMetadata) *Emitter { + nmd := e.md.merge(md) + return &Emitter{ + client: e.client, + md: nmd, + } +} + +func NewEmitter() *Emitter { + return &Emitter{ + client: beholder.GetClient().Emitter, + } +} + +type Message struct { + Msg string + Labels map[string]any + Metadata EmitMetadata +} + +func (e *Emitter) Emit(ctx context.Context, msg Message) error { + nmd := e.md.merge(msg.Metadata) + + if nmd.WorkflowOwner == "" { + return errors.New("must provide workflow owner to emit event") + } + + if nmd.WorkflowID == "" { + return errors.New("must provide workflow id to emit event") + } + + if nmd.WorkflowName == "" { + return errors.New("must provide workflow name to emit event") + } + + wm, err := values.WrapMap(msg.Labels) + if err != nil { + return fmt.Errorf("could not wrap map: %w", err) + } + + pm := values.ProtoMap(wm) + + bytes, err := proto.Marshal(&pb.OperationalEvent{ + Labels: pm, + Message: msg.Msg, + }) + if err != nil { + return fmt.Errorf("could not marshal operational event: %w", err) + } + + attrs := []any{ + "beholder_data_schema", + "/capabilities-operational-event/versions/1", + "beholder_data_type", + "custom_message", + } + + attrs = append(attrs, nmd.attrs()...) + + return e.client.Emit( + ctx, + bytes, + attrs..., + ) +} diff --git a/pkg/capabilities/events/events_test.go b/pkg/capabilities/events/events_test.go new file mode 100644 index 000000000..c08975b6e --- /dev/null +++ b/pkg/capabilities/events/events_test.go @@ -0,0 +1,88 @@ +package events + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/events/pb" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" +) + +type testEmitter struct { + payload []byte + attrs []any +} + +func (t *testEmitter) Emit(ctx context.Context, payload []byte, attrKVs ...any) error { + t.payload = payload + t.attrs = attrKVs + return nil +} + +func TestEmitter(t *testing.T) { + client := &testEmitter{} + emitter := &Emitter{client: client} + msg := "a message" + + message := Message{ + Msg: msg, + } + err := emitter.Emit(tests.Context(t), message) + assert.ErrorContains(t, err, "must provide workflow owner") + + message.Metadata.WorkflowOwner = "owner" + err = emitter.Emit(tests.Context(t), message) + assert.ErrorContains(t, err, "must provide workflow id") + + message.Metadata.WorkflowID = "id" + err = emitter.Emit(tests.Context(t), message) + assert.ErrorContains(t, err, "must provide workflow name") + + message.Metadata.WorkflowName = "name" + err = emitter.Emit(tests.Context(t), message) + require.NoError(t, err) + + event := &pb.OperationalEvent{} + err = proto.Unmarshal(client.payload, event) + require.NoError(t, err) + + assert.Equal(t, event.Message, msg) +} + +func assertHasKey(t *testing.T, attrs []any, keyName, keyValue string) { + for i, a := range attrs { + if a.(string) == keyName { + assert.Equal(t, attrs[i+1].(string), keyValue) + return + } + } + + assert.FailNow(t, fmt.Sprintf("could not find keyName %s in attrs", keyName)) +} + +func TestEmitter_WithMetadata(t *testing.T) { + client := &testEmitter{} + emitter := &Emitter{client: client} + emitter = emitter.With(EmitMetadata{ + WorkflowOwner: "owner", + WorkflowID: "id", + WorkflowName: "name", + }) + msg := "a message" + + message := Message{ + Msg: msg, + } + err := emitter.Emit(tests.Context(t), message) + require.NoError(t, err) + + fmt.Printf("%+v", client.attrs) + assertHasKey(t, client.attrs, "workflow_owner_address", "owner") + assertHasKey(t, client.attrs, "workflow_id", "id") + assertHasKey(t, client.attrs, "workflow_name", "name") +} diff --git a/pkg/capabilities/events/pb/events.pb.go b/pkg/capabilities/events/pb/events.pb.go new file mode 100644 index 000000000..411b79b1b --- /dev/null +++ b/pkg/capabilities/events/pb/events.pb.go @@ -0,0 +1,227 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v4.25.1 +// source: capabilities/events/pb/events.proto + +package pb + +import ( + pb "github.com/smartcontractkit/chainlink-common/pkg/values/pb" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type OperationalEvent struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` + Labels *pb.Map `protobuf:"bytes,2,opt,name=labels,proto3" json:"labels,omitempty"` +} + +func (x *OperationalEvent) Reset() { + *x = OperationalEvent{} + if protoimpl.UnsafeEnabled { + mi := &file_capabilities_events_pb_events_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *OperationalEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*OperationalEvent) ProtoMessage() {} + +func (x *OperationalEvent) ProtoReflect() protoreflect.Message { + mi := &file_capabilities_events_pb_events_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use OperationalEvent.ProtoReflect.Descriptor instead. +func (*OperationalEvent) Descriptor() ([]byte, []int) { + return file_capabilities_events_pb_events_proto_rawDescGZIP(), []int{0} +} + +func (x *OperationalEvent) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +func (x *OperationalEvent) GetLabels() *pb.Map { + if x != nil { + return x.Labels + } + return nil +} + +// Used by custom compute to send any beholder errors +// back. +type OperationalEventResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ErrMsg string `protobuf:"bytes,1,opt,name=errMsg,proto3" json:"errMsg,omitempty"` +} + +func (x *OperationalEventResponse) Reset() { + *x = OperationalEventResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_capabilities_events_pb_events_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *OperationalEventResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*OperationalEventResponse) ProtoMessage() {} + +func (x *OperationalEventResponse) ProtoReflect() protoreflect.Message { + mi := &file_capabilities_events_pb_events_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use OperationalEventResponse.ProtoReflect.Descriptor instead. +func (*OperationalEventResponse) Descriptor() ([]byte, []int) { + return file_capabilities_events_pb_events_proto_rawDescGZIP(), []int{1} +} + +func (x *OperationalEventResponse) GetErrMsg() string { + if x != nil { + return x.ErrMsg + } + return "" +} + +var File_capabilities_events_pb_events_proto protoreflect.FileDescriptor + +var file_capabilities_events_pb_events_proto_rawDesc = []byte{ + 0x0a, 0x23, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x2f, 0x65, + 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2f, 0x70, 0x62, 0x2f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x1a, 0x16, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x73, 0x2f, 0x70, 0x62, 0x2f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x51, 0x0a, 0x10, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x61, 0x6c, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x12, 0x23, 0x0a, 0x06, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x0b, 0x2e, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x2e, 0x4d, 0x61, 0x70, + 0x52, 0x06, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x22, 0x32, 0x0a, 0x18, 0x4f, 0x70, 0x65, 0x72, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x65, 0x72, 0x72, 0x4d, 0x73, 0x67, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x65, 0x72, 0x72, 0x4d, 0x73, 0x67, 0x42, 0x49, 0x5a, 0x47, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x6d, 0x61, 0x72, 0x74, + 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x6b, 0x69, 0x74, 0x2f, 0x63, 0x68, 0x61, 0x69, + 0x6e, 0x6c, 0x69, 0x6e, 0x6b, 0x2d, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x70, 0x6b, 0x67, + 0x2f, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x2f, 0x65, 0x76, + 0x65, 0x6e, 0x74, 0x73, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_capabilities_events_pb_events_proto_rawDescOnce sync.Once + file_capabilities_events_pb_events_proto_rawDescData = file_capabilities_events_pb_events_proto_rawDesc +) + +func file_capabilities_events_pb_events_proto_rawDescGZIP() []byte { + file_capabilities_events_pb_events_proto_rawDescOnce.Do(func() { + file_capabilities_events_pb_events_proto_rawDescData = protoimpl.X.CompressGZIP(file_capabilities_events_pb_events_proto_rawDescData) + }) + return file_capabilities_events_pb_events_proto_rawDescData +} + +var file_capabilities_events_pb_events_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_capabilities_events_pb_events_proto_goTypes = []interface{}{ + (*OperationalEvent)(nil), // 0: events.OperationalEvent + (*OperationalEventResponse)(nil), // 1: events.OperationalEventResponse + (*pb.Map)(nil), // 2: values.Map +} +var file_capabilities_events_pb_events_proto_depIdxs = []int32{ + 2, // 0: events.OperationalEvent.labels:type_name -> values.Map + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_capabilities_events_pb_events_proto_init() } +func file_capabilities_events_pb_events_proto_init() { + if File_capabilities_events_pb_events_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_capabilities_events_pb_events_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*OperationalEvent); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_capabilities_events_pb_events_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*OperationalEventResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_capabilities_events_pb_events_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_capabilities_events_pb_events_proto_goTypes, + DependencyIndexes: file_capabilities_events_pb_events_proto_depIdxs, + MessageInfos: file_capabilities_events_pb_events_proto_msgTypes, + }.Build() + File_capabilities_events_pb_events_proto = out.File + file_capabilities_events_pb_events_proto_rawDesc = nil + file_capabilities_events_pb_events_proto_goTypes = nil + file_capabilities_events_pb_events_proto_depIdxs = nil +} diff --git a/pkg/capabilities/events/pb/events.proto b/pkg/capabilities/events/pb/events.proto new file mode 100644 index 000000000..7d19ca585 --- /dev/null +++ b/pkg/capabilities/events/pb/events.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +option go_package = "github.com/smartcontractkit/chainlink-common/pkg/capabilities/events/pb"; + +package events; + +import "values/pb/values.proto"; + +message OperationalEvent { + string message = 1; + values.Map labels = 2; +} + +// Used by custom compute to send any beholder errors +// back. +message OperationalEventResponse { + string errMsg = 1; +} diff --git a/pkg/capabilities/events/pb/generate.go b/pkg/capabilities/events/pb/generate.go new file mode 100644 index 000000000..8d1c8ee25 --- /dev/null +++ b/pkg/capabilities/events/pb/generate.go @@ -0,0 +1,2 @@ +//go:generate protoc --go_out=../../../ --go_opt=paths=source_relative --go-grpc_out=../../../ --go-grpc_opt=paths=source_relative --proto_path=../../../ capabilities/events/pb/events.proto values/pb/values.proto +package pb