Skip to content

Commit

Permalink
Add logging to quorum sends, refactor a bit (#1783)
Browse files Browse the repository at this point in the history
  • Loading branch information
sergekh2 authored Dec 12, 2024
1 parent a502377 commit b62e8ac
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 72 deletions.
12 changes: 5 additions & 7 deletions core/node/events/miniblock_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,9 +538,9 @@ func mbProduceCandiate_Save(
mbInfo *MiniblockInfo,
remoteNodes []common.Address,
) error {
qp := NewQuorumPool(len(remoteNodes))
qp := NewQuorumPool("method", "mbProduceCandiate_Save", "streamId", streamId, "miniblock", mbInfo.Ref)

qp.GoLocal(func() error {
qp.GoLocal(ctx, func(ctx context.Context) error {
miniblockBytes, err := mbInfo.ToBytes()
if err != nil {
return err
Expand All @@ -555,11 +555,9 @@ func mbProduceCandiate_Save(
)
})

for _, node := range remoteNodes {
qp.GoRemote(node, func(node common.Address) error {
return params.RemoteMiniblockProvider.SaveMbCandidate(ctx, node, streamId, mbInfo.Proto)
})
}
qp.GoRemotes(ctx, remoteNodes, func(ctx context.Context, node common.Address) error {
return params.RemoteMiniblockProvider.SaveMbCandidate(ctx, node, streamId, mbInfo.Proto)
})

return qp.Wait()
}
Expand Down
64 changes: 50 additions & 14 deletions core/node/events/quorum_pool.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,78 @@
package events

import (
"context"
"errors"

"github.com/ethereum/go-ethereum/common"

"github.com/river-build/river/core/node/dlog"
)

type QuorumPool struct {
localErrChannel chan error
remotes int
remoteErrChannel chan error
tags []any
}

func NewQuorumPool(maxRemotes int) *QuorumPool {
var remoteErrChannel chan error
if maxRemotes > 0 {
remoteErrChannel = make(chan error, maxRemotes)
}
func NewQuorumPool(tags ...any) *QuorumPool {
return &QuorumPool{
remoteErrChannel: remoteErrChannel,
tags: tags,
}
}

func (q *QuorumPool) GoLocal(f func() error) {
func (q *QuorumPool) GoLocal(ctx context.Context, f func(ctx context.Context) error) {
q.localErrChannel = make(chan error, 1)
go func() {
err := f()
err := f(ctx)
q.localErrChannel <- err
if err != nil {
tags := []any{"error", err}
tags = append(tags, q.tags...)
dlog.FromCtx(ctx).Warn("QuorumPool: GoLocal: Error", tags...)
}
}()
}

func (q *QuorumPool) GoRemote(node common.Address, f func(node common.Address) error) {
q.remotes++
go func(node common.Address) {
err := f(node)
q.remoteErrChannel <- err
}(node)
func (q *QuorumPool) GoRemotes(
ctx context.Context,
nodes []common.Address,
f func(ctx context.Context, node common.Address) error,
) {
if len(nodes) == 0 {
return
}
q.remoteErrChannel = make(chan error, len(nodes))
q.remotes += len(nodes)
for _, node := range nodes {
go q.executeRemote(ctx, node, f)
}
}

func (q *QuorumPool) executeRemote(
ctx context.Context,
node common.Address,
f func(ctx context.Context, node common.Address) error,
) {
err := f(ctx, node)
q.remoteErrChannel <- err

// Cancel error is expected here: Wait() returns once quorum is achieved
// and some remotes are still in progress.
// Eventually Wait caller is going to cancel the context.
// On the receiver side, write operations should be detached from cancelable contexts
// (grpc transmits context cancellation from client to server), i.e. once local write
// operation is started, it should not be cancelled and should proceed to completion.
if err != nil && !errors.Is(err, context.Canceled) {
tags := []any{"error", err, "node", node}
tags = append(tags, q.tags...)
dlog.FromCtx(ctx).Warn("QuorumPool: GoRemotes: Error", tags...)
}
}

func (q *QuorumPool) Wait() error {
// TODO: FIX: REPLICATION: succeed if enough remotes succeed even if local fails.
// First wait for local if any.
if q.localErrChannel != nil {
if err := <-q.localErrChannel; err != nil {
Expand Down
51 changes: 23 additions & 28 deletions core/node/rpc/create_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,11 @@ func (s *Service) createReplicatedStream(

nodes := NewStreamNodesWithLock(nodesList, s.wallet.Address)
remotes, isLocal := nodes.GetRemotesAndIsLocal()
sender := NewQuorumPool(len(remotes))
sender := NewQuorumPool("method", "createReplicatedStream", "streamId", streamId)

var localSyncCookie *SyncCookie
if isLocal {
sender.GoLocal(func() error {
sender.GoLocal(ctx, func(ctx context.Context) error {
st, err := s.cache.GetStreamNoWait(ctx, streamId)
if err != nil {
return err
Expand All @@ -178,33 +178,28 @@ func (s *Service) createReplicatedStream(
var remoteSyncCookie *SyncCookie
var remoteSyncCookieOnce sync.Once
if len(remotes) > 0 {
for _, n := range remotes {
sender.GoRemote(
n,
func(node common.Address) error {
stub, err := s.nodeRegistry.GetNodeToNodeClientForAddress(node)
if err != nil {
return err
}
r, err := stub.AllocateStream(
ctx,
connect.NewRequest[AllocateStreamRequest](
&AllocateStreamRequest{
StreamId: streamId[:],
Miniblock: mb,
},
),
)
if err != nil {
return err
}
remoteSyncCookieOnce.Do(func() {
remoteSyncCookie = r.Msg.SyncCookie
})
return nil
},
sender.GoRemotes(ctx, remotes, func(ctx context.Context, node common.Address) error {
stub, err := s.nodeRegistry.GetNodeToNodeClientForAddress(node)
if err != nil {
return err
}
r, err := stub.AllocateStream(
ctx,
connect.NewRequest[AllocateStreamRequest](
&AllocateStreamRequest{
StreamId: streamId[:],
Miniblock: mb,
},
),
)
}
if err != nil {
return err
}
remoteSyncCookieOnce.Do(func() {
remoteSyncCookie = r.Msg.SyncCookie
})
return nil
})
}

err = sender.Wait()
Expand Down
40 changes: 17 additions & 23 deletions core/node/rpc/replicated_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ var _ AddableStream = (*replicatedStream)(nil)

func (r *replicatedStream) AddEvent(ctx context.Context, event *ParsedEvent) error {
remotes, _ := r.nodes.GetRemotesAndIsLocal()
// TODO: remove
if len(remotes) == 0 {
return r.localStream.AddEvent(ctx, event)
}

sender := NewQuorumPool(len(remotes))
sender := NewQuorumPool("method", "replicatedStream.AddEvent", "streamId", r.streamId)

sender.GoLocal(func() error {
sender.GoLocal(ctx, func(ctx context.Context) error {
return r.localStream.AddEvent(ctx, event)
})

Expand All @@ -40,27 +39,22 @@ func (r *replicatedStream) AddEvent(ctx context.Context, event *ParsedEvent) err
if err != nil {
return err
}
for _, n := range remotes {
sender.GoRemote(
n,
func(node common.Address) error {
stub, err := r.service.nodeRegistry.GetNodeToNodeClientForAddress(node)
if err != nil {
return err
}
_, err = stub.NewEventReceived(
ctx,
connect.NewRequest[NewEventReceivedRequest](
&NewEventReceivedRequest{
StreamId: streamId[:],
Event: event.Envelope,
},
),
)
return err
},
sender.GoRemotes(ctx, remotes, func(ctx context.Context, node common.Address) error {
stub, err := r.service.nodeRegistry.GetNodeToNodeClientForAddress(node)
if err != nil {
return err
}
_, err = stub.NewEventReceived(
ctx,
connect.NewRequest[NewEventReceivedRequest](
&NewEventReceivedRequest{
StreamId: streamId[:],
Event: event.Envelope,
},
),
)
}
return err
})
}

return sender.Wait()
Expand Down

0 comments on commit b62e8ac

Please sign in to comment.