Skip to content

Commit

Permalink
Playtime: Correctly route request processing errors to app
Browse files Browse the repository at this point in the history
  • Loading branch information
helgoboss committed Oct 30, 2023
1 parent 5769318 commit 4a01be1
Showing 1 changed file with 102 additions and 41 deletions.
143 changes: 102 additions & 41 deletions main/src/infrastructure/ui/app/app_library.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use playtime_clip_engine::base::Matrix;
use playtime_clip_engine::proto;
use playtime_clip_engine::proto::{
create_initial_matrix_updates, create_initial_slot_updates, create_initial_track_updates,
event_reply, query_result, reply, ClipEngineRequestHandler, CommandRequest, EventReply,
MatrixProvider, QueryReply, QueryResult, Reply, Request,
event_reply, query_result, reply, request, ClipEngineRequestHandler, CommandRequest,
EventReply, MatrixProvider, Notification, NotificationKind, QueryReply, QueryResult, Reply,
Request,
};
use prost::Message;
use reaper_low::raw::HWND;
Expand Down Expand Up @@ -136,19 +137,52 @@ impl AppLibrary {
/// Attention: This is *not* called from the main thread but from some special Flutter UI thread.
#[no_mangle]
extern "C" fn invoke_host(data: *const u8, length: i32) {
// Decode payload
let bytes = unsafe { std::slice::from_raw_parts(data, length as usize) };
let req = Request::decode(bytes).unwrap();
let Some(req) = req.value else {
let request = Request::decode(bytes).unwrap();
// Extract values
let Some(request_value) = request.value else {
tracing::error!(msg = "incoming app request didn't have value");
return;
};
// We need to execute commands on the main thread!
Global::task_support()
.do_in_main_thread_asap(|| {
if let Err(error) = process_request(req) {
tracing::error!(msg = "Error processing app request", %error);
}
})
.unwrap();
// Process request
if let Err(error) = process_request(request.matrix_id, request_value) {
tracing::error!(msg = "error in synchronous phase of request processing", %error);
}
}

/// Processes the given request.
///
/// Essentially this only extracts some values and then schedules the actual work on the main
/// thread.
///
/// # Errors
///
/// Returns an error if something in the synchronous part of the request processing went wrong.
fn process_request(matrix_id: String, request_value: request::Value) -> Result<()> {
use proto::request::Value;
match request_value {
// It's a command (fire-and-forget)
Value::CommandRequest(command_request) => {
let command_request_value = command_request
.value
.context("incoming app command request didn't have value")?;
process_command_request(matrix_id, command_request_value)
.context("processing command request")?;
Ok(())
}
// It's a query (with async response)
Value::QueryRequest(query_request) => {
let query_request_value = query_request
.query
.context("incoming app query request didn't have query")?
.value
.context("incoming app query didn't have value")?;
process_query_request(matrix_id, query_request.id, query_request_value)
.context("processing query request")?;
Ok(())
}
}
}

pub type AppHandle = NonNull<c_void>;
Expand Down Expand Up @@ -212,28 +246,39 @@ fn prepare_app_launch() {
}
}

fn process_request(req: proto::request::Value) -> Result<()> {
use proto::request::Value;
match req {
Value::CommandRequest(req) => process_command(
req.value
.ok_or(Status::invalid_argument("command value missing"))?,
),
Value::QueryRequest(req) => process_query(
req.matrix_id,
req.id,
req.query.ok_or(Status::invalid_argument("query missing"))?,
),
}
/// Executes the given command asynchronously (in the main thread).
///
/// # Errors
///
/// Returns an error if the main thread task queue is full.
fn process_command_request(matrix_id: String, value: proto::command_request::Value) -> Result<()> {
// We need to execute commands on the main thread!
Global::task_support().do_in_main_thread_asap(move || {
// Execute command
let result = process_command(value);
// Handle possible error
if let Err(status) = result {
// Log error
tracing::error!(msg = "error in asynchronous phase of command request processing", %status);
// Send it to the app as notification
let _ = send_to_app(
&matrix_id,
reply::Value::EventReply(EventReply {
value: Some(event_reply::Value::Notification(Notification {
kind: NotificationKind::NotificationTypeError.into(),
text: status.message().to_string(),
})),
}),
);
}
}).map_err(|e| anyhow!(e))?;
Ok(())
}

fn process_query(matrix_id: String, id: u32, query: proto::Query) -> Result<()> {
fn process_query_request(matrix_id: String, id: u32, query: proto::query::Value) -> Result<()> {
use proto::query::Value::*;
let handler = ClipEngineRequestHandler::new(AppMatrixProvider);
match query
.value
.ok_or(Status::invalid_argument("query value missing"))?
{
match query {
ProveAuthenticity(req) => {
send_query_reply_to_app(matrix_id, id, async move {
let value = handler.prove_authenticity(req).await?.into_inner();
Expand All @@ -256,7 +301,7 @@ fn process_query(matrix_id: String, id: u32, query: proto::Query) -> Result<()>
Ok(())
}

fn process_command(req: proto::command_request::Value) -> Result<()> {
fn process_command(req: proto::command_request::Value) -> std::result::Result<(), Status> {
// TODO-low This should be a more generic command handler in future (not just clip engine)
let handler = ClipEngineRequestHandler::new(AppMatrixProvider);
use proto::command_request::Value::*;
Expand All @@ -266,20 +311,28 @@ fn process_command(req: proto::command_request::Value) -> Result<()> {
// App instance is started. Put the app instance callback at the correct position.
let ptr = req.app_callback_address as *const ();
let app_callback: AppCallback = unsafe { std::mem::transmute(ptr) };
find_app_panel(&req.matrix_id)?.notify_app_is_ready(app_callback);
find_app_panel(&req.matrix_id)
.map_err(to_status)?
.notify_app_is_ready(app_callback);
}
TriggerApp(req) => {
find_app_panel(&req.session_id)?.toggle_full_screen()?;
find_app_panel(&req.session_id)
.map_err(to_status)?
.toggle_full_screen()
.map_err(to_status)?;
}
// Event subscription commands
GetOccasionalMatrixUpdates(req) => {
send_initial_events_to_app(&req.matrix_id, create_initial_matrix_updates)?;
send_initial_events_to_app(&req.matrix_id, create_initial_matrix_updates)
.map_err(to_status)?;
}
GetOccasionalTrackUpdates(req) => {
send_initial_events_to_app(&req.matrix_id, create_initial_track_updates)?;
send_initial_events_to_app(&req.matrix_id, create_initial_track_updates)
.map_err(to_status)?;
}
GetOccasionalSlotUpdates(req) => {
send_initial_events_to_app(&req.matrix_id, create_initial_slot_updates)?;
send_initial_events_to_app(&req.matrix_id, create_initial_slot_updates)
.map_err(to_status)?;
}
// Normal commands
TriggerMatrix(req) => {
Expand Down Expand Up @@ -388,16 +441,20 @@ fn send_initial_events_to_app<T: Into<event_reply::Value>>(
fn send_query_reply_to_app(
matrix_id: String,
id: u32,
future: impl Future<Output = Result<query_result::Value, Status>> + 'static,
future: impl Future<Output = Result<query_result::Value, Status>> + Send + 'static,
) {
Global::future_support().spawn_in_main_thread_from_main_thread(async move {
let value = reply::Value::QueryReply(QueryReply {
Global::future_support().spawn_in_main_thread(async move {
let query_result_value = match future.await {
Ok(outcome) => outcome,
Err(error) => query_result::Value::Error(error.to_string()),
};
let reply_value = reply::Value::QueryReply(QueryReply {
id,
result: Some(QueryResult {
value: Some(future.await?),
value: Some(query_result_value),
}),
});
send_to_app(&matrix_id, value)?;
send_to_app(&matrix_id, reply_value)?;
Ok(())
});
}
Expand All @@ -417,3 +474,7 @@ fn find_app_panel(session_id: &str) -> Result<SharedView<AppPanel>> {
.context("instance not found")?
.app_panel()
}

fn to_status(err: anyhow::Error) -> Status {
Status::unknown(err.to_string())
}

0 comments on commit 4a01be1

Please sign in to comment.