Skip to content

Commit

Permalink
[Keystone] Launcher: don't fail if some capabilities are missing locally
Browse files Browse the repository at this point in the history
If the onchain registry has capabilities that are not enabled locally, we don't
want to fail everything but continue launching other capabilities that are available.
  • Loading branch information
bolekk committed Nov 8, 2024
1 parent 1757514 commit 1fa5efe
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 27 deletions.
24 changes: 13 additions & 11 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func (w *launcher) Launch(ctx context.Context, state *registrysyncer.LocalRegist
}
}

// - remote capability DONs (with IsPublic = true) the current node is a part of.
// These need server-side shims.
// Capability DONs (with IsPublic = true) the current node is a part of.
// These need server-side shims to expose my own capabilities externally.
myCapabilityDONs := []registrysyncer.DON{}
remoteCapabilityDONs := []registrysyncer.DON{}
for _, d := range publicDONs {
Expand Down Expand Up @@ -223,11 +223,11 @@ func (w *launcher) Launch(ctx context.Context, state *registrysyncer.LocalRegist
}
}

// Finally, if I'm a capability DON, let's enable external access
// Finally, if I'm in a capability DON, let's enable external access
// to the capability.
if len(myCapabilityDONs) > 0 {
for _, mcd := range myCapabilityDONs {
err := w.exposeCapabilities(ctx, myID, mcd, state, remoteWorkflowDONs)
for _, myDON := range myCapabilityDONs {
err := w.exposeCapabilities(ctx, myID, myDON, state, remoteWorkflowDONs)
if err != nil {
return err
}
Expand Down Expand Up @@ -395,10 +395,10 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee

switch capability.CapabilityType {
case capabilities.CapabilityTypeTrigger:
newTriggerPublisher := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) {
newTriggerPublisher := func(cap capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) {
publisher := remote.NewTriggerPublisher(
capabilityConfig.RemoteTriggerConfig,
capability.(capabilities.TriggerCapability),
cap.(capabilities.TriggerCapability),
info,
don.DON,
idsToDONs,
Expand All @@ -410,18 +410,19 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee

err := w.addReceiver(ctx, capability, don, newTriggerPublisher)
if err != nil {
return fmt.Errorf("failed to add server-side receiver: %w", err)
w.lggr.Errorw("failed to add server-side receiver for a trigger capability - it won't be exposed remotely", "id", cid, "error", err)
// continue attempting other capabilities
}
case capabilities.CapabilityTypeAction:
w.lggr.Warn("no remote client configured for capability type action, skipping configuration")
case capabilities.CapabilityTypeConsensus:
w.lggr.Warn("no remote client configured for capability type consensus, skipping configuration")
case capabilities.CapabilityTypeTarget:
newTargetServer := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) {
newTargetServer := func(cap capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) {
return target.NewServer(
capabilityConfig.RemoteTargetConfig,
myPeerID,
capability.(capabilities.TargetCapability),
cap.(capabilities.TargetCapability),
info,
don.DON,
idsToDONs,
Expand All @@ -433,7 +434,8 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee

err := w.addReceiver(ctx, capability, don, newTargetServer)
if err != nil {
return fmt.Errorf("failed to add server-side receiver: %w", err)
w.lggr.Errorw("failed to add server-side receiver for a target capability - it won't be exposed remotely", "id", cid, "error", err)
// continue attempting other capabilities
}
default:
w.lggr.Warnf("unknown capability type, skipping configuration: %+v", capability)
Expand Down
20 changes: 14 additions & 6 deletions core/capabilities/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ func TestLauncher_WiresUpExternalCapabilities(t *testing.T) {

triggerCapID := randomWord()
targetCapID := randomWord()
// one capability from onchain registry is not set up locally
fullMissingTargetID := "[email protected]"
missingTargetCapID := randomWord()
dID := uint32(1)
// The below state describes a Workflow DON (AcceptsWorkflows = true),
// which exposes the streams-trigger and write_chain capabilities.
Expand All @@ -130,8 +133,9 @@ func TestLauncher_WiresUpExternalCapabilities(t *testing.T) {
Members: nodes,
},
CapabilityConfigurations: map[string]registrysyncer.CapabilityConfiguration{
fullTriggerCapID: {},
fullTargetID: {},
fullTriggerCapID: {},
fullTargetID: {},
fullMissingTargetID: {},
},
},
},
Expand All @@ -144,35 +148,39 @@ func TestLauncher_WiresUpExternalCapabilities(t *testing.T) {
ID: "[email protected]",
CapabilityType: capabilities.CapabilityTypeTarget,
},
fullMissingTargetID: {
ID: fullMissingTargetID,
CapabilityType: capabilities.CapabilityTypeTarget,
},
},
IDsToNodes: map[p2ptypes.PeerID]kcr.INodeInfoProviderNodeInfo{
nodes[0]: {
NodeOperatorId: 1,
Signer: randomWord(),
P2pId: nodes[0],
EncryptionPublicKey: randomWord(),
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID},
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID},
},
nodes[1]: {
NodeOperatorId: 1,
Signer: randomWord(),
P2pId: nodes[1],
EncryptionPublicKey: randomWord(),
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID},
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID},
},
nodes[2]: {
NodeOperatorId: 1,
Signer: randomWord(),
P2pId: nodes[2],
EncryptionPublicKey: randomWord(),
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID},
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID},
},
nodes[3]: {
NodeOperatorId: 1,
Signer: randomWord(),
P2pId: nodes[3],
EncryptionPublicKey: randomWord(),
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID},
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID},
},
},
}
Expand Down
20 changes: 10 additions & 10 deletions core/services/registrysyncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (s *registrySyncer) updateStateLoop() {
}
}

func (s *registrySyncer) localRegistry(ctx context.Context) (*LocalRegistry, error) {
func (s *registrySyncer) importOnchainRegistry(ctx context.Context) (*LocalRegistry, error) {
caps := []kcr.CapabilitiesRegistryCapabilityInfo{}

err := s.reader.GetLatestValue(ctx, s.capabilitiesContract.ReadIdentifier("getCapabilities"), primitives.Unconfirmed, nil, &caps)
Expand Down Expand Up @@ -288,33 +288,33 @@ func (s *registrySyncer) Sync(ctx context.Context, isInitialSync bool) error {
s.reader = reader
}

var lr *LocalRegistry
var latestRegistry *LocalRegistry
var err error

if isInitialSync {
s.lggr.Debug("syncing with local registry")
lr, err = s.orm.LatestLocalRegistry(ctx)
latestRegistry, err = s.orm.LatestLocalRegistry(ctx)
if err != nil {
s.lggr.Warnw("failed to sync with local registry, using remote registry instead", "error", err)
} else {
lr.lggr = s.lggr
lr.getPeerID = s.getPeerID
latestRegistry.lggr = s.lggr
latestRegistry.getPeerID = s.getPeerID
}
}

if lr == nil {
if latestRegistry == nil {
s.lggr.Debug("syncing with remote registry")
localRegistry, err := s.localRegistry(ctx)
importedRegistry, err := s.importOnchainRegistry(ctx)
if err != nil {
return fmt.Errorf("failed to sync with remote registry: %w", err)
}
lr = localRegistry
latestRegistry = importedRegistry
// Attempt to send local registry to the update channel without blocking
// This is to prevent the tests from hanging if they are not calling `Start()` on the syncer
select {
case <-s.stopCh:
s.lggr.Debug("sync cancelled, stopping")
case s.updateChan <- lr:
case s.updateChan <- latestRegistry:
// Successfully sent state
s.lggr.Debug("remote registry update triggered successfully")
default:
Expand All @@ -324,7 +324,7 @@ func (s *registrySyncer) Sync(ctx context.Context, isInitialSync bool) error {
}

for _, h := range s.launchers {
lrCopy := deepCopyLocalRegistry(lr)
lrCopy := deepCopyLocalRegistry(latestRegistry)
if err := h.Launch(ctx, &lrCopy); err != nil {
s.lggr.Errorf("error calling launcher: %s", err)
s.metrics.incrementLauncherFailureCounter(ctx)
Expand Down

0 comments on commit 1fa5efe

Please sign in to comment.