Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client-daemon communication via gRPC #62

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: Swatinem/rust-cache@v1
- name: Install dependencies with apt
run: |
sudo apt update
sudo apt install -y protobuf-compiler libprotobuf-dev

- name: Build
run: |
cd ./daemon
Expand Down Expand Up @@ -59,6 +64,11 @@ jobs:
steps:
- uses: actions/checkout@v3
- uses: Swatinem/rust-cache@v1
- name: Install dependencies with apt
run: |
sudo apt update
sudo apt install -y protobuf-compiler libprotobuf-dev

- name: Build
run: |
cd ./client
Expand Down
9 changes: 8 additions & 1 deletion client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@ structopt = "0.3.26"
anyhow = "1"
rmp-serde = "1"
chrono = "0.4"
crossterm = "0.24"
crossterm = { version = "0.24", features = ["event-stream"] }
bunt = "0.2.6"
mio = "0.8.4"
dirs = "4.0.0"
tokio = { version = "1.20.1", features = ["rt-multi-thread", "macros"] }
tokio-stream = "0.1"
tonic = "0.8.2"
prost = "0.11"

[build-dependencies]
tonic-build = "0.8"

[dependencies.common]
path = "../common"
5 changes: 5 additions & 0 deletions client/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("../proto/client_daemon.proto")?;
println!("cargo:rerun-if-changed=migrations");
Ok(())
}
219 changes: 111 additions & 108 deletions client/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
use anyhow::{bail, Context, Result};
use args::Opts;
use common::client_daemon::{ClientToDaemonMsg, DaemonToClientMsg};
use client_daemon::{
client_daemon_client::ClientDaemonClient,
event::Event::{ChatRequest, Message, MessageDelete},
SendMessageRequest,
};
use crossterm::event::EventStream;
use crossterm::{
cursor,
event::{self, Event, KeyCode, KeyEvent, KeyModifiers},
event::{Event, KeyCode, KeyEvent, KeyModifiers},
terminal::{self, ClearType},
tty::IsTty,
ExecutableCommand, QueueableCommand,
};
use mio::{unix::SourceFd, Events, Interest, Poll, Token};
use std::{
fs::File,
io::{self, Read, StdoutLock, Write},
io::{self, StdoutLock, Write},
ops::ControlFlow,
os::unix::prelude::AsRawFd,
time::Duration,
};
use tokio_stream::StreamExt;
use tonic::transport::Channel;

pub mod client_daemon {
tonic::include_proto!("clientdaemon");
}

mod args;

Expand All @@ -27,107 +34,119 @@ struct State<'a> {
/// the current text in the input field
input: String,

/// a handle to the `~/.rclc/dtocbuf` fifo in read-only mode
dtocbuf: File,
daemon: ClientDaemonClient<Channel>,

/// a handle to the `~/.rclc/ctodbuf` fifo in write-only mode
ctodbuf: File,
event_stream: EventStream,
}

