From 8cea618a70be32ad346fb21f4f54491102a1eb12 Mon Sep 17 00:00:00 2001 From: Benjamin Klum Date: Thu, 16 Nov 2023 22:35:45 +0100 Subject: [PATCH] Make App and Server share the same one-threaded async runtime saves one thread --- main/src/infrastructure/plugin/app.rs | 22 ++++++++++--- main/src/infrastructure/server/mod.rs | 46 +++++++++++---------------- 2 files changed, 36 insertions(+), 32 deletions(-) diff --git a/main/src/infrastructure/plugin/app.rs b/main/src/infrastructure/plugin/app.rs index 463370d78..7cf9c3e28 100644 --- a/main/src/infrastructure/plugin/app.rs +++ b/main/src/infrastructure/plugin/app.rs @@ -37,6 +37,7 @@ use crate::infrastructure::plugin::debug_util::resolve_symbols_from_clipboard; use crate::infrastructure::plugin::tracing_util::TracingHook; use crate::infrastructure::server::services::RealearnServices; use crate::infrastructure::test::run_test; +use anyhow::bail; use base::metrics_util::MetricsHook; use helgoboss_allocator::{start_async_deallocation_thread, AsyncDeallocatorCommandReceiver}; use once_cell::sync::Lazy; @@ -450,7 +451,7 @@ impl App { if self.config.borrow().server_is_enabled() { self.server() .borrow_mut() - .start(self.create_services()) + .start(&async_runtime, self.create_services()) .unwrap_or_else(warn_about_failed_server_start); } let mut session = Reaper::get().medium_session(); @@ -744,11 +745,16 @@ impl App { where R: Send + 'static, { + self.with_async_runtime(|runtime| runtime.spawn(f)) + .expect("couldn't use runtime") + } + + fn with_async_runtime(&self, f: impl FnOnce(&Runtime) -> R) -> anyhow::Result { let state = self.state.borrow(); let AppState::Awake(state) = &*state else { - panic!("attempted to spawn future while ReaLearn in wrong state: {state:?}"); + bail!("attempt to access async runtime while ReaLearn in wrong state: {state:?}"); }; - state.async_runtime.spawn(f) + Ok(f(&state.async_runtime)) } // TODO-medium Return a reference to a SharedControllerManager! Clients might just want to turn @@ -804,9 +810,15 @@ impl App { } pub fn start_server_persistently(&self) -> Result<(), String> { - self.server.borrow_mut().start(self.create_services())?; + let start_result = self + .with_async_runtime(|runtime| { + self.server + .borrow_mut() + .start(runtime, self.create_services()) + }) + .map_err(|e| e.to_string())?; self.change_config(AppConfig::enable_server); - Ok(()) + start_result } pub fn stop_server_persistently(&self) { diff --git a/main/src/infrastructure/server/mod.rs b/main/src/infrastructure/server/mod.rs index 0cd0110c7..1181411f6 100644 --- a/main/src/infrastructure/server/mod.rs +++ b/main/src/infrastructure/server/mod.rs @@ -23,6 +23,7 @@ use crate::infrastructure::server::services::RealearnServices; use derivative::Derivative; use std::thread::JoinHandle; use std::time::Duration; +use tokio::runtime::Runtime; pub type SharedRealearnServer = Rc>; @@ -99,7 +100,7 @@ enum ServerState { struct ServerRuntimeData { clients: ServerClients, shutdown_sender: broadcast::Sender<()>, - server_thread_join_handle: JoinHandle<()>, + server_join_handle: tokio::task::JoinHandle<()>, } impl ServerState { @@ -135,7 +136,7 @@ impl RealearnServer { } /// Idempotent - pub fn start(&mut self, services: RealearnServices) -> Result<(), String> { + pub fn start(&mut self, runtime: &Runtime, services: RealearnServices) -> Result<(), String> { if self.state.is_starting_or_running() { return Ok(()); } @@ -150,30 +151,20 @@ impl RealearnServer { let key_and_cert = self.key_and_cert(); let (shutdown_sender, shutdown_receiver) = broadcast::channel(5); let metrics_reporter = self.metrics_reporter.clone(); - let server_thread_join_handle = std::thread::Builder::new() - .name("ReaLearn server".to_string()) - .spawn(move || { - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - runtime.block_on(start_servers( - http_port, - https_port, - grpc_port, - clients_clone, - key_and_cert, - shutdown_receiver, - metrics_reporter, - services, - )); - runtime.shutdown_timeout(Duration::from_secs(1)); - }) - .map_err(|_| "couldn't start server thread".to_string())?; + let server_join_handle = runtime.spawn(start_servers( + http_port, + https_port, + grpc_port, + clients_clone, + key_and_cert, + shutdown_receiver, + metrics_reporter, + services, + )); let runtime_data = ServerRuntimeData { clients, shutdown_sender, - server_thread_join_handle, + server_join_handle, }; self.state = ServerState::Starting(runtime_data); self.notify_changed(); @@ -208,10 +199,11 @@ impl RealearnServer { ServerState::Stopped => return, }; let _ = runtime_data.shutdown_sender.send(()); - runtime_data - .server_thread_join_handle - .join() - .expect("couldn't wait for server thread to finish"); + // TODO-high-ms2 Maybe this is enough? No shutdown sender necessary? + // runtime_data + // .server_join_handle + // .join() + // .expect("couldn't wait for server thread to finish"); } fn notify_changed(&mut self) {