diff --git a/core/capabilities/launcher.go b/core/capabilities/launcher.go index b464a154772..56144774baa 100644 --- a/core/capabilities/launcher.go +++ b/core/capabilities/launcher.go @@ -191,8 +191,8 @@ func (w *launcher) Launch(ctx context.Context, state *registrysyncer.LocalRegist } } - // - remote capability DONs (with IsPublic = true) the current node is a part of. - // These need server-side shims. + // Capability DONs (with IsPublic = true) the current node is a part of. + // These need server-side shims to expose my own capabilities externally. myCapabilityDONs := []registrysyncer.DON{} remoteCapabilityDONs := []registrysyncer.DON{} for _, d := range publicDONs { @@ -223,11 +223,11 @@ func (w *launcher) Launch(ctx context.Context, state *registrysyncer.LocalRegist } } - // Finally, if I'm a capability DON, let's enable external access + // Finally, if I'm in a capability DON, let's enable external access // to the capability. if len(myCapabilityDONs) > 0 { - for _, mcd := range myCapabilityDONs { - err := w.exposeCapabilities(ctx, myID, mcd, state, remoteWorkflowDONs) + for _, myDON := range myCapabilityDONs { + err := w.exposeCapabilities(ctx, myID, myDON, state, remoteWorkflowDONs) if err != nil { return err } @@ -395,10 +395,10 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee switch capability.CapabilityType { case capabilities.CapabilityTypeTrigger: - newTriggerPublisher := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) { + newTriggerPublisher := func(cap capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) { publisher := remote.NewTriggerPublisher( capabilityConfig.RemoteTriggerConfig, - capability.(capabilities.TriggerCapability), + cap.(capabilities.TriggerCapability), info, don.DON, idsToDONs, @@ -410,18 +410,19 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee err := w.addReceiver(ctx, capability, don, newTriggerPublisher) if err != nil { - return fmt.Errorf("failed to add server-side receiver: %w", err) + w.lggr.Errorw("failed to add server-side receiver for a trigger capability - it won't be exposed remotely", "id", cid, "error", err) + // continue attempting other capabilities } case capabilities.CapabilityTypeAction: w.lggr.Warn("no remote client configured for capability type action, skipping configuration") case capabilities.CapabilityTypeConsensus: w.lggr.Warn("no remote client configured for capability type consensus, skipping configuration") case capabilities.CapabilityTypeTarget: - newTargetServer := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) { + newTargetServer := func(cap capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) { return target.NewServer( capabilityConfig.RemoteTargetConfig, myPeerID, - capability.(capabilities.TargetCapability), + cap.(capabilities.TargetCapability), info, don.DON, idsToDONs, @@ -433,7 +434,8 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee err := w.addReceiver(ctx, capability, don, newTargetServer) if err != nil { - return fmt.Errorf("failed to add server-side receiver: %w", err) + w.lggr.Errorw("failed to add server-side receiver for a target capability - it won't be exposed remotely", "id", cid, "error", err) + // continue attempting other capabilities } default: w.lggr.Warnf("unknown capability type, skipping configuration: %+v", capability) diff --git a/core/capabilities/launcher_test.go b/core/capabilities/launcher_test.go index 11482b0dd30..43f283fc532 100644 --- a/core/capabilities/launcher_test.go +++ b/core/capabilities/launcher_test.go @@ -113,6 +113,9 @@ func TestLauncher_WiresUpExternalCapabilities(t *testing.T) { triggerCapID := randomWord() targetCapID := randomWord() + // one capability from onchain registry is not set up locally + fullMissingTargetID := "super-duper-target@6.6.6" + missingTargetCapID := randomWord() dID := uint32(1) // The below state describes a Workflow DON (AcceptsWorkflows = true), // which exposes the streams-trigger and write_chain capabilities. @@ -130,8 +133,9 @@ func TestLauncher_WiresUpExternalCapabilities(t *testing.T) { Members: nodes, }, CapabilityConfigurations: map[string]registrysyncer.CapabilityConfiguration{ - fullTriggerCapID: {}, - fullTargetID: {}, + fullTriggerCapID: {}, + fullTargetID: {}, + fullMissingTargetID: {}, }, }, }, @@ -144,6 +148,10 @@ func TestLauncher_WiresUpExternalCapabilities(t *testing.T) { ID: "write-chain_evm_1@1.0.0", CapabilityType: capabilities.CapabilityTypeTarget, }, + fullMissingTargetID: { + ID: fullMissingTargetID, + CapabilityType: capabilities.CapabilityTypeTarget, + }, }, IDsToNodes: map[p2ptypes.PeerID]kcr.INodeInfoProviderNodeInfo{ nodes[0]: { @@ -151,28 +159,28 @@ func TestLauncher_WiresUpExternalCapabilities(t *testing.T) { Signer: randomWord(), P2pId: nodes[0], EncryptionPublicKey: randomWord(), - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID}, }, nodes[1]: { NodeOperatorId: 1, Signer: randomWord(), P2pId: nodes[1], EncryptionPublicKey: randomWord(), - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID}, }, nodes[2]: { NodeOperatorId: 1, Signer: randomWord(), P2pId: nodes[2], EncryptionPublicKey: randomWord(), - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID}, }, nodes[3]: { NodeOperatorId: 1, Signer: randomWord(), P2pId: nodes[3], EncryptionPublicKey: randomWord(), - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID}, }, }, } diff --git a/core/services/registrysyncer/syncer.go b/core/services/registrysyncer/syncer.go index c0d2a60b47e..5fc241ad249 100644 --- a/core/services/registrysyncer/syncer.go +++ b/core/services/registrysyncer/syncer.go @@ -202,7 +202,7 @@ func (s *registrySyncer) updateStateLoop() { } } -func (s *registrySyncer) localRegistry(ctx context.Context) (*LocalRegistry, error) { +func (s *registrySyncer) importOnchainRegistry(ctx context.Context) (*LocalRegistry, error) { caps := []kcr.CapabilitiesRegistryCapabilityInfo{} err := s.reader.GetLatestValue(ctx, s.capabilitiesContract.ReadIdentifier("getCapabilities"), primitives.Unconfirmed, nil, &caps) @@ -288,33 +288,33 @@ func (s *registrySyncer) Sync(ctx context.Context, isInitialSync bool) error { s.reader = reader } - var lr *LocalRegistry + var latestRegistry *LocalRegistry var err error if isInitialSync { s.lggr.Debug("syncing with local registry") - lr, err = s.orm.LatestLocalRegistry(ctx) + latestRegistry, err = s.orm.LatestLocalRegistry(ctx) if err != nil { s.lggr.Warnw("failed to sync with local registry, using remote registry instead", "error", err) } else { - lr.lggr = s.lggr - lr.getPeerID = s.getPeerID + latestRegistry.lggr = s.lggr + latestRegistry.getPeerID = s.getPeerID } } - if lr == nil { + if latestRegistry == nil { s.lggr.Debug("syncing with remote registry") - localRegistry, err := s.localRegistry(ctx) + importedRegistry, err := s.importOnchainRegistry(ctx) if err != nil { return fmt.Errorf("failed to sync with remote registry: %w", err) } - lr = localRegistry + latestRegistry = importedRegistry // Attempt to send local registry to the update channel without blocking // This is to prevent the tests from hanging if they are not calling `Start()` on the syncer select { case <-s.stopCh: s.lggr.Debug("sync cancelled, stopping") - case s.updateChan <- lr: + case s.updateChan <- latestRegistry: // Successfully sent state s.lggr.Debug("remote registry update triggered successfully") default: @@ -324,7 +324,7 @@ func (s *registrySyncer) Sync(ctx context.Context, isInitialSync bool) error { } for _, h := range s.launchers { - lrCopy := deepCopyLocalRegistry(lr) + lrCopy := deepCopyLocalRegistry(latestRegistry) if err := h.Launch(ctx, &lrCopy); err != nil { s.lggr.Errorf("error calling launcher: %s", err) s.metrics.incrementLauncherFailureCounter(ctx)