impl<'a> State<'a> {
fn new() -> Result<Self> {
async fn new() -> Result<State<'a>> {
let mut stdout = io::stdout().lock();
if !stdout.is_tty() {
bail!("stdout is not a tty");
}

terminal::enable_raw_mode().context("couldn't enable raw mode")?;

let home_dir = dirs::home_dir().context("couldn't get home directory")?;

print!("waiting for daemon connection...");
// FIXME: wait for address to actually become available before connecting
let daemon = ClientDaemonClient::connect("http://0.0.0.0:5768").await?;
stdout.flush()?;
// aquire a write handle on the ctod fifo
// this blocks the client until the daemon opens the file to read
let ctodbuf = File::options()
.write(true)
.open(home_dir.join(".rclc/ctodbuf"))
.context("couldn't open client->daemon fifo")?;

// clear waiting message
stdout
.queue(terminal::Clear(ClearType::CurrentLine))?
.execute(cursor::MoveToColumn(0))?;

print!("waiting for daemon response...");
stdout.flush()?;
// aquire a read handle on the dtoc file
// this blocks until the daemon opens the file to write
let dtocbuf = File::open(home_dir.join(".rclc/dtocbuf"))
.context("couldn't open daemon->client fifo")?;

// clear waiting message
stdout
.queue(terminal::Clear(ClearType::CurrentLine))?
.execute(cursor::MoveToColumn(0))?;

Ok(Self {
stdout,
input: String::new(),
dtocbuf,
ctodbuf,
daemon,
event_stream: EventStream::new(),
})
}

/// start the event loop
fn start(&mut self) -> Result<()> {
// a mio poll lets you monitor for readiness events from multiple sources.
let mut poll = Poll::new().context("failed to start mio poll")?;
let mut events = Events::with_capacity(1024);

// register the dtoc fifo to notify the poll whenever it is readable (whenever a new
// message from the daemon is available to read).
poll.registry()
.register(
&mut SourceFd(&self.dtocbuf.as_raw_fd()),
Token(0),
Interest::READABLE,
)
.context("could not register daemon->client fifo with mio poll")?;

// register stdin to notify the poll whenever it is readalbe (whenever the user presses
// a key).
poll.registry()
.register(
&mut SourceFd(&io::stdin().as_raw_fd()),
Token(1),
Interest::READABLE,
)
.context("could not register stdin with mio poll")?;
async fn start(&mut self) -> Result<()> {
let mut stream = self.daemon.subscribe_to_events(()).await?.into_inner();
let (dtx, mut drx) = tokio::sync::mpsc::channel(100);
tokio::spawn(async move {
while let Some(maybe_event) = stream.next().await {
match maybe_event {
Ok(event) => dtx.send(event).await.unwrap(),
Err(e) => {
eprintln!("error: {}", e);
break;
}
}
}
});

print!("{}", PROMPT);
self.stdout.flush()?;

let ui_state = self.daemon.get_ui_state(()).await?.into_inner();
self.print_upwards(|| {
bunt::println!(
"{$green}[+] OK{/$} {$yellow}Got initial UI state from daemon:{/$} {:?}",
ui_state
);
Ok(())
})?;

'evt_loop: loop {
// block until the next poll event happens
poll.poll(&mut events, None)
.context("failed to poll mio poll")?;

for event in &events {
match event.token() {
Token(0) => self.handle_fifo_msg()?,
Token(1) => match self.handle_term_evt()? {
ControlFlow::Continue(()) => (),
ControlFlow::Break(()) => break 'evt_loop,
},
_ => unreachable!(),
tokio::select! {
event = self.event_stream.next() => {
match event {
Some(rst) => {
match rst {
Ok(evt) => match self.handle_term_evt(evt).await? {
ControlFlow::Continue(()) => (),
ControlFlow::Break(()) => break 'evt_loop,
},
Err(term_e) => panic!("terminal error: {}", term_e),
}
}
None => eprintln!("Blank terminal event")
}
}
opt = drx.recv() => match opt {
Some(msg) => match msg.event {
Some(evt) => match evt {
Message(mevt) => {
self.print_upwards(|| {
bunt::print!("{$green}[+] OK {/$}{$yellow}Message event:{/$} {:?}\n\r", mevt);
Ok(())
})?;
},
MessageDelete(mdevt) => {
self.print_upwards(|| {
bunt::print!("{$green}[+] OK {/$}{$yellow}Message delete event:{/$} {:?}\n\r", mdevt);
Ok(())
})?;
},
ChatRequest(crevt) => {
self.print_upwards(|| {
bunt::print!("{$green}[+] OK {/$}{$yellow}Chat request event:{/$} {:?}\n\r", crevt);
Ok(())
})?;
}
}
None => self.print_upwards(|| {
bunt::eprintln!("{$red}[-] XX Blank message from daemon{/$}");
Ok(())
})?
}
None => self.print_upwards(|| {
bunt::eprintln!("{$red}[-] XX Connection to daemon closed{/$}");
Ok(())
})?
}
}

// temporary fix for initial buffering problem
// XXX: hopefully this code change fixes this issue anyways
/*// temporary fix for initial buffering problem
while event::poll(Duration::ZERO)? {
self.handle_term_evt()?;
}
}*/

self.stdout.flush()?;
}
Expand All @@ -149,36 +168,14 @@ impl<'a> State<'a> {
Ok(())
}

fn send_fifo_msg(&mut self, msg: &ClientToDaemonMsg) -> Result<()> {
rmp_serde::encode::write(&mut self.ctodbuf, &msg)
.with_context(|| format!("couldn't write message {msg:?} to fifo"))
}

fn handle_fifo_msg(&mut self) -> Result<()> {
let mut data = Vec::new();
self.dtocbuf
.read_to_end(&mut data)
.context("couldn't read from fifo")?;
match rmp_serde::from_slice::<DaemonToClientMsg>(&data) {
Ok(msg) => print!("got fifo msg: {msg:?}\n\r"),
Err(e) => self.print_upwards(|| {
bunt::print!("{$red}invalid message from daemon: {}{/$}\n\r", e);
Ok(())
})?,
}

Ok(())
}

fn handle_term_evt(&mut self) -> Result<ControlFlow<()>> {
let event = event::read().context("couldn't read next terminal event")?;
async fn handle_term_evt(&mut self, event: Event) -> Result<ControlFlow<()>> {
match event {
Event::Key(kev) => self.handle_kev(kev),
Event::Key(kev) => self.handle_kev(kev).await,
_ => Ok(ControlFlow::Continue(())),
}
}

fn handle_kev(&mut self, kev: KeyEvent) -> Result<ControlFlow<()>> {
async fn handle_kev(&mut self, kev: KeyEvent) -> Result<ControlFlow<()>> {
match kev.code {
KeyCode::Char('c') if kev.modifiers == KeyModifiers::CONTROL => {
// notify the event loop to break
Expand All @@ -195,7 +192,12 @@ impl<'a> State<'a> {
return Ok(ControlFlow::Continue(()));
}

self.send_fifo_msg(&ClientToDaemonMsg::Send(self.input.clone()))?;
self.daemon
.send_message(SendMessageRequest {
recipient: 0,
content: self.input.clone(),
})
.await?;
self.input.clear();
self.stdout
.queue(terminal::Clear(ClearType::CurrentLine))?
Expand All @@ -213,9 +215,9 @@ impl<'a> State<'a> {
}
}

fn chat() -> Result<()> {
let mut state = State::new()?;
state.start()?;
async fn chat() -> Result<()> {
let mut state = State::new().await?;
state.start().await?;

Ok(())
}
Expand All @@ -235,15 +237,16 @@ fn cleanup() {
}
}

fn go() -> Result<()> {
async fn go() -> Result<()> {
match Opts::get() {
Opts::Chat => chat(),
Opts::Chat => chat().await,
}
}

fn main() {
if let Err(e) = go() {
#[tokio::main]
async fn main() {
if let Err(e) = go().await {
cleanup();
println!("rclc client error: {e:?}");
}
};
}
8 changes: 7 additions & 1 deletion daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ notify-rust = "4"
reqwest = { version = "0.11.11", features = ["json"] }
common = { path = "../common", features = ["rocket"] }
anyhow = "1.0.61"
tokio = "1.20.1"
tokio = { version = "1.20.1", features = ["rt-multi-thread", "macros"] }
tokio-stream = { version = "0.1", features = ["sync"] }
log = "0.4.14"
env_logger = "0.9.0"
serde_json = "1.0"
tonic = "0.8.2"
prost = "0.11"

[build-dependencies]
tonic-build = "0.8"
5 changes: 5 additions & 0 deletions daemon/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("../proto/client_daemon.proto")?;
println!("cargo:rerun-if-changed=migrations");
Ok(())
}
Loading