Skip to content

Commit

Permalink
Reunited writer and reader so run_terminal no longer destroys stream.
Browse files Browse the repository at this point in the history
  • Loading branch information
george-cosma committed Sep 30, 2023
1 parent ac33e1a commit 4fb5411
Showing 1 changed file with 39 additions and 30 deletions.
69 changes: 39 additions & 30 deletions src/interfaces/serial/virtual_terminal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use console::Term;
use futures::stream::StreamExt;
use futures::SinkExt;
use std::io::Write;
use std::sync::Arc;
use std::{io, str};
use tokio_util::codec::{Decoder, Encoder};

Expand All @@ -17,9 +18,6 @@ struct TerminalCodec;
#[async_trait]
impl VirtualTerminal for SerialInterface {
// Run the virtual terminal to interact with the tock console.
// Warning!
// Before returning, the connection is closed. You must re-open the connection for any
// further operations.
async fn run_terminal(&mut self) -> Result<(), TockloaderError> {
if self.stream.is_none() {
// Note: I'm using panic here because "unreachable!" doesn't feel appropriate
Expand All @@ -29,47 +27,58 @@ impl VirtualTerminal for SerialInterface {
panic!("Stream is not initialized!")
}

let (mut writer, mut reader) = TerminalCodec.framed(self.stream.take().unwrap()).split();

let read_handle: JoinHandle<Result<(), TockloaderError>> = tokio::spawn(async move {
// Q: I don't get why the decoder returns Result<Option<String>, ...> but
// line_result is actually Result<String, ...>.
// A: Because the decoded uses Ok(None) as an indicator that it needs to wait for
// more bytes, where we will always have a result (even if it happens to be an
// empty string).
// TODO: What does it mean if .next() return None?
while let Some(line_result) = reader.next().await {
print!("{}", line_result?);
let (writer, reader) = TerminalCodec.framed(self.stream.take().unwrap()).split();

let reader_arc = Arc::new(tokio::sync::Mutex::new(reader));
let read_handle: JoinHandle<Result<(), TockloaderError>> = tokio::spawn({
let reader_arc = Arc::clone(&reader_arc);
async move {
// Q: I don't get why the decoder returns Result<Option<String>, ...> but
// line_result is actually Result<String, ...>.
// A: Because the decoded uses Ok(None) as an indicator that it needs to wait for
// more bytes, where we will always have a result (even if it happens to be an
// empty string).
// TODO: What does it mean if .next() return None?
while let Some(line_result) = reader_arc.lock().await.next().await {
print!("{}", line_result?);

// We need to flush the buffer because the "tock$" prompt does not have a newline.
io::stdout().flush().unwrap();
}

// We need to flush the buffer because the "tock$" prompt does not have a newline.
io::stdout().flush().unwrap();
Ok(())
}

Ok(())
});

let write_handle: JoinHandle<Result<(), TockloaderError>> = tokio::spawn(async move {
loop {
if let Some(buffer) = get_key().await? {
writer.send(buffer).await?
let writer_arc = Arc::new(tokio::sync::Mutex::new(writer));
let write_handle: JoinHandle<Result<(), TockloaderError>> = tokio::spawn({
let writer_arc = Arc::clone(&writer_arc);
async move {
loop {
if let Some(buffer) = get_key().await? {
writer_arc.lock().await.send(buffer).await?
}
}
}
});

// TODO: Make this work.
// Currently both reader and writer are moved into their async closures, thus
// we lose access to them here. We can't provide the closures a reference since
// that reference will then need to have a static lifetime.
// self.stream = Some(reader.reunite(writer).unwrap().into_inner());

tokio::select! {
let result = tokio::select! {
join_result = read_handle => {
join_result?
}
join_result = write_handle => {
join_result?
}
}
};

// Arc::into_innter will always return Mutex<SplitSink/SplitStream> because the previous select statement
// will always make sure all the closures are either finished or cancelled and the arc is cloned nowhere else.
// We can move out of the mutex for similar reasons, no one else will need this reader/writer.
let writer = Arc::into_inner(writer_arc).unwrap().into_inner();
let reader = Arc::into_inner(reader_arc).unwrap().into_inner();
self.stream = Some(reader.reunite(writer).unwrap().into_inner());

return result;
}
}

Expand Down

0 comments on commit 4fb5411

Please sign in to comment.