From 98637989fe24a18469358e9a0b68569b5c9dcc75 Mon Sep 17 00:00:00 2001 From: Sophon96 <71684640+Sophon96@users.noreply.github.com> Date: Fri, 11 Nov 2022 20:37:11 +0100 Subject: [PATCH 1/8] feat(grpc): stub out basic client-to-daemon server Signed-off-by: Sophon96 <71684640+Sophon96@users.noreply.github.com> --- daemon/Cargo.toml | 8 ++- daemon/build.rs | 5 ++ daemon/src/client_server.rs | 72 +++++++++++++++++++++++++ daemon/src/main.rs | 14 ++++- proto/client_daemon.proto | 104 ++++++++++++++++++++++++++++++++++++ 5 files changed, 201 insertions(+), 2 deletions(-) create mode 100644 daemon/build.rs create mode 100644 daemon/src/client_server.rs create mode 100644 proto/client_daemon.proto diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index 85a2b54..c5c62d4 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -10,7 +10,13 @@ notify-rust = "4" reqwest = { version = "0.11.11", features = ["json"] } common = { path = "../common", features = ["rocket"] } anyhow = "1.0.61" -tokio = "1.20.1" +tokio = { version = "1.20.1", features = ["rt-multi-thread", "macros"] } +tokio-stream = "0.1" log = "0.4.14" env_logger = "0.9.0" serde_json = "1.0" +tonic = "0.8.2" +prost = "0.11" + +[build-dependencies] +tonic-build = "0.8" diff --git a/daemon/build.rs b/daemon/build.rs new file mode 100644 index 0000000..fb91c2d --- /dev/null +++ b/daemon/build.rs @@ -0,0 +1,5 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("../proto/client_daemon.proto")?; + println!("cargo:rerun-if-changed=migrations"); + Ok(()) +} diff --git a/daemon/src/client_server.rs b/daemon/src/client_server.rs new file mode 100644 index 0000000..ede1ee0 --- /dev/null +++ b/daemon/src/client_server.rs @@ -0,0 +1,72 @@ +use client_daemon::client_daemon_server::ClientDaemon; +use client_daemon::{ + ChatHistoryRequest, ChatHistoryResponse, DeleteMessageRequest, DeleteMessageResponse, + EditMessageRequest, EditMessageResponse, Event, SendMessageRequest, SendMessageResponse, +}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{Request, Response, Status}; + +pub mod client_daemon { + tonic::include_proto!("clientdaemon"); +} + +#[derive(Debug)] +// FIXME: name this something better +pub struct ClientDaemonService {} + +#[tonic::async_trait] +impl ClientDaemon for ClientDaemonService { + async fn get_chat_history( + &self, + request: Request, + ) -> Result, Status> { + println!("Got chat history request: {:?}", request); + + // TODO: Implement returning chat history with database, etc. + return Ok(Response::new(ChatHistoryResponse::default())); + } + + type SubscribeToEventsStream = ReceiverStream>; + + async fn subscribe_to_events( + &self, + request: Request<()>, + ) -> Result, Status> { + println!("Received subscribe to events request: {:?}", request); + // FIXME: Implement this + let (_tx, rx) = tokio::sync::mpsc::channel(4); + + // TODO: send events as they come + return Ok(Response::new(ReceiverStream::new(rx))); + } + + async fn send_message( + &self, + request: Request, + ) -> Result, Status> { + println!("Received send message request: {:?}", request); + + // TODO: implement sending messages + return Ok(Response::new(SendMessageResponse::default())); + } + + async fn edit_message( + &self, + request: Request, + ) -> Result, Status> { + println!("Received edit message request: {:?}", request); + + // TODO: implement editing messages + return Ok(Response::new(EditMessageResponse::default())); + } + + async fn delete_message( + &self, + request: Request, + ) -> Result, Status> { + println!("Received delete message request: {:?}", request); + + // TODO: implement deleting messages + return Ok(Response::new(DeleteMessageResponse::default())); + } +} diff --git a/daemon/src/main.rs b/daemon/src/main.rs index c351e7f..dc25fc3 100644 --- a/daemon/src/main.rs +++ b/daemon/src/main.rs @@ -1,15 +1,18 @@ use reqwest; +use client_server::client_daemon::client_daemon_server::ClientDaemonServer; use common::structures::{DiscoveryRequest, DiscoveryResponse, InfoResponse}; use discovery::{discover, discover_info, discover_root, DiscoveryServerConfig}; use std::net::{IpAddr, Ipv4Addr}; +use tonic::transport::Server; +pub mod client_server; pub mod contact; pub mod discovery; pub mod notif; #[tokio::main] -async fn main() { +async fn main() -> Result<(), Box> { env_logger::init(); println!("Hello, world!"); @@ -68,4 +71,13 @@ async fn main() { } Err(_) => eprintln!("Could not connect to server. Possible it does not exist yet."), } + + // tonic gRPC server for the communication with the client + let addr = "0.0.0.0:5768".parse()?; + let cd_svc = client_server::ClientDaemonService {}; + let cd_srv = ClientDaemonServer::new(cd_svc); + println!("Client gRPC server on {}", addr); + Server::builder().add_service(cd_srv).serve(addr).await?; + + Ok(()) } diff --git a/proto/client_daemon.proto b/proto/client_daemon.proto new file mode 100644 index 0000000..4350551 --- /dev/null +++ b/proto/client_daemon.proto @@ -0,0 +1,104 @@ +syntax = "proto3"; +package clientdaemon; + +import "google/protobuf/empty.proto"; + +// Service for talking between the client and the daemon +// TODO: Come up with a better name +service ClientDaemon { + // Get the chat history with some user + rpc GetChatHistory(ChatHistoryRequest) returns (ChatHistoryResponse); + + // Subscribe to events emitted by the daemon + rpc SubscribeToEvents(google.protobuf.Empty) returns (stream Event); + + // Send a message + rpc SendMessage(SendMessageRequest) returns (SendMessageResponse); + // Edit a message + rpc EditMessage(EditMessageRequest) returns (EditMessageResponse); + // Delete a message + rpc DeleteMessage(DeleteMessageRequest) returns (DeleteMessageResponse); +} + +// request to get chat history +message ChatHistoryRequest { + // user to get chat history with + string username = 1; + // id of last message received + string last_id = 2; +} + +// chat history with a user +message ChatHistoryResponse { + // all of the messages + repeated Message messages = 1; +} + + +// An event emitted by the daemon +message Event { + // New or edited message + message MessageEvent { + // edited message + bool edit = 1; + // who sent it + string sender = 2; + // the message + Message message = 3; + } + + // Deleted message + message MessageDeleteEvent { + // who + string sender = 1; + // message id + string id = 2; + } + + // Other user request to talk + message ChatRequestEvent { + // user name of the user + string username = 1; + } + + oneof event { + MessageEvent message = 1; + MessageDeleteEvent message_delete = 2; + ChatRequestEvent chat_request = 3; + } +} + + +message SendMessageRequest { + string recipient = 1; + Message message = 2; +} + +// Currently blank, just here for the future +message SendMessageResponse {} + + +message EditMessageRequest { + string recipient = 1; + Message message = 2; +} + +// Currently blank, just here for the future +message EditMessageResponse {} + +message DeleteMessageRequest { + string recipient = 1; + Message message = 2; +} + +// Currently blank, just here for the future +message DeleteMessageResponse {} + +message Message { + string id = 1; + uint64 timestamp = 2; + string user = 3; + string content = 4; + // TODO: figure out how to do attachments + // repeated bytes attachments = 5; +} From 1a7d2000c6dbaca9eb210062eadaa1036b95a07b Mon Sep 17 00:00:00 2001 From: Brandon Qi <71684640+Sophon96@users.noreply.github.com> Date: Fri, 11 Nov 2022 12:34:22 -0800 Subject: [PATCH 2/8] fix(grpc): install protoc before building daemon --- .github/workflows/rust.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 21eb85b..17c5c57 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -16,6 +16,11 @@ jobs: steps: - uses: actions/checkout@v3 - uses: Swatinem/rust-cache@v1 + - name: Install dependencies with apt + run: | + sudo apt update + sudo apt install -y protobuf-compiler libprotobuf-dev + - name: Build run: | cd ./daemon From af1edae6d408c99342be90748b1b20a6ddb9c973 Mon Sep 17 00:00:00 2001 From: Brandon Qi <71684640+Sophon96@users.noreply.github.com> Date: Fri, 11 Nov 2022 13:00:49 -0800 Subject: [PATCH 3/8] fix(actions): try to fix rust caching --- .github/workflows/rust.yml | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 17c5c57..a74adf5 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -15,7 +15,14 @@ jobs: steps: - uses: actions/checkout@v3 - - uses: Swatinem/rust-cache@v1 + + - name: Install rust + run: | + rustup toolchain install stable --profile minimal + rustup component add rustfmt + + - uses: Swatinem/rust-cache@v2 + - name: Install dependencies with apt run: | sudo apt update @@ -63,7 +70,14 @@ jobs: steps: - uses: actions/checkout@v3 - - uses: Swatinem/rust-cache@v1 + + - name: Install rust + run: | + rustup toolchain install stable --profile minimal + rustup component add rustfmt + + - uses: Swatinem/rust-cache@v2 + - name: Build run: | cd ./client @@ -84,7 +98,7 @@ jobs: steps: - uses: actions/checkout@v3 - - uses: Swatinem/rust-cache@v1 + - uses: Swatinem/rust-cache@v2 - name: Build run: | cd ./common From d21d9a6e9cd4e1761d4d4936b5fcae858c08ca55 Mon Sep 17 00:00:00 2001 From: Sophon96 <71684640+Sophon96@users.noreply.github.com> Date: Fri, 11 Nov 2022 22:34:10 +0100 Subject: [PATCH 4/8] Revert "fix(actions): try to fix rust caching" This reverts commit af1edae6d408c99342be90748b1b20a6ddb9c973. --- .github/workflows/rust.yml | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index a74adf5..17c5c57 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -15,14 +15,7 @@ jobs: steps: - uses: actions/checkout@v3 - - - name: Install rust - run: | - rustup toolchain install stable --profile minimal - rustup component add rustfmt - - - uses: Swatinem/rust-cache@v2 - + - uses: Swatinem/rust-cache@v1 - name: Install dependencies with apt run: | sudo apt update @@ -70,14 +63,7 @@ jobs: steps: - uses: actions/checkout@v3 - - - name: Install rust - run: | - rustup toolchain install stable --profile minimal - rustup component add rustfmt - - - uses: Swatinem/rust-cache@v2 - + - uses: Swatinem/rust-cache@v1 - name: Build run: | cd ./client @@ -98,7 +84,7 @@ jobs: steps: - uses: actions/checkout@v3 - - uses: Swatinem/rust-cache@v2 + - uses: Swatinem/rust-cache@v1 - name: Build run: | cd ./common From bbe5ef74bcfa82b5b8b36ba1d6c414245be7aeb9 Mon Sep 17 00:00:00 2001 From: Sophon96 <71684640+Sophon96@users.noreply.github.com> Date: Sat, 19 Nov 2022 06:16:25 +0100 Subject: [PATCH 5/8] feat(grpc): implement basic grpc client in client and test event stream in daemon Signed-off-by: Sophon96 <71684640+Sophon96@users.noreply.github.com> --- client/Cargo.toml | 9 +- client/build.rs | 5 ++ client/src/main.rs | 163 +++++++++++++++++++++++++++--------- daemon/Cargo.toml | 3 +- daemon/src/client_server.rs | 29 +++++-- daemon/src/main.rs | 30 ++++++- proto/client_daemon.proto | 42 +++++++--- 7 files changed, 218 insertions(+), 63 deletions(-) create mode 100644 client/build.rs diff --git a/client/Cargo.toml b/client/Cargo.toml index 30ed826..2d5bf18 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -10,10 +10,17 @@ structopt = "0.3.26" anyhow = "1" rmp-serde = "1" chrono = "0.4" -crossterm = "0.24" +crossterm = { version = "0.24", features = ["event-stream"] } bunt = "0.2.6" mio = "0.8.4" dirs = "4.0.0" +tokio = { version = "1.20.1", features = ["rt-multi-thread", "macros"] } +tokio-stream = "0.1" +tonic = "0.8.2" +prost = "0.11" + +[build-dependencies] +tonic-build = "0.8" [dependencies.common] path = "../common" diff --git a/client/build.rs b/client/build.rs new file mode 100644 index 0000000..fb91c2d --- /dev/null +++ b/client/build.rs @@ -0,0 +1,5 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("../proto/client_daemon.proto")?; + println!("cargo:rerun-if-changed=migrations"); + Ok(()) +} diff --git a/client/src/main.rs b/client/src/main.rs index ca23c30..34b2370 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -1,21 +1,29 @@ use anyhow::{bail, Context, Result}; use args::Opts; -use common::client_daemon::{ClientToDaemonMsg, DaemonToClientMsg}; +use client_daemon::{ + client_daemon_client::ClientDaemonClient, + event::Event::{ChatRequest, Message, MessageDelete}, + SendMessageRequest, +}; +//use common::client_daemon::{ClientToDaemonMsg, DaemonToClientMsg}; +use crossterm::event::EventStream; use crossterm::{ cursor, - event::{self, Event, KeyCode, KeyEvent, KeyModifiers}, + event::{Event, KeyCode, KeyEvent, KeyModifiers}, terminal::{self, ClearType}, tty::IsTty, ExecutableCommand, QueueableCommand, }; -use mio::{unix::SourceFd, Events, Interest, Poll, Token}; use std::{ - fs::File, - io::{self, Read, StdoutLock, Write}, + io::{self, StdoutLock, Write}, ops::ControlFlow, - os::unix::prelude::AsRawFd, - time::Duration, }; +use tokio_stream::StreamExt; +use tonic::transport::Channel; + +pub mod client_daemon { + tonic::include_proto!("clientdaemon"); +} mod args; @@ -27,15 +35,18 @@ struct State<'a> { /// the current text in the input field input: String, - /// a handle to the `~/.rclc/dtocbuf` fifo in read-only mode + daemon: ClientDaemonClient, + + event_stream: EventStream, + /*/// a handle to the `~/.rclc/dtocbuf` fifo in read-only mode dtocbuf: File, /// a handle to the `~/.rclc/ctodbuf` fifo in write-only mode - ctodbuf: File, + ctodbuf: File,*/ } impl<'a> State<'a> { - fn new() -> Result { + async fn new() -> Result> { let mut stdout = io::stdout().lock(); if !stdout.is_tty() { bail!("stdout is not a tty"); @@ -43,11 +54,11 @@ impl<'a> State<'a> { terminal::enable_raw_mode().context("couldn't enable raw mode")?; - let home_dir = dirs::home_dir().context("couldn't get home directory")?; + //let home_dir = dirs::home_dir().context("couldn't get home directory")?; print!("waiting for daemon connection..."); stdout.flush()?; - // aquire a write handle on the ctod fifo + /*// aquire a write handle on the ctod fifo // this blocks the client until the daemon opens the file to read let ctodbuf = File::options() .write(true) @@ -64,28 +75,30 @@ impl<'a> State<'a> { // aquire a read handle on the dtoc file // this blocks until the daemon opens the file to write let dtocbuf = File::open(home_dir.join(".rclc/dtocbuf")) - .context("couldn't open daemon->client fifo")?; + .context("couldn't open daemon->client fifo")?;*/ // clear waiting message stdout .queue(terminal::Clear(ClearType::CurrentLine))? .execute(cursor::MoveToColumn(0))?; + let client = ClientDaemonClient::connect("http://0.0.0.0:5768").await?; + Ok(Self { stdout, input: String::new(), - dtocbuf, - ctodbuf, + daemon: client, + event_stream: EventStream::new(), }) } /// start the event loop - fn start(&mut self) -> Result<()> { + async fn start(&mut self) -> Result<()> { // a mio poll lets you monitor for readiness events from multiple sources. - let mut poll = Poll::new().context("failed to start mio poll")?; - let mut events = Events::with_capacity(1024); + /*let poll = Poll::new().context("failed to start mio poll")?; + let events = Events::with_capacity(1024);*/ - // register the dtoc fifo to notify the poll whenever it is readable (whenever a new + /*// register the dtoc fifo to notify the poll whenever it is readable (whenever a new // message from the daemon is available to read). poll.registry() .register( @@ -93,23 +106,85 @@ impl<'a> State<'a> { Token(0), Interest::READABLE, ) - .context("could not register daemon->client fifo with mio poll")?; + .context("could not register daemon->client fifo with mio poll")?;*/ // register stdin to notify the poll whenever it is readalbe (whenever the user presses // a key). - poll.registry() + /*poll.registry() .register( &mut SourceFd(&io::stdin().as_raw_fd()), Token(1), Interest::READABLE, ) - .context("could not register stdin with mio poll")?; + .context("could not register stdin with mio poll")?;*/ + + let mut stream = self.daemon.subscribe_to_events(()).await?.into_inner(); + let (dtx, mut drx) = tokio::sync::mpsc::channel(100); + tokio::spawn(async move { + while let Some(maybe_event) = stream.next().await { + match maybe_event { + Ok(event) => dtx.send(event).await.unwrap(), + Err(e) => { + eprintln!("error: {}", e); + break; + } + } + } + }); print!("{}", PROMPT); self.stdout.flush()?; + let ui_state = self.daemon.get_ui_state(()).await?.into_inner(); + self.print_upwards(|| { + bunt::println!("{$green}[+] OK{/$} {$yellow}Got initial UI state from daemon:{/$} {:?}", ui_state); + Ok(()) + })?; + 'evt_loop: loop { - // block until the next poll event happens + tokio::select! { + event = self.event_stream.next() => { + match event { + Some(rst) => { + match rst { + Ok(evt) => match self.handle_term_evt(evt).await? { + ControlFlow::Continue(()) => (), + ControlFlow::Break(()) => break 'evt_loop, + }, + Err(poo) => panic!("poo occurred: {poo}"), + } + } + None => panic!("I can't program") + } + } + opt = drx.recv() => match opt { + Some(msg) => match msg.event { + Some(evt) => match evt { + Message(mevt) => { + bunt::print!("{$green}[+] OK {/$}{$yellow}Message event:{/$} {:?}\n\raw", mevt); + //print!("[+ok] MESSAGE event: {:?}", mevt); + }, + MessageDelete(mdevt) => { + bunt::print!("{$green}[+] OK {/$}{$yellow}Message delete event:{/$} {:?}\n\r", mdevt); + //println!("DELETED event: {:?}", mdevt); + }, + ChatRequest(crevt) => { + bunt::print!("{$green}[+] OK {/$}{$yellow}Scammer Likely:{/$} {:?}\n\r", crevt); + //println!("Scammer Likely: {:?}", crevt); + } + } + None => self.print_upwards(|| { + bunt::eprintln!("{$red}[-] XX Blank message from daemon{/$}"); + Ok(()) + })? + } + None => self.print_upwards(|| { + bunt::eprintln!("{$red}[-] XX Connection to daemon closed{/$}"); + Ok(()) + })? + } + } + /*// block until the next poll event happens poll.poll(&mut events, None) .context("failed to poll mio poll")?; @@ -122,12 +197,13 @@ impl<'a> State<'a> { }, _ => unreachable!(), } - } + }*/ - // temporary fix for initial buffering problem + // XXX: hopefully this code change fixes this issue anyways + /*// temporary fix for initial buffering problem while event::poll(Duration::ZERO)? { self.handle_term_evt()?; - } + }*/ self.stdout.flush()?; } @@ -149,7 +225,7 @@ impl<'a> State<'a> { Ok(()) } - fn send_fifo_msg(&mut self, msg: &ClientToDaemonMsg) -> Result<()> { + /*fn send_fifo_msg(&mut self, msg: &ClientToDaemonMsg) -> Result<()> { rmp_serde::encode::write(&mut self.ctodbuf, &msg) .with_context(|| format!("couldn't write message {msg:?} to fifo")) } @@ -168,17 +244,17 @@ impl<'a> State<'a> { } Ok(()) - } + }*/ - fn handle_term_evt(&mut self) -> Result> { - let event = event::read().context("couldn't read next terminal event")?; + async fn handle_term_evt(&mut self, event: Event) -> Result> { + //let event = event::read().context("couldn't read next terminal event")?; match event { - Event::Key(kev) => self.handle_kev(kev), + Event::Key(kev) => self.handle_kev(kev).await, _ => Ok(ControlFlow::Continue(())), } } - fn handle_kev(&mut self, kev: KeyEvent) -> Result> { + async fn handle_kev(&mut self, kev: KeyEvent) -> Result> { match kev.code { KeyCode::Char('c') if kev.modifiers == KeyModifiers::CONTROL => { // notify the event loop to break @@ -195,7 +271,11 @@ impl<'a> State<'a> { return Ok(ControlFlow::Continue(())); } - self.send_fifo_msg(&ClientToDaemonMsg::Send(self.input.clone()))?; + self.daemon.send_message(SendMessageRequest { + recipient: 0, + content: self.input.clone(), + }).await?; + //self.send_fifo_msg(&ClientToDaemonMsg::Send(self.input.clone()))?; self.input.clear(); self.stdout .queue(terminal::Clear(ClearType::CurrentLine))? @@ -213,9 +293,9 @@ impl<'a> State<'a> { } } -fn chat() -> Result<()> { - let mut state = State::new()?; - state.start()?; +async fn chat() -> Result<()> { + let mut state = State::new().await?; + state.start().await?; Ok(()) } @@ -235,15 +315,16 @@ fn cleanup() { } } -fn go() -> Result<()> { +async fn go() -> Result<()> { match Opts::get() { - Opts::Chat => chat(), + Opts::Chat => chat().await, } } -fn main() { - if let Err(e) = go() { +#[tokio::main] +async fn main() { + if let Err(e) = go().await { cleanup(); println!("rclc client error: {e:?}"); - } + }; } diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index c5c62d4..943e9eb 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -11,12 +11,13 @@ reqwest = { version = "0.11.11", features = ["json"] } common = { path = "../common", features = ["rocket"] } anyhow = "1.0.61" tokio = { version = "1.20.1", features = ["rt-multi-thread", "macros"] } -tokio-stream = "0.1" +tokio-stream = { version = "0.1", features = ["sync"] } log = "0.4.14" env_logger = "0.9.0" serde_json = "1.0" tonic = "0.8.2" prost = "0.11" +futures = "0.3.25" [build-dependencies] tonic-build = "0.8" diff --git a/daemon/src/client_server.rs b/daemon/src/client_server.rs index ede1ee0..9eff648 100644 --- a/daemon/src/client_server.rs +++ b/daemon/src/client_server.rs @@ -2,8 +2,11 @@ use client_daemon::client_daemon_server::ClientDaemon; use client_daemon::{ ChatHistoryRequest, ChatHistoryResponse, DeleteMessageRequest, DeleteMessageResponse, EditMessageRequest, EditMessageResponse, Event, SendMessageRequest, SendMessageResponse, + UiStateResponse, }; -use tokio_stream::wrappers::ReceiverStream; +use std::pin::Pin; +use tokio_stream::wrappers::BroadcastStream; +use tokio_stream::{Stream, StreamExt}; use tonic::{Request, Response, Status}; pub mod client_daemon { @@ -12,7 +15,9 @@ pub mod client_daemon { #[derive(Debug)] // FIXME: name this something better -pub struct ClientDaemonService {} +pub struct ClientDaemonService { + pub ev_stream: tokio::sync::broadcast::Sender>, +} #[tonic::async_trait] impl ClientDaemon for ClientDaemonService { @@ -26,7 +31,18 @@ impl ClientDaemon for ClientDaemonService { return Ok(Response::new(ChatHistoryResponse::default())); } - type SubscribeToEventsStream = ReceiverStream>; + async fn get_ui_state( + &self, + request: Request<()>, + ) -> Result, Status> { + println!("Got UI state request: {:?}", request); + + // TODO: implement + Ok(Response::new(UiStateResponse::default())) + } + + type SubscribeToEventsStream = + Pin> + Send + Sync + 'static>>; async fn subscribe_to_events( &self, @@ -34,10 +50,13 @@ impl ClientDaemon for ClientDaemonService { ) -> Result, Status> { println!("Received subscribe to events request: {:?}", request); // FIXME: Implement this - let (_tx, rx) = tokio::sync::mpsc::channel(4); + //let (_tx, rx) = tokio::sync::mpsc::channel(4); // TODO: send events as they come - return Ok(Response::new(ReceiverStream::new(rx))); + // FIXME: Someone needs to review this code, I just turned on Copilot and let it write this, including this comment + return Ok(Response::new(Box::pin( + BroadcastStream::new(self.ev_stream.subscribe()).filter_map(|x| x.ok()), + ))); } async fn send_message( diff --git a/daemon/src/main.rs b/daemon/src/main.rs index dc25fc3..6234d42 100644 --- a/daemon/src/main.rs +++ b/daemon/src/main.rs @@ -1,6 +1,8 @@ use reqwest; -use client_server::client_daemon::client_daemon_server::ClientDaemonServer; +use client_server::client_daemon::{ + client_daemon_server::ClientDaemonServer, event, Event, Message, +}; use common::structures::{DiscoveryRequest, DiscoveryResponse, InfoResponse}; use discovery::{discover, discover_info, discover_root, DiscoveryServerConfig}; use std::net::{IpAddr, Ipv4Addr}; @@ -16,7 +18,7 @@ async fn main() -> Result<(), Box> { env_logger::init(); println!("Hello, world!"); - notif::notif("RCLC", "The RCLC daemon has been launched!"); + //notif::notif("RCLC", "The RCLC daemon has been launched!"); let disc_conf = DiscoveryServerConfig { url: "http://127.0.0.1:8000".to_string(), @@ -74,7 +76,29 @@ async fn main() -> Result<(), Box> { // tonic gRPC server for the communication with the client let addr = "0.0.0.0:5768".parse()?; - let cd_svc = client_server::ClientDaemonService {}; + let (ctx, _crx) = tokio::sync::broadcast::channel(4); + let cd_svc = client_server::ClientDaemonService { + ev_stream: ctx.clone(), + }; + // FIXME: Remove this when event stream is implemented, this is only for testing + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(5)); + loop { + interval.tick().await; + ctx.send(Ok(Event { + event: Some(event::Event::Message(event::MessageEvent { + edit: false, + sender: 1, + message: Some(Message { + id: 1, + content: "Hello".to_string(), + user: 1, + timestamp: 1, + }), + })), + })).unwrap(); + } + }); let cd_srv = ClientDaemonServer::new(cd_svc); println!("Client gRPC server on {}", addr); Server::builder().add_service(cd_srv).serve(addr).await?; diff --git a/proto/client_daemon.proto b/proto/client_daemon.proto index 4350551..6b888b8 100644 --- a/proto/client_daemon.proto +++ b/proto/client_daemon.proto @@ -9,6 +9,9 @@ service ClientDaemon { // Get the chat history with some user rpc GetChatHistory(ChatHistoryRequest) returns (ChatHistoryResponse); + // Get the UI state on startup + rpc GetUIState(google.protobuf.Empty) returns (UIStateResponse); + // Subscribe to events emitted by the daemon rpc SubscribeToEvents(google.protobuf.Empty) returns (stream Event); @@ -23,9 +26,9 @@ service ClientDaemon { // request to get chat history message ChatHistoryRequest { // user to get chat history with - string username = 1; + int64 user_id = 1; // id of last message received - string last_id = 2; + int64 last_id = 2; } // chat history with a user @@ -35,6 +38,11 @@ message ChatHistoryResponse { } +message UIStateResponse { + repeated int64 conversations = 1; +} + + // An event emitted by the daemon message Event { // New or edited message @@ -42,7 +50,7 @@ message Event { // edited message bool edit = 1; // who sent it - string sender = 2; + int64 sender = 2; // the message Message message = 3; } @@ -50,7 +58,7 @@ message Event { // Deleted message message MessageDeleteEvent { // who - string sender = 1; + int64 sender = 1; // message id string id = 2; } @@ -59,6 +67,7 @@ message Event { message ChatRequestEvent { // user name of the user string username = 1; + string addr = 2; } oneof event { @@ -69,17 +78,25 @@ message Event { } +// Request to send a message message SendMessageRequest { - string recipient = 1; - Message message = 2; + // user id of recipient + int64 recipient = 1; + + // content of the message + string content = 2; } -// Currently blank, just here for the future -message SendMessageResponse {} +// Response to client after sending message +message SendMessageResponse { + // id assigned to the message + int64 id = 1; +} message EditMessageRequest { - string recipient = 1; + // user id of recipient + int64 recipient = 1; Message message = 2; } @@ -87,7 +104,8 @@ message EditMessageRequest { message EditMessageResponse {} message DeleteMessageRequest { - string recipient = 1; + // user id of recipient + int64 recipient = 1; Message message = 2; } @@ -95,9 +113,9 @@ message DeleteMessageRequest { message DeleteMessageResponse {} message Message { - string id = 1; + int64 id = 1; uint64 timestamp = 2; - string user = 3; + int64 user = 3; string content = 4; // TODO: figure out how to do attachments // repeated bytes attachments = 5; From 38b39b9b6d73eb74aca8a78c12a11ca96855fdb1 Mon Sep 17 00:00:00 2001 From: Sophon96 <71684640+Sophon96@users.noreply.github.com> Date: Sat, 19 Nov 2022 06:54:28 +0100 Subject: [PATCH 6/8] refactor: cleanup code Signed-off-by: Sophon96 <71684640+Sophon96@users.noreply.github.com> --- client/src/main.rs | 132 ++++++++---------------------------- daemon/Cargo.toml | 1 - daemon/src/client_server.rs | 2 - 3 files changed, 27 insertions(+), 108 deletions(-) diff --git a/client/src/main.rs b/client/src/main.rs index 34b2370..3c3d503 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -5,7 +5,6 @@ use client_daemon::{ event::Event::{ChatRequest, Message, MessageDelete}, SendMessageRequest, }; -//use common::client_daemon::{ClientToDaemonMsg, DaemonToClientMsg}; use crossterm::event::EventStream; use crossterm::{ cursor, @@ -38,11 +37,6 @@ struct State<'a> { daemon: ClientDaemonClient, event_stream: EventStream, - /*/// a handle to the `~/.rclc/dtocbuf` fifo in read-only mode - dtocbuf: File, - - /// a handle to the `~/.rclc/ctodbuf` fifo in write-only mode - ctodbuf: File,*/ } impl<'a> State<'a> { @@ -54,70 +48,24 @@ impl<'a> State<'a> { terminal::enable_raw_mode().context("couldn't enable raw mode")?; - //let home_dir = dirs::home_dir().context("couldn't get home directory")?; - print!("waiting for daemon connection..."); + // FIXME: wait for address to actually become available before connecting + let daemon = ClientDaemonClient::connect("http://0.0.0.0:5768").await?; stdout.flush()?; - /*// aquire a write handle on the ctod fifo - // this blocks the client until the daemon opens the file to read - let ctodbuf = File::options() - .write(true) - .open(home_dir.join(".rclc/ctodbuf")) - .context("couldn't open client->daemon fifo")?; - - // clear waiting message stdout .queue(terminal::Clear(ClearType::CurrentLine))? .execute(cursor::MoveToColumn(0))?; - print!("waiting for daemon response..."); - stdout.flush()?; - // aquire a read handle on the dtoc file - // this blocks until the daemon opens the file to write - let dtocbuf = File::open(home_dir.join(".rclc/dtocbuf")) - .context("couldn't open daemon->client fifo")?;*/ - - // clear waiting message - stdout - .queue(terminal::Clear(ClearType::CurrentLine))? - .execute(cursor::MoveToColumn(0))?; - - let client = ClientDaemonClient::connect("http://0.0.0.0:5768").await?; - Ok(Self { stdout, input: String::new(), - daemon: client, + daemon, event_stream: EventStream::new(), }) } /// start the event loop async fn start(&mut self) -> Result<()> { - // a mio poll lets you monitor for readiness events from multiple sources. - /*let poll = Poll::new().context("failed to start mio poll")?; - let events = Events::with_capacity(1024);*/ - - /*// register the dtoc fifo to notify the poll whenever it is readable (whenever a new - // message from the daemon is available to read). - poll.registry() - .register( - &mut SourceFd(&self.dtocbuf.as_raw_fd()), - Token(0), - Interest::READABLE, - ) - .context("could not register daemon->client fifo with mio poll")?;*/ - - // register stdin to notify the poll whenever it is readalbe (whenever the user presses - // a key). - /*poll.registry() - .register( - &mut SourceFd(&io::stdin().as_raw_fd()), - Token(1), - Interest::READABLE, - ) - .context("could not register stdin with mio poll")?;*/ - let mut stream = self.daemon.subscribe_to_events(()).await?.into_inner(); let (dtx, mut drx) = tokio::sync::mpsc::channel(100); tokio::spawn(async move { @@ -137,7 +85,10 @@ impl<'a> State<'a> { let ui_state = self.daemon.get_ui_state(()).await?.into_inner(); self.print_upwards(|| { - bunt::println!("{$green}[+] OK{/$} {$yellow}Got initial UI state from daemon:{/$} {:?}", ui_state); + bunt::println!( + "{$green}[+] OK{/$} {$yellow}Got initial UI state from daemon:{/$} {:?}", + ui_state + ); Ok(()) })?; @@ -151,26 +102,32 @@ impl<'a> State<'a> { ControlFlow::Continue(()) => (), ControlFlow::Break(()) => break 'evt_loop, }, - Err(poo) => panic!("poo occurred: {poo}"), + Err(term_e) => panic!("terminal error: {}", term_e), } } - None => panic!("I can't program") + None => eprintln!("Blank terminal event") } } opt = drx.recv() => match opt { Some(msg) => match msg.event { Some(evt) => match evt { Message(mevt) => { - bunt::print!("{$green}[+] OK {/$}{$yellow}Message event:{/$} {:?}\n\raw", mevt); - //print!("[+ok] MESSAGE event: {:?}", mevt); + self.print_upwards(|| { + bunt::print!("{$green}[+] OK {/$}{$yellow}Message event:{/$} {:?}\n\r", mevt); + Ok(()) + })?; }, MessageDelete(mdevt) => { - bunt::print!("{$green}[+] OK {/$}{$yellow}Message delete event:{/$} {:?}\n\r", mdevt); - //println!("DELETED event: {:?}", mdevt); + self.print_upwards(|| { + bunt::print!("{$green}[+] OK {/$}{$yellow}Message delete event:{/$} {:?}\n\r", mdevt); + Ok(()) + })?; }, ChatRequest(crevt) => { - bunt::print!("{$green}[+] OK {/$}{$yellow}Scammer Likely:{/$} {:?}\n\r", crevt); - //println!("Scammer Likely: {:?}", crevt); + self.print_upwards(|| { + bunt::print!("{$green}[+] OK {/$}{$yellow}Chat request event:{/$} {:?}\n\r", crevt); + Ok(()) + })?; } } None => self.print_upwards(|| { @@ -184,20 +141,6 @@ impl<'a> State<'a> { })? } } - /*// block until the next poll event happens - poll.poll(&mut events, None) - .context("failed to poll mio poll")?; - - for event in &events { - match event.token() { - Token(0) => self.handle_fifo_msg()?, - Token(1) => match self.handle_term_evt()? { - ControlFlow::Continue(()) => (), - ControlFlow::Break(()) => break 'evt_loop, - }, - _ => unreachable!(), - } - }*/ // XXX: hopefully this code change fixes this issue anyways /*// temporary fix for initial buffering problem @@ -225,29 +168,7 @@ impl<'a> State<'a> { Ok(()) } - /*fn send_fifo_msg(&mut self, msg: &ClientToDaemonMsg) -> Result<()> { - rmp_serde::encode::write(&mut self.ctodbuf, &msg) - .with_context(|| format!("couldn't write message {msg:?} to fifo")) - } - - fn handle_fifo_msg(&mut self) -> Result<()> { - let mut data = Vec::new(); - self.dtocbuf - .read_to_end(&mut data) - .context("couldn't read from fifo")?; - match rmp_serde::from_slice::(&data) { - Ok(msg) => print!("got fifo msg: {msg:?}\n\r"), - Err(e) => self.print_upwards(|| { - bunt::print!("{$red}invalid message from daemon: {}{/$}\n\r", e); - Ok(()) - })?, - } - - Ok(()) - }*/ - async fn handle_term_evt(&mut self, event: Event) -> Result> { - //let event = event::read().context("couldn't read next terminal event")?; match event { Event::Key(kev) => self.handle_kev(kev).await, _ => Ok(ControlFlow::Continue(())), @@ -271,11 +192,12 @@ impl<'a> State<'a> { return Ok(ControlFlow::Continue(())); } - self.daemon.send_message(SendMessageRequest { - recipient: 0, - content: self.input.clone(), - }).await?; - //self.send_fifo_msg(&ClientToDaemonMsg::Send(self.input.clone()))?; + self.daemon + .send_message(SendMessageRequest { + recipient: 0, + content: self.input.clone(), + }) + .await?; self.input.clear(); self.stdout .queue(terminal::Clear(ClearType::CurrentLine))? diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index 943e9eb..7cbba2d 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -17,7 +17,6 @@ env_logger = "0.9.0" serde_json = "1.0" tonic = "0.8.2" prost = "0.11" -futures = "0.3.25" [build-dependencies] tonic-build = "0.8" diff --git a/daemon/src/client_server.rs b/daemon/src/client_server.rs index 9eff648..214a61c 100644 --- a/daemon/src/client_server.rs +++ b/daemon/src/client_server.rs @@ -49,8 +49,6 @@ impl ClientDaemon for ClientDaemonService { request: Request<()>, ) -> Result, Status> { println!("Received subscribe to events request: {:?}", request); - // FIXME: Implement this - //let (_tx, rx) = tokio::sync::mpsc::channel(4); // TODO: send events as they come // FIXME: Someone needs to review this code, I just turned on Copilot and let it write this, including this comment From 790cf8348efb35bdca1fdd08b5db4cc6bd68f2c9 Mon Sep 17 00:00:00 2001 From: Sophon96 <71684640+Sophon96@users.noreply.github.com> Date: Sat, 19 Nov 2022 06:56:39 +0100 Subject: [PATCH 7/8] fix(grpc): install protoc before building client Signed-off-by: Sophon96 <71684640+Sophon96@users.noreply.github.com> --- .github/workflows/rust.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 17c5c57..466c232 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -64,6 +64,11 @@ jobs: steps: - uses: actions/checkout@v3 - uses: Swatinem/rust-cache@v1 + - name: Install dependencies with apt + run: | + sudo apt update + sudo apt install -y protobuf-compiler libprotobuf-dev + - name: Build run: | cd ./client From 055a6a20f78b1a7a94b589c8e28fbecb72858f00 Mon Sep 17 00:00:00 2001 From: Sophon96 <71684640+Sophon96@users.noreply.github.com> Date: Sat, 19 Nov 2022 07:09:33 +0100 Subject: [PATCH 8/8] fix: format Signed-off-by: Sophon96 <71684640+Sophon96@users.noreply.github.com> --- daemon/src/main.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/daemon/src/main.rs b/daemon/src/main.rs index 6234d42..d18f638 100644 --- a/daemon/src/main.rs +++ b/daemon/src/main.rs @@ -96,7 +96,8 @@ async fn main() -> Result<(), Box> { timestamp: 1, }), })), - })).unwrap(); + })) + .unwrap(); } }); let cd_srv = ClientDaemonServer::new(cd_svc);