Skip to content

Commit

Permalink
Disable secondary forwarding on AddEvent (other handlers TBI) (#1725)
Browse files Browse the repository at this point in the history
  • Loading branch information
sergekh2 authored Dec 5, 2024
1 parent f33ee1e commit 7194c1f
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions core/node/rpc/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ import (
"github.com/river-build/river/core/node/shared"
)

const (
RiverNoForwardHeader = "X-River-No-Forward"
RiverNoForwardValue = "true"
RiverFromNodeHeader = "X-River-From-Node"
RiverToNodeHeader = "X-River-To-Node"
)

// peerNodeRequestWithRetries makes a request to as many as each of the remote nodes, returning the first response
// that is not a network unavailability error.
func peerNodeRequestWithRetries[T any](
Expand Down Expand Up @@ -184,7 +191,7 @@ func executeConnectHandler[Req, Res any](
if e != nil {
err := AsRiverError(e).
Tags(
"nodeAddress", service.wallet.Address,
"nodeAddress", service.wallet.Address.Hex(),
"nodeUrl", service.config.Address,
"elapsed", elapsed,
).
Expand Down Expand Up @@ -456,15 +463,29 @@ func (s *Service) addEventImpl(
return s.localAddEvent(ctx, req, stream, view)
}

if req.Header().Get(RiverNoForwardHeader) == RiverNoForwardValue {
return nil, RiverError(Err_UNAVAILABLE, "Forwarding disabled by request header").
Func("service.addEventImpl").
Tags("streamId", req.Msg.StreamId,
RiverFromNodeHeader, req.Header().Get(RiverFromNodeHeader),
RiverToNodeHeader, req.Header().Get(RiverToNodeHeader),
)
}

// TODO: smarter remote select? random?
// TODO: retry?
firstRemote := stream.GetStickyPeer()
dlog.FromCtx(ctx).Debug("Forwarding request", "nodeAddress", firstRemote)
stub, err := s.nodeRegistry.GetStreamServiceClientForAddress(firstRemote)
if err != nil {
return nil, err
}

ret, err := stub.AddEvent(ctx, req)
newReq := connect.NewRequest(req.Msg)
newReq.Header().Set(RiverNoForwardHeader, RiverNoForwardValue)
newReq.Header().Set(RiverFromNodeHeader, s.wallet.Address.Hex())
newReq.Header().Set(RiverToNodeHeader, firstRemote.Hex())
ret, err := stub.AddEvent(ctx, newReq)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 7194c1f

Please sign in to comment.