Skip to content

Commit

Permalink
Add some idempotency and robustness to the node registration
Browse files Browse the repository at this point in the history
  • Loading branch information
archseer committed Nov 14, 2024
1 parent cdbd5e4 commit 2bca70c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 7 deletions.
33 changes: 27 additions & 6 deletions deployment/environment/devenv/don.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,30 @@ func (n *Node) RegisterNodeToJobDistributor(ctx context.Context, jd JobDistribut
Labels: n.labels,
Name: n.Name,
})

if err != nil {
// node already registered, fetch it's id
// TODO: check for rpc code = "AlreadyExists" instead
if err != nil && strings.Contains(err.Error(), "AlreadyExists") {
nodesResponse, err := jd.ListNodes(ctx, &nodev1.ListNodesRequest{
Filter: &nodev1.ListNodesRequest_Filter{
Selectors: []*ptypes.Selector{
{
Key: "p2p_id",
Op: ptypes.SelectorOp_EQ,
Value: peerID,
},
},
},
})
if err != nil {
return err
}
nodes := nodesResponse.GetNodes()
if len(nodes) == 0 {
return fmt.Errorf("failed to find node: %v", n.Name)
}
n.NodeId = nodes[0].Id
return nil
} else if err != nil {
return fmt.Errorf("failed to register node %s: %w", n.Name, err)
}
if registerResponse.GetNode().GetId() == "" {
Expand Down Expand Up @@ -367,13 +389,12 @@ func (n *Node) CreateJobDistributor(ctx context.Context, jd JobDistributor) (str
func (n *Node) SetUpAndLinkJobDistributor(ctx context.Context, jd JobDistributor) error {
// register the node in the job distributor
err := n.RegisterNodeToJobDistributor(ctx, jd)
// TODO: check for rpc code = "AlreadyExists" instead
if err != nil && !strings.Contains(err.Error(), "AlreadyExists") {
if err != nil {
return err
}
// now create the job distributor in the node
id, err := n.CreateJobDistributor(ctx, jd)
if err != nil {
if err != nil && !strings.Contains(err.Error(), "DuplicateFeedsManagerError") {
return err
}
// wait for the node to connect to the job distributor
Expand All @@ -382,7 +403,7 @@ func (n *Node) SetUpAndLinkJobDistributor(ctx context.Context, jd JobDistributor
Id: n.NodeId,
})
if err != nil {
return fmt.Errorf("failed to get node %s: %w", n.Name, err)
return retry.RetryableError(fmt.Errorf("failed to get node %s: %w", n.Name, err))
}
if getRes.GetNode() == nil {
return fmt.Errorf("no node found for node id %s", n.NodeId)
Expand Down
2 changes: 1 addition & 1 deletion deployment/environment/web/sdk/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (c *client) CreateJobDistributor(ctx context.Context, in JobDistributorInpu
msg := err.GetMessage()
return "", fmt.Errorf("failed to create feeds manager: %v", msg)
}
return "", fmt.Errorf("failed to create feeds manager")
return "", fmt.Errorf("failed to create feeds manager: %v", response.GetCreateFeedsManager().GetTypename())
}

func (c *client) UpdateJobDistributor(ctx context.Context, id string, in JobDistributorInput) error {
Expand Down

0 comments on commit 2bca70c

Please sign in to comment.