From 4a01be16eebb08920cf840e5b5bc2f83a3e7eddb Mon Sep 17 00:00:00 2001 From: Benjamin Klum Date: Mon, 30 Oct 2023 23:58:25 +0100 Subject: [PATCH] Playtime: Correctly route request processing errors to app --- main/src/infrastructure/ui/app/app_library.rs | 143 +++++++++++++----- 1 file changed, 102 insertions(+), 41 deletions(-) diff --git a/main/src/infrastructure/ui/app/app_library.rs b/main/src/infrastructure/ui/app/app_library.rs index b620f733f..d5057e9ac 100644 --- a/main/src/infrastructure/ui/app/app_library.rs +++ b/main/src/infrastructure/ui/app/app_library.rs @@ -9,8 +9,9 @@ use playtime_clip_engine::base::Matrix; use playtime_clip_engine::proto; use playtime_clip_engine::proto::{ create_initial_matrix_updates, create_initial_slot_updates, create_initial_track_updates, - event_reply, query_result, reply, ClipEngineRequestHandler, CommandRequest, EventReply, - MatrixProvider, QueryReply, QueryResult, Reply, Request, + event_reply, query_result, reply, request, ClipEngineRequestHandler, CommandRequest, + EventReply, MatrixProvider, Notification, NotificationKind, QueryReply, QueryResult, Reply, + Request, }; use prost::Message; use reaper_low::raw::HWND; @@ -136,19 +137,52 @@ impl AppLibrary { /// Attention: This is *not* called from the main thread but from some special Flutter UI thread. #[no_mangle] extern "C" fn invoke_host(data: *const u8, length: i32) { + // Decode payload let bytes = unsafe { std::slice::from_raw_parts(data, length as usize) }; - let req = Request::decode(bytes).unwrap(); - let Some(req) = req.value else { + let request = Request::decode(bytes).unwrap(); + // Extract values + let Some(request_value) = request.value else { + tracing::error!(msg = "incoming app request didn't have value"); return; }; - // We need to execute commands on the main thread! - Global::task_support() - .do_in_main_thread_asap(|| { - if let Err(error) = process_request(req) { - tracing::error!(msg = "Error processing app request", %error); - } - }) - .unwrap(); + // Process request + if let Err(error) = process_request(request.matrix_id, request_value) { + tracing::error!(msg = "error in synchronous phase of request processing", %error); + } +} + +/// Processes the given request. +/// +/// Essentially this only extracts some values and then schedules the actual work on the main +/// thread. +/// +/// # Errors +/// +/// Returns an error if something in the synchronous part of the request processing went wrong. +fn process_request(matrix_id: String, request_value: request::Value) -> Result<()> { + use proto::request::Value; + match request_value { + // It's a command (fire-and-forget) + Value::CommandRequest(command_request) => { + let command_request_value = command_request + .value + .context("incoming app command request didn't have value")?; + process_command_request(matrix_id, command_request_value) + .context("processing command request")?; + Ok(()) + } + // It's a query (with async response) + Value::QueryRequest(query_request) => { + let query_request_value = query_request + .query + .context("incoming app query request didn't have query")? + .value + .context("incoming app query didn't have value")?; + process_query_request(matrix_id, query_request.id, query_request_value) + .context("processing query request")?; + Ok(()) + } + } } pub type AppHandle = NonNull; @@ -212,28 +246,39 @@ fn prepare_app_launch() { } } -fn process_request(req: proto::request::Value) -> Result<()> { - use proto::request::Value; - match req { - Value::CommandRequest(req) => process_command( - req.value - .ok_or(Status::invalid_argument("command value missing"))?, - ), - Value::QueryRequest(req) => process_query( - req.matrix_id, - req.id, - req.query.ok_or(Status::invalid_argument("query missing"))?, - ), - } +/// Executes the given command asynchronously (in the main thread). +/// +/// # Errors +/// +/// Returns an error if the main thread task queue is full. +fn process_command_request(matrix_id: String, value: proto::command_request::Value) -> Result<()> { + // We need to execute commands on the main thread! + Global::task_support().do_in_main_thread_asap(move || { + // Execute command + let result = process_command(value); + // Handle possible error + if let Err(status) = result { + // Log error + tracing::error!(msg = "error in asynchronous phase of command request processing", %status); + // Send it to the app as notification + let _ = send_to_app( + &matrix_id, + reply::Value::EventReply(EventReply { + value: Some(event_reply::Value::Notification(Notification { + kind: NotificationKind::NotificationTypeError.into(), + text: status.message().to_string(), + })), + }), + ); + } + }).map_err(|e| anyhow!(e))?; + Ok(()) } -fn process_query(matrix_id: String, id: u32, query: proto::Query) -> Result<()> { +fn process_query_request(matrix_id: String, id: u32, query: proto::query::Value) -> Result<()> { use proto::query::Value::*; let handler = ClipEngineRequestHandler::new(AppMatrixProvider); - match query - .value - .ok_or(Status::invalid_argument("query value missing"))? - { + match query { ProveAuthenticity(req) => { send_query_reply_to_app(matrix_id, id, async move { let value = handler.prove_authenticity(req).await?.into_inner(); @@ -256,7 +301,7 @@ fn process_query(matrix_id: String, id: u32, query: proto::Query) -> Result<()> Ok(()) } -fn process_command(req: proto::command_request::Value) -> Result<()> { +fn process_command(req: proto::command_request::Value) -> std::result::Result<(), Status> { // TODO-low This should be a more generic command handler in future (not just clip engine) let handler = ClipEngineRequestHandler::new(AppMatrixProvider); use proto::command_request::Value::*; @@ -266,20 +311,28 @@ fn process_command(req: proto::command_request::Value) -> Result<()> { // App instance is started. Put the app instance callback at the correct position. let ptr = req.app_callback_address as *const (); let app_callback: AppCallback = unsafe { std::mem::transmute(ptr) }; - find_app_panel(&req.matrix_id)?.notify_app_is_ready(app_callback); + find_app_panel(&req.matrix_id) + .map_err(to_status)? + .notify_app_is_ready(app_callback); } TriggerApp(req) => { - find_app_panel(&req.session_id)?.toggle_full_screen()?; + find_app_panel(&req.session_id) + .map_err(to_status)? + .toggle_full_screen() + .map_err(to_status)?; } // Event subscription commands GetOccasionalMatrixUpdates(req) => { - send_initial_events_to_app(&req.matrix_id, create_initial_matrix_updates)?; + send_initial_events_to_app(&req.matrix_id, create_initial_matrix_updates) + .map_err(to_status)?; } GetOccasionalTrackUpdates(req) => { - send_initial_events_to_app(&req.matrix_id, create_initial_track_updates)?; + send_initial_events_to_app(&req.matrix_id, create_initial_track_updates) + .map_err(to_status)?; } GetOccasionalSlotUpdates(req) => { - send_initial_events_to_app(&req.matrix_id, create_initial_slot_updates)?; + send_initial_events_to_app(&req.matrix_id, create_initial_slot_updates) + .map_err(to_status)?; } // Normal commands TriggerMatrix(req) => { @@ -388,16 +441,20 @@ fn send_initial_events_to_app>( fn send_query_reply_to_app( matrix_id: String, id: u32, - future: impl Future> + 'static, + future: impl Future> + Send + 'static, ) { - Global::future_support().spawn_in_main_thread_from_main_thread(async move { - let value = reply::Value::QueryReply(QueryReply { + Global::future_support().spawn_in_main_thread(async move { + let query_result_value = match future.await { + Ok(outcome) => outcome, + Err(error) => query_result::Value::Error(error.to_string()), + }; + let reply_value = reply::Value::QueryReply(QueryReply { id, result: Some(QueryResult { - value: Some(future.await?), + value: Some(query_result_value), }), }); - send_to_app(&matrix_id, value)?; + send_to_app(&matrix_id, reply_value)?; Ok(()) }); } @@ -417,3 +474,7 @@ fn find_app_panel(session_id: &str) -> Result> { .context("instance not found")? .app_panel() } + +fn to_status(err: anyhow::Error) -> Status { + Status::unknown(err.to_string()) +}