Skip to content

Commit

Permalink
Experiment: run Send from other thread (#663)
Browse files Browse the repository at this point in the history
Experiment to fix #640
  • Loading branch information
sergekh2 authored Aug 7, 2024
1 parent 4d66418 commit e345bb6
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions core/node/rpc/sync/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package sync

import (
"context"
"github.com/river-build/river/core/node/utils"
"sync"

"connectrpc.com/connect"
"github.com/ethereum/go-ethereum/common"

. "github.com/river-build/river/core/node/base"
"github.com/river-build/river/core/node/events"
"github.com/river-build/river/core/node/nodes"
. "github.com/river-build/river/core/node/protocol"
"github.com/river-build/river/core/node/shared"
"github.com/river-build/river/core/node/utils"
)

type (
Expand Down Expand Up @@ -102,17 +103,28 @@ func (h *handlerImpl) SyncStreams(
h.activeSyncOperations.Store(op.SyncID, op)
defer h.activeSyncOperations.Delete(op.SyncID)

doneChan := make(chan error, 1)
go h.runSyncStreams(req, res, op, doneChan)
return <-doneChan
}

func (h *handlerImpl) runSyncStreams(
req *connect.Request[SyncStreamsRequest],
res *connect.ServerStream[SyncStreamsResponse],
op *StreamSyncOperation,
doneChan chan error,
) {
// send SyncID to client
if err := res.Send(&SyncStreamsResponse{
SyncId: op.SyncID,
SyncOp: SyncOp_SYNC_NEW,
}); err != nil {
err := AsRiverError(err).Func("SyncStreams")
return err
doneChan <- AsRiverError(err).Func("SyncStreams")
return
}

// run until sub.ctx expires or until the client calls CancelSync
return op.Run(req, res)
doneChan <- op.Run(req, res)
}

func (h *handlerImpl) AddStreamToSync(
Expand Down

0 comments on commit e345bb6

Please sign in to comment.