Skip to content

Commit

Permalink
gRPC server enhancements (kaspanet#296)
Browse files Browse the repository at this point in the history
* Identify connections to gRPC server with Uuids

* Add concurrent request and subscription handlers to the connection to the gRPC server

* On shutdown clear services from core and async runtime

* Use tonic v0.10.x

* Make `Notifier` sync channel available to integration tests

* Fix tokio test runtime in daemon integration tests

* Remove links from Connection & ConnectionHandler to Manager

* Stop gRPC server forcefully on shutdown timeout

* Add a gRPC client-server sanity check test

* Lint

* Reduce sleep time

* Refactor gRPC client-server test

* Add a gRPC client-server notifications test

* Allocate only one thread to tokio test runtime of all daemon integration tests

* Handle legacy virtual hash in `EstimateNetworkHashesPerSecond` RPC method

* Make `subscribe` fn resistant to a theoretical race-condition

* Ignore duplicated subscriptions

* Add notify requests to Scope conversions

* Unify subscription handler logic (one exception)

* Fix meta

* Fix typos

* Add a gRPC payload ops enum

* Replace `RpcApiOps` with `KaspadPayloadOps` in gRPC client

* Replace `RpcApiOps` with `KaspadPayloadOps` in gRPC server

* Add a macro building a gRPC server interface

* Give request handlers payload op specific methods

* Fix doc

* Working GetMetrics method in gRPC client & server

* Remove obsolete `RpcApiOps::Notification`

* Remove `KaspadPayloadOps::is_subscription()`

* Change GetMetrics server time type to u64

* Strictly correlate interface method set to KaspadPayloadOps variant set

* Set gRPC server shutdown timeout to 1 second

* Update tonic, prost & h2 to the latest versions

* Move the connection routing logic inside the request receive loop, saving a lock

* Make gRPC logs more consistent

* Disconnect the client if the outgoing route gets full

* Ignore oneshot send potential error

* Polish logs

* Refactor "not implemented" error building
  • Loading branch information
tiram88 authored Oct 30, 2023
1 parent ae7dae3 commit acadd87
Show file tree
Hide file tree
Showing 51 changed files with 1,695 additions and 856 deletions.
63 changes: 28 additions & 35 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ dashmap = "5.4.0"
separator = "0.4.1"
serde_bytes = "0.11.11"
xxhash-rust = { version = "0.8.6", features = ["xxh3"] }
h2 = "0.3.21"
prost = { version = "0.12.1" }
tonic = { version = "0.10.2", features = ["tls", "gzip"] }
tonic-build = { version = "0.10.2", features = ["prost"] }

# bip32 dependencies
rand_core = { version = "0.6", features = ["std"] }
Expand All @@ -201,6 +205,7 @@ once_cell = { version = "1" }
pbkdf2 = { version = "0.12.1" }
# pbkdf2 = { version = "0.11", default-features = false}


# workflow dependencies
workflow-d3 = { version = "0.7.0" }
workflow-nw = { version = "0.7.0" }
Expand Down
4 changes: 3 additions & 1 deletion components/consensusmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ pub struct ConsensusManager {
}

impl ConsensusManager {
pub const IDENT: &'static str = "consensus manager";

pub fn new(factory: Arc<dyn ConsensusFactory>) -> Self {
let (consensus, ctl) = factory.new_active_consensus();
Self { factory, inner: RwLock::new(ManagerInner::new(consensus, ctl)) }
Expand Down Expand Up @@ -170,7 +172,7 @@ impl ConsensusManager {

impl Service for ConsensusManager {
fn ident(self: Arc<Self>) -> &'static str {
"consensus manager"
Self::IDENT
}

fn start(self: Arc<Self>, _core: Arc<Core>) -> Vec<JoinHandle<()>> {
Expand Down
4 changes: 4 additions & 0 deletions core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ impl Core {
self.services.lock().unwrap().push(service);
}

pub fn find(&self, ident: &'static str) -> Option<Arc<dyn Service>> {
self.services.lock().unwrap().iter().find(|s| (*s).clone().ident() == ident).cloned()
}

/// Starts all services and blocks waiting to join them. For performing other operations in between
/// use start and join explicitly
pub fn run(self: &Arc<Core>) {
Expand Down
8 changes: 4 additions & 4 deletions core/src/task/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ use std::{
};
use tokio::task::JoinHandle as TaskJoinHandle;

const ASYNC_RUNTIME: &str = "async-runtime";

/// AsyncRuntime registers async services and provides
/// a tokio Runtime to run them.
pub struct AsyncRuntime {
Expand All @@ -27,6 +25,8 @@ impl Default for AsyncRuntime {
}

impl AsyncRuntime {
pub const IDENT: &'static str = "async-runtime";

pub fn new(threads: usize) -> Self {
trace!("Creating the async-runtime service");
Self { threads, services: Mutex::new(Vec::new()) }
Expand All @@ -42,7 +42,7 @@ impl AsyncRuntime {

pub fn init(self: Arc<AsyncRuntime>, core: Arc<Core>) -> Vec<ThreadJoinHandle<()>> {
trace!("initializing async-runtime service");
vec![thread::Builder::new().name(ASYNC_RUNTIME.to_string()).spawn(move || self.worker(core)).unwrap()]
vec![thread::Builder::new().name(Self::IDENT.to_string()).spawn(move || self.worker(core)).unwrap()]
}

/// Launch a tokio Runtime and run the top-level async objects
Expand Down Expand Up @@ -122,7 +122,7 @@ impl AsyncRuntime {

impl Service for AsyncRuntime {
fn ident(self: Arc<AsyncRuntime>) -> &'static str {
ASYNC_RUNTIME
Self::IDENT
}

fn start(self: Arc<AsyncRuntime>, core: Arc<Core>) -> Vec<ThreadJoinHandle<()>> {
Expand Down
17 changes: 2 additions & 15 deletions notify/src/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,7 @@ where
N: Notification,
C: Connection<Notification = N>,
{
pub fn new(name: &'static str, incoming: Receiver<N>) -> Self {
Self {
name,
started: Arc::new(AtomicBool::default()),
ctl: Channel::unbounded(),
incoming,
_sync: None,
shutdown: Channel::oneshot(),
}
}

#[cfg(test)]
pub fn with_sync(name: &'static str, incoming: Receiver<N>, _sync: Option<Sender<()>>) -> Self {
pub fn new(name: &'static str, incoming: Receiver<N>, _sync: Option<Sender<()>>) -> Self {
Self {
name,
started: Arc::new(AtomicBool::default()),
Expand Down Expand Up @@ -198,7 +186,6 @@ where

// In case we have a sync channel, report that the command was processed.
// This is for test only.
#[cfg(test)]
if let Some(ref sync) = self._sync {
let _ = sync.try_send(());
}
Expand Down Expand Up @@ -258,7 +245,7 @@ mod tests {
fn new(name: &'static str, listener_count: usize, steps: Vec<Step>) -> Self {
let (sync_sender, sync_receiver) = unbounded();
let (notification_sender, notification_receiver) = unbounded();
let broadcaster = Arc::new(TestBroadcaster::with_sync("test", notification_receiver, Some(sync_sender)));
let broadcaster = Arc::new(TestBroadcaster::new("test", notification_receiver, Some(sync_sender)));
let mut listeners = Vec::with_capacity(listener_count);
let mut notification_receivers = Vec::with_capacity(listener_count);
for _ in 0..listener_count {
Expand Down
Loading

0 comments on commit acadd87

Please sign in to comment.