From 4fb541143092395b58f85e62e23dc21eb0019945 Mon Sep 17 00:00:00 2001 From: Cosma George Date: Sat, 30 Sep 2023 16:44:30 +0300 Subject: [PATCH] Reunited writer and reader so run_terminal no longer destroys stream. --- src/interfaces/serial/virtual_terminal.rs | 69 +++++++++++++---------- 1 file changed, 39 insertions(+), 30 deletions(-) diff --git a/src/interfaces/serial/virtual_terminal.rs b/src/interfaces/serial/virtual_terminal.rs index 1b82198..eff9898 100644 --- a/src/interfaces/serial/virtual_terminal.rs +++ b/src/interfaces/serial/virtual_terminal.rs @@ -9,6 +9,7 @@ use console::Term; use futures::stream::StreamExt; use futures::SinkExt; use std::io::Write; +use std::sync::Arc; use std::{io, str}; use tokio_util::codec::{Decoder, Encoder}; @@ -17,9 +18,6 @@ struct TerminalCodec; #[async_trait] impl VirtualTerminal for SerialInterface { // Run the virtual terminal to interact with the tock console. - // Warning! - // Before returning, the connection is closed. You must re-open the connection for any - // further operations. async fn run_terminal(&mut self) -> Result<(), TockloaderError> { if self.stream.is_none() { // Note: I'm using panic here because "unreachable!" doesn't feel appropriate @@ -29,47 +27,58 @@ impl VirtualTerminal for SerialInterface { panic!("Stream is not initialized!") } - let (mut writer, mut reader) = TerminalCodec.framed(self.stream.take().unwrap()).split(); - - let read_handle: JoinHandle> = tokio::spawn(async move { - // Q: I don't get why the decoder returns Result, ...> but - // line_result is actually Result. - // A: Because the decoded uses Ok(None) as an indicator that it needs to wait for - // more bytes, where we will always have a result (even if it happens to be an - // empty string). - // TODO: What does it mean if .next() return None? - while let Some(line_result) = reader.next().await { - print!("{}", line_result?); + let (writer, reader) = TerminalCodec.framed(self.stream.take().unwrap()).split(); + + let reader_arc = Arc::new(tokio::sync::Mutex::new(reader)); + let read_handle: JoinHandle> = tokio::spawn({ + let reader_arc = Arc::clone(&reader_arc); + async move { + // Q: I don't get why the decoder returns Result, ...> but + // line_result is actually Result. + // A: Because the decoded uses Ok(None) as an indicator that it needs to wait for + // more bytes, where we will always have a result (even if it happens to be an + // empty string). + // TODO: What does it mean if .next() return None? + while let Some(line_result) = reader_arc.lock().await.next().await { + print!("{}", line_result?); + + // We need to flush the buffer because the "tock$" prompt does not have a newline. + io::stdout().flush().unwrap(); + } - // We need to flush the buffer because the "tock$" prompt does not have a newline. - io::stdout().flush().unwrap(); + Ok(()) } - - Ok(()) }); - let write_handle: JoinHandle> = tokio::spawn(async move { - loop { - if let Some(buffer) = get_key().await? { - writer.send(buffer).await? + let writer_arc = Arc::new(tokio::sync::Mutex::new(writer)); + let write_handle: JoinHandle> = tokio::spawn({ + let writer_arc = Arc::clone(&writer_arc); + async move { + loop { + if let Some(buffer) = get_key().await? { + writer_arc.lock().await.send(buffer).await? + } } } }); - // TODO: Make this work. - // Currently both reader and writer are moved into their async closures, thus - // we lose access to them here. We can't provide the closures a reference since - // that reference will then need to have a static lifetime. - // self.stream = Some(reader.reunite(writer).unwrap().into_inner()); - - tokio::select! { + let result = tokio::select! { join_result = read_handle => { join_result? } join_result = write_handle => { join_result? } - } + }; + + // Arc::into_innter will always return Mutex because the previous select statement + // will always make sure all the closures are either finished or cancelled and the arc is cloned nowhere else. + // We can move out of the mutex for similar reasons, no one else will need this reader/writer. + let writer = Arc::into_inner(writer_arc).unwrap().into_inner(); + let reader = Arc::into_inner(reader_arc).unwrap().into_inner(); + self.stream = Some(reader.reunite(writer).unwrap().into_inner()); + + return result; } }