diff --git a/core/capabilities/launcher.go b/core/capabilities/launcher.go index 56144774baa..8deb42fdd68 100644 --- a/core/capabilities/launcher.go +++ b/core/capabilities/launcher.go @@ -396,9 +396,14 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee switch capability.CapabilityType { case capabilities.CapabilityTypeTrigger: newTriggerPublisher := func(cap capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) { + triggerCapability, ok := (cap).(capabilities.TriggerCapability) + if !ok { + return nil, errors.New("capability does not implement TriggerCapability") + } + publisher := remote.NewTriggerPublisher( capabilityConfig.RemoteTriggerConfig, - cap.(capabilities.TriggerCapability), + triggerCapability, info, don.DON, idsToDONs, @@ -419,10 +424,15 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee w.lggr.Warn("no remote client configured for capability type consensus, skipping configuration") case capabilities.CapabilityTypeTarget: newTargetServer := func(cap capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) { + targetCapability, ok := (cap).(capabilities.TargetCapability) + if !ok { + return nil, errors.New("capability does not implement TargetCapability") + } + return target.NewServer( capabilityConfig.RemoteTargetConfig, myPeerID, - cap.(capabilities.TargetCapability), + targetCapability, info, don.DON, idsToDONs, diff --git a/core/capabilities/launcher_test.go b/core/capabilities/launcher_test.go index 43f283fc532..3ebed639cb0 100644 --- a/core/capabilities/launcher_test.go +++ b/core/capabilities/launcher_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" @@ -30,6 +31,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer" ) +var _ capabilities.TriggerCapability = (*mockTrigger)(nil) + type mockTrigger struct { capabilities.CapabilityInfo } @@ -46,6 +49,8 @@ func newMockTrigger(info capabilities.CapabilityInfo) *mockTrigger { return &mockTrigger{CapabilityInfo: info} } +var _ capabilities.TargetCapability = (*mockCapability)(nil) + type mockCapability struct { capabilities.CapabilityInfo } @@ -71,133 +76,347 @@ func randomWord() [32]byte { return [32]byte(word) } -func TestLauncher_WiresUpExternalCapabilities(t *testing.T) { - ctx := tests.Context(t) - lggr := logger.TestLogger(t) - registry := NewRegistry(lggr) - dispatcher := remoteMocks.NewDispatcher(t) +func TestLauncher(t *testing.T) { + t.Run("OK-wires_up_external_capabilities", func(t *testing.T) { + ctx := tests.Context(t) + lggr := logger.TestLogger(t) + registry := NewRegistry(lggr) + dispatcher := remoteMocks.NewDispatcher(t) + + var pid ragetypes.PeerID + err := pid.UnmarshalText([]byte("12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N")) + require.NoError(t, err) + peer := mocks.NewPeer(t) + peer.On("UpdateConnections", mock.Anything).Return(nil) + peer.On("ID").Return(pid) + wrapper := mocks.NewPeerWrapper(t) + wrapper.On("GetPeer").Return(peer) + + nodes := []ragetypes.PeerID{ + pid, + randomWord(), + randomWord(), + randomWord(), + } - var pid ragetypes.PeerID - err := pid.UnmarshalText([]byte("12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N")) - require.NoError(t, err) - peer := mocks.NewPeer(t) - peer.On("UpdateConnections", mock.Anything).Return(nil) - peer.On("ID").Return(pid) - wrapper := mocks.NewPeerWrapper(t) - wrapper.On("GetPeer").Return(peer) + fullTriggerCapID := "streams-trigger@1.0.0" + mt := newMockTrigger(capabilities.MustNewCapabilityInfo( + fullTriggerCapID, + capabilities.CapabilityTypeTrigger, + "streams trigger", + )) + require.NoError(t, registry.Add(ctx, mt)) + + fullTargetID := "write-chain_evm_1@1.0.0" + mtarg := &mockCapability{ + CapabilityInfo: capabilities.MustNewCapabilityInfo( + fullTargetID, + capabilities.CapabilityTypeTarget, + "write chain", + ), + } + require.NoError(t, registry.Add(ctx, mtarg)) + + triggerCapID := randomWord() + targetCapID := randomWord() + // one capability from onchain registry is not set up locally + fullMissingTargetID := "super-duper-target@6.6.6" + missingTargetCapID := randomWord() + dID := uint32(1) + // The below state describes a Workflow DON (AcceptsWorkflows = true), + // which exposes the streams-trigger and write_chain capabilities. + // We expect a publisher to be wired up with this configuration, and + // no entries should be added to the registry. + state := ®istrysyncer.LocalRegistry{ + IDsToDONs: map[registrysyncer.DonID]registrysyncer.DON{ + registrysyncer.DonID(dID): { + DON: capabilities.DON{ + ID: dID, + ConfigVersion: uint32(0), + F: uint8(1), + IsPublic: true, + AcceptsWorkflows: true, + Members: nodes, + }, + CapabilityConfigurations: map[string]registrysyncer.CapabilityConfiguration{ + fullTriggerCapID: {}, + fullTargetID: {}, + fullMissingTargetID: {}, + }, + }, + }, + IDsToCapabilities: map[string]registrysyncer.Capability{ + fullTriggerCapID: { + ID: "streams-trigger@1.0.0", + CapabilityType: capabilities.CapabilityTypeTrigger, + }, + fullTargetID: { + ID: "write-chain_evm_1@1.0.0", + CapabilityType: capabilities.CapabilityTypeTarget, + }, + fullMissingTargetID: { + ID: fullMissingTargetID, + CapabilityType: capabilities.CapabilityTypeTarget, + }, + }, + IDsToNodes: map[p2ptypes.PeerID]kcr.INodeInfoProviderNodeInfo{ + nodes[0]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[0], + EncryptionPublicKey: randomWord(), + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID}, + }, + nodes[1]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[1], + EncryptionPublicKey: randomWord(), + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID}, + }, + nodes[2]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[2], + EncryptionPublicKey: randomWord(), + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID}, + }, + nodes[3]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[3], + EncryptionPublicKey: randomWord(), + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID}, + }, + }, + } - nodes := []ragetypes.PeerID{ - pid, - randomWord(), - randomWord(), - randomWord(), - } + launcher := NewLauncher( + lggr, + wrapper, + dispatcher, + registry, + ) - fullTriggerCapID := "streams-trigger@1.0.0" - mt := newMockTrigger(capabilities.MustNewCapabilityInfo( - fullTriggerCapID, - capabilities.CapabilityTypeTrigger, - "streams trigger", - )) - require.NoError(t, registry.Add(ctx, mt)) + dispatcher.On("SetReceiver", fullTriggerCapID, dID, mock.AnythingOfType("*remote.triggerPublisher")).Return(nil) + dispatcher.On("SetReceiver", fullTargetID, dID, mock.AnythingOfType("*target.server")).Return(nil) - fullTargetID := "write-chain_evm_1@1.0.0" - mtarg := &mockCapability{ - CapabilityInfo: capabilities.MustNewCapabilityInfo( - fullTargetID, - capabilities.CapabilityTypeTarget, - "write chain", - ), - } - require.NoError(t, registry.Add(ctx, mtarg)) + err = launcher.Launch(ctx, state) + require.NoError(t, err) + defer launcher.Close() + }) - triggerCapID := randomWord() - targetCapID := randomWord() - // one capability from onchain registry is not set up locally - fullMissingTargetID := "super-duper-target@6.6.6" - missingTargetCapID := randomWord() - dID := uint32(1) - // The below state describes a Workflow DON (AcceptsWorkflows = true), - // which exposes the streams-trigger and write_chain capabilities. - // We expect a publisher to be wired up with this configuration, and - // no entries should be added to the registry. - state := ®istrysyncer.LocalRegistry{ - IDsToDONs: map[registrysyncer.DonID]registrysyncer.DON{ - registrysyncer.DonID(dID): { - DON: capabilities.DON{ - ID: dID, - ConfigVersion: uint32(0), - F: uint8(1), - IsPublic: true, - AcceptsWorkflows: true, - Members: nodes, - }, - CapabilityConfigurations: map[string]registrysyncer.CapabilityConfiguration{ - fullTriggerCapID: {}, - fullTargetID: {}, - fullMissingTargetID: {}, + t.Run("NOK-invalid_trigger_capability", func(t *testing.T) { + ctx := tests.Context(t) + lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.DebugLevel) + registry := NewRegistry(lggr) + dispatcher := remoteMocks.NewDispatcher(t) + + var pid ragetypes.PeerID + err := pid.UnmarshalText([]byte("12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N")) + require.NoError(t, err) + peer := mocks.NewPeer(t) + peer.On("UpdateConnections", mock.Anything).Return(nil) + peer.On("ID").Return(pid) + wrapper := mocks.NewPeerWrapper(t) + wrapper.On("GetPeer").Return(peer) + + nodes := []ragetypes.PeerID{ + pid, + randomWord(), + randomWord(), + randomWord(), + } + + // We intentionally create a Trigger capability with a Target type + fullTriggerCapID := "streams-trigger@1.0.0" + mtarg := &mockCapability{ + CapabilityInfo: capabilities.MustNewCapabilityInfo( + fullTriggerCapID, + capabilities.CapabilityTypeTarget, + "wrong type capability", + ), + } + require.NoError(t, registry.Add(ctx, mtarg)) + + triggerCapID := randomWord() + + dID := uint32(1) + // The below state describes a Workflow DON (AcceptsWorkflows = true), + // which exposes the streams-trigger and write_chain capabilities. + // We expect a publisher to be wired up with this configuration, and + // no entries should be added to the registry. + state := ®istrysyncer.LocalRegistry{ + IDsToDONs: map[registrysyncer.DonID]registrysyncer.DON{ + registrysyncer.DonID(dID): { + DON: capabilities.DON{ + ID: dID, + ConfigVersion: uint32(0), + F: uint8(1), + IsPublic: true, + AcceptsWorkflows: true, + Members: nodes, + }, + CapabilityConfigurations: map[string]registrysyncer.CapabilityConfiguration{ + fullTriggerCapID: {}, + }, }, }, - }, - IDsToCapabilities: map[string]registrysyncer.Capability{ - fullTriggerCapID: { - ID: "streams-trigger@1.0.0", - CapabilityType: capabilities.CapabilityTypeTrigger, - }, - fullTargetID: { - ID: "write-chain_evm_1@1.0.0", - CapabilityType: capabilities.CapabilityTypeTarget, - }, - fullMissingTargetID: { - ID: fullMissingTargetID, - CapabilityType: capabilities.CapabilityTypeTarget, + IDsToCapabilities: map[string]registrysyncer.Capability{ + fullTriggerCapID: { + ID: "streams-trigger@1.0.0", + CapabilityType: capabilities.CapabilityTypeTrigger, + }, }, - }, - IDsToNodes: map[p2ptypes.PeerID]kcr.INodeInfoProviderNodeInfo{ - nodes[0]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: nodes[0], - EncryptionPublicKey: randomWord(), - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID}, + IDsToNodes: map[p2ptypes.PeerID]kcr.INodeInfoProviderNodeInfo{ + nodes[0]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[0], + EncryptionPublicKey: randomWord(), + HashedCapabilityIds: [][32]byte{triggerCapID}, + }, + nodes[1]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[1], + EncryptionPublicKey: randomWord(), + HashedCapabilityIds: [][32]byte{triggerCapID}, + }, + nodes[2]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[2], + EncryptionPublicKey: randomWord(), + HashedCapabilityIds: [][32]byte{triggerCapID}, + }, + nodes[3]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[3], + EncryptionPublicKey: randomWord(), + HashedCapabilityIds: [][32]byte{triggerCapID}, + }, }, - nodes[1]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: nodes[1], - EncryptionPublicKey: randomWord(), - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID}, + } + + launcher := NewLauncher( + lggr, + wrapper, + dispatcher, + registry, + ) + + err = launcher.Launch(ctx, state) + require.NoError(t, err) + + assert.Equal(t, 1, observedLogs.FilterMessage("failed to add server-side receiver for a trigger capability - it won't be exposed remotely").Len()) + defer launcher.Close() + }) + + t.Run("NOK-invalid_target_capability", func(t *testing.T) { + ctx := tests.Context(t) + lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.DebugLevel) + registry := NewRegistry(lggr) + dispatcher := remoteMocks.NewDispatcher(t) + + var pid ragetypes.PeerID + err := pid.UnmarshalText([]byte("12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N")) + require.NoError(t, err) + peer := mocks.NewPeer(t) + peer.On("UpdateConnections", mock.Anything).Return(nil) + peer.On("ID").Return(pid) + wrapper := mocks.NewPeerWrapper(t) + wrapper.On("GetPeer").Return(peer) + + nodes := []ragetypes.PeerID{ + pid, + randomWord(), + randomWord(), + randomWord(), + } + + fullTargetID := "write-chain_evm_1@1.0.0" + mt := newMockTrigger(capabilities.MustNewCapabilityInfo( + fullTargetID, + capabilities.CapabilityTypeTrigger, + "streams trigger", + )) + require.NoError(t, registry.Add(ctx, mt)) + + targetCapID := randomWord() + dID := uint32(1) + // The below state describes a Workflow DON (AcceptsWorkflows = true), + // which exposes the streams-trigger and write_chain capabilities. + // We expect a publisher to be wired up with this configuration, and + // no entries should be added to the registry. + state := ®istrysyncer.LocalRegistry{ + IDsToDONs: map[registrysyncer.DonID]registrysyncer.DON{ + registrysyncer.DonID(dID): { + DON: capabilities.DON{ + ID: dID, + ConfigVersion: uint32(0), + F: uint8(1), + IsPublic: true, + AcceptsWorkflows: true, + Members: nodes, + }, + CapabilityConfigurations: map[string]registrysyncer.CapabilityConfiguration{ + fullTargetID: {}, + }, + }, }, - nodes[2]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: nodes[2], - EncryptionPublicKey: randomWord(), - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID}, + IDsToCapabilities: map[string]registrysyncer.Capability{ + fullTargetID: { + ID: "write-chain_evm_1@1.0.0", + CapabilityType: capabilities.CapabilityTypeTarget, + }, }, - nodes[3]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: nodes[3], - EncryptionPublicKey: randomWord(), - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID}, + IDsToNodes: map[p2ptypes.PeerID]kcr.INodeInfoProviderNodeInfo{ + nodes[0]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[0], + EncryptionPublicKey: randomWord(), + HashedCapabilityIds: [][32]byte{targetCapID}, + }, + nodes[1]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[1], + EncryptionPublicKey: randomWord(), + HashedCapabilityIds: [][32]byte{targetCapID}, + }, + nodes[2]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[2], + EncryptionPublicKey: randomWord(), + HashedCapabilityIds: [][32]byte{targetCapID}, + }, + nodes[3]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[3], + EncryptionPublicKey: randomWord(), + HashedCapabilityIds: [][32]byte{targetCapID}, + }, }, - }, - } + } - launcher := NewLauncher( - lggr, - wrapper, - dispatcher, - registry, - ) + launcher := NewLauncher( + lggr, + wrapper, + dispatcher, + registry, + ) - dispatcher.On("SetReceiver", fullTriggerCapID, dID, mock.AnythingOfType("*remote.triggerPublisher")).Return(nil) - dispatcher.On("SetReceiver", fullTargetID, dID, mock.AnythingOfType("*target.server")).Return(nil) + err = launcher.Launch(ctx, state) + require.NoError(t, err) - err = launcher.Launch(ctx, state) - require.NoError(t, err) - defer launcher.Close() + assert.Equal(t, 1, observedLogs.FilterMessage("failed to add server-side receiver for a target capability - it won't be exposed remotely").Len()) + defer launcher.Close() + }) } func newTriggerEventMsg(t *testing.T,