From 5cc72ebda7a3de1aacb0803c349c59e24ed5ea1f Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Sun, 18 Feb 2024 20:15:14 -0600 Subject: [PATCH 01/11] Register and Connect Util Functions --- citadel-internal-service-connector/Cargo.toml | 3 +- .../src/util.rs | 244 +++++++++++++++++- 2 files changed, 243 insertions(+), 4 deletions(-) diff --git a/citadel-internal-service-connector/Cargo.toml b/citadel-internal-service-connector/Cargo.toml index f5365cd..4d3411b 100644 --- a/citadel-internal-service-connector/Cargo.toml +++ b/citadel-internal-service-connector/Cargo.toml @@ -11,4 +11,5 @@ tokio = { workspace = true, features = ["net", "rt", "macros"] } tokio-util = { workspace = true, features = ["codec"] } bincode2 = { workspace = true } serde = { workspace = true } -futures = { workspace = true, features = ["alloc"] } \ No newline at end of file +futures = { workspace = true, features = ["alloc"] } +uuid = { workspace = true, features = ["v4"]} \ No newline at end of file diff --git a/citadel-internal-service-connector/src/util.rs b/citadel-internal-service-connector/src/util.rs index 62f489a..63cdfa7 100644 --- a/citadel-internal-service-connector/src/util.rs +++ b/citadel-internal-service-connector/src/util.rs @@ -1,13 +1,17 @@ use crate::codec::{CodecError, SerializingCodec}; use citadel_internal_service_types::{ - InternalServicePayload, InternalServiceRequest, InternalServiceResponse, + ConnectMode, InternalServicePayload, InternalServiceRequest, InternalServiceResponse, + SecBuffer, SessionSecuritySettings, UdpMode, }; use futures::stream::{SplitSink, SplitStream}; -use futures::{Sink, Stream, StreamExt}; +use futures::{Sink, SinkExt, Stream, StreamExt}; +use std::net::SocketAddr; use std::pin::Pin; use std::task::{Context, Poll}; +use std::time::Duration; use tokio::net::{TcpStream, ToSocketAddrs}; use tokio_util::codec::{Decoder, Framed, LengthDelimitedCodec}; +use uuid::Uuid; pub struct InternalServiceConnector { pub sink: WrappedSink, @@ -26,7 +30,11 @@ pub struct WrappedSink { } impl InternalServiceConnector { - pub async fn connect(addr: T) -> Result> { + /// Establishes a connection with the Internal Service running at the given address. Returns an + /// InternalServiceConnector that can be used to easily interface with the Internal Service. + pub async fn connect_to_service( + addr: T, + ) -> Result> { let conn = TcpStream::connect(addr).await?; let (sink, mut stream) = wrap_tcp_conn(conn).split(); let greeter_packet = stream @@ -48,6 +56,236 @@ impl InternalServiceConnector { pub fn split(self) -> (WrappedSink, WrappedStream) { (self.sink, self.stream) } + + /// Sends a request to register at server running at the given address. Returns a Result with + /// an InternalServiceResponse that specifies whether or not the request was successfully sent. + pub async fn register, S: Into, R: Into>( + &mut self, + server_address: T, + full_name: S, + username: S, + proposed_password: R, + session_security_settings: SessionSecuritySettings, + ) -> Result> { + let outbound_request = InternalServiceRequest::Register { + request_id: Uuid::new_v4(), + server_addr: server_address.into(), + full_name: full_name.into(), + username: username.into(), + proposed_password: proposed_password.into(), + session_security_settings, + connect_after_register: false, + }; + match self.sink.send(outbound_request).await { + Ok(_) => self + .stream + .next() + .await + .ok_or(Err("Service Connector - Register Stream Failure")?), + Err(error_message) => Err(Box::new(error_message)), + } + } + + /// Sends a request to register at server running at the given address. Uses the default values + /// except for proposed credentials and the target server's address. Returns a Result with an + /// InternalServiceResponse that specifies whether or not the request was successfully sent. + pub async fn register_with_defaults< + T: Into, + S: Into, + R: Into, + >( + &mut self, + server_address: T, + full_name: S, + username: S, + proposed_password: R, + ) -> Result> { + self.register( + server_address, + full_name, + username, + proposed_password, + Default::default(), + ) + .await + } + + /// Sends a request to register at server running at the given address. Sends a request to + /// connect immediately following a successful registration. Returns a Result with an + /// InternalServiceResponse that specifies whether or not the request was successfully sent. + pub async fn register_and_connect, S: Into, R: Into>( + &mut self, + server_address: T, + full_name: S, + username: S, + proposed_password: R, + session_security_settings: SessionSecuritySettings, + ) -> Result> { + let outbound_request = InternalServiceRequest::Register { + request_id: Uuid::new_v4(), + server_addr: server_address.into(), + full_name: full_name.into(), + username: username.into(), + proposed_password: proposed_password.into(), + session_security_settings, + connect_after_register: true, + }; + match self.sink.send(outbound_request).await { + Ok(_) => self.stream.next().await.ok_or(Err( + "Service Connector - Register and Connect Stream Failure", + )?), + Err(error_message) => Err(Box::new(error_message)), + } + } + + /// Sends a request to connect to the current server with the given credentials. Returns a + /// Result with an InternalServiceResponse that specifies whether or not the request + /// was successfully sent. + pub async fn connect, R: Into>( + &mut self, + username: S, + password: R, + connect_mode: ConnectMode, + udp_mode: UdpMode, + keep_alive_timeout: Option, + session_security_settings: SessionSecuritySettings, + ) -> Result> { + let outbound_request = InternalServiceRequest::Connect { + request_id: Uuid::new_v4(), + username: username.into(), + password: password.into(), + connect_mode, + udp_mode, + keep_alive_timeout, + session_security_settings, + }; + match self.sink.send(outbound_request).await { + Ok(_) => self + .stream + .next() + .await + .ok_or(Err("Service Connector - Connect Stream Failure")?), + Err(error_message) => Err(Box::new(error_message)), + } + } + + /// Sends a request to connect to the current server with the given credentials. Uses default + /// values for all parameters other than credentials. Returns a Result with an + /// InternalServiceResponse that specifies whether or not the request was successfully sent. + pub async fn connect_with_defaults, R: Into>( + &mut self, + username: S, + password: R, + ) -> Result> { + self.connect( + username, + password, + Default::default(), + Default::default(), + Default::default(), + Default::default(), + ) + .await + } + + /// Sends a request to register with peer with CID peer_cid. Returns a Result with an + /// InternalServiceResponse that specifies whether or not the request was successfully sent. + pub async fn peer_register>( + &mut self, + cid: T, + peer_cid: T, + session_security_settings: SessionSecuritySettings, + ) -> Result> { + let outbound_request = InternalServiceRequest::PeerRegister { + request_id: Uuid::new_v4(), + cid: cid.into(), + peer_cid: peer_cid.into(), + session_security_settings, + connect_after_register: false, + }; + match self.sink.send(outbound_request).await { + Ok(_) => self + .stream + .next() + .await + .ok_or(Err("Service Connector - Peer Register Stream Failure")?), + Err(error_message) => Err(Box::new(error_message)), + } + } + + /// Sends a request to register with peer with CID peer_cid. Uses the default values except for + /// proposed credentials. Returns a Result with an InternalServiceResponse that specifies + /// whether or not the request was successfully sent. + pub async fn peer_register_with_defaults>( + &mut self, + cid: T, + peer_cid: T, + ) -> Result> { + self.peer_register(cid, peer_cid, Default::default()).await + } + + /// Sends a request to register with peer with CID peer_cid. Sends a request to + /// connect immediately following a successful registration. Returns a Result with an + /// InternalServiceResponse that specifies whether or not the request was successfully sent. + pub async fn peer_register_and_connect>( + &mut self, + cid: T, + peer_cid: T, + session_security_settings: SessionSecuritySettings, + ) -> Result> { + let outbound_request = InternalServiceRequest::PeerRegister { + request_id: Uuid::new_v4(), + cid: cid.into(), + peer_cid: peer_cid.into(), + session_security_settings, + connect_after_register: true, + }; + match self.sink.send(outbound_request).await { + Ok(_) => self.stream.next().await.ok_or(Err( + "Service Connector - Peer Register and Connect Stream Failure", + )?), + Err(error_message) => Err(Box::new(error_message)), + } + } + + /// Sends a request to connect to peer with CID peer_cid. Returns a + /// Result with an InternalServiceResponse that specifies whether or not the request + /// was successfully sent. + pub async fn peer_connect>( + &mut self, + cid: T, + peer_cid: T, + udp_mode: UdpMode, + session_security_settings: SessionSecuritySettings, + ) -> Result> { + let outbound_request = InternalServiceRequest::PeerConnect { + request_id: Uuid::new_v4(), + cid: cid.into(), + peer_cid: peer_cid.into(), + udp_mode, + session_security_settings, + }; + match self.sink.send(outbound_request).await { + Ok(_) => self + .stream + .next() + .await + .ok_or(Err("Service Connector - Peer Connect Stream Failure")?), + Err(error_message) => Err(Box::new(error_message)), + } + } + + /// Sends a request to connect to peer with CID peer_cid. Uses default values for connection + /// parameters. Returns a Result with an InternalServiceResponse that specifies whether or + /// not the request was successfully sent. + pub async fn peer_connect_with_defaults>( + &mut self, + cid: T, + peer_cid: T, + ) -> Result> { + self.peer_connect(cid, peer_cid, Default::default(), Default::default()) + .await + } } impl Stream for WrappedStream { From c1c70bd3b00548b3bc85cb16dc231e0ad446575a Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Sun, 18 Feb 2024 20:40:39 -0600 Subject: [PATCH 02/11] Cleanup and Clippy --- citadel-internal-service/tests/common/mod.rs | 2 +- citadel-internal-service/tests/service.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/citadel-internal-service/tests/common/mod.rs b/citadel-internal-service/tests/common/mod.rs index fbb8700..3151b36 100644 --- a/citadel-internal-service/tests/common/mod.rs +++ b/citadel-internal-service/tests/common/mod.rs @@ -87,7 +87,7 @@ pub async fn register_and_connect_to_server< )> = Vec::new(); for item in services_to_create { - let (mut sink, mut stream) = InternalServiceConnector::connect(item.internal_service_addr) + let (mut sink, mut stream) = InternalServiceConnector::connect_to_service(item.internal_service_addr) .await? .split(); diff --git a/citadel-internal-service/tests/service.rs b/citadel-internal-service/tests/service.rs index 1fe1f7b..ad85bf7 100644 --- a/citadel-internal-service/tests/service.rs +++ b/citadel-internal-service/tests/service.rs @@ -217,7 +217,7 @@ mod tests { // begin mocking the GUI/CLI access let (mut sink, mut stream) = - InternalServiceConnector::connect(bind_address_internal_service) + InternalServiceConnector::connect_to_service(bind_address_internal_service) .await? .split(); From dae119568ac50d2b45217c8ed367da72b1cb96ea Mon Sep 17 00:00:00 2001 From: Thomas Braun Date: Sun, 3 Mar 2024 14:41:06 -0500 Subject: [PATCH 03/11] refactor code --- citadel-internal-service-connector/Cargo.toml | 1 + .../src/codec.rs | 7 + .../src/connector.rs | 332 ++++++++++++++++++ citadel-internal-service-connector/src/lib.rs | 1 + .../src/util.rs | 279 +-------------- citadel-internal-service/tests/common/mod.rs | 10 +- citadel-internal-service/tests/service.rs | 2 +- 7 files changed, 353 insertions(+), 279 deletions(-) create mode 100644 citadel-internal-service-connector/src/connector.rs diff --git a/citadel-internal-service-connector/Cargo.toml b/citadel-internal-service-connector/Cargo.toml index 4d3411b..35c258d 100644 --- a/citadel-internal-service-connector/Cargo.toml +++ b/citadel-internal-service-connector/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] citadel-internal-service-types = { workspace = true } +citadel_logging = { workspace = true } tokio = { workspace = true, features = ["net", "rt", "macros"] } tokio-util = { workspace = true, features = ["codec"] } bincode2 = { workspace = true } diff --git a/citadel-internal-service-connector/src/codec.rs b/citadel-internal-service-connector/src/codec.rs index ad2086b..f727e02 100644 --- a/citadel-internal-service-connector/src/codec.rs +++ b/citadel-internal-service-connector/src/codec.rs @@ -1,3 +1,4 @@ +use crate::connector::ClientError; use serde::de::DeserializeOwned; use serde::Serialize; use std::error::Error; @@ -31,6 +32,12 @@ impl From for CodecError { } } +impl From for ClientError { + fn from(value: CodecError) -> Self { + ClientError::CodecError(value.reason) + } +} + impl Encoder for SerializingCodec { type Error = CodecError; diff --git a/citadel-internal-service-connector/src/connector.rs b/citadel-internal-service-connector/src/connector.rs new file mode 100644 index 0000000..d7f381f --- /dev/null +++ b/citadel-internal-service-connector/src/connector.rs @@ -0,0 +1,332 @@ +use crate::util; +use crate::util::{WrappedSink, WrappedStream}; +use citadel_internal_service_types::{ + ConnectMode, ConnectSuccess, InternalServicePayload, InternalServiceRequest, + InternalServiceResponse, PeerConnectSuccess, PeerRegisterSuccess, RegisterSuccess, SecBuffer, + SessionSecuritySettings, UdpMode, +}; +use futures::{SinkExt, StreamExt}; +use std::error::Error; +use std::fmt::{Display, Formatter}; +use std::net::SocketAddr; +use std::time::Duration; +use tokio::net::{TcpStream, ToSocketAddrs}; +use uuid::Uuid; + +pub struct InternalServiceConnector { + pub sink: WrappedSink, + pub stream: WrappedStream, +} + +#[derive(Debug, Clone)] +pub enum ClientError { + ConnectionToInternalServiceFailed(String), + InternalServiceDisconnected, + InternalServiceInvalidResponse(String), + CodecError(String), + SendError(String), +} + +impl Display for ClientError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + std::fmt::Debug::fmt(self, f) + } +} + +impl Error for ClientError {} + +macro_rules! scan_for_response { + ($stream:expr, $required_packet:pat) => {{ + loop { + match $stream.next().await { + Some(response) => { + if response.is_error() { + return Err(ClientError::InternalServiceInvalidResponse(format!( + "{response:?}" + ))); + } + + if matches!(response, $required_packet) { + break response; + } + + citadel_logging::trace!("Service Connector - Unrelated response: {response:?}"); + } + None => return Err(ClientError::InternalServiceDisconnected)?, + } + } + }}; +} + +impl InternalServiceConnector { + /// Establishes a connection with the Internal Service running at the given address. Returns an + /// InternalServiceConnector that can be used to easily interface with the Internal Service. + pub async fn connect_to_service(addr: T) -> Result { + let conn = TcpStream::connect(addr) + .await + .map_err(|err| ClientError::ConnectionToInternalServiceFailed(err.to_string()))?; + let (sink, mut stream) = util::wrap_tcp_conn(conn).split(); + let greeter_packet = stream + .next() + .await + .ok_or(ClientError::InternalServiceDisconnected)??; + if matches!( + greeter_packet, + InternalServicePayload::Response(InternalServiceResponse::ServiceConnectionAccepted(_)) + ) { + let stream = WrappedStream { inner: stream }; + let sink = WrappedSink { inner: sink }; + Ok(Self { sink, stream }) + } else { + Err(ClientError::InternalServiceInvalidResponse(format!( + "{greeter_packet:?}" + )))? + } + } + + pub fn split(self) -> (WrappedSink, WrappedStream) { + (self.sink, self.stream) + } + + /// Sends a request to register at server running at the given address. Returns a Result with + /// an InternalServiceResponse that specifies whether or not the request was successfully sent. + pub async fn register, S: Into, R: Into>( + &mut self, + server_address: T, + full_name: S, + username: S, + proposed_password: R, + session_security_settings: SessionSecuritySettings, + ) -> Result { + let outbound_request = InternalServiceRequest::Register { + request_id: Uuid::new_v4(), + server_addr: server_address.into(), + full_name: full_name.into(), + username: username.into(), + proposed_password: proposed_password.into(), + session_security_settings, + connect_after_register: false, + }; + + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::RegisterSuccess(success) = + scan_for_response!(self.stream, InternalServiceResponse::RegisterSuccess(..)) + else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Sends a request to register at server running at the given address. Uses the default values + /// except for proposed credentials and the target server's address. Returns a Result with an + /// InternalServiceResponse that specifies whether or not the request was successfully sent. + pub async fn register_with_defaults< + T: Into, + S: Into, + R: Into, + >( + &mut self, + server_address: T, + full_name: S, + username: S, + proposed_password: R, + ) -> Result { + self.register( + server_address, + full_name, + username, + proposed_password, + Default::default(), + ) + .await + } + + /// Sends a request to register at server running at the given address. Sends a request to + /// connect immediately following a successful registration. Returns a Result with an + /// InternalServiceResponse that specifies whether or not the request was successfully sent. + pub async fn register_and_connect, S: Into, R: Into>( + &mut self, + server_address: T, + full_name: S, + username: S, + proposed_password: R, + session_security_settings: SessionSecuritySettings, + ) -> Result { + let outbound_request = InternalServiceRequest::Register { + request_id: Uuid::new_v4(), + server_addr: server_address.into(), + full_name: full_name.into(), + username: username.into(), + proposed_password: proposed_password.into(), + session_security_settings, + connect_after_register: true, + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::ConnectSuccess(success) = + scan_for_response!(self.stream, InternalServiceResponse::ConnectSuccess(..)) + else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Sends a request to connect to the current server with the given credentials. Returns a + /// Result with an InternalServiceResponse that specifies whether or not the request + /// was successfully sent. + pub async fn connect, R: Into>( + &mut self, + username: S, + password: R, + connect_mode: ConnectMode, + udp_mode: UdpMode, + keep_alive_timeout: Option, + session_security_settings: SessionSecuritySettings, + ) -> Result { + let outbound_request = InternalServiceRequest::Connect { + request_id: Uuid::new_v4(), + username: username.into(), + password: password.into(), + connect_mode, + udp_mode, + keep_alive_timeout, + session_security_settings, + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::ConnectSuccess(success) = + scan_for_response!(self.stream, InternalServiceResponse::ConnectSuccess(..)) + else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Sends a request to connect to the current server with the given credentials. Uses default + /// values for all parameters other than credentials. Returns a Result with an + /// InternalServiceResponse that specifies whether or not the request was successfully sent. + pub async fn connect_with_defaults, R: Into>( + &mut self, + username: S, + password: R, + ) -> Result { + self.connect( + username, + password, + Default::default(), + Default::default(), + Default::default(), + Default::default(), + ) + .await + } + + /// Sends a request to register with peer with CID peer_cid. Returns a Result with an + /// InternalServiceResponse that specifies whether or not the request was successfully sent. + pub async fn peer_register>( + &mut self, + cid: T, + peer_cid: T, + session_security_settings: SessionSecuritySettings, + ) -> Result { + let outbound_request = InternalServiceRequest::PeerRegister { + request_id: Uuid::new_v4(), + cid: cid.into(), + peer_cid: peer_cid.into(), + session_security_settings, + connect_after_register: false, + }; + + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::PeerRegisterSuccess(success) = scan_for_response!( + self.stream, + InternalServiceResponse::PeerRegisterSuccess(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Sends a request to register with peer with CID peer_cid. Uses the default values except for + /// proposed credentials. Returns a Result with an InternalServiceResponse that specifies + /// whether or not the request was successfully sent. + pub async fn peer_register_with_defaults>( + &mut self, + cid: T, + peer_cid: T, + ) -> Result { + self.peer_register(cid, peer_cid, Default::default()).await + } + + /// Sends a request to register with peer with CID peer_cid. Sends a request to + /// connect immediately following a successful registration. Returns a Result with an + /// InternalServiceResponse that specifies whether or not the request was successfully sent. + pub async fn peer_register_and_connect>( + &mut self, + cid: T, + peer_cid: T, + session_security_settings: SessionSecuritySettings, + ) -> Result { + let outbound_request = InternalServiceRequest::PeerRegister { + request_id: Uuid::new_v4(), + cid: cid.into(), + peer_cid: peer_cid.into(), + session_security_settings, + connect_after_register: true, + }; + + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::PeerConnectSuccess(success) = + scan_for_response!(self.stream, InternalServiceResponse::PeerConnectSuccess(..)) + else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Sends a request to connect to peer with CID peer_cid. Returns a + /// Result with an InternalServiceResponse that specifies whether or not the request + /// was successfully sent. + pub async fn peer_connect>( + &mut self, + cid: T, + peer_cid: T, + udp_mode: UdpMode, + session_security_settings: SessionSecuritySettings, + ) -> Result { + let outbound_request = InternalServiceRequest::PeerConnect { + request_id: Uuid::new_v4(), + cid: cid.into(), + peer_cid: peer_cid.into(), + udp_mode, + session_security_settings, + }; + + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::PeerConnectSuccess(success) = + scan_for_response!(self.stream, InternalServiceResponse::PeerConnectSuccess(..)) + else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Sends a request to connect to peer with CID peer_cid. Uses default values for connection + /// parameters. Returns a Result with an InternalServiceResponse that specifies whether or + /// not the request was successfully sent. + pub async fn peer_connect_with_defaults>( + &mut self, + cid: T, + peer_cid: T, + ) -> Result { + self.peer_connect(cid, peer_cid, Default::default(), Default::default()) + .await + } + + /// Sends a raw request to the internal service + pub async fn send_raw_request( + &mut self, + request: InternalServiceRequest, + ) -> Result<(), ClientError> { + self.sink.send(request).await?; + Ok(()) + } +} diff --git a/citadel-internal-service-connector/src/lib.rs b/citadel-internal-service-connector/src/lib.rs index 7b141df..a8b5513 100644 --- a/citadel-internal-service-connector/src/lib.rs +++ b/citadel-internal-service-connector/src/lib.rs @@ -1,2 +1,3 @@ pub mod codec; +pub mod connector; pub mod util; diff --git a/citadel-internal-service-connector/src/util.rs b/citadel-internal-service-connector/src/util.rs index 63cdfa7..24e5013 100644 --- a/citadel-internal-service-connector/src/util.rs +++ b/citadel-internal-service-connector/src/util.rs @@ -1,293 +1,25 @@ use crate::codec::{CodecError, SerializingCodec}; use citadel_internal_service_types::{ - ConnectMode, InternalServicePayload, InternalServiceRequest, InternalServiceResponse, - SecBuffer, SessionSecuritySettings, UdpMode, + InternalServicePayload, InternalServiceRequest, InternalServiceResponse, }; use futures::stream::{SplitSink, SplitStream}; -use futures::{Sink, SinkExt, Stream, StreamExt}; -use std::net::SocketAddr; +use futures::{Sink, Stream, StreamExt}; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::Duration; -use tokio::net::{TcpStream, ToSocketAddrs}; +use tokio::net::TcpStream; use tokio_util::codec::{Decoder, Framed, LengthDelimitedCodec}; -use uuid::Uuid; - -pub struct InternalServiceConnector { - pub sink: WrappedSink, - pub stream: WrappedStream, -} pub struct WrappedStream { - inner: SplitStream>>, + pub(crate) inner: SplitStream>>, } pub struct WrappedSink { - inner: SplitSink< + pub(crate) inner: SplitSink< Framed>, InternalServicePayload, >, } -impl InternalServiceConnector { - /// Establishes a connection with the Internal Service running at the given address. Returns an - /// InternalServiceConnector that can be used to easily interface with the Internal Service. - pub async fn connect_to_service( - addr: T, - ) -> Result> { - let conn = TcpStream::connect(addr).await?; - let (sink, mut stream) = wrap_tcp_conn(conn).split(); - let greeter_packet = stream - .next() - .await - .ok_or("Failed to receive greeting packet")??; - if matches!( - greeter_packet, - InternalServicePayload::Response(InternalServiceResponse::ServiceConnectionAccepted(_)) - ) { - let stream = WrappedStream { inner: stream }; - let sink = WrappedSink { inner: sink }; - Ok(Self { sink, stream }) - } else { - Err("Failed to receive greeting packet")? - } - } - - pub fn split(self) -> (WrappedSink, WrappedStream) { - (self.sink, self.stream) - } - - /// Sends a request to register at server running at the given address. Returns a Result with - /// an InternalServiceResponse that specifies whether or not the request was successfully sent. - pub async fn register, S: Into, R: Into>( - &mut self, - server_address: T, - full_name: S, - username: S, - proposed_password: R, - session_security_settings: SessionSecuritySettings, - ) -> Result> { - let outbound_request = InternalServiceRequest::Register { - request_id: Uuid::new_v4(), - server_addr: server_address.into(), - full_name: full_name.into(), - username: username.into(), - proposed_password: proposed_password.into(), - session_security_settings, - connect_after_register: false, - }; - match self.sink.send(outbound_request).await { - Ok(_) => self - .stream - .next() - .await - .ok_or(Err("Service Connector - Register Stream Failure")?), - Err(error_message) => Err(Box::new(error_message)), - } - } - - /// Sends a request to register at server running at the given address. Uses the default values - /// except for proposed credentials and the target server's address. Returns a Result with an - /// InternalServiceResponse that specifies whether or not the request was successfully sent. - pub async fn register_with_defaults< - T: Into, - S: Into, - R: Into, - >( - &mut self, - server_address: T, - full_name: S, - username: S, - proposed_password: R, - ) -> Result> { - self.register( - server_address, - full_name, - username, - proposed_password, - Default::default(), - ) - .await - } - - /// Sends a request to register at server running at the given address. Sends a request to - /// connect immediately following a successful registration. Returns a Result with an - /// InternalServiceResponse that specifies whether or not the request was successfully sent. - pub async fn register_and_connect, S: Into, R: Into>( - &mut self, - server_address: T, - full_name: S, - username: S, - proposed_password: R, - session_security_settings: SessionSecuritySettings, - ) -> Result> { - let outbound_request = InternalServiceRequest::Register { - request_id: Uuid::new_v4(), - server_addr: server_address.into(), - full_name: full_name.into(), - username: username.into(), - proposed_password: proposed_password.into(), - session_security_settings, - connect_after_register: true, - }; - match self.sink.send(outbound_request).await { - Ok(_) => self.stream.next().await.ok_or(Err( - "Service Connector - Register and Connect Stream Failure", - )?), - Err(error_message) => Err(Box::new(error_message)), - } - } - - /// Sends a request to connect to the current server with the given credentials. Returns a - /// Result with an InternalServiceResponse that specifies whether or not the request - /// was successfully sent. - pub async fn connect, R: Into>( - &mut self, - username: S, - password: R, - connect_mode: ConnectMode, - udp_mode: UdpMode, - keep_alive_timeout: Option, - session_security_settings: SessionSecuritySettings, - ) -> Result> { - let outbound_request = InternalServiceRequest::Connect { - request_id: Uuid::new_v4(), - username: username.into(), - password: password.into(), - connect_mode, - udp_mode, - keep_alive_timeout, - session_security_settings, - }; - match self.sink.send(outbound_request).await { - Ok(_) => self - .stream - .next() - .await - .ok_or(Err("Service Connector - Connect Stream Failure")?), - Err(error_message) => Err(Box::new(error_message)), - } - } - - /// Sends a request to connect to the current server with the given credentials. Uses default - /// values for all parameters other than credentials. Returns a Result with an - /// InternalServiceResponse that specifies whether or not the request was successfully sent. - pub async fn connect_with_defaults, R: Into>( - &mut self, - username: S, - password: R, - ) -> Result> { - self.connect( - username, - password, - Default::default(), - Default::default(), - Default::default(), - Default::default(), - ) - .await - } - - /// Sends a request to register with peer with CID peer_cid. Returns a Result with an - /// InternalServiceResponse that specifies whether or not the request was successfully sent. - pub async fn peer_register>( - &mut self, - cid: T, - peer_cid: T, - session_security_settings: SessionSecuritySettings, - ) -> Result> { - let outbound_request = InternalServiceRequest::PeerRegister { - request_id: Uuid::new_v4(), - cid: cid.into(), - peer_cid: peer_cid.into(), - session_security_settings, - connect_after_register: false, - }; - match self.sink.send(outbound_request).await { - Ok(_) => self - .stream - .next() - .await - .ok_or(Err("Service Connector - Peer Register Stream Failure")?), - Err(error_message) => Err(Box::new(error_message)), - } - } - - /// Sends a request to register with peer with CID peer_cid. Uses the default values except for - /// proposed credentials. Returns a Result with an InternalServiceResponse that specifies - /// whether or not the request was successfully sent. - pub async fn peer_register_with_defaults>( - &mut self, - cid: T, - peer_cid: T, - ) -> Result> { - self.peer_register(cid, peer_cid, Default::default()).await - } - - /// Sends a request to register with peer with CID peer_cid. Sends a request to - /// connect immediately following a successful registration. Returns a Result with an - /// InternalServiceResponse that specifies whether or not the request was successfully sent. - pub async fn peer_register_and_connect>( - &mut self, - cid: T, - peer_cid: T, - session_security_settings: SessionSecuritySettings, - ) -> Result> { - let outbound_request = InternalServiceRequest::PeerRegister { - request_id: Uuid::new_v4(), - cid: cid.into(), - peer_cid: peer_cid.into(), - session_security_settings, - connect_after_register: true, - }; - match self.sink.send(outbound_request).await { - Ok(_) => self.stream.next().await.ok_or(Err( - "Service Connector - Peer Register and Connect Stream Failure", - )?), - Err(error_message) => Err(Box::new(error_message)), - } - } - - /// Sends a request to connect to peer with CID peer_cid. Returns a - /// Result with an InternalServiceResponse that specifies whether or not the request - /// was successfully sent. - pub async fn peer_connect>( - &mut self, - cid: T, - peer_cid: T, - udp_mode: UdpMode, - session_security_settings: SessionSecuritySettings, - ) -> Result> { - let outbound_request = InternalServiceRequest::PeerConnect { - request_id: Uuid::new_v4(), - cid: cid.into(), - peer_cid: peer_cid.into(), - udp_mode, - session_security_settings, - }; - match self.sink.send(outbound_request).await { - Ok(_) => self - .stream - .next() - .await - .ok_or(Err("Service Connector - Peer Connect Stream Failure")?), - Err(error_message) => Err(Box::new(error_message)), - } - } - - /// Sends a request to connect to peer with CID peer_cid. Uses default values for connection - /// parameters. Returns a Result with an InternalServiceResponse that specifies whether or - /// not the request was successfully sent. - pub async fn peer_connect_with_defaults>( - &mut self, - cid: T, - peer_cid: T, - ) -> Result> { - self.peer_connect(cid, peer_cid, Default::default(), Default::default()) - .await - } -} - impl Stream for WrappedStream { type Item = InternalServiceResponse; @@ -295,7 +27,6 @@ impl Stream for WrappedStream { let item = futures::ready!(self.inner.poll_next_unpin(cx)); match item { Some(Ok(InternalServicePayload::Response(response))) => Poll::Ready(Some(response)), - _ => Poll::Ready(None), } } diff --git a/citadel-internal-service/tests/common/mod.rs b/citadel-internal-service/tests/common/mod.rs index 3151b36..5e7a253 100644 --- a/citadel-internal-service/tests/common/mod.rs +++ b/citadel-internal-service/tests/common/mod.rs @@ -1,6 +1,7 @@ #![allow(dead_code)] use citadel_internal_service::kernel::CitadelWorkspaceService; -use citadel_internal_service_connector::util::{InternalServiceConnector, WrappedSink}; +use citadel_internal_service_connector::connector::InternalServiceConnector; +use citadel_internal_service_connector::util::WrappedSink; use citadel_internal_service_types::{ InternalServiceRequest, InternalServiceResponse, PeerConnectSuccess, PeerRegisterSuccess, }; @@ -87,9 +88,10 @@ pub async fn register_and_connect_to_server< )> = Vec::new(); for item in services_to_create { - let (mut sink, mut stream) = InternalServiceConnector::connect_to_service(item.internal_service_addr) - .await? - .split(); + let (mut sink, mut stream) = + InternalServiceConnector::connect_to_service(item.internal_service_addr) + .await? + .split(); let username = item.username.into(); let full_name = item.full_name.into(); diff --git a/citadel-internal-service/tests/service.rs b/citadel-internal-service/tests/service.rs index ad85bf7..d72d3b6 100644 --- a/citadel-internal-service/tests/service.rs +++ b/citadel-internal-service/tests/service.rs @@ -8,7 +8,7 @@ mod tests { spawn_services, test_kv_for_service, InternalServicesFutures, RegisterAndConnectItems, }; use citadel_internal_service::kernel::CitadelWorkspaceService; - use citadel_internal_service_connector::util::InternalServiceConnector; + use citadel_internal_service_connector::connector::InternalServiceConnector; use citadel_internal_service_types::{ InternalServiceRequest, InternalServiceResponse, MessageNotification, MessageSendSuccess, PeerConnectNotification, PeerRegisterNotification, From d1d5d395d270221688e13cb3e0932920a4b1d14e Mon Sep 17 00:00:00 2001 From: Thomas Braun Date: Sun, 3 Mar 2024 15:48:47 -0500 Subject: [PATCH 04/11] Add RequestID macro --- .../src/connector.rs | 4 +- citadel-internal-service-macros/src/lib.rs | 48 ++++++++++++++++++- citadel-internal-service-types/src/lib.rs | 22 +++++++-- citadel-internal-service/src/kernel/mod.rs | 6 ++- .../src/kernel/request_handler.rs | 7 ++- .../tests/file_transfer.rs | 1 + 6 files changed, 80 insertions(+), 8 deletions(-) diff --git a/citadel-internal-service-connector/src/connector.rs b/citadel-internal-service-connector/src/connector.rs index d7f381f..10c1e1b 100644 --- a/citadel-internal-service-connector/src/connector.rs +++ b/citadel-internal-service-connector/src/connector.rs @@ -72,7 +72,9 @@ impl InternalServiceConnector { .ok_or(ClientError::InternalServiceDisconnected)??; if matches!( greeter_packet, - InternalServicePayload::Response(InternalServiceResponse::ServiceConnectionAccepted(_)) + InternalServicePayload::Response( + InternalServiceResponse::ServiceConnectionAccepted { .. } + ) ) { let stream = WrappedStream { inner: stream }; let sink = WrappedSink { inner: sink }; diff --git a/citadel-internal-service-macros/src/lib.rs b/citadel-internal-service-macros/src/lib.rs index 1bb4af3..ee228c3 100644 --- a/citadel-internal-service-macros/src/lib.rs +++ b/citadel-internal-service-macros/src/lib.rs @@ -31,7 +31,7 @@ fn generate_function(input: TokenStream, contains: &str, function_name: &str) -> // Generate match arms for each enum variant let match_arms = generate_match_arms(name, &data, contains); - // Generate the implementation of the `is_error` method + // Generate the implementation of the `$function_name` method let expanded = quote! { impl #name { pub fn #function_name(&self) -> bool { @@ -66,3 +66,49 @@ fn generate_match_arms( }) .collect() } + +#[proc_macro_derive(RequestId)] +pub fn request_ids(input: TokenStream) -> TokenStream { + // Parse the input tokens into a syntax tree + let input = parse_macro_input!(input as DeriveInput); + + // Extract the identifier and data from the input + let name = &input.ident; + let data = if let Data::Enum(data) = input.data { + data + } else { + // This macro only supports enums + panic!("RequestId can only be derived for enums"); + }; + + // Generate match arms for each enum variant + let match_arms = generate_match_arms_uuid(name, &data); + + // Generate the implementation of the `$function_name` method + let expanded = quote! { + impl #name { + pub fn request_id(&self) -> Option<&Uuid> { + match self { + #(#match_arms)* + } + } + } + }; + + // Convert into a TokenStream and return it + TokenStream::from(expanded) +} + +fn generate_match_arms_uuid(name: &Ident, data_enum: &DataEnum) -> Vec { + data_enum + .variants + .iter() + .map(|variant| { + let variant_ident = &variant.ident; + // Match against each variant, ignoring any inner data + quote! { + #name::#variant_ident(inner) => inner.request_id.as_ref(), + } + }) + .collect() +} diff --git a/citadel-internal-service-types/src/lib.rs b/citadel-internal-service-types/src/lib.rs index 29579f9..2d8a2df 100644 --- a/citadel-internal-service-types/src/lib.rs +++ b/citadel-internal-service-types/src/lib.rs @@ -1,5 +1,5 @@ use bytes::BytesMut; -use citadel_internal_service_macros::{IsError, IsNotification}; +use citadel_internal_service_macros::{IsError, IsNotification, RequestId}; pub use citadel_types::prelude::{ ConnectMode, MemberState, MessageGroupKey, ObjectTransferStatus, SecBuffer, SecurityLevel, SessionSecuritySettings, TransferType, UdpMode, UserIdentifier, VirtualObjectMetadata, @@ -35,7 +35,9 @@ pub struct RegisterFailure { } #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct ServiceConnectionAccepted; +pub struct ServiceConnectionAccepted { + pub request_id: Option, +} #[derive(Serialize, Deserialize, Debug, Clone)] pub struct MessageSendSuccess { @@ -542,6 +544,7 @@ pub struct FileTransferRequestNotification { pub cid: u64, pub peer_cid: u64, pub metadata: VirtualObjectMetadata, + pub request_id: Option, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -559,9 +562,10 @@ pub struct FileTransferTickNotification { pub cid: u64, pub peer_cid: u64, pub status: ObjectTransferStatus, + pub request_id: Option, } -#[derive(Serialize, Deserialize, Debug, Clone, IsError, IsNotification)] +#[derive(Serialize, Deserialize, Debug, Clone, IsError, IsNotification, RequestId)] pub enum InternalServiceResponse { ConnectSuccess(ConnectSuccess), ConnectFailure(ConnectFailure), @@ -827,6 +831,7 @@ pub enum InternalServiceRequest { pub struct SessionInformation { pub cid: u64, pub peer_connections: HashMap, + pub request_id: Option, } #[derive(Serialize, Deserialize, Clone, Debug)] @@ -840,6 +845,7 @@ pub struct AccountInformation { pub username: String, pub full_name: String, pub peers: HashMap, + pub request_id: Option, } #[derive(Serialize, Deserialize, Clone, Debug)] @@ -889,4 +895,14 @@ mod tests { assert!(!success_response.is_notification()); assert!(notification_response.is_notification()); } + + #[test] + fn test_request_id_macro() { + let request_id = Uuid::new_v4(); + let response = InternalServiceResponse::ConnectSuccess(ConnectSuccess { + cid: 0, + request_id: Some(request_id), + }); + assert_eq!(response.request_id(), Some(&request_id)); + } } diff --git a/citadel-internal-service/src/kernel/mod.rs b/citadel-internal-service/src/kernel/mod.rs index 3660823..c417915 100644 --- a/citadel-internal-service/src/kernel/mod.rs +++ b/citadel-internal-service/src/kernel/mod.rs @@ -329,6 +329,7 @@ impl NetKernel for CitadelWorkspaceService { cid: implicated_cid, peer_cid, metadata, + request_id: None, }, ); send_response_to_tcp_client(&self.tcp_connection_map, response, uuid) @@ -565,7 +566,9 @@ fn handle_connection( let write_task = async move { let response = - InternalServiceResponse::ServiceConnectionAccepted(ServiceConnectionAccepted); + InternalServiceResponse::ServiceConnectionAccepted(ServiceConnectionAccepted { + request_id: None, + }); if let Err(err) = sink_send_payload(response, &mut sink).await { error!(target: "citadel", "Failed to send to client: {err:?}"); @@ -823,6 +826,7 @@ fn spawn_tick_updater( cid: implicated_cid, peer_cid, status: status_message, + request_id: None, }, ); match entry.send(message.clone()) { diff --git a/citadel-internal-service/src/kernel/request_handler.rs b/citadel-internal-service/src/kernel/request_handler.rs index 89f61bb..ca9aa88 100644 --- a/citadel-internal-service/src/kernel/request_handler.rs +++ b/citadel-internal-service/src/kernel/request_handler.rs @@ -106,6 +106,7 @@ pub async fn handle_request( accounts_ret: &mut HashMap, account: CNACMetadata, remote: &NodeRemote, + request_id: Uuid, ) { let username = account.username.clone(); let full_name = account.full_name.clone(); @@ -143,6 +144,7 @@ pub async fn handle_request( username, full_name, peers, + request_id: Some(request_id), }, ); } @@ -164,11 +166,11 @@ pub async fn handle_request( if let Some(cid) = cid { let account = filtered_accounts.into_iter().find(|r| r.cid == cid); if let Some(account) = account { - add_account_to_map(&mut accounts_ret, account, remote).await; + add_account_to_map(&mut accounts_ret, account, remote, request_id).await; } } else { for account in filtered_accounts { - add_account_to_map(&mut accounts_ret, account, remote).await; + add_account_to_map(&mut accounts_ret, account, remote, request_id).await; } } @@ -188,6 +190,7 @@ pub async fn handle_request( let mut session = SessionInformation { cid: *cid, peer_connections: HashMap::new(), + request_id: Some(request_id), }; for (peer_cid, conn) in connection.peers.iter() { session.peer_connections.insert( diff --git a/citadel-internal-service/tests/file_transfer.rs b/citadel-internal-service/tests/file_transfer.rs index aaf74b7..6d966a1 100644 --- a/citadel-internal-service/tests/file_transfer.rs +++ b/citadel-internal-service/tests/file_transfer.rs @@ -431,6 +431,7 @@ mod tests { cid: _, peer_cid: _, status, + request_id: None, }, ) => match status { ObjectTransferStatus::ReceptionBeginning(file_path, vfm) => { From 4d70ccb5caea6ea7195286bb9ae020f76915c053 Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Wed, 20 Mar 2024 00:15:36 -0500 Subject: [PATCH 05/11] Starting Connector Tests --- citadel-internal-service/tests/utilities.rs | 51 +++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 citadel-internal-service/tests/utilities.rs diff --git a/citadel-internal-service/tests/utilities.rs b/citadel-internal-service/tests/utilities.rs new file mode 100644 index 0000000..3f3dea6 --- /dev/null +++ b/citadel-internal-service/tests/utilities.rs @@ -0,0 +1,51 @@ +mod common; + +#[cfg(test)] +mod tests { + use crate::common::server_info_skip_cert_verification; + use citadel_internal_service::kernel::CitadelWorkspaceService; + use citadel_internal_service_connector::connector::InternalServiceConnector; + use citadel_sdk::prelude::{BackendType, NodeBuilder, NodeType, SecBuffer}; + use std::error::Error; + use std::net::SocketAddr; + use std::str::FromStr; + + #[tokio::test] + async fn test_utilities_service_and_server() -> Result<(), Box> { + crate::common::setup_log(); + let _result = + connector_service_and_server(SocketAddr::from_str("127.0.0.1:23457")?).await?; + + Ok(()) + } + + async fn connector_service_and_server( + addr: SocketAddr, + ) -> Result> { + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + + let internal_service_kernel = CitadelWorkspaceService::new(addr.clone()); + let internal_service = NodeBuilder::default() + .with_node_type(NodeType::Peer) + .with_backend(BackendType::Filesystem("filesystem".into())) + .with_insecure_skip_cert_verification() + .build(internal_service_kernel) + .unwrap(); + tokio::task::spawn(internal_service); + + // Connect to Internal Service via TCP + let mut service_connector = InternalServiceConnector::connect_to_service(addr).await?; + + service_connector + .register_and_connect( + server_bind_address, + "Full Name", + "myusername", + SecBuffer::from("password"), + Default::default(), + ) + .await?; + Ok(service_connector) + } +} From 44924e45562f957fbafe73fdfaaef8163f2cf9e6 Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Thu, 21 Mar 2024 00:09:10 -0500 Subject: [PATCH 06/11] Tests for Register and Connect --- citadel-internal-service/tests/utilities.rs | 95 ++++++++++++++++++--- 1 file changed, 82 insertions(+), 13 deletions(-) diff --git a/citadel-internal-service/tests/utilities.rs b/citadel-internal-service/tests/utilities.rs index 3f3dea6..ad1df3e 100644 --- a/citadel-internal-service/tests/utilities.rs +++ b/citadel-internal-service/tests/utilities.rs @@ -13,36 +13,105 @@ mod tests { #[tokio::test] async fn test_utilities_service_and_server() -> Result<(), Box> { crate::common::setup_log(); - let _result = - connector_service_and_server(SocketAddr::from_str("127.0.0.1:23457")?).await?; - + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + let _result = connector_service_and_server( + server_bind_address, + SocketAddr::from_str("127.0.0.1:23457")?, + "my name", + "myusername", + "password", + ) + .await?; Ok(()) } - async fn connector_service_and_server( - addr: SocketAddr, - ) -> Result> { + #[tokio::test] + async fn test_utilities_register_and_connect_methods() -> Result<(), Box> { + // Setup Logging and Start Server + crate::common::setup_log(); let (server, server_bind_address) = server_info_skip_cert_verification(); tokio::task::spawn(server); - let internal_service_kernel = CitadelWorkspaceService::new(addr.clone()); + // Start Internal Service + let internal_service_kernel = + CitadelWorkspaceService::new(SocketAddr::from_str("127.0.0.1:23457")?); let internal_service = NodeBuilder::default() .with_node_type(NodeType::Peer) - .with_backend(BackendType::Filesystem("filesystem".into())) + .with_backend(BackendType::InMemory) .with_insecure_skip_cert_verification() .build(internal_service_kernel) .unwrap(); tokio::task::spawn(internal_service); + let (full_name, username, password) = + ("full name", "myusername", SecBuffer::from("password")); + // Connect to Internal Service via TCP - let mut service_connector = InternalServiceConnector::connect_to_service(addr).await?; + let mut service_connector = + InternalServiceConnector::connect_to_service(SocketAddr::from_str("127.0.0.1:23457")?) + .await?; + // Register to Server + service_connector + .register_with_defaults(server_bind_address, full_name, username, password.clone()) + .await?; + // Connect to Server + service_connector + .connect_with_defaults(username, password) + .await?; + Ok(()) + } + #[tokio::test] + async fn test_utilities_peer_register_and_connect() -> Result<(), Box> { + crate::common::setup_log(); + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + let _service_connector_0 = connector_service_and_server( + server_bind_address, + SocketAddr::from_str("127.0.0.1:23457")?, + "name 0", + "username0", + "password0", + ) + .await?; + let _service_connector_1 = connector_service_and_server( + server_bind_address, + SocketAddr::from_str("127.0.0.1:23458")?, + "name 1", + "username1", + "password1", + ) + .await?; + // TODO: Add Peer Register and Connect - Requires CID from Responses above + Ok(()) + } + + async fn connector_service_and_server, R: Into>( + server_addr: SocketAddr, + service_addr: SocketAddr, + full_name: S, + username: S, + password: R, + ) -> Result> { + let internal_service_kernel = CitadelWorkspaceService::new(service_addr); + let internal_service = NodeBuilder::default() + .with_node_type(NodeType::Peer) + .with_backend(BackendType::InMemory) + .with_insecure_skip_cert_verification() + .build(internal_service_kernel) + .unwrap(); + tokio::task::spawn(internal_service); + + // Connect to Internal Service via TCP + let mut service_connector = + InternalServiceConnector::connect_to_service(service_addr).await?; service_connector .register_and_connect( - server_bind_address, - "Full Name", - "myusername", - SecBuffer::from("password"), + server_addr, + full_name, + username, + password, Default::default(), ) .await?; From bcbc66f5a2726237e4f3b30fe1cd6d4714217f03 Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Tue, 9 Apr 2024 23:09:23 -0500 Subject: [PATCH 07/11] Connector methods, tests, and fixes --- citadel-internal-service-connector/Cargo.toml | 1 - .../src/codec.rs | 16 +- .../src/connector.rs | 375 ++---------------- .../src/io_interface/tcp.rs | 301 +++++++++++++- citadel-internal-service-connector/src/lib.rs | 3 - citadel-internal-service/src/kernel/ext.rs | 4 +- citadel-internal-service/src/kernel/mod.rs | 2 + .../requests/get_account_information.rs | 6 +- .../src/kernel/requests/get_sessions.rs | 1 + .../src/kernel/requests/peer/register.rs | 11 + .../responses/object_transfer_handle.rs | 1 + citadel-internal-service/tests/common/mod.rs | 8 +- citadel-internal-service/tests/utilities.rs | 50 ++- 13 files changed, 413 insertions(+), 366 deletions(-) diff --git a/citadel-internal-service-connector/Cargo.toml b/citadel-internal-service-connector/Cargo.toml index ad8c568..4d79de0 100644 --- a/citadel-internal-service-connector/Cargo.toml +++ b/citadel-internal-service-connector/Cargo.toml @@ -14,5 +14,4 @@ bincode2 = { workspace = true } serde = { workspace = true } futures = { workspace = true, features = ["alloc"] } uuid = { workspace = true, features = ["v4"]} -citadel_logging = { workspace = true } async-trait = "0.1.79" diff --git a/citadel-internal-service-connector/src/codec.rs b/citadel-internal-service-connector/src/codec.rs index f727e02..7c49344 100644 --- a/citadel-internal-service-connector/src/codec.rs +++ b/citadel-internal-service-connector/src/codec.rs @@ -32,14 +32,26 @@ impl From for CodecError { } } +impl From for std::io::Error { + fn from(value: CodecError) -> Self { + std::io::Error::new(std::io::ErrorKind::Other, value.reason) + } +} + impl From for ClientError { fn from(value: CodecError) -> Self { ClientError::CodecError(value.reason) } } +impl From for ClientError { + fn from(value: std::io::Error) -> Self { + ClientError::SendError(value.to_string()) + } +} + impl Encoder for SerializingCodec { - type Error = CodecError; + type Error = std::io::Error; fn encode(&mut self, item: T, dst: &mut BytesMut) -> Result<(), Self::Error> { let bytes = bincode2::serialize(&item) @@ -58,7 +70,7 @@ impl Encoder for SerializingCodec { impl Decoder for SerializingCodec { type Item = T; - type Error = CodecError; + type Error = std::io::Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { let bytes = self.inner.decode(src).map_err(|rr| CodecError { diff --git a/citadel-internal-service-connector/src/connector.rs b/citadel-internal-service-connector/src/connector.rs index ecb8b1e..29dac3d 100644 --- a/citadel-internal-service-connector/src/connector.rs +++ b/citadel-internal-service-connector/src/connector.rs @@ -1,24 +1,16 @@ -use crate::util; -use crate::util::{WrappedSink, WrappedStream}; use crate::codec::SerializingCodec; use crate::io_interface::IOInterface; use citadel_internal_service_types::{ - ConnectMode, ConnectSuccess, InternalServicePayload, InternalServiceRequest, - InternalServiceResponse, PeerConnectSuccess, PeerRegisterSuccess, RegisterSuccess, SecBuffer, - SessionSecuritySettings, UdpMode, + InternalServicePayload, InternalServiceRequest, InternalServiceResponse, }; use futures::{Sink, Stream, StreamExt}; -use std::pin::Pin; -use std::task::{Context, Poll}; use std::error::Error; use std::fmt::{Display, Formatter}; -use std::net::SocketAddr; -use std::time::Duration; -use tokio::net::{TcpStream, ToSocketAddrs}; -use uuid::Uuid; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::net::TcpStream; use tokio_util::codec::{Decoder, Framed, LengthDelimitedCodec}; - pub struct InternalServiceConnector { pub sink: WrappedSink, pub stream: WrappedStream, @@ -49,6 +41,7 @@ impl Display for ClientError { impl Error for ClientError {} +#[macro_export] macro_rules! scan_for_response { ($stream:expr, $required_packet:pat) => {{ loop { @@ -73,340 +66,54 @@ macro_rules! scan_for_response { } impl Stream for WrappedStream { - pub struct WrappedStream { - pub(crate) inner: SplitStream>>, - } + type Item = InternalServiceResponse; - pub struct WrappedSink { - pub(crate) inner: SplitSink< - Framed>, - InternalServicePayload, - >, - } - - impl Stream for WrappedStream { - type Item = InternalServiceResponse; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let item = futures::ready!(self.inner.poll_next_unpin(cx)); + match item { + Some(Ok(InternalServicePayload::Response(response))) => Poll::Ready(Some(response)), - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let item = futures::ready!(self.inner.poll_next_unpin(cx)); - match item { - Some(Ok(InternalServicePayload::Response(response))) => Poll::Ready(Some(response)), - _ => Poll::Ready(None), - } + _ => Poll::Ready(None), } } +} - impl Sink for WrappedSink { - type Error = std::io::Error; - - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_ready(cx) - } - - fn start_send( - mut self: Pin<&mut Self>, - item: InternalServiceRequest, - ) -> Result<(), Self::Error> { - Pin::new(&mut self.inner).start_send(InternalServicePayload::Request(item)) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_flush(cx) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_close(cx) - } - } - - pub fn wrap_tcp_conn( - conn: TcpStream, - ) -> Framed> { - let length_delimited = LengthDelimitedCodec::builder() - .length_field_offset(0) // default value - .max_frame_length(1024 * 1024 * 64) // 64 MB - .length_field_type::() - .length_adjustment(0) - .new_codec(); - - let serializing_codec = SerializingCodec { - inner: length_delimited, - _pd: std::marker::PhantomData, - }; - serializing_codec.framed(conn) - } - -impl InternalServiceConnector { - /// Establishes a connection with the Internal Service running at the given address. Returns an - /// InternalServiceConnector that can be used to easily interface with the Internal Service. - pub async fn connect_to_service(addr: T) -> Result { - let conn = TcpStream::connect(addr) - .await - .map_err(|err| ClientError::ConnectionToInternalServiceFailed(err.to_string()))?; - let (sink, mut stream) = util::wrap_tcp_conn(conn).split(); - let greeter_packet = stream - .next() - .await - .ok_or(ClientError::InternalServiceDisconnected)??; - if matches!( - greeter_packet, - InternalServicePayload::Response( - InternalServiceResponse::ServiceConnectionAccepted { .. } - ) - ) { - let stream = WrappedStream { inner: stream }; - let sink = WrappedSink { inner: sink }; - Ok(Self { sink, stream }) - } else { - Err(ClientError::InternalServiceInvalidResponse(format!( - "{greeter_packet:?}" - )))? - } - } - - pub fn split(self) -> (WrappedSink, WrappedStream) { - (self.sink, self.stream) - } - - /// Sends a request to register at server running at the given address. Returns a Result with - /// an InternalServiceResponse that specifies whether or not the request was successfully sent. - pub async fn register, S: Into, R: Into>( - &mut self, - server_address: T, - full_name: S, - username: S, - proposed_password: R, - session_security_settings: SessionSecuritySettings, - ) -> Result { - let outbound_request = InternalServiceRequest::Register { - request_id: Uuid::new_v4(), - server_addr: server_address.into(), - full_name: full_name.into(), - username: username.into(), - proposed_password: proposed_password.into(), - session_security_settings, - connect_after_register: false, - }; - - self.send_raw_request(outbound_request).await?; - let InternalServiceResponse::RegisterSuccess(success) = - scan_for_response!(self.stream, InternalServiceResponse::RegisterSuccess(..)) - else { - panic!("Unreachable") - }; - Ok(success) - } - - /// Sends a request to register at server running at the given address. Uses the default values - /// except for proposed credentials and the target server's address. Returns a Result with an - /// InternalServiceResponse that specifies whether or not the request was successfully sent. - pub async fn register_with_defaults< - T: Into, - S: Into, - R: Into, - >( - &mut self, - server_address: T, - full_name: S, - username: S, - proposed_password: R, - ) -> Result { - self.register( - server_address, - full_name, - username, - proposed_password, - Default::default(), - ) - .await - } - - /// Sends a request to register at server running at the given address. Sends a request to - /// connect immediately following a successful registration. Returns a Result with an - /// InternalServiceResponse that specifies whether or not the request was successfully sent. - pub async fn register_and_connect, S: Into, R: Into>( - &mut self, - server_address: T, - full_name: S, - username: S, - proposed_password: R, - session_security_settings: SessionSecuritySettings, - ) -> Result { - let outbound_request = InternalServiceRequest::Register { - request_id: Uuid::new_v4(), - server_addr: server_address.into(), - full_name: full_name.into(), - username: username.into(), - proposed_password: proposed_password.into(), - session_security_settings, - connect_after_register: true, - }; - self.send_raw_request(outbound_request).await?; - let InternalServiceResponse::ConnectSuccess(success) = - scan_for_response!(self.stream, InternalServiceResponse::ConnectSuccess(..)) - else { - panic!("Unreachable") - }; - Ok(success) - } - - /// Sends a request to connect to the current server with the given credentials. Returns a - /// Result with an InternalServiceResponse that specifies whether or not the request - /// was successfully sent. - pub async fn connect, R: Into>( - &mut self, - username: S, - password: R, - connect_mode: ConnectMode, - udp_mode: UdpMode, - keep_alive_timeout: Option, - session_security_settings: SessionSecuritySettings, - ) -> Result { - let outbound_request = InternalServiceRequest::Connect { - request_id: Uuid::new_v4(), - username: username.into(), - password: password.into(), - connect_mode, - udp_mode, - keep_alive_timeout, - session_security_settings, - }; - self.send_raw_request(outbound_request).await?; - let InternalServiceResponse::ConnectSuccess(success) = - scan_for_response!(self.stream, InternalServiceResponse::ConnectSuccess(..)) - else { - panic!("Unreachable") - }; - Ok(success) - } +impl Sink for WrappedSink { + type Error = std::io::Error; - /// Sends a request to connect to the current server with the given credentials. Uses default - /// values for all parameters other than credentials. Returns a Result with an - /// InternalServiceResponse that specifies whether or not the request was successfully sent. - pub async fn connect_with_defaults, R: Into>( - &mut self, - username: S, - password: R, - ) -> Result { - self.connect( - username, - password, - Default::default(), - Default::default(), - Default::default(), - Default::default(), - ) - .await + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_ready(cx) } - /// Sends a request to register with peer with CID peer_cid. Returns a Result with an - /// InternalServiceResponse that specifies whether or not the request was successfully sent. - pub async fn peer_register>( - &mut self, - cid: T, - peer_cid: T, - session_security_settings: SessionSecuritySettings, - ) -> Result { - let outbound_request = InternalServiceRequest::PeerRegister { - request_id: Uuid::new_v4(), - cid: cid.into(), - peer_cid: peer_cid.into(), - session_security_settings, - connect_after_register: false, - }; - - self.send_raw_request(outbound_request).await?; - let InternalServiceResponse::PeerRegisterSuccess(success) = scan_for_response!( - self.stream, - InternalServiceResponse::PeerRegisterSuccess(..) - ) else { - panic!("Unreachable") - }; - Ok(success) + fn start_send( + mut self: Pin<&mut Self>, + item: InternalServiceRequest, + ) -> Result<(), Self::Error> { + Pin::new(&mut self.inner).start_send(InternalServicePayload::Request(item)) } - /// Sends a request to register with peer with CID peer_cid. Uses the default values except for - /// proposed credentials. Returns a Result with an InternalServiceResponse that specifies - /// whether or not the request was successfully sent. - pub async fn peer_register_with_defaults>( - &mut self, - cid: T, - peer_cid: T, - ) -> Result { - self.peer_register(cid, peer_cid, Default::default()).await + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) } - /// Sends a request to register with peer with CID peer_cid. Sends a request to - /// connect immediately following a successful registration. Returns a Result with an - /// InternalServiceResponse that specifies whether or not the request was successfully sent. - pub async fn peer_register_and_connect>( - &mut self, - cid: T, - peer_cid: T, - session_security_settings: SessionSecuritySettings, - ) -> Result { - let outbound_request = InternalServiceRequest::PeerRegister { - request_id: Uuid::new_v4(), - cid: cid.into(), - peer_cid: peer_cid.into(), - session_security_settings, - connect_after_register: true, - }; - - self.send_raw_request(outbound_request).await?; - let InternalServiceResponse::PeerConnectSuccess(success) = - scan_for_response!(self.stream, InternalServiceResponse::PeerConnectSuccess(..)) - else { - panic!("Unreachable") - }; - Ok(success) - } - - /// Sends a request to connect to peer with CID peer_cid. Returns a - /// Result with an InternalServiceResponse that specifies whether or not the request - /// was successfully sent. - pub async fn peer_connect>( - &mut self, - cid: T, - peer_cid: T, - udp_mode: UdpMode, - session_security_settings: SessionSecuritySettings, - ) -> Result { - let outbound_request = InternalServiceRequest::PeerConnect { - request_id: Uuid::new_v4(), - cid: cid.into(), - peer_cid: peer_cid.into(), - udp_mode, - session_security_settings, - }; - - self.send_raw_request(outbound_request).await?; - let InternalServiceResponse::PeerConnectSuccess(success) = - scan_for_response!(self.stream, InternalServiceResponse::PeerConnectSuccess(..)) - else { - panic!("Unreachable") - }; - Ok(success) - } - - /// Sends a request to connect to peer with CID peer_cid. Uses default values for connection - /// parameters. Returns a Result with an InternalServiceResponse that specifies whether or - /// not the request was successfully sent. - pub async fn peer_connect_with_defaults>( - &mut self, - cid: T, - peer_cid: T, - ) -> Result { - self.peer_connect(cid, peer_cid, Default::default(), Default::default()) - .await + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_close(cx) } +} - /// Sends a raw request to the internal service - pub async fn send_raw_request( - &mut self, - request: InternalServiceRequest, - ) -> Result<(), ClientError> { - self.sink.send(request).await?; - Ok(()) - } +pub fn wrap_tcp_conn( + conn: TcpStream, +) -> Framed> { + let length_delimited = LengthDelimitedCodec::builder() + .length_field_offset(0) // default value + .max_frame_length(1024 * 1024 * 64) // 64 MB + .length_field_type::() + .length_adjustment(0) + .new_codec(); + + let serializing_codec = SerializingCodec { + inner: length_delimited, + _pd: std::marker::PhantomData, + }; + serializing_codec.framed(conn) } diff --git a/citadel-internal-service-connector/src/io_interface/tcp.rs b/citadel-internal-service-connector/src/io_interface/tcp.rs index 5a4a58a..d1218d6 100644 --- a/citadel-internal-service-connector/src/io_interface/tcp.rs +++ b/citadel-internal-service-connector/src/io_interface/tcp.rs @@ -1,12 +1,24 @@ -use crate::codec::SerializingCodec; -use crate::connector::{wrap_tcp_conn, InternalServiceConnector, WrappedSink, WrappedStream}; -use crate::io_interface::IOInterface; +use crate::connector::*; //{wrap_tcp_conn, InternalServiceConnector, WrappedSink, WrappedStream, scan_for_response}; +use crate::scan_for_response; use async_trait::async_trait; -use citadel_internal_service_types::{InternalServicePayload, InternalServiceResponse}; +use citadel_internal_service_types::InternalServicePayload; use futures::stream::{SplitSink, SplitStream}; +use futures::SinkExt; +use tokio::net::TcpListener; + +use crate::codec::SerializingCodec; +use crate::io_interface::IOInterface; +use citadel_internal_service_types::{ + ConnectMode, ConnectSuccess, InternalServiceRequest, InternalServiceResponse, + PeerConnectSuccess, PeerRegisterSuccess, RegisterSuccess, SecBuffer, SessionSecuritySettings, + UdpMode, +}; use futures::StreamExt; -use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; +use std::net::SocketAddr; +use std::time::Duration; +use tokio::net::{TcpStream, ToSocketAddrs}; use tokio_util::codec::Framed; +use uuid::Uuid; pub struct TcpIOInterface { pub listener: TcpListener, @@ -20,25 +32,294 @@ impl TcpIOInterface { } impl InternalServiceConnector { - pub async fn connect(addr: T) -> Result> { - let conn = TcpStream::connect(addr).await?; + /// Establishes a connection with the Internal Service running at the given address. Returns an + /// InternalServiceConnector that can be used to easily interface with the Internal Service. + pub async fn connect_to_service(addr: S) -> Result { + let conn = TcpStream::connect(addr) + .await + .map_err(|err| ClientError::ConnectionToInternalServiceFailed(err.to_string()))?; let (sink, mut stream) = wrap_tcp_conn(conn).split(); let greeter_packet = stream .next() .await - .ok_or("Failed to receive greeting packet")??; + .ok_or(ClientError::InternalServiceDisconnected)??; if matches!( greeter_packet, - InternalServicePayload::Response(InternalServiceResponse::ServiceConnectionAccepted(_)) + InternalServicePayload::Response( + InternalServiceResponse::ServiceConnectionAccepted { .. } + ) ) { let stream = WrappedStream { inner: stream }; let sink = WrappedSink { inner: sink }; Ok(Self { sink, stream }) } else { - Err("Failed to receive greeting packet")? + Err(ClientError::InternalServiceInvalidResponse(format!( + "{greeter_packet:?}" + )))? } } + /// Sends a request to register at server running at the given address. Returns a Result with + /// an InternalServiceResponse that specifies whether or not the request was successfully sent. + pub async fn register, S: Into, R: Into>( + &mut self, + server_address: U, + full_name: S, + username: S, + proposed_password: R, + session_security_settings: SessionSecuritySettings, + ) -> Result { + let outbound_request = InternalServiceRequest::Register { + request_id: Uuid::new_v4(), + server_addr: server_address.into(), + full_name: full_name.into(), + username: username.into(), + proposed_password: proposed_password.into(), + session_security_settings, + connect_after_register: false, + }; + + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::RegisterSuccess(success) = + scan_for_response!(self.stream, InternalServiceResponse::RegisterSuccess(..)) + else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Sends a request to register at server running at the given address. Uses the default values + /// except for proposed credentials and the target server's address. Returns a Result with an + /// InternalServiceResponse that specifies whether or not the request was successfully sent. + pub async fn register_with_defaults< + U: Into, + S: Into, + R: Into, + >( + &mut self, + server_address: U, + full_name: S, + username: S, + proposed_password: R, + ) -> Result { + self.register( + server_address, + full_name, + username, + proposed_password, + Default::default(), + ) + .await + } + + /// Sends a request to register at server running at the given address. Sends a request to + /// connect immediately following a successful registration. Returns a Result with an + /// InternalServiceResponse that specifies whether or not the request was successfully sent. + pub async fn register_and_connect, S: Into, R: Into>( + &mut self, + server_address: U, + full_name: S, + username: S, + proposed_password: R, + session_security_settings: SessionSecuritySettings, + ) -> Result { + let outbound_request = InternalServiceRequest::Register { + request_id: Uuid::new_v4(), + server_addr: server_address.into(), + full_name: full_name.into(), + username: username.into(), + proposed_password: proposed_password.into(), + session_security_settings, + connect_after_register: true, + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::ConnectSuccess(success) = + scan_for_response!(self.stream, InternalServiceResponse::ConnectSuccess(..)) + else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Sends a request to connect to the current server with the given credentials. Returns a + /// Result with an InternalServiceResponse that specifies whether or not the request + /// was successfully sent. + pub async fn connect, R: Into>( + &mut self, + username: S, + password: R, + connect_mode: ConnectMode, + udp_mode: UdpMode, + keep_alive_timeout: Option, + session_security_settings: SessionSecuritySettings, + ) -> Result { + let outbound_request = InternalServiceRequest::Connect { + request_id: Uuid::new_v4(), + username: username.into(), + password: password.into(), + connect_mode, + udp_mode, + keep_alive_timeout, + session_security_settings, + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::ConnectSuccess(success) = + scan_for_response!(self.stream, InternalServiceResponse::ConnectSuccess(..)) + else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Sends a request to connect to the current server with the given credentials. Uses default + /// values for all parameters other than credentials. Returns a Result with an + /// InternalServiceResponse that specifies whether or not the request was successfully sent. + pub async fn connect_with_defaults, R: Into>( + &mut self, + username: S, + password: R, + ) -> Result { + self.connect( + username, + password, + Default::default(), + Default::default(), + Default::default(), + Default::default(), + ) + .await + } + + /// Sends a request to register with peer with CID peer_cid. Returns a Result with an + /// InternalServiceResponse that specifies whether or not the request was successfully sent. + pub async fn peer_register>( + &mut self, + cid: S, + peer_cid: S, + session_security_settings: SessionSecuritySettings, + ) -> Result { + let outbound_request = InternalServiceRequest::PeerRegister { + request_id: Uuid::new_v4(), + cid: cid.into(), + peer_cid: peer_cid.into(), + session_security_settings, + connect_after_register: false, + }; + + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::PeerRegisterSuccess(success) = scan_for_response!( + self.stream, + InternalServiceResponse::PeerRegisterSuccess(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + // Ok(PeerRegisterSuccess{cid: 0, peer_cid: 1, peer_username: "peer".to_string(), request_id: None}) + } + + /// Sends a request to register with peer with CID peer_cid. Uses the default values except for + /// proposed credentials. Returns a Result with an InternalServiceResponse that specifies + /// whether or not the request was successfully sent. + pub async fn peer_register_with_defaults>( + &mut self, + cid: S, + peer_cid: S, + ) -> Result { + self.peer_register(cid, peer_cid, Default::default()).await + } + + /// Sends a request to register with peer with CID peer_cid. Sends a request to + /// connect immediately following a successful registration. Returns a Result with an + /// InternalServiceResponse that specifies whether or not the request was successfully sent. + pub async fn peer_register_and_connect>( + &mut self, + cid: S, + peer_cid: S, + session_security_settings: SessionSecuritySettings, + ) -> Result { + let outbound_request = InternalServiceRequest::PeerRegister { + request_id: Uuid::new_v4(), + cid: cid.into(), + peer_cid: peer_cid.into(), + session_security_settings, + connect_after_register: true, + }; + + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::PeerConnectSuccess(success) = + scan_for_response!(self.stream, InternalServiceResponse::PeerConnectSuccess(..)) + else { + panic!("Unreachable") + }; + Ok(success) + // Ok(PeerConnectSuccess{cid: 0, peer_cid: 1, request_id: None}) + } + + /// Sends a request to register with peer with CID peer_cid. Sends a request to + /// connect immediately following a successful registration. Requests use the default + /// SessionSecuritySettings Value. Returns a Result with an InternalServiceResponse that + /// specifies whether or not the request was successfully sent. + pub async fn peer_register_and_connect_with_defaults>( + &mut self, + cid: S, + peer_cid: S, + ) -> Result { + self.peer_register_and_connect(cid, peer_cid, Default::default()) + .await + } + + /// Sends a request to connect to peer with CID peer_cid. Returns a + /// Result with an InternalServiceResponse that specifies whether or not the request + /// was successfully sent. + pub async fn peer_connect>( + &mut self, + cid: S, + peer_cid: S, + udp_mode: UdpMode, + session_security_settings: SessionSecuritySettings, + ) -> Result { + let outbound_request = InternalServiceRequest::PeerConnect { + request_id: Uuid::new_v4(), + cid: cid.into(), + peer_cid: peer_cid.into(), + udp_mode, + session_security_settings, + }; + + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::PeerConnectSuccess(success) = + scan_for_response!(self.stream, InternalServiceResponse::PeerConnectSuccess(..)) + else { + panic!("Unreachable") + }; + Ok(success) + // Ok(PeerConnectSuccess{cid: 0, peer_cid: 1, request_id: None}) + } + + /// Sends a request to connect to peer with CID peer_cid. Uses default values for connection + /// parameters. Returns a Result with an InternalServiceResponse that specifies whether or + /// not the request was successfully sent. + pub async fn peer_connect_with_defaults>( + &mut self, + cid: S, + peer_cid: S, + ) -> Result { + self.peer_connect(cid, peer_cid, Default::default(), Default::default()) + .await + } + + /// Sends a raw request to the internal service + pub async fn send_raw_request( + &mut self, + request: InternalServiceRequest, + ) -> Result<(), ClientError> { + self.sink + .inner + .send(InternalServicePayload::Request(request)) + .await?; + Ok(()) + } + pub fn split(self) -> (WrappedSink, WrappedStream) { (self.sink, self.stream) } diff --git a/citadel-internal-service-connector/src/lib.rs b/citadel-internal-service-connector/src/lib.rs index 04324ed..2b6597e 100644 --- a/citadel-internal-service-connector/src/lib.rs +++ b/citadel-internal-service-connector/src/lib.rs @@ -1,6 +1,3 @@ pub mod codec; pub mod connector; - pub mod io_interface; -pub mod connector; -pub mod util; diff --git a/citadel-internal-service/src/kernel/ext.rs b/citadel-internal-service/src/kernel/ext.rs index b079cef..ecb168d 100644 --- a/citadel-internal-service/src/kernel/ext.rs +++ b/citadel-internal-service/src/kernel/ext.rs @@ -27,7 +27,9 @@ pub trait IOInterfaceExt: IOInterface { tokio::task::spawn(async move { let write_task = async move { let response = - InternalServiceResponse::ServiceConnectionAccepted(ServiceConnectionAccepted); + InternalServiceResponse::ServiceConnectionAccepted(ServiceConnectionAccepted { + request_id: None, + }); if let Err(err) = sink_send_payload::(response, &mut sink).await { error!(target: "citadel", "Failed to send to client: {err:?}"); diff --git a/citadel-internal-service/src/kernel/mod.rs b/citadel-internal-service/src/kernel/mod.rs index b0c56a4..b172935 100644 --- a/citadel-internal-service/src/kernel/mod.rs +++ b/citadel-internal-service/src/kernel/mod.rs @@ -268,6 +268,7 @@ impl NetKernel for CitadelWorkspaceService { if let Some(HandledRequestResult { response, uuid }) = handle_request(&this, conn_id, command).await { + info!(target: "citadel", "Sending response to TCP client: {response:?}"); if let Err(err) = send_response_to_tcp_client(&this.tcp_connection_map, response, uuid) .await @@ -368,6 +369,7 @@ fn spawn_tick_updater( cid: implicated_cid, peer_cid, status: status_message, + request_id: None, }, ); match entry.send(message.clone()) { diff --git a/citadel-internal-service/src/kernel/requests/get_account_information.rs b/citadel-internal-service/src/kernel/requests/get_account_information.rs index cfa2385..8c8c56f 100644 --- a/citadel-internal-service/src/kernel/requests/get_account_information.rs +++ b/citadel-internal-service/src/kernel/requests/get_account_information.rs @@ -36,11 +36,11 @@ pub async fn handle( if let Some(cid) = cid { let account = filtered_accounts.into_iter().find(|r| r.cid == cid); if let Some(account) = account { - add_account_to_map(&mut accounts_ret, account, remote).await; + add_account_to_map(&mut accounts_ret, account, remote, request_id).await; } } else { for account in filtered_accounts { - add_account_to_map(&mut accounts_ret, account, remote).await; + add_account_to_map(&mut accounts_ret, account, remote, request_id).await; } } @@ -56,6 +56,7 @@ async fn add_account_to_map( accounts_ret: &mut HashMap, account: CNACMetadata, remote: &NodeRemote, + request_id: Uuid, ) { let username = account.username.clone(); let full_name = account.full_name.clone(); @@ -93,6 +94,7 @@ async fn add_account_to_map( username, full_name, peers, + request_id: Some(request_id), }, ); } diff --git a/citadel-internal-service/src/kernel/requests/get_sessions.rs b/citadel-internal-service/src/kernel/requests/get_sessions.rs index 875074f..270c963 100644 --- a/citadel-internal-service/src/kernel/requests/get_sessions.rs +++ b/citadel-internal-service/src/kernel/requests/get_sessions.rs @@ -25,6 +25,7 @@ pub async fn handle( let mut session = SessionInformation { cid: *cid, peer_connections: HashMap::new(), + request_id: Some(request_id), }; for (peer_cid, conn) in connection.peers.iter() { session.peer_connections.insert( diff --git a/citadel-internal-service/src/kernel/requests/peer/register.rs b/citadel-internal-service/src/kernel/requests/peer/register.rs index ab24c07..223f05c 100644 --- a/citadel-internal-service/src/kernel/requests/peer/register.rs +++ b/citadel-internal-service/src/kernel/requests/peer/register.rs @@ -35,6 +35,17 @@ pub async fn handle( let response = match client_to_server_remote.propose_target(cid, peer_cid).await { Ok(symmetric_identifier_handle_ref) => { + // let peer_command = NodeRequest::PeerCommand(PeerCommand { + // implicated_cid, + // command: PeerSignal::PostRegister { + // peer_conn_type: PeerConnectionType::LocalGroupPeer { implicated_cid: cid, peer_cid }, + // inviter_username: symmetric_identifier_handle_ref.remote().account_manager().get_username_by_cid(cid).await.unwrap().ok_or_else(|| NetworkError::msg("Unable to find username for local user")).unwrap(), + // invitee_username: symmetric_identifier_handle_ref.target_username().cloned(), + // ticket_opt: None, + // invitee_response: None, + // }, + // }); + // symmetric_identifier_handle_ref.send(peer_command).await?; match symmetric_identifier_handle_ref.register_to_peer().await { Ok(_peer_register_success) => { let account_manager = symmetric_identifier_handle_ref.account_manager(); diff --git a/citadel-internal-service/src/kernel/responses/object_transfer_handle.rs b/citadel-internal-service/src/kernel/responses/object_transfer_handle.rs index 90c5b20..4c94330 100644 --- a/citadel-internal-service/src/kernel/responses/object_transfer_handle.rs +++ b/citadel-internal-service/src/kernel/responses/object_transfer_handle.rs @@ -53,6 +53,7 @@ pub async fn handle( cid: implicated_cid, peer_cid, metadata, + request_id: None, }, ); diff --git a/citadel-internal-service/tests/common/mod.rs b/citadel-internal-service/tests/common/mod.rs index 5efd320..63b076b 100644 --- a/citadel-internal-service/tests/common/mod.rs +++ b/citadel-internal-service/tests/common/mod.rs @@ -89,9 +89,10 @@ pub async fn register_and_connect_to_server< )> = Vec::new(); for item in services_to_create { - let (mut sink, mut stream) = InternalServiceConnector::connect(item.internal_service_addr) - .await? - .split(); + let (mut sink, mut stream) = + InternalServiceConnector::connect_to_service(item.internal_service_addr) + .await? + .split(); let username = item.username.into(); let full_name = item.full_name.into(); @@ -573,6 +574,7 @@ pub async fn exhaust_stream_to_file_completion( cid: _, peer_cid: _, status, + request_id: _, }, ) => match status { ObjectTransferStatus::ReceptionBeginning(file_path, vfm) => { diff --git a/citadel-internal-service/tests/utilities.rs b/citadel-internal-service/tests/utilities.rs index ad1df3e..0ee4d45 100644 --- a/citadel-internal-service/tests/utilities.rs +++ b/citadel-internal-service/tests/utilities.rs @@ -4,7 +4,11 @@ mod common; mod tests { use crate::common::server_info_skip_cert_verification; use citadel_internal_service::kernel::CitadelWorkspaceService; - use citadel_internal_service_connector::connector::InternalServiceConnector; + use citadel_internal_service_connector::connector::{ClientError, InternalServiceConnector}; + use citadel_internal_service_connector::io_interface::tcp::TcpIOInterface; + use citadel_internal_service_connector::scan_for_response; + use citadel_internal_service_types::InternalServiceResponse::PeerConnectSuccess; + use citadel_internal_service_types::{ConnectSuccess, InternalServiceResponse}; use citadel_sdk::prelude::{BackendType, NodeBuilder, NodeType, SecBuffer}; use std::error::Error; use std::net::SocketAddr; @@ -35,7 +39,7 @@ mod tests { // Start Internal Service let internal_service_kernel = - CitadelWorkspaceService::new(SocketAddr::from_str("127.0.0.1:23457")?); + CitadelWorkspaceService::new_tcp(SocketAddr::from_str("127.0.0.1:23457")?).await?; let internal_service = NodeBuilder::default() .with_node_type(NodeType::Peer) .with_backend(BackendType::InMemory) @@ -67,7 +71,7 @@ mod tests { crate::common::setup_log(); let (server, server_bind_address) = server_info_skip_cert_verification(); tokio::task::spawn(server); - let _service_connector_0 = connector_service_and_server( + let (mut service_connector_0, cid_0) = connector_service_and_server( server_bind_address, SocketAddr::from_str("127.0.0.1:23457")?, "name 0", @@ -75,7 +79,7 @@ mod tests { "password0", ) .await?; - let _service_connector_1 = connector_service_and_server( + let (mut service_connector_1, cid_1) = connector_service_and_server( server_bind_address, SocketAddr::from_str("127.0.0.1:23458")?, "name 1", @@ -83,7 +87,30 @@ mod tests { "password1", ) .await?; - // TODO: Add Peer Register and Connect - Requires CID from Responses above + + let peer_0_register_and_connect = tokio::task::spawn(async move { + service_connector_0 + .peer_register_and_connect_with_defaults(cid_0, cid_1) + .await + }); + let peer_1_register_and_connect = tokio::task::spawn(async move { + service_connector_1 + .peer_register_and_connect_with_defaults(cid_1, cid_0) + .await + }); + let result = + futures::future::join_all([peer_0_register_and_connect, peer_1_register_and_connect]) + .await; + if result.iter().all(|i| i.is_ok()) { + citadel_logging::info!(target: "citadel", "Peers Registration and Connection to each other was successful"); + } else { + panic!("Peer Register and Connect Error") + } + // service_connector_0.peer_register_with_defaults(cid_0, cid_1).await?; + // service_connector_1.peer_register_with_defaults(cid_1, cid_0).await?; + // + // service_connector_0.peer_connect_with_defaults(cid_0, cid_1).await?; + // service_connector_1.peer_connect_with_defaults(cid_1, cid_0).await?; Ok(()) } @@ -93,8 +120,8 @@ mod tests { full_name: S, username: S, password: R, - ) -> Result> { - let internal_service_kernel = CitadelWorkspaceService::new(service_addr); + ) -> Result<(InternalServiceConnector, u64), Box> { + let internal_service_kernel = CitadelWorkspaceService::new_tcp(service_addr).await?; let internal_service = NodeBuilder::default() .with_node_type(NodeType::Peer) .with_backend(BackendType::InMemory) @@ -106,7 +133,7 @@ mod tests { // Connect to Internal Service via TCP let mut service_connector = InternalServiceConnector::connect_to_service(service_addr).await?; - service_connector + match service_connector .register_and_connect( server_addr, full_name, @@ -114,7 +141,10 @@ mod tests { password, Default::default(), ) - .await?; - Ok(service_connector) + .await + { + Ok(ConnectSuccess { cid, request_id: _ }) => Ok((service_connector, cid)), + Err(err) => Err(Box::from(err)), + } } } From 4363cee1dbce44e799631ce392434f4bf514e5d5 Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Tue, 9 Apr 2024 23:46:43 -0500 Subject: [PATCH 08/11] Message connector method and test --- .../src/io_interface/tcp.rs | 46 +++++++++++++++-- citadel-internal-service/tests/utilities.rs | 49 ++++++++++++++++--- 2 files changed, 84 insertions(+), 11 deletions(-) diff --git a/citadel-internal-service-connector/src/io_interface/tcp.rs b/citadel-internal-service-connector/src/io_interface/tcp.rs index d1218d6..d23e8f5 100644 --- a/citadel-internal-service-connector/src/io_interface/tcp.rs +++ b/citadel-internal-service-connector/src/io_interface/tcp.rs @@ -1,7 +1,7 @@ use crate::connector::*; //{wrap_tcp_conn, InternalServiceConnector, WrappedSink, WrappedStream, scan_for_response}; use crate::scan_for_response; use async_trait::async_trait; -use citadel_internal_service_types::InternalServicePayload; +use citadel_internal_service_types::{InternalServicePayload, MessageSendSuccess, SecurityLevel}; use futures::stream::{SplitSink, SplitStream}; use futures::SinkExt; use tokio::net::TcpListener; @@ -214,7 +214,6 @@ impl InternalServiceConnector { panic!("Unreachable") }; Ok(success) - // Ok(PeerRegisterSuccess{cid: 0, peer_cid: 1, peer_username: "peer".to_string(), request_id: None}) } /// Sends a request to register with peer with CID peer_cid. Uses the default values except for @@ -252,7 +251,6 @@ impl InternalServiceConnector { panic!("Unreachable") }; Ok(success) - // Ok(PeerConnectSuccess{cid: 0, peer_cid: 1, request_id: None}) } /// Sends a request to register with peer with CID peer_cid. Sends a request to @@ -293,7 +291,6 @@ impl InternalServiceConnector { panic!("Unreachable") }; Ok(success) - // Ok(PeerConnectSuccess{cid: 0, peer_cid: 1, request_id: None}) } /// Sends a request to connect to peer with CID peer_cid. Uses default values for connection @@ -308,6 +305,47 @@ impl InternalServiceConnector { .await } + /// Sends a message to given Peer or server if no peer CID was given. Returns a + /// Result with an InternalServiceResponse that specifies whether or not the request + /// was successfully sent. + pub async fn message>( + &mut self, + cid: S, + peer_cid: Option, + message: Vec, + security_level: SecurityLevel, + ) -> Result { + let peer_cid = peer_cid.map(|i| i.into()); + let outbound_request = InternalServiceRequest::Message { + request_id: Uuid::new_v4(), + message, + cid: cid.into(), + peer_cid, + security_level, + }; + + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::MessageSendSuccess(success) = + scan_for_response!(self.stream, InternalServiceResponse::MessageSendSuccess(..)) + else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Sends a message to given Peer or server if no peer CID was given. Uses the default + /// security level. Returns a Result with an InternalServiceResponse that specifies whether + /// or not the request was successfully sent. + pub async fn message_with_defaults>( + &mut self, + cid: S, + peer_cid: Option, + message: Vec, + ) -> Result { + self.message(cid, peer_cid, message, Default::default()) + .await + } + /// Sends a raw request to the internal service pub async fn send_raw_request( &mut self, diff --git a/citadel-internal-service/tests/utilities.rs b/citadel-internal-service/tests/utilities.rs index 0ee4d45..5d8563a 100644 --- a/citadel-internal-service/tests/utilities.rs +++ b/citadel-internal-service/tests/utilities.rs @@ -7,9 +7,11 @@ mod tests { use citadel_internal_service_connector::connector::{ClientError, InternalServiceConnector}; use citadel_internal_service_connector::io_interface::tcp::TcpIOInterface; use citadel_internal_service_connector::scan_for_response; - use citadel_internal_service_types::InternalServiceResponse::PeerConnectSuccess; - use citadel_internal_service_types::{ConnectSuccess, InternalServiceResponse}; + use citadel_internal_service_types::{ + ConnectSuccess, InternalServiceResponse, MessageNotification, + }; use citadel_sdk::prelude::{BackendType, NodeBuilder, NodeType, SecBuffer}; + use futures::StreamExt; use std::error::Error; use std::net::SocketAddr; use std::str::FromStr; @@ -106,11 +108,44 @@ mod tests { } else { panic!("Peer Register and Connect Error") } - // service_connector_0.peer_register_with_defaults(cid_0, cid_1).await?; - // service_connector_1.peer_register_with_defaults(cid_1, cid_0).await?; - // - // service_connector_0.peer_connect_with_defaults(cid_0, cid_1).await?; - // service_connector_1.peer_connect_with_defaults(cid_1, cid_0).await?; + Ok(()) + } + + #[tokio::test] + async fn test_utilities_peer_message() -> Result<(), Box> { + crate::common::setup_log(); + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + let (mut service_connector_0, cid_0) = connector_service_and_server( + server_bind_address, + SocketAddr::from_str("127.0.0.1:23457")?, + "name 0", + "username0", + "password0", + ) + .await?; + let (mut service_connector_1, cid_1) = connector_service_and_server( + server_bind_address, + SocketAddr::from_str("127.0.0.1:23458")?, + "name 1", + "username1", + "password1", + ) + .await?; + + service_connector_0 + .message_with_defaults(cid_0, Some(cid_1), "Test Message".to_string().into_bytes()) + .await?; + let InternalServiceResponse::MessageNotification(MessageNotification { + message, + cid: _, + peer_cid: _, + request_id: _, + }) = scan_for_response!( + service_connector_1.stream, + InternalServiceResponse::MessageNotification(..) + ); + citadel_logging::info!(target: "citadel", "Peer 1 received message: {message:?}"); Ok(()) } From 904e736fa996aecbf8fe92b926c9fbcd523aaebb Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Sat, 13 Apr 2024 23:09:17 -0500 Subject: [PATCH 09/11] Message Test Fix --- citadel-internal-service/tests/utilities.rs | 98 +++++++++++++++------ 1 file changed, 69 insertions(+), 29 deletions(-) diff --git a/citadel-internal-service/tests/utilities.rs b/citadel-internal-service/tests/utilities.rs index 5d8563a..0de19da 100644 --- a/citadel-internal-service/tests/utilities.rs +++ b/citadel-internal-service/tests/utilities.rs @@ -12,18 +12,17 @@ mod tests { }; use citadel_sdk::prelude::{BackendType, NodeBuilder, NodeType, SecBuffer}; use futures::StreamExt; - use std::error::Error; use std::net::SocketAddr; use std::str::FromStr; #[tokio::test] - async fn test_utilities_service_and_server() -> Result<(), Box> { + async fn test_utilities_service_and_server() -> Result<(), ClientError> { crate::common::setup_log(); let (server, server_bind_address) = server_info_skip_cert_verification(); tokio::task::spawn(server); let _result = connector_service_and_server( server_bind_address, - SocketAddr::from_str("127.0.0.1:23457")?, + SocketAddr::from_str("127.0.0.1:23457").unwrap(), "my name", "myusername", "password", @@ -33,7 +32,7 @@ mod tests { } #[tokio::test] - async fn test_utilities_register_and_connect_methods() -> Result<(), Box> { + async fn test_utilities_register_and_connect_methods() -> Result<(), ClientError> { // Setup Logging and Start Server crate::common::setup_log(); let (server, server_bind_address) = server_info_skip_cert_verification(); @@ -41,7 +40,8 @@ mod tests { // Start Internal Service let internal_service_kernel = - CitadelWorkspaceService::new_tcp(SocketAddr::from_str("127.0.0.1:23457")?).await?; + CitadelWorkspaceService::new_tcp(SocketAddr::from_str("127.0.0.1:23457").unwrap()) + .await?; let internal_service = NodeBuilder::default() .with_node_type(NodeType::Peer) .with_backend(BackendType::InMemory) @@ -54,9 +54,10 @@ mod tests { ("full name", "myusername", SecBuffer::from("password")); // Connect to Internal Service via TCP - let mut service_connector = - InternalServiceConnector::connect_to_service(SocketAddr::from_str("127.0.0.1:23457")?) - .await?; + let mut service_connector = InternalServiceConnector::connect_to_service( + SocketAddr::from_str("127.0.0.1:23457").unwrap(), + ) + .await?; // Register to Server service_connector .register_with_defaults(server_bind_address, full_name, username, password.clone()) @@ -69,13 +70,13 @@ mod tests { } #[tokio::test] - async fn test_utilities_peer_register_and_connect() -> Result<(), Box> { + async fn test_utilities_peer_register_and_connect() -> Result<(), ClientError> { crate::common::setup_log(); let (server, server_bind_address) = server_info_skip_cert_verification(); tokio::task::spawn(server); let (mut service_connector_0, cid_0) = connector_service_and_server( server_bind_address, - SocketAddr::from_str("127.0.0.1:23457")?, + SocketAddr::from_str("127.0.0.1:23457").unwrap(), "name 0", "username0", "password0", @@ -83,7 +84,7 @@ mod tests { .await?; let (mut service_connector_1, cid_1) = connector_service_and_server( server_bind_address, - SocketAddr::from_str("127.0.0.1:23458")?, + SocketAddr::from_str("127.0.0.1:23458").unwrap(), "name 1", "username1", "password1", @@ -112,13 +113,13 @@ mod tests { } #[tokio::test] - async fn test_utilities_peer_message() -> Result<(), Box> { + async fn test_utilities_peer_message() -> Result<(), ClientError> { crate::common::setup_log(); let (server, server_bind_address) = server_info_skip_cert_verification(); tokio::task::spawn(server); let (mut service_connector_0, cid_0) = connector_service_and_server( server_bind_address, - SocketAddr::from_str("127.0.0.1:23457")?, + SocketAddr::from_str("127.0.0.1:23457").unwrap(), "name 0", "username0", "password0", @@ -126,26 +127,65 @@ mod tests { .await?; let (mut service_connector_1, cid_1) = connector_service_and_server( server_bind_address, - SocketAddr::from_str("127.0.0.1:23458")?, + SocketAddr::from_str("127.0.0.1:23458").unwrap(), "name 1", "username1", "password1", ) .await?; - service_connector_0 - .message_with_defaults(cid_0, Some(cid_1), "Test Message".to_string().into_bytes()) - .await?; - let InternalServiceResponse::MessageNotification(MessageNotification { - message, - cid: _, - peer_cid: _, - request_id: _, - }) = scan_for_response!( - service_connector_1.stream, - InternalServiceResponse::MessageNotification(..) - ); - citadel_logging::info!(target: "citadel", "Peer 1 received message: {message:?}"); + let peer_0_register_and_connect = tokio::task::spawn(async move { + service_connector_0 + .peer_register_and_connect_with_defaults(cid_0, cid_1) + .await?; + let result = service_connector_0 + .message_with_defaults(cid_0, Some(cid_1), "Test Message".to_string().into_bytes()) + .await; + let InternalServiceResponse::MessageNotification(MessageNotification { + message, + cid: _, + peer_cid: _, + request_id: _, + }) = scan_for_response!( + service_connector_0.stream, + InternalServiceResponse::MessageNotification(..) + ) + else { + panic!("Unreachable"); + }; + citadel_logging::info!(target: "citadel", "Peer 0 received message: {message:?}"); + result + }); + let peer_1_register_and_connect = tokio::task::spawn(async move { + service_connector_1 + .peer_register_and_connect_with_defaults(cid_1, cid_0) + .await?; + let result = service_connector_1 + .message_with_defaults(cid_1, Some(cid_0), "Test Message".to_string().into_bytes()) + .await; + let InternalServiceResponse::MessageNotification(MessageNotification { + message, + cid: _, + peer_cid: _, + request_id: _, + }) = scan_for_response!( + service_connector_1.stream, + InternalServiceResponse::MessageNotification(..) + ) + else { + panic!("Unreachable"); + }; + citadel_logging::info!(target: "citadel", "Peer 1 received message: {message:?}"); + result + }); + let result = + futures::future::join_all([peer_0_register_and_connect, peer_1_register_and_connect]) + .await; + if result.iter().all(|i| i.is_ok()) { + citadel_logging::info!(target: "citadel", "Peers Successfully Registered, Connected, and Sent Message"); + } else { + panic!("Peer Message Error") + } Ok(()) } @@ -155,7 +195,7 @@ mod tests { full_name: S, username: S, password: R, - ) -> Result<(InternalServiceConnector, u64), Box> { + ) -> Result<(InternalServiceConnector, u64), ClientError> { let internal_service_kernel = CitadelWorkspaceService::new_tcp(service_addr).await?; let internal_service = NodeBuilder::default() .with_node_type(NodeType::Peer) @@ -179,7 +219,7 @@ mod tests { .await { Ok(ConnectSuccess { cid, request_id: _ }) => Ok((service_connector, cid)), - Err(err) => Err(Box::from(err)), + Err(err) => Err(err), } } } From fc8ca406fc2130db2b51f8e5b4db4a4b33889af7 Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Sun, 14 Apr 2024 00:38:20 -0500 Subject: [PATCH 10/11] Disconnect Utilities and Fix --- .../src/io_interface/tcp.rs | 92 +++++++++++++------ citadel-internal-service-types/src/lib.rs | 7 ++ .../src/kernel/requests/disconnect.rs | 5 +- citadel-internal-service/tests/service.rs | 4 +- citadel-internal-service/tests/utilities.rs | 17 ++++ 5 files changed, 94 insertions(+), 31 deletions(-) diff --git a/citadel-internal-service-connector/src/io_interface/tcp.rs b/citadel-internal-service-connector/src/io_interface/tcp.rs index d23e8f5..25e8a83 100644 --- a/citadel-internal-service-connector/src/io_interface/tcp.rs +++ b/citadel-internal-service-connector/src/io_interface/tcp.rs @@ -1,7 +1,10 @@ use crate::connector::*; //{wrap_tcp_conn, InternalServiceConnector, WrappedSink, WrappedStream, scan_for_response}; use crate::scan_for_response; use async_trait::async_trait; -use citadel_internal_service_types::{InternalServicePayload, MessageSendSuccess, SecurityLevel}; +use citadel_internal_service_types::{ + DisconnectSuccess, InternalServicePayload, MessageSendSuccess, PeerDisconnectSuccess, + SecurityLevel, +}; use futures::stream::{SplitSink, SplitStream}; use futures::SinkExt; use tokio::net::TcpListener; @@ -60,7 +63,7 @@ impl InternalServiceConnector { } /// Sends a request to register at server running at the given address. Returns a Result with - /// an InternalServiceResponse that specifies whether or not the request was successfully sent. + /// a RegisterSuccess on success or ClientError on failure. pub async fn register, S: Into, R: Into>( &mut self, server_address: U, @@ -89,8 +92,8 @@ impl InternalServiceConnector { } /// Sends a request to register at server running at the given address. Uses the default values - /// except for proposed credentials and the target server's address. Returns a Result with an - /// InternalServiceResponse that specifies whether or not the request was successfully sent. + /// except for proposed credentials and the target server's address. Returns a Result with a + /// RegisterSuccess on success or ClientError on failure. pub async fn register_with_defaults< U: Into, S: Into, @@ -113,8 +116,8 @@ impl InternalServiceConnector { } /// Sends a request to register at server running at the given address. Sends a request to - /// connect immediately following a successful registration. Returns a Result with an - /// InternalServiceResponse that specifies whether or not the request was successfully sent. + /// connect immediately following a successful registration. Returns a Result with a + /// ConnectSuccess on success or ClientError on failure. pub async fn register_and_connect, S: Into, R: Into>( &mut self, server_address: U, @@ -142,8 +145,7 @@ impl InternalServiceConnector { } /// Sends a request to connect to the current server with the given credentials. Returns a - /// Result with an InternalServiceResponse that specifies whether or not the request - /// was successfully sent. + /// Result with a ConnectSuccess on success or ClientError on failure. pub async fn connect, R: Into>( &mut self, username: S, @@ -172,8 +174,8 @@ impl InternalServiceConnector { } /// Sends a request to connect to the current server with the given credentials. Uses default - /// values for all parameters other than credentials. Returns a Result with an - /// InternalServiceResponse that specifies whether or not the request was successfully sent. + /// values for all parameters other than credentials. Returns a Result with a ConnectSuccess + /// on success or ClientError on failure. pub async fn connect_with_defaults, R: Into>( &mut self, username: S, @@ -190,8 +192,8 @@ impl InternalServiceConnector { .await } - /// Sends a request to register with peer with CID peer_cid. Returns a Result with an - /// InternalServiceResponse that specifies whether or not the request was successfully sent. + /// Sends a request to register with peer with CID peer_cid. Returns a Result with a + /// PeerRegisterSuccess on success or ClientError on failure. pub async fn peer_register>( &mut self, cid: S, @@ -217,8 +219,8 @@ impl InternalServiceConnector { } /// Sends a request to register with peer with CID peer_cid. Uses the default values except for - /// proposed credentials. Returns a Result with an InternalServiceResponse that specifies - /// whether or not the request was successfully sent. + /// proposed credentials. Returns a Result with a PeerRegisterSuccess on success or + /// ClientError on failure. pub async fn peer_register_with_defaults>( &mut self, cid: S, @@ -228,8 +230,8 @@ impl InternalServiceConnector { } /// Sends a request to register with peer with CID peer_cid. Sends a request to - /// connect immediately following a successful registration. Returns a Result with an - /// InternalServiceResponse that specifies whether or not the request was successfully sent. + /// connect immediately following a successful registration. Returns a Result with a + /// PeerConnectSuccess on success or ClientError on failure. pub async fn peer_register_and_connect>( &mut self, cid: S, @@ -255,8 +257,8 @@ impl InternalServiceConnector { /// Sends a request to register with peer with CID peer_cid. Sends a request to /// connect immediately following a successful registration. Requests use the default - /// SessionSecuritySettings Value. Returns a Result with an InternalServiceResponse that - /// specifies whether or not the request was successfully sent. + /// SessionSecuritySettings Value. Returns a Result with a PeerConnectSuccess on + /// success or ClientError on failure. pub async fn peer_register_and_connect_with_defaults>( &mut self, cid: S, @@ -267,8 +269,7 @@ impl InternalServiceConnector { } /// Sends a request to connect to peer with CID peer_cid. Returns a - /// Result with an InternalServiceResponse that specifies whether or not the request - /// was successfully sent. + /// Result with a PeerConnectSuccess on success or ClientError on failure. pub async fn peer_connect>( &mut self, cid: S, @@ -294,8 +295,7 @@ impl InternalServiceConnector { } /// Sends a request to connect to peer with CID peer_cid. Uses default values for connection - /// parameters. Returns a Result with an InternalServiceResponse that specifies whether or - /// not the request was successfully sent. + /// parameters. Returns a Result with a PeerConnectSuccess on success or ClientError on failure. pub async fn peer_connect_with_defaults>( &mut self, cid: S, @@ -306,8 +306,7 @@ impl InternalServiceConnector { } /// Sends a message to given Peer or server if no peer CID was given. Returns a - /// Result with an InternalServiceResponse that specifies whether or not the request - /// was successfully sent. + /// Result with a MessageSendSuccess on success or ClientError on failure. pub async fn message>( &mut self, cid: S, @@ -334,8 +333,8 @@ impl InternalServiceConnector { } /// Sends a message to given Peer or server if no peer CID was given. Uses the default - /// security level. Returns a Result with an InternalServiceResponse that specifies whether - /// or not the request was successfully sent. + /// security level. Returns a Result with a MessageSendSuccess on success or ClientError + /// on failure. pub async fn message_with_defaults>( &mut self, cid: S, @@ -346,6 +345,47 @@ impl InternalServiceConnector { .await } + /// Disconnects from connected server. Returns a Result with a DisconnectSuccess on success + /// or ClientError on failure. + pub async fn disconnect>( + &mut self, + cid: S, + ) -> Result { + let outbound_request = InternalServiceRequest::Disconnect { + request_id: Uuid::new_v4(), + cid: cid.into(), + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::DisconnectSuccess(success) = + scan_for_response!(self.stream, InternalServiceResponse::DisconnectSuccess(..)) + else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Disconnects from the given peer. Returns a Result with a PeerDisconnectSuccess on + /// success or ClientError on failure. + pub async fn peer_disconnect>( + &mut self, + cid: S, + peer_cid: S, + ) -> Result { + let outbound_request = InternalServiceRequest::PeerDisconnect { + request_id: Uuid::new_v4(), + cid: cid.into(), + peer_cid: peer_cid.into(), + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::PeerDisconnectSuccess(success) = scan_for_response!( + self.stream, + InternalServiceResponse::PeerDisconnectSuccess(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + } + /// Sends a raw request to the internal service pub async fn send_raw_request( &mut self, diff --git a/citadel-internal-service-types/src/lib.rs b/citadel-internal-service-types/src/lib.rs index 02600d7..fcd75f1 100644 --- a/citadel-internal-service-types/src/lib.rs +++ b/citadel-internal-service-types/src/lib.rs @@ -70,6 +70,12 @@ pub struct DisconnectNotification { pub request_id: Option, } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct DisconnectSuccess { + pub cid: u64, + pub request_id: Option, +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct DisconnectFailure { pub cid: u64, @@ -579,6 +585,7 @@ pub enum InternalServiceResponse { MessageSendFailure(MessageSendFailure), MessageNotification(MessageNotification), DisconnectNotification(DisconnectNotification), + DisconnectSuccess(DisconnectSuccess), DisconnectFailure(DisconnectFailure), SendFileRequestSuccess(SendFileRequestSuccess), SendFileRequestFailure(SendFileRequestFailure), diff --git a/citadel-internal-service/src/kernel/requests/disconnect.rs b/citadel-internal-service/src/kernel/requests/disconnect.rs index 0d38082..d6583c2 100644 --- a/citadel-internal-service/src/kernel/requests/disconnect.rs +++ b/citadel-internal-service/src/kernel/requests/disconnect.rs @@ -2,7 +2,7 @@ use crate::kernel::requests::HandledRequestResult; use crate::kernel::CitadelWorkspaceService; use citadel_internal_service_connector::io_interface::IOInterface; use citadel_internal_service_types::{ - DisconnectFailure, DisconnectNotification, InternalServiceRequest, InternalServiceResponse, + DisconnectFailure, DisconnectSuccess, InternalServiceRequest, InternalServiceResponse, }; use citadel_logging::info; use citadel_sdk::prelude::{DisconnectFromHypernode, NodeRequest}; @@ -27,9 +27,8 @@ pub async fn handle( match remote.send(request).await { Ok(_res) => { let disconnect_success = - InternalServiceResponse::DisconnectNotification(DisconnectNotification { + InternalServiceResponse::DisconnectSuccess(DisconnectSuccess { cid, - peer_cid: None, request_id: Some(request_id), }); Some(HandledRequestResult { diff --git a/citadel-internal-service/tests/service.rs b/citadel-internal-service/tests/service.rs index a7987ef..31ad263 100644 --- a/citadel-internal-service/tests/service.rs +++ b/citadel-internal-service/tests/service.rs @@ -68,7 +68,7 @@ mod tests { assert!(matches!( disconnect_response, - InternalServiceResponse::DisconnectNotification { .. } + InternalServiceResponse::DisconnectSuccess { .. } )); Ok(()) @@ -169,7 +169,7 @@ mod tests { assert!(matches!( disconnect_response, - InternalServiceResponse::DisconnectNotification { .. } + InternalServiceResponse::DisconnectSuccess { .. } )); Ok(()) diff --git a/citadel-internal-service/tests/utilities.rs b/citadel-internal-service/tests/utilities.rs index 0de19da..b1486a1 100644 --- a/citadel-internal-service/tests/utilities.rs +++ b/citadel-internal-service/tests/utilities.rs @@ -69,6 +69,23 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_utilities_disconnect() -> Result<(), ClientError> { + crate::common::setup_log(); + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + let (mut service_connector_0, cid_0) = connector_service_and_server( + server_bind_address, + SocketAddr::from_str("127.0.0.1:23457").unwrap(), + "name 0", + "username0", + "password0", + ) + .await?; + service_connector_0.disconnect(cid_0).await?; + Ok(()) + } + #[tokio::test] async fn test_utilities_peer_register_and_connect() -> Result<(), ClientError> { crate::common::setup_log(); From 2f39d8c539451043cfa876c1755067b6a378adcf Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Sun, 14 Apr 2024 23:39:08 -0500 Subject: [PATCH 11/11] File Transfer Utilities and Additional Tests --- .../src/io_interface/tcp.rs | 255 +++++++++++++++++- citadel-internal-service/tests/utilities.rs | 184 ++++++++++++- 2 files changed, 422 insertions(+), 17 deletions(-) diff --git a/citadel-internal-service-connector/src/io_interface/tcp.rs b/citadel-internal-service-connector/src/io_interface/tcp.rs index 25e8a83..71f02a7 100644 --- a/citadel-internal-service-connector/src/io_interface/tcp.rs +++ b/citadel-internal-service-connector/src/io_interface/tcp.rs @@ -1,23 +1,22 @@ -use crate::connector::*; //{wrap_tcp_conn, InternalServiceConnector, WrappedSink, WrappedStream, scan_for_response}; +use crate::connector::*; use crate::scan_for_response; use async_trait::async_trait; -use citadel_internal_service_types::{ - DisconnectSuccess, InternalServicePayload, MessageSendSuccess, PeerDisconnectSuccess, - SecurityLevel, -}; +// use citadel_internal_service_types::{DisconnectSuccess, InternalServicePayload, MessageSendSuccess, PeerDisconnectSuccess, SecurityLevel, SendFileRequestSuccess, TransferType}; use futures::stream::{SplitSink, SplitStream}; use futures::SinkExt; use tokio::net::TcpListener; use crate::codec::SerializingCodec; use crate::io_interface::IOInterface; -use citadel_internal_service_types::{ - ConnectMode, ConnectSuccess, InternalServiceRequest, InternalServiceResponse, - PeerConnectSuccess, PeerRegisterSuccess, RegisterSuccess, SecBuffer, SessionSecuritySettings, - UdpMode, -}; +// use citadel_internal_service_types::{ +// ConnectMode, ConnectSuccess, InternalServiceRequest, InternalServiceResponse, +// PeerConnectSuccess, PeerRegisterSuccess, RegisterSuccess, SecBuffer, SessionSecuritySettings, +// UdpMode, +// }; +use citadel_internal_service_types::*; use futures::StreamExt; use std::net::SocketAddr; +use std::path::PathBuf; use std::time::Duration; use tokio::net::{TcpStream, ToSocketAddrs}; use tokio_util::codec::Framed; @@ -386,6 +385,242 @@ impl InternalServiceConnector { Ok(success) } + /// Requests to Send a file to the given peer or server if no peer CID was given. Allows for + /// the chunk size to be controlled. Returns a Result with a SendFileRequestSuccess + /// on success or Client Error on failure. + pub async fn file_send, R: Into>( + &mut self, + cid: S, + peer_cid: Option, + source: R, + chunk_size: Option, + ) -> Result { + let outbound_request = InternalServiceRequest::SendFile { + request_id: Uuid::new_v4(), + source: source.into(), + cid: cid.into(), + peer_cid: peer_cid.map(|i| i.into()), + chunk_size, + transfer_type: TransferType::FileTransfer, + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::SendFileRequestSuccess(success) = scan_for_response!( + self.stream, + InternalServiceResponse::SendFileRequestSuccess(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Requests to Send a file to the given peer or server if no peer CID was given. Uses the + /// default chunk size. Returns a Result with a SendFileRequestSuccess + /// on success or Client Error on failure. + pub async fn file_send_with_defaults, R: Into>( + &mut self, + cid: S, + peer_cid: Option, + source: R, + ) -> Result { + let outbound_request = InternalServiceRequest::SendFile { + request_id: Uuid::new_v4(), + source: source.into(), + cid: cid.into(), + peer_cid: peer_cid.map(|i| i.into()), + chunk_size: None, + transfer_type: TransferType::FileTransfer, + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::SendFileRequestSuccess(success) = scan_for_response!( + self.stream, + InternalServiceResponse::SendFileRequestSuccess(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Respond to a File Transfer Request by accepting or declining. A Download Location can be + /// given for the received file. Returns a Result with a SendFileRequestSuccess + /// on success or Client Error on failure. + pub async fn respond_file_transfer, R: Into>( + &mut self, + cid: S, + peer_cid: S, + object_id: u64, + accept: bool, + download_location: Option, + ) -> Result { + let outbound_request = InternalServiceRequest::RespondFileTransfer { + request_id: Uuid::new_v4(), + cid: cid.into(), + peer_cid: peer_cid.into(), + object_id, + accept, + download_location: download_location.map(|i| i.into()), + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::FileTransferStatusNotification(success) = scan_for_response!( + self.stream, + InternalServiceResponse::FileTransferStatusNotification(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Respond to a File Transfer Request by accepting or declining. Download Location is set to + /// Default. Returns a Result with a SendFileRequestSuccess on success or Client Error + /// on failure. + pub async fn respond_file_transfer_with_defaults>( + &mut self, + cid: S, + peer_cid: S, + object_id: u64, + accept: bool, + ) -> Result { + let outbound_request = InternalServiceRequest::RespondFileTransfer { + request_id: Uuid::new_v4(), + cid: cid.into(), + peer_cid: peer_cid.into(), + object_id, + accept, + download_location: None, + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::FileTransferStatusNotification(success) = scan_for_response!( + self.stream, + InternalServiceResponse::FileTransferStatusNotification(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Requests to Send a file for REVFS (Remote Encrypted Virtual File System) at the given peer + /// or server if no peer CID was given. Allows for the chunk size to be controlled. Returns a + /// Result with a SendFileRequestSuccess on success or Client Error on failure. + pub async fn revfs_push, R: Into>( + &mut self, + cid: S, + peer_cid: Option, + source: R, + chunk_size: Option, + virtual_path: R, + security_level: SecurityLevel, + ) -> Result { + let outbound_request = InternalServiceRequest::SendFile { + request_id: Uuid::new_v4(), + source: source.into(), + cid: cid.into(), + peer_cid: peer_cid.map(|i| i.into()), + chunk_size, + transfer_type: TransferType::RemoteEncryptedVirtualFilesystem { + virtual_path: virtual_path.into(), + security_level, + }, + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::SendFileRequestSuccess(success) = scan_for_response!( + self.stream, + InternalServiceResponse::SendFileRequestSuccess(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Requests to Send a file for REVFS (Remote Encrypted Virtual File System) at the given peer + /// or server if no peer CID was given. Allows for the chunk size to be controlled. Returns a + /// Result with a SendFileRequestSuccess on success or Client Error on failure. + pub async fn revfs_push_with_defaults, R: Into>( + &mut self, + cid: S, + peer_cid: Option, + source: R, + virtual_path: R, + ) -> Result { + let outbound_request = InternalServiceRequest::SendFile { + request_id: Uuid::new_v4(), + source: source.into(), + cid: cid.into(), + peer_cid: peer_cid.map(|i| i.into()), + chunk_size: None, + transfer_type: TransferType::RemoteEncryptedVirtualFilesystem { + virtual_path: virtual_path.into(), + security_level: Default::default(), + }, + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::SendFileRequestSuccess(success) = scan_for_response!( + self.stream, + InternalServiceResponse::SendFileRequestSuccess(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Downloads a file from REVFS (Remote Encrypted Virtual File System) at the given peer + /// or server if no peer CID was given. Uses a custom Security Level and can delete the file + /// upon pulling. Returns a Result with a SendFileRequestSuccess on success or Client Error + /// on failure. + pub async fn revfs_pull, R: Into>( + &mut self, + cid: S, + peer_cid: Option, + virtual_directory: R, + security_level: Option, + delete_on_pull: bool, + ) -> Result { + let outbound_request = InternalServiceRequest::DownloadFile { + virtual_directory: virtual_directory.into(), + security_level, + delete_on_pull, + cid: cid.into(), + peer_cid: peer_cid.map(|i| i.into()), + request_id: Uuid::new_v4(), + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::SendFileRequestSuccess(success) = scan_for_response!( + self.stream, + InternalServiceResponse::SendFileRequestSuccess(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Downloads a file from REVFS (Remote Encrypted Virtual File System) at the given peer + /// or server if no peer CID was given. Uses default Security Level and does not delete the + /// file on pull. Returns a Result with a SendFileRequestSuccess on success or Client Error + /// on failure. + pub async fn revfs_pull_with_defaults, R: Into>( + &mut self, + cid: S, + peer_cid: Option, + virtual_directory: R, + security_level: Option, + delete_on_pull: bool, + ) -> Result { + let outbound_request = InternalServiceRequest::DownloadFile { + virtual_directory: virtual_directory.into(), + security_level, + delete_on_pull, + cid: cid.into(), + peer_cid: peer_cid.map(|i| i.into()), + request_id: Uuid::new_v4(), + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::SendFileRequestSuccess(success) = scan_for_response!( + self.stream, + InternalServiceResponse::SendFileRequestSuccess(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + } + /// Sends a raw request to the internal service pub async fn send_raw_request( &mut self, diff --git a/citadel-internal-service/tests/utilities.rs b/citadel-internal-service/tests/utilities.rs index b1486a1..1f71023 100644 --- a/citadel-internal-service/tests/utilities.rs +++ b/citadel-internal-service/tests/utilities.rs @@ -8,11 +8,13 @@ mod tests { use citadel_internal_service_connector::io_interface::tcp::TcpIOInterface; use citadel_internal_service_connector::scan_for_response; use citadel_internal_service_types::{ - ConnectSuccess, InternalServiceResponse, MessageNotification, + ConnectSuccess, DisconnectNotification, FileTransferRequestNotification, + InternalServiceResponse, MessageNotification, }; use citadel_sdk::prelude::{BackendType, NodeBuilder, NodeType, SecBuffer}; use futures::StreamExt; use std::net::SocketAddr; + use std::path::PathBuf; use std::str::FromStr; #[tokio::test] @@ -129,6 +131,63 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_utilities_peer_disconnect() -> Result<(), ClientError> { + crate::common::setup_log(); + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + let (mut service_connector_0, cid_0) = connector_service_and_server( + server_bind_address, + SocketAddr::from_str("127.0.0.1:23457").unwrap(), + "name 0", + "username0", + "password0", + ) + .await?; + let (mut service_connector_1, cid_1) = connector_service_and_server( + server_bind_address, + SocketAddr::from_str("127.0.0.1:23458").unwrap(), + "name 1", + "username1", + "password1", + ) + .await?; + + let peer_0_task = tokio::task::spawn(async move { + service_connector_0 + .peer_register_and_connect_with_defaults(cid_0, cid_1) + .await?; + service_connector_0.peer_disconnect(cid_0, cid_1).await?; + citadel_logging::info!(target: "citadel", "Peer 0 Disconnected from Peer 1"); + Ok(()) + }); + let peer_1_task = tokio::task::spawn(async move { + service_connector_1 + .peer_register_and_connect_with_defaults(cid_1, cid_0) + .await?; + let InternalServiceResponse::DisconnectNotification(DisconnectNotification { + cid: _, + peer_cid: _, + request_id: _, + }) = scan_for_response!( + service_connector_1.stream, + InternalServiceResponse::DisconnectNotification(..) + ) + else { + panic!("Unreachable"); + }; + citadel_logging::info!(target: "citadel", "Peer 1 Received Disconnect Notification"); + Ok(()) + }); + let result = futures::future::join_all([peer_0_task, peer_1_task]).await; + if result.iter().all(|i| i.is_ok()) { + citadel_logging::info!(target: "citadel", "Peers Successfully Disconnected"); + } else { + panic!("Peer Disconnect Error") + } + Ok(()) + } + #[tokio::test] async fn test_utilities_peer_message() -> Result<(), ClientError> { crate::common::setup_log(); @@ -151,7 +210,7 @@ mod tests { ) .await?; - let peer_0_register_and_connect = tokio::task::spawn(async move { + let peer_0_task = tokio::task::spawn(async move { service_connector_0 .peer_register_and_connect_with_defaults(cid_0, cid_1) .await?; @@ -173,7 +232,7 @@ mod tests { citadel_logging::info!(target: "citadel", "Peer 0 received message: {message:?}"); result }); - let peer_1_register_and_connect = tokio::task::spawn(async move { + let peer_1_task = tokio::task::spawn(async move { service_connector_1 .peer_register_and_connect_with_defaults(cid_1, cid_0) .await?; @@ -195,9 +254,7 @@ mod tests { citadel_logging::info!(target: "citadel", "Peer 1 received message: {message:?}"); result }); - let result = - futures::future::join_all([peer_0_register_and_connect, peer_1_register_and_connect]) - .await; + let result = futures::future::join_all([peer_0_task, peer_1_task]).await; if result.iter().all(|i| i.is_ok()) { citadel_logging::info!(target: "citadel", "Peers Successfully Registered, Connected, and Sent Message"); } else { @@ -206,17 +263,130 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_utilities_peer_file_transfer() -> Result<(), ClientError> { + crate::common::setup_log(); + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + let (mut service_connector_0, cid_0) = connector_service_and_server_with_backend( + server_bind_address, + SocketAddr::from_str("127.0.0.1:23457").unwrap(), + "name 0", + "username0", + "password0", + BackendType::Filesystem("peer0".to_string()), + ) + .await?; + let (mut service_connector_1, cid_1) = connector_service_and_server_with_backend( + server_bind_address, + SocketAddr::from_str("127.0.0.1:23458").unwrap(), + "name 1", + "username1", + "password1", + BackendType::Filesystem("peer1".to_string()), + ) + .await?; + + let file_to_send = PathBuf::from("../resources/test.txt"); + let _virtual_path = PathBuf::from("/vfs/test.txt"); + let peer_0_task = tokio::task::spawn(async move { + service_connector_0 + .peer_register_and_connect_with_defaults(cid_0, cid_1) + .await?; + service_connector_0 + .file_send_with_defaults(cid_0, Some(cid_1), file_to_send) + .await?; + citadel_logging::info!(target: "citadel", "Peer 0 Successfully Sent File to Peer 1"); + Ok(()) + }); + let peer_1_task = tokio::task::spawn(async move { + service_connector_1 + .peer_register_and_connect_with_defaults(cid_1, cid_0) + .await?; + let InternalServiceResponse::FileTransferRequestNotification( + FileTransferRequestNotification { + cid: _, + peer_cid: _, + metadata, + request_id: _, + }, + ) = scan_for_response!( + service_connector_1.stream, + InternalServiceResponse::FileTransferRequestNotification(..) + ) + else { + panic!("Unreachable"); + }; + service_connector_1 + .respond_file_transfer_with_defaults(cid_1, cid_0, metadata.object_id, true) + .await?; + citadel_logging::info!(target: "citadel", "Peer 1 Received Disconnect Notification"); + Ok(()) + }); + let result = futures::future::join_all([peer_0_task, peer_1_task]).await; + if result.iter().all(|i| i.is_ok()) { + citadel_logging::info!(target: "citadel", "File Transfer Succeeded"); + } else { + panic!("File Transfer Error") + } + Ok(()) + } + async fn connector_service_and_server, R: Into>( server_addr: SocketAddr, service_addr: SocketAddr, full_name: S, username: S, password: R, + ) -> Result<(InternalServiceConnector, u64), ClientError> { + connector_service_and_server_with_backend( + server_addr, + service_addr, + full_name, + username, + password, + BackendType::InMemory, + ) + .await + // let internal_service_kernel = CitadelWorkspaceService::new_tcp(service_addr).await?; + // let internal_service = NodeBuilder::default() + // .with_node_type(NodeType::Peer) + // .with_backend(BackendType::InMemory) + // .with_insecure_skip_cert_verification() + // .build(internal_service_kernel) + // .unwrap(); + // tokio::task::spawn(internal_service); + // + // // Connect to Internal Service via TCP + // let mut service_connector = + // InternalServiceConnector::connect_to_service(service_addr).await?; + // match service_connector + // .register_and_connect( + // server_addr, + // full_name, + // username, + // password, + // Default::default(), + // ) + // .await + // { + // Ok(ConnectSuccess { cid, request_id: _ }) => Ok((service_connector, cid)), + // Err(err) => Err(err), + // } + } + + async fn connector_service_and_server_with_backend, R: Into>( + server_addr: SocketAddr, + service_addr: SocketAddr, + full_name: S, + username: S, + password: R, + backend: BackendType, ) -> Result<(InternalServiceConnector, u64), ClientError> { let internal_service_kernel = CitadelWorkspaceService::new_tcp(service_addr).await?; let internal_service = NodeBuilder::default() .with_node_type(NodeType::Peer) - .with_backend(BackendType::InMemory) + .with_backend(backend) .with_insecure_skip_cert_verification() .build(internal_service_kernel) .unwrap();