-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
node: Make stream sync more resilient #408
Conversation
671a903
to
a590464
Compare
f8ccbdb
to
35cd87f
Compare
@@ -694,6 +694,7 @@ message SyncStreamsResponse { | |||
SyncOp sync_op = 2; | |||
StreamAndCookie stream = 3; | |||
string pong_nonce = 4; | |||
bytes stream_id = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels like it's a good idea to add some comments to this struct.
core/node/rpc/sync/util.go
Outdated
GetStreamId() string | ||
} | ||
|
||
func ctxAndLogForRequest[T any](ctx context.Context, req *connect.Request[T]) (context.Context, *slog.Logger) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's suboptimal to have a copy in rpc and here. Lets make it public (needs a new package?)
nodeRegistry: nodeRegistry, | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To highlight interface comformance:
var _ Handler = (*handlerImpl)(nil) | |
var _ DebugHandler = (*handlerImpl)(nil) |
// StreamCookieSetGroupedByNodeAddress is a mapping from a node address to a SyncCookieSet | ||
StreamCookieSetGroupedByNodeAddress map[common.Address]SyncCookieSet | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add var _
to indicate which structs should implement which interfaces
} | ||
|
||
// use the syncID as used between client and subscription node | ||
msg.SyncId = syncOp.SyncID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But is there really a need to set it if it's not a SYNC_NEW op? (Or rather should it be empty for all other ops?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It must be set for SYNC_NEW
and SYNC_CLOSE
. It is debatable if it needs to be set for other updates. Currently it is set and several unittests test (go + sdk) do check for it.
delete(ss.syncers, syncerAddr) | ||
ss.muSyncers.Unlock() | ||
ss.syncerTasks.Done() | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this capture work correctly? should syncerAddr, syncer
be args to the goroutine instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From go 1.22 this isn't an issue anymore.
ss.streamID2Syncer[streamID] = syncer | ||
|
||
ss.syncerTasks.Add(1) | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like this can be a functions on ss, then here and above in can be go-called instead of same code.
core/node/rpc/sync/client/remote.go
Outdated
type remoteSyncer struct { | ||
syncStreamCtx context.Context | ||
syncStreamCancel context.CancelFunc | ||
syncID atomic.Value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems it doesn't have to be atomic: it is written once during object creation?
core/node/rpc/sync/client/remote.go
Outdated
} | ||
case <-s.syncStreamCtx.Done(): | ||
return | ||
case <-s.syncStreamCtx.Done(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same case twice
core/node/rpc/sync/client/remote.go
Outdated
|
||
latestMsgReceived.Store(time.Now()) | ||
|
||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please put this func in the method and go-call it here: it's relatively long and makes Run() less readable as such
f5a2588
to
d7c8309
Compare
d7c8309
to
53f0374
Compare
Currently is one or more streams from a client sync are managed by a remote node that goes down/or is unreachable the entire stream sync subscription is cancelled. This is undesirable.
With this change a new sync op message type
SyncOp_SYNC_DOWN
is introduced. When the node that managed the subscription for the client detects a remote node is down it will sendSyncOp_SYNC_DOWN
message to the client to report that it will not get updates for this stream and drops all streams managed by the remote node from the existing subscription. It is up to the client after that to add these streams again.Architecture
Introduces the
rpc/sync
package. This defines a handler interface that the service uses to forward incoming client requests. The handlerImpl implements this interface and keeps a mapping of activeStreamSyncOperation
. It will either create a subscription (SyncStreams) or find and forwards the request to the sync operation. The sync operation manages a set of syncers. A syncer is either local (streamCache) or remote (grpc api). Syncers are defined in therpc/sync/client
package. Syncers write messages to an internal channel from which the sync operation sends them to the client.