diff --git a/citadel-internal-service-connector/Cargo.toml b/citadel-internal-service-connector/Cargo.toml index 5bd69d3..4d79de0 100644 --- a/citadel-internal-service-connector/Cargo.toml +++ b/citadel-internal-service-connector/Cargo.toml @@ -7,11 +7,11 @@ edition = "2021" [dependencies] citadel-internal-service-types = { workspace = true } +citadel_logging = { workspace = true } tokio = { workspace = true, features = ["net", "rt", "macros"] } tokio-util = { workspace = true, features = ["codec"] } bincode2 = { workspace = true } serde = { workspace = true } futures = { workspace = true, features = ["alloc"] } -uuid = { workspace = true } -citadel_logging = { workspace = true } -async-trait = "0.1.79" \ No newline at end of file +uuid = { workspace = true, features = ["v4"]} +async-trait = "0.1.79" diff --git a/citadel-internal-service-connector/src/codec.rs b/citadel-internal-service-connector/src/codec.rs index 5cea696..7c49344 100644 --- a/citadel-internal-service-connector/src/codec.rs +++ b/citadel-internal-service-connector/src/codec.rs @@ -1,3 +1,4 @@ +use crate::connector::ClientError; use serde::de::DeserializeOwned; use serde::Serialize; use std::error::Error; @@ -37,6 +38,18 @@ impl From for std::io::Error { } } +impl From for ClientError { + fn from(value: CodecError) -> Self { + ClientError::CodecError(value.reason) + } +} + +impl From for ClientError { + fn from(value: std::io::Error) -> Self { + ClientError::SendError(value.to_string()) + } +} + impl Encoder for SerializingCodec { type Error = std::io::Error; diff --git a/citadel-internal-service-connector/src/connector.rs b/citadel-internal-service-connector/src/connector.rs index 4d81e59..29dac3d 100644 --- a/citadel-internal-service-connector/src/connector.rs +++ b/citadel-internal-service-connector/src/connector.rs @@ -4,6 +4,8 @@ use citadel_internal_service_types::{ InternalServicePayload, InternalServiceRequest, InternalServiceResponse, }; use futures::{Sink, Stream, StreamExt}; +use std::error::Error; +use std::fmt::{Display, Formatter}; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::net::TcpStream; @@ -22,6 +24,47 @@ pub struct WrappedSink { pub inner: T::Sink, } +#[derive(Debug, Clone)] +pub enum ClientError { + ConnectionToInternalServiceFailed(String), + InternalServiceDisconnected, + InternalServiceInvalidResponse(String), + CodecError(String), + SendError(String), +} + +impl Display for ClientError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + std::fmt::Debug::fmt(self, f) + } +} + +impl Error for ClientError {} + +#[macro_export] +macro_rules! scan_for_response { + ($stream:expr, $required_packet:pat) => {{ + loop { + match $stream.next().await { + Some(response) => { + if response.is_error() { + return Err(ClientError::InternalServiceInvalidResponse(format!( + "{response:?}" + ))); + } + + if matches!(response, $required_packet) { + break response; + } + + citadel_logging::trace!("Service Connector - Unrelated response: {response:?}"); + } + None => return Err(ClientError::InternalServiceDisconnected)?, + } + } + }}; +} + impl Stream for WrappedStream { type Item = InternalServiceResponse; diff --git a/citadel-internal-service-connector/src/io_interface/tcp.rs b/citadel-internal-service-connector/src/io_interface/tcp.rs index 5a4a58a..71f02a7 100644 --- a/citadel-internal-service-connector/src/io_interface/tcp.rs +++ b/citadel-internal-service-connector/src/io_interface/tcp.rs @@ -1,12 +1,26 @@ -use crate::codec::SerializingCodec; -use crate::connector::{wrap_tcp_conn, InternalServiceConnector, WrappedSink, WrappedStream}; -use crate::io_interface::IOInterface; +use crate::connector::*; +use crate::scan_for_response; use async_trait::async_trait; -use citadel_internal_service_types::{InternalServicePayload, InternalServiceResponse}; +// use citadel_internal_service_types::{DisconnectSuccess, InternalServicePayload, MessageSendSuccess, PeerDisconnectSuccess, SecurityLevel, SendFileRequestSuccess, TransferType}; use futures::stream::{SplitSink, SplitStream}; +use futures::SinkExt; +use tokio::net::TcpListener; + +use crate::codec::SerializingCodec; +use crate::io_interface::IOInterface; +// use citadel_internal_service_types::{ +// ConnectMode, ConnectSuccess, InternalServiceRequest, InternalServiceResponse, +// PeerConnectSuccess, PeerRegisterSuccess, RegisterSuccess, SecBuffer, SessionSecuritySettings, +// UdpMode, +// }; +use citadel_internal_service_types::*; use futures::StreamExt; -use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::time::Duration; +use tokio::net::{TcpStream, ToSocketAddrs}; use tokio_util::codec::Framed; +use uuid::Uuid; pub struct TcpIOInterface { pub listener: TcpListener, @@ -20,25 +34,605 @@ impl TcpIOInterface { } impl InternalServiceConnector { - pub async fn connect(addr: T) -> Result> { - let conn = TcpStream::connect(addr).await?; + /// Establishes a connection with the Internal Service running at the given address. Returns an + /// InternalServiceConnector that can be used to easily interface with the Internal Service. + pub async fn connect_to_service(addr: S) -> Result { + let conn = TcpStream::connect(addr) + .await + .map_err(|err| ClientError::ConnectionToInternalServiceFailed(err.to_string()))?; let (sink, mut stream) = wrap_tcp_conn(conn).split(); let greeter_packet = stream .next() .await - .ok_or("Failed to receive greeting packet")??; + .ok_or(ClientError::InternalServiceDisconnected)??; if matches!( greeter_packet, - InternalServicePayload::Response(InternalServiceResponse::ServiceConnectionAccepted(_)) + InternalServicePayload::Response( + InternalServiceResponse::ServiceConnectionAccepted { .. } + ) ) { let stream = WrappedStream { inner: stream }; let sink = WrappedSink { inner: sink }; Ok(Self { sink, stream }) } else { - Err("Failed to receive greeting packet")? + Err(ClientError::InternalServiceInvalidResponse(format!( + "{greeter_packet:?}" + )))? } } + /// Sends a request to register at server running at the given address. Returns a Result with + /// a RegisterSuccess on success or ClientError on failure. + pub async fn register, S: Into, R: Into>( + &mut self, + server_address: U, + full_name: S, + username: S, + proposed_password: R, + session_security_settings: SessionSecuritySettings, + ) -> Result { + let outbound_request = InternalServiceRequest::Register { + request_id: Uuid::new_v4(), + server_addr: server_address.into(), + full_name: full_name.into(), + username: username.into(), + proposed_password: proposed_password.into(), + session_security_settings, + connect_after_register: false, + }; + + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::RegisterSuccess(success) = + scan_for_response!(self.stream, InternalServiceResponse::RegisterSuccess(..)) + else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Sends a request to register at server running at the given address. Uses the default values + /// except for proposed credentials and the target server's address. Returns a Result with a + /// RegisterSuccess on success or ClientError on failure. + pub async fn register_with_defaults< + U: Into, + S: Into, + R: Into, + >( + &mut self, + server_address: U, + full_name: S, + username: S, + proposed_password: R, + ) -> Result { + self.register( + server_address, + full_name, + username, + proposed_password, + Default::default(), + ) + .await + } + + /// Sends a request to register at server running at the given address. Sends a request to + /// connect immediately following a successful registration. Returns a Result with a + /// ConnectSuccess on success or ClientError on failure. + pub async fn register_and_connect, S: Into, R: Into>( + &mut self, + server_address: U, + full_name: S, + username: S, + proposed_password: R, + session_security_settings: SessionSecuritySettings, + ) -> Result { + let outbound_request = InternalServiceRequest::Register { + request_id: Uuid::new_v4(), + server_addr: server_address.into(), + full_name: full_name.into(), + username: username.into(), + proposed_password: proposed_password.into(), + session_security_settings, + connect_after_register: true, + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::ConnectSuccess(success) = + scan_for_response!(self.stream, InternalServiceResponse::ConnectSuccess(..)) + else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Sends a request to connect to the current server with the given credentials. Returns a + /// Result with a ConnectSuccess on success or ClientError on failure. + pub async fn connect, R: Into>( + &mut self, + username: S, + password: R, + connect_mode: ConnectMode, + udp_mode: UdpMode, + keep_alive_timeout: Option, + session_security_settings: SessionSecuritySettings, + ) -> Result { + let outbound_request = InternalServiceRequest::Connect { + request_id: Uuid::new_v4(), + username: username.into(), + password: password.into(), + connect_mode, + udp_mode, + keep_alive_timeout, + session_security_settings, + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::ConnectSuccess(success) = + scan_for_response!(self.stream, InternalServiceResponse::ConnectSuccess(..)) + else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Sends a request to connect to the current server with the given credentials. Uses default + /// values for all parameters other than credentials. Returns a Result with a ConnectSuccess + /// on success or ClientError on failure. + pub async fn connect_with_defaults, R: Into>( + &mut self, + username: S, + password: R, + ) -> Result { + self.connect( + username, + password, + Default::default(), + Default::default(), + Default::default(), + Default::default(), + ) + .await + } + + /// Sends a request to register with peer with CID peer_cid. Returns a Result with a + /// PeerRegisterSuccess on success or ClientError on failure. + pub async fn peer_register>( + &mut self, + cid: S, + peer_cid: S, + session_security_settings: SessionSecuritySettings, + ) -> Result { + let outbound_request = InternalServiceRequest::PeerRegister { + request_id: Uuid::new_v4(), + cid: cid.into(), + peer_cid: peer_cid.into(), + session_security_settings, + connect_after_register: false, + }; + + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::PeerRegisterSuccess(success) = scan_for_response!( + self.stream, + InternalServiceResponse::PeerRegisterSuccess(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Sends a request to register with peer with CID peer_cid. Uses the default values except for + /// proposed credentials. Returns a Result with a PeerRegisterSuccess on success or + /// ClientError on failure. + pub async fn peer_register_with_defaults>( + &mut self, + cid: S, + peer_cid: S, + ) -> Result { + self.peer_register(cid, peer_cid, Default::default()).await + } + + /// Sends a request to register with peer with CID peer_cid. Sends a request to + /// connect immediately following a successful registration. Returns a Result with a + /// PeerConnectSuccess on success or ClientError on failure. + pub async fn peer_register_and_connect>( + &mut self, + cid: S, + peer_cid: S, + session_security_settings: SessionSecuritySettings, + ) -> Result { + let outbound_request = InternalServiceRequest::PeerRegister { + request_id: Uuid::new_v4(), + cid: cid.into(), + peer_cid: peer_cid.into(), + session_security_settings, + connect_after_register: true, + }; + + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::PeerConnectSuccess(success) = + scan_for_response!(self.stream, InternalServiceResponse::PeerConnectSuccess(..)) + else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Sends a request to register with peer with CID peer_cid. Sends a request to + /// connect immediately following a successful registration. Requests use the default + /// SessionSecuritySettings Value. Returns a Result with a PeerConnectSuccess on + /// success or ClientError on failure. + pub async fn peer_register_and_connect_with_defaults>( + &mut self, + cid: S, + peer_cid: S, + ) -> Result { + self.peer_register_and_connect(cid, peer_cid, Default::default()) + .await + } + + /// Sends a request to connect to peer with CID peer_cid. Returns a + /// Result with a PeerConnectSuccess on success or ClientError on failure. + pub async fn peer_connect>( + &mut self, + cid: S, + peer_cid: S, + udp_mode: UdpMode, + session_security_settings: SessionSecuritySettings, + ) -> Result { + let outbound_request = InternalServiceRequest::PeerConnect { + request_id: Uuid::new_v4(), + cid: cid.into(), + peer_cid: peer_cid.into(), + udp_mode, + session_security_settings, + }; + + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::PeerConnectSuccess(success) = + scan_for_response!(self.stream, InternalServiceResponse::PeerConnectSuccess(..)) + else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Sends a request to connect to peer with CID peer_cid. Uses default values for connection + /// parameters. Returns a Result with a PeerConnectSuccess on success or ClientError on failure. + pub async fn peer_connect_with_defaults>( + &mut self, + cid: S, + peer_cid: S, + ) -> Result { + self.peer_connect(cid, peer_cid, Default::default(), Default::default()) + .await + } + + /// Sends a message to given Peer or server if no peer CID was given. Returns a + /// Result with a MessageSendSuccess on success or ClientError on failure. + pub async fn message>( + &mut self, + cid: S, + peer_cid: Option, + message: Vec, + security_level: SecurityLevel, + ) -> Result { + let peer_cid = peer_cid.map(|i| i.into()); + let outbound_request = InternalServiceRequest::Message { + request_id: Uuid::new_v4(), + message, + cid: cid.into(), + peer_cid, + security_level, + }; + + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::MessageSendSuccess(success) = + scan_for_response!(self.stream, InternalServiceResponse::MessageSendSuccess(..)) + else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Sends a message to given Peer or server if no peer CID was given. Uses the default + /// security level. Returns a Result with a MessageSendSuccess on success or ClientError + /// on failure. + pub async fn message_with_defaults>( + &mut self, + cid: S, + peer_cid: Option, + message: Vec, + ) -> Result { + self.message(cid, peer_cid, message, Default::default()) + .await + } + + /// Disconnects from connected server. Returns a Result with a DisconnectSuccess on success + /// or ClientError on failure. + pub async fn disconnect>( + &mut self, + cid: S, + ) -> Result { + let outbound_request = InternalServiceRequest::Disconnect { + request_id: Uuid::new_v4(), + cid: cid.into(), + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::DisconnectSuccess(success) = + scan_for_response!(self.stream, InternalServiceResponse::DisconnectSuccess(..)) + else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Disconnects from the given peer. Returns a Result with a PeerDisconnectSuccess on + /// success or ClientError on failure. + pub async fn peer_disconnect>( + &mut self, + cid: S, + peer_cid: S, + ) -> Result { + let outbound_request = InternalServiceRequest::PeerDisconnect { + request_id: Uuid::new_v4(), + cid: cid.into(), + peer_cid: peer_cid.into(), + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::PeerDisconnectSuccess(success) = scan_for_response!( + self.stream, + InternalServiceResponse::PeerDisconnectSuccess(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Requests to Send a file to the given peer or server if no peer CID was given. Allows for + /// the chunk size to be controlled. Returns a Result with a SendFileRequestSuccess + /// on success or Client Error on failure. + pub async fn file_send, R: Into>( + &mut self, + cid: S, + peer_cid: Option, + source: R, + chunk_size: Option, + ) -> Result { + let outbound_request = InternalServiceRequest::SendFile { + request_id: Uuid::new_v4(), + source: source.into(), + cid: cid.into(), + peer_cid: peer_cid.map(|i| i.into()), + chunk_size, + transfer_type: TransferType::FileTransfer, + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::SendFileRequestSuccess(success) = scan_for_response!( + self.stream, + InternalServiceResponse::SendFileRequestSuccess(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Requests to Send a file to the given peer or server if no peer CID was given. Uses the + /// default chunk size. Returns a Result with a SendFileRequestSuccess + /// on success or Client Error on failure. + pub async fn file_send_with_defaults, R: Into>( + &mut self, + cid: S, + peer_cid: Option, + source: R, + ) -> Result { + let outbound_request = InternalServiceRequest::SendFile { + request_id: Uuid::new_v4(), + source: source.into(), + cid: cid.into(), + peer_cid: peer_cid.map(|i| i.into()), + chunk_size: None, + transfer_type: TransferType::FileTransfer, + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::SendFileRequestSuccess(success) = scan_for_response!( + self.stream, + InternalServiceResponse::SendFileRequestSuccess(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Respond to a File Transfer Request by accepting or declining. A Download Location can be + /// given for the received file. Returns a Result with a SendFileRequestSuccess + /// on success or Client Error on failure. + pub async fn respond_file_transfer, R: Into>( + &mut self, + cid: S, + peer_cid: S, + object_id: u64, + accept: bool, + download_location: Option, + ) -> Result { + let outbound_request = InternalServiceRequest::RespondFileTransfer { + request_id: Uuid::new_v4(), + cid: cid.into(), + peer_cid: peer_cid.into(), + object_id, + accept, + download_location: download_location.map(|i| i.into()), + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::FileTransferStatusNotification(success) = scan_for_response!( + self.stream, + InternalServiceResponse::FileTransferStatusNotification(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Respond to a File Transfer Request by accepting or declining. Download Location is set to + /// Default. Returns a Result with a SendFileRequestSuccess on success or Client Error + /// on failure. + pub async fn respond_file_transfer_with_defaults>( + &mut self, + cid: S, + peer_cid: S, + object_id: u64, + accept: bool, + ) -> Result { + let outbound_request = InternalServiceRequest::RespondFileTransfer { + request_id: Uuid::new_v4(), + cid: cid.into(), + peer_cid: peer_cid.into(), + object_id, + accept, + download_location: None, + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::FileTransferStatusNotification(success) = scan_for_response!( + self.stream, + InternalServiceResponse::FileTransferStatusNotification(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Requests to Send a file for REVFS (Remote Encrypted Virtual File System) at the given peer + /// or server if no peer CID was given. Allows for the chunk size to be controlled. Returns a + /// Result with a SendFileRequestSuccess on success or Client Error on failure. + pub async fn revfs_push, R: Into>( + &mut self, + cid: S, + peer_cid: Option, + source: R, + chunk_size: Option, + virtual_path: R, + security_level: SecurityLevel, + ) -> Result { + let outbound_request = InternalServiceRequest::SendFile { + request_id: Uuid::new_v4(), + source: source.into(), + cid: cid.into(), + peer_cid: peer_cid.map(|i| i.into()), + chunk_size, + transfer_type: TransferType::RemoteEncryptedVirtualFilesystem { + virtual_path: virtual_path.into(), + security_level, + }, + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::SendFileRequestSuccess(success) = scan_for_response!( + self.stream, + InternalServiceResponse::SendFileRequestSuccess(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Requests to Send a file for REVFS (Remote Encrypted Virtual File System) at the given peer + /// or server if no peer CID was given. Allows for the chunk size to be controlled. Returns a + /// Result with a SendFileRequestSuccess on success or Client Error on failure. + pub async fn revfs_push_with_defaults, R: Into>( + &mut self, + cid: S, + peer_cid: Option, + source: R, + virtual_path: R, + ) -> Result { + let outbound_request = InternalServiceRequest::SendFile { + request_id: Uuid::new_v4(), + source: source.into(), + cid: cid.into(), + peer_cid: peer_cid.map(|i| i.into()), + chunk_size: None, + transfer_type: TransferType::RemoteEncryptedVirtualFilesystem { + virtual_path: virtual_path.into(), + security_level: Default::default(), + }, + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::SendFileRequestSuccess(success) = scan_for_response!( + self.stream, + InternalServiceResponse::SendFileRequestSuccess(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Downloads a file from REVFS (Remote Encrypted Virtual File System) at the given peer + /// or server if no peer CID was given. Uses a custom Security Level and can delete the file + /// upon pulling. Returns a Result with a SendFileRequestSuccess on success or Client Error + /// on failure. + pub async fn revfs_pull, R: Into>( + &mut self, + cid: S, + peer_cid: Option, + virtual_directory: R, + security_level: Option, + delete_on_pull: bool, + ) -> Result { + let outbound_request = InternalServiceRequest::DownloadFile { + virtual_directory: virtual_directory.into(), + security_level, + delete_on_pull, + cid: cid.into(), + peer_cid: peer_cid.map(|i| i.into()), + request_id: Uuid::new_v4(), + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::SendFileRequestSuccess(success) = scan_for_response!( + self.stream, + InternalServiceResponse::SendFileRequestSuccess(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Downloads a file from REVFS (Remote Encrypted Virtual File System) at the given peer + /// or server if no peer CID was given. Uses default Security Level and does not delete the + /// file on pull. Returns a Result with a SendFileRequestSuccess on success or Client Error + /// on failure. + pub async fn revfs_pull_with_defaults, R: Into>( + &mut self, + cid: S, + peer_cid: Option, + virtual_directory: R, + security_level: Option, + delete_on_pull: bool, + ) -> Result { + let outbound_request = InternalServiceRequest::DownloadFile { + virtual_directory: virtual_directory.into(), + security_level, + delete_on_pull, + cid: cid.into(), + peer_cid: peer_cid.map(|i| i.into()), + request_id: Uuid::new_v4(), + }; + self.send_raw_request(outbound_request).await?; + let InternalServiceResponse::SendFileRequestSuccess(success) = scan_for_response!( + self.stream, + InternalServiceResponse::SendFileRequestSuccess(..) + ) else { + panic!("Unreachable") + }; + Ok(success) + } + + /// Sends a raw request to the internal service + pub async fn send_raw_request( + &mut self, + request: InternalServiceRequest, + ) -> Result<(), ClientError> { + self.sink + .inner + .send(InternalServicePayload::Request(request)) + .await?; + Ok(()) + } + pub fn split(self) -> (WrappedSink, WrappedStream) { (self.sink, self.stream) } diff --git a/citadel-internal-service-connector/src/lib.rs b/citadel-internal-service-connector/src/lib.rs index 807ab56..2b6597e 100644 --- a/citadel-internal-service-connector/src/lib.rs +++ b/citadel-internal-service-connector/src/lib.rs @@ -1,4 +1,3 @@ pub mod codec; pub mod connector; - pub mod io_interface; diff --git a/citadel-internal-service-macros/src/lib.rs b/citadel-internal-service-macros/src/lib.rs index 1bb4af3..ee228c3 100644 --- a/citadel-internal-service-macros/src/lib.rs +++ b/citadel-internal-service-macros/src/lib.rs @@ -31,7 +31,7 @@ fn generate_function(input: TokenStream, contains: &str, function_name: &str) -> // Generate match arms for each enum variant let match_arms = generate_match_arms(name, &data, contains); - // Generate the implementation of the `is_error` method + // Generate the implementation of the `$function_name` method let expanded = quote! { impl #name { pub fn #function_name(&self) -> bool { @@ -66,3 +66,49 @@ fn generate_match_arms( }) .collect() } + +#[proc_macro_derive(RequestId)] +pub fn request_ids(input: TokenStream) -> TokenStream { + // Parse the input tokens into a syntax tree + let input = parse_macro_input!(input as DeriveInput); + + // Extract the identifier and data from the input + let name = &input.ident; + let data = if let Data::Enum(data) = input.data { + data + } else { + // This macro only supports enums + panic!("RequestId can only be derived for enums"); + }; + + // Generate match arms for each enum variant + let match_arms = generate_match_arms_uuid(name, &data); + + // Generate the implementation of the `$function_name` method + let expanded = quote! { + impl #name { + pub fn request_id(&self) -> Option<&Uuid> { + match self { + #(#match_arms)* + } + } + } + }; + + // Convert into a TokenStream and return it + TokenStream::from(expanded) +} + +fn generate_match_arms_uuid(name: &Ident, data_enum: &DataEnum) -> Vec { + data_enum + .variants + .iter() + .map(|variant| { + let variant_ident = &variant.ident; + // Match against each variant, ignoring any inner data + quote! { + #name::#variant_ident(inner) => inner.request_id.as_ref(), + } + }) + .collect() +} diff --git a/citadel-internal-service-types/src/lib.rs b/citadel-internal-service-types/src/lib.rs index 7dfdcbf..fcd75f1 100644 --- a/citadel-internal-service-types/src/lib.rs +++ b/citadel-internal-service-types/src/lib.rs @@ -1,5 +1,5 @@ use bytes::BytesMut; -use citadel_internal_service_macros::{IsError, IsNotification}; +use citadel_internal_service_macros::{IsError, IsNotification, RequestId}; pub use citadel_types::prelude::{ ConnectMode, MemberState, MessageGroupKey, ObjectTransferStatus, SecBuffer, SecurityLevel, SessionSecuritySettings, TransferType, UdpMode, UserIdentifier, VirtualObjectMetadata, @@ -37,7 +37,9 @@ pub struct RegisterFailure { } #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct ServiceConnectionAccepted; +pub struct ServiceConnectionAccepted { + pub request_id: Option, +} #[derive(Serialize, Deserialize, Debug, Clone)] pub struct MessageSendSuccess { @@ -68,6 +70,12 @@ pub struct DisconnectNotification { pub request_id: Option, } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct DisconnectSuccess { + pub cid: u64, + pub request_id: Option, +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct DisconnectFailure { pub cid: u64, @@ -545,6 +553,7 @@ pub struct FileTransferRequestNotification { pub cid: u64, pub peer_cid: u64, pub metadata: VirtualObjectMetadata, + pub request_id: Option, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -562,9 +571,10 @@ pub struct FileTransferTickNotification { pub cid: u64, pub peer_cid: Option, pub status: ObjectTransferStatus, + pub request_id: Option, } -#[derive(Serialize, Deserialize, Debug, Clone, IsError, IsNotification)] +#[derive(Serialize, Deserialize, Debug, Clone, IsError, IsNotification, RequestId)] pub enum InternalServiceResponse { ConnectSuccess(ConnectSuccess), ConnectFailure(ConnectFailure), @@ -575,6 +585,7 @@ pub enum InternalServiceResponse { MessageSendFailure(MessageSendFailure), MessageNotification(MessageNotification), DisconnectNotification(DisconnectNotification), + DisconnectSuccess(DisconnectSuccess), DisconnectFailure(DisconnectFailure), SendFileRequestSuccess(SendFileRequestSuccess), SendFileRequestFailure(SendFileRequestFailure), @@ -830,6 +841,7 @@ pub enum InternalServiceRequest { pub struct SessionInformation { pub cid: u64, pub peer_connections: HashMap, + pub request_id: Option, } #[derive(Serialize, Deserialize, Clone, Debug)] @@ -843,6 +855,7 @@ pub struct AccountInformation { pub username: String, pub full_name: String, pub peers: HashMap, + pub request_id: Option, } #[derive(Serialize, Deserialize, Clone, Debug)] @@ -892,4 +905,14 @@ mod tests { assert!(!success_response.is_notification()); assert!(notification_response.is_notification()); } + + #[test] + fn test_request_id_macro() { + let request_id = Uuid::new_v4(); + let response = InternalServiceResponse::ConnectSuccess(ConnectSuccess { + cid: 0, + request_id: Some(request_id), + }); + assert_eq!(response.request_id(), Some(&request_id)); + } } diff --git a/citadel-internal-service/src/kernel/ext.rs b/citadel-internal-service/src/kernel/ext.rs index b079cef..ecb168d 100644 --- a/citadel-internal-service/src/kernel/ext.rs +++ b/citadel-internal-service/src/kernel/ext.rs @@ -27,7 +27,9 @@ pub trait IOInterfaceExt: IOInterface { tokio::task::spawn(async move { let write_task = async move { let response = - InternalServiceResponse::ServiceConnectionAccepted(ServiceConnectionAccepted); + InternalServiceResponse::ServiceConnectionAccepted(ServiceConnectionAccepted { + request_id: None, + }); if let Err(err) = sink_send_payload::(response, &mut sink).await { error!(target: "citadel", "Failed to send to client: {err:?}"); diff --git a/citadel-internal-service/src/kernel/mod.rs b/citadel-internal-service/src/kernel/mod.rs index b0c56a4..b172935 100644 --- a/citadel-internal-service/src/kernel/mod.rs +++ b/citadel-internal-service/src/kernel/mod.rs @@ -268,6 +268,7 @@ impl NetKernel for CitadelWorkspaceService { if let Some(HandledRequestResult { response, uuid }) = handle_request(&this, conn_id, command).await { + info!(target: "citadel", "Sending response to TCP client: {response:?}"); if let Err(err) = send_response_to_tcp_client(&this.tcp_connection_map, response, uuid) .await @@ -368,6 +369,7 @@ fn spawn_tick_updater( cid: implicated_cid, peer_cid, status: status_message, + request_id: None, }, ); match entry.send(message.clone()) { diff --git a/citadel-internal-service/src/kernel/requests/disconnect.rs b/citadel-internal-service/src/kernel/requests/disconnect.rs index 0d38082..d6583c2 100644 --- a/citadel-internal-service/src/kernel/requests/disconnect.rs +++ b/citadel-internal-service/src/kernel/requests/disconnect.rs @@ -2,7 +2,7 @@ use crate::kernel::requests::HandledRequestResult; use crate::kernel::CitadelWorkspaceService; use citadel_internal_service_connector::io_interface::IOInterface; use citadel_internal_service_types::{ - DisconnectFailure, DisconnectNotification, InternalServiceRequest, InternalServiceResponse, + DisconnectFailure, DisconnectSuccess, InternalServiceRequest, InternalServiceResponse, }; use citadel_logging::info; use citadel_sdk::prelude::{DisconnectFromHypernode, NodeRequest}; @@ -27,9 +27,8 @@ pub async fn handle( match remote.send(request).await { Ok(_res) => { let disconnect_success = - InternalServiceResponse::DisconnectNotification(DisconnectNotification { + InternalServiceResponse::DisconnectSuccess(DisconnectSuccess { cid, - peer_cid: None, request_id: Some(request_id), }); Some(HandledRequestResult { diff --git a/citadel-internal-service/src/kernel/requests/get_account_information.rs b/citadel-internal-service/src/kernel/requests/get_account_information.rs index cfa2385..8c8c56f 100644 --- a/citadel-internal-service/src/kernel/requests/get_account_information.rs +++ b/citadel-internal-service/src/kernel/requests/get_account_information.rs @@ -36,11 +36,11 @@ pub async fn handle( if let Some(cid) = cid { let account = filtered_accounts.into_iter().find(|r| r.cid == cid); if let Some(account) = account { - add_account_to_map(&mut accounts_ret, account, remote).await; + add_account_to_map(&mut accounts_ret, account, remote, request_id).await; } } else { for account in filtered_accounts { - add_account_to_map(&mut accounts_ret, account, remote).await; + add_account_to_map(&mut accounts_ret, account, remote, request_id).await; } } @@ -56,6 +56,7 @@ async fn add_account_to_map( accounts_ret: &mut HashMap, account: CNACMetadata, remote: &NodeRemote, + request_id: Uuid, ) { let username = account.username.clone(); let full_name = account.full_name.clone(); @@ -93,6 +94,7 @@ async fn add_account_to_map( username, full_name, peers, + request_id: Some(request_id), }, ); } diff --git a/citadel-internal-service/src/kernel/requests/get_sessions.rs b/citadel-internal-service/src/kernel/requests/get_sessions.rs index 875074f..270c963 100644 --- a/citadel-internal-service/src/kernel/requests/get_sessions.rs +++ b/citadel-internal-service/src/kernel/requests/get_sessions.rs @@ -25,6 +25,7 @@ pub async fn handle( let mut session = SessionInformation { cid: *cid, peer_connections: HashMap::new(), + request_id: Some(request_id), }; for (peer_cid, conn) in connection.peers.iter() { session.peer_connections.insert( diff --git a/citadel-internal-service/src/kernel/requests/peer/register.rs b/citadel-internal-service/src/kernel/requests/peer/register.rs index ab24c07..223f05c 100644 --- a/citadel-internal-service/src/kernel/requests/peer/register.rs +++ b/citadel-internal-service/src/kernel/requests/peer/register.rs @@ -35,6 +35,17 @@ pub async fn handle( let response = match client_to_server_remote.propose_target(cid, peer_cid).await { Ok(symmetric_identifier_handle_ref) => { + // let peer_command = NodeRequest::PeerCommand(PeerCommand { + // implicated_cid, + // command: PeerSignal::PostRegister { + // peer_conn_type: PeerConnectionType::LocalGroupPeer { implicated_cid: cid, peer_cid }, + // inviter_username: symmetric_identifier_handle_ref.remote().account_manager().get_username_by_cid(cid).await.unwrap().ok_or_else(|| NetworkError::msg("Unable to find username for local user")).unwrap(), + // invitee_username: symmetric_identifier_handle_ref.target_username().cloned(), + // ticket_opt: None, + // invitee_response: None, + // }, + // }); + // symmetric_identifier_handle_ref.send(peer_command).await?; match symmetric_identifier_handle_ref.register_to_peer().await { Ok(_peer_register_success) => { let account_manager = symmetric_identifier_handle_ref.account_manager(); diff --git a/citadel-internal-service/src/kernel/responses/object_transfer_handle.rs b/citadel-internal-service/src/kernel/responses/object_transfer_handle.rs index 90c5b20..4c94330 100644 --- a/citadel-internal-service/src/kernel/responses/object_transfer_handle.rs +++ b/citadel-internal-service/src/kernel/responses/object_transfer_handle.rs @@ -53,6 +53,7 @@ pub async fn handle( cid: implicated_cid, peer_cid, metadata, + request_id: None, }, ); diff --git a/citadel-internal-service/tests/common/mod.rs b/citadel-internal-service/tests/common/mod.rs index 5efd320..63b076b 100644 --- a/citadel-internal-service/tests/common/mod.rs +++ b/citadel-internal-service/tests/common/mod.rs @@ -89,9 +89,10 @@ pub async fn register_and_connect_to_server< )> = Vec::new(); for item in services_to_create { - let (mut sink, mut stream) = InternalServiceConnector::connect(item.internal_service_addr) - .await? - .split(); + let (mut sink, mut stream) = + InternalServiceConnector::connect_to_service(item.internal_service_addr) + .await? + .split(); let username = item.username.into(); let full_name = item.full_name.into(); @@ -573,6 +574,7 @@ pub async fn exhaust_stream_to_file_completion( cid: _, peer_cid: _, status, + request_id: _, }, ) => match status { ObjectTransferStatus::ReceptionBeginning(file_path, vfm) => { diff --git a/citadel-internal-service/tests/service.rs b/citadel-internal-service/tests/service.rs index ec82efc..31ad263 100644 --- a/citadel-internal-service/tests/service.rs +++ b/citadel-internal-service/tests/service.rs @@ -68,7 +68,7 @@ mod tests { assert!(matches!( disconnect_response, - InternalServiceResponse::DisconnectNotification { .. } + InternalServiceResponse::DisconnectSuccess { .. } )); Ok(()) @@ -169,7 +169,7 @@ mod tests { assert!(matches!( disconnect_response, - InternalServiceResponse::DisconnectNotification { .. } + InternalServiceResponse::DisconnectSuccess { .. } )); Ok(()) @@ -219,7 +219,7 @@ mod tests { // begin mocking the GUI/CLI access let (mut sink, mut stream) = - InternalServiceConnector::connect(bind_address_internal_service) + InternalServiceConnector::connect_to_service(bind_address_internal_service) .await? .split(); diff --git a/citadel-internal-service/tests/utilities.rs b/citadel-internal-service/tests/utilities.rs new file mode 100644 index 0000000..1f71023 --- /dev/null +++ b/citadel-internal-service/tests/utilities.rs @@ -0,0 +1,412 @@ +mod common; + +#[cfg(test)] +mod tests { + use crate::common::server_info_skip_cert_verification; + use citadel_internal_service::kernel::CitadelWorkspaceService; + use citadel_internal_service_connector::connector::{ClientError, InternalServiceConnector}; + use citadel_internal_service_connector::io_interface::tcp::TcpIOInterface; + use citadel_internal_service_connector::scan_for_response; + use citadel_internal_service_types::{ + ConnectSuccess, DisconnectNotification, FileTransferRequestNotification, + InternalServiceResponse, MessageNotification, + }; + use citadel_sdk::prelude::{BackendType, NodeBuilder, NodeType, SecBuffer}; + use futures::StreamExt; + use std::net::SocketAddr; + use std::path::PathBuf; + use std::str::FromStr; + + #[tokio::test] + async fn test_utilities_service_and_server() -> Result<(), ClientError> { + crate::common::setup_log(); + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + let _result = connector_service_and_server( + server_bind_address, + SocketAddr::from_str("127.0.0.1:23457").unwrap(), + "my name", + "myusername", + "password", + ) + .await?; + Ok(()) + } + + #[tokio::test] + async fn test_utilities_register_and_connect_methods() -> Result<(), ClientError> { + // Setup Logging and Start Server + crate::common::setup_log(); + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + + // Start Internal Service + let internal_service_kernel = + CitadelWorkspaceService::new_tcp(SocketAddr::from_str("127.0.0.1:23457").unwrap()) + .await?; + let internal_service = NodeBuilder::default() + .with_node_type(NodeType::Peer) + .with_backend(BackendType::InMemory) + .with_insecure_skip_cert_verification() + .build(internal_service_kernel) + .unwrap(); + tokio::task::spawn(internal_service); + + let (full_name, username, password) = + ("full name", "myusername", SecBuffer::from("password")); + + // Connect to Internal Service via TCP + let mut service_connector = InternalServiceConnector::connect_to_service( + SocketAddr::from_str("127.0.0.1:23457").unwrap(), + ) + .await?; + // Register to Server + service_connector + .register_with_defaults(server_bind_address, full_name, username, password.clone()) + .await?; + // Connect to Server + service_connector + .connect_with_defaults(username, password) + .await?; + Ok(()) + } + + #[tokio::test] + async fn test_utilities_disconnect() -> Result<(), ClientError> { + crate::common::setup_log(); + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + let (mut service_connector_0, cid_0) = connector_service_and_server( + server_bind_address, + SocketAddr::from_str("127.0.0.1:23457").unwrap(), + "name 0", + "username0", + "password0", + ) + .await?; + service_connector_0.disconnect(cid_0).await?; + Ok(()) + } + + #[tokio::test] + async fn test_utilities_peer_register_and_connect() -> Result<(), ClientError> { + crate::common::setup_log(); + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + let (mut service_connector_0, cid_0) = connector_service_and_server( + server_bind_address, + SocketAddr::from_str("127.0.0.1:23457").unwrap(), + "name 0", + "username0", + "password0", + ) + .await?; + let (mut service_connector_1, cid_1) = connector_service_and_server( + server_bind_address, + SocketAddr::from_str("127.0.0.1:23458").unwrap(), + "name 1", + "username1", + "password1", + ) + .await?; + + let peer_0_register_and_connect = tokio::task::spawn(async move { + service_connector_0 + .peer_register_and_connect_with_defaults(cid_0, cid_1) + .await + }); + let peer_1_register_and_connect = tokio::task::spawn(async move { + service_connector_1 + .peer_register_and_connect_with_defaults(cid_1, cid_0) + .await + }); + let result = + futures::future::join_all([peer_0_register_and_connect, peer_1_register_and_connect]) + .await; + if result.iter().all(|i| i.is_ok()) { + citadel_logging::info!(target: "citadel", "Peers Registration and Connection to each other was successful"); + } else { + panic!("Peer Register and Connect Error") + } + Ok(()) + } + + #[tokio::test] + async fn test_utilities_peer_disconnect() -> Result<(), ClientError> { + crate::common::setup_log(); + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + let (mut service_connector_0, cid_0) = connector_service_and_server( + server_bind_address, + SocketAddr::from_str("127.0.0.1:23457").unwrap(), + "name 0", + "username0", + "password0", + ) + .await?; + let (mut service_connector_1, cid_1) = connector_service_and_server( + server_bind_address, + SocketAddr::from_str("127.0.0.1:23458").unwrap(), + "name 1", + "username1", + "password1", + ) + .await?; + + let peer_0_task = tokio::task::spawn(async move { + service_connector_0 + .peer_register_and_connect_with_defaults(cid_0, cid_1) + .await?; + service_connector_0.peer_disconnect(cid_0, cid_1).await?; + citadel_logging::info!(target: "citadel", "Peer 0 Disconnected from Peer 1"); + Ok(()) + }); + let peer_1_task = tokio::task::spawn(async move { + service_connector_1 + .peer_register_and_connect_with_defaults(cid_1, cid_0) + .await?; + let InternalServiceResponse::DisconnectNotification(DisconnectNotification { + cid: _, + peer_cid: _, + request_id: _, + }) = scan_for_response!( + service_connector_1.stream, + InternalServiceResponse::DisconnectNotification(..) + ) + else { + panic!("Unreachable"); + }; + citadel_logging::info!(target: "citadel", "Peer 1 Received Disconnect Notification"); + Ok(()) + }); + let result = futures::future::join_all([peer_0_task, peer_1_task]).await; + if result.iter().all(|i| i.is_ok()) { + citadel_logging::info!(target: "citadel", "Peers Successfully Disconnected"); + } else { + panic!("Peer Disconnect Error") + } + Ok(()) + } + + #[tokio::test] + async fn test_utilities_peer_message() -> Result<(), ClientError> { + crate::common::setup_log(); + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + let (mut service_connector_0, cid_0) = connector_service_and_server( + server_bind_address, + SocketAddr::from_str("127.0.0.1:23457").unwrap(), + "name 0", + "username0", + "password0", + ) + .await?; + let (mut service_connector_1, cid_1) = connector_service_and_server( + server_bind_address, + SocketAddr::from_str("127.0.0.1:23458").unwrap(), + "name 1", + "username1", + "password1", + ) + .await?; + + let peer_0_task = tokio::task::spawn(async move { + service_connector_0 + .peer_register_and_connect_with_defaults(cid_0, cid_1) + .await?; + let result = service_connector_0 + .message_with_defaults(cid_0, Some(cid_1), "Test Message".to_string().into_bytes()) + .await; + let InternalServiceResponse::MessageNotification(MessageNotification { + message, + cid: _, + peer_cid: _, + request_id: _, + }) = scan_for_response!( + service_connector_0.stream, + InternalServiceResponse::MessageNotification(..) + ) + else { + panic!("Unreachable"); + }; + citadel_logging::info!(target: "citadel", "Peer 0 received message: {message:?}"); + result + }); + let peer_1_task = tokio::task::spawn(async move { + service_connector_1 + .peer_register_and_connect_with_defaults(cid_1, cid_0) + .await?; + let result = service_connector_1 + .message_with_defaults(cid_1, Some(cid_0), "Test Message".to_string().into_bytes()) + .await; + let InternalServiceResponse::MessageNotification(MessageNotification { + message, + cid: _, + peer_cid: _, + request_id: _, + }) = scan_for_response!( + service_connector_1.stream, + InternalServiceResponse::MessageNotification(..) + ) + else { + panic!("Unreachable"); + }; + citadel_logging::info!(target: "citadel", "Peer 1 received message: {message:?}"); + result + }); + let result = futures::future::join_all([peer_0_task, peer_1_task]).await; + if result.iter().all(|i| i.is_ok()) { + citadel_logging::info!(target: "citadel", "Peers Successfully Registered, Connected, and Sent Message"); + } else { + panic!("Peer Message Error") + } + Ok(()) + } + + #[tokio::test] + async fn test_utilities_peer_file_transfer() -> Result<(), ClientError> { + crate::common::setup_log(); + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + let (mut service_connector_0, cid_0) = connector_service_and_server_with_backend( + server_bind_address, + SocketAddr::from_str("127.0.0.1:23457").unwrap(), + "name 0", + "username0", + "password0", + BackendType::Filesystem("peer0".to_string()), + ) + .await?; + let (mut service_connector_1, cid_1) = connector_service_and_server_with_backend( + server_bind_address, + SocketAddr::from_str("127.0.0.1:23458").unwrap(), + "name 1", + "username1", + "password1", + BackendType::Filesystem("peer1".to_string()), + ) + .await?; + + let file_to_send = PathBuf::from("../resources/test.txt"); + let _virtual_path = PathBuf::from("/vfs/test.txt"); + let peer_0_task = tokio::task::spawn(async move { + service_connector_0 + .peer_register_and_connect_with_defaults(cid_0, cid_1) + .await?; + service_connector_0 + .file_send_with_defaults(cid_0, Some(cid_1), file_to_send) + .await?; + citadel_logging::info!(target: "citadel", "Peer 0 Successfully Sent File to Peer 1"); + Ok(()) + }); + let peer_1_task = tokio::task::spawn(async move { + service_connector_1 + .peer_register_and_connect_with_defaults(cid_1, cid_0) + .await?; + let InternalServiceResponse::FileTransferRequestNotification( + FileTransferRequestNotification { + cid: _, + peer_cid: _, + metadata, + request_id: _, + }, + ) = scan_for_response!( + service_connector_1.stream, + InternalServiceResponse::FileTransferRequestNotification(..) + ) + else { + panic!("Unreachable"); + }; + service_connector_1 + .respond_file_transfer_with_defaults(cid_1, cid_0, metadata.object_id, true) + .await?; + citadel_logging::info!(target: "citadel", "Peer 1 Received Disconnect Notification"); + Ok(()) + }); + let result = futures::future::join_all([peer_0_task, peer_1_task]).await; + if result.iter().all(|i| i.is_ok()) { + citadel_logging::info!(target: "citadel", "File Transfer Succeeded"); + } else { + panic!("File Transfer Error") + } + Ok(()) + } + + async fn connector_service_and_server, R: Into>( + server_addr: SocketAddr, + service_addr: SocketAddr, + full_name: S, + username: S, + password: R, + ) -> Result<(InternalServiceConnector, u64), ClientError> { + connector_service_and_server_with_backend( + server_addr, + service_addr, + full_name, + username, + password, + BackendType::InMemory, + ) + .await + // let internal_service_kernel = CitadelWorkspaceService::new_tcp(service_addr).await?; + // let internal_service = NodeBuilder::default() + // .with_node_type(NodeType::Peer) + // .with_backend(BackendType::InMemory) + // .with_insecure_skip_cert_verification() + // .build(internal_service_kernel) + // .unwrap(); + // tokio::task::spawn(internal_service); + // + // // Connect to Internal Service via TCP + // let mut service_connector = + // InternalServiceConnector::connect_to_service(service_addr).await?; + // match service_connector + // .register_and_connect( + // server_addr, + // full_name, + // username, + // password, + // Default::default(), + // ) + // .await + // { + // Ok(ConnectSuccess { cid, request_id: _ }) => Ok((service_connector, cid)), + // Err(err) => Err(err), + // } + } + + async fn connector_service_and_server_with_backend, R: Into>( + server_addr: SocketAddr, + service_addr: SocketAddr, + full_name: S, + username: S, + password: R, + backend: BackendType, + ) -> Result<(InternalServiceConnector, u64), ClientError> { + let internal_service_kernel = CitadelWorkspaceService::new_tcp(service_addr).await?; + let internal_service = NodeBuilder::default() + .with_node_type(NodeType::Peer) + .with_backend(backend) + .with_insecure_skip_cert_verification() + .build(internal_service_kernel) + .unwrap(); + tokio::task::spawn(internal_service); + + // Connect to Internal Service via TCP + let mut service_connector = + InternalServiceConnector::connect_to_service(service_addr).await?; + match service_connector + .register_and_connect( + server_addr, + full_name, + username, + password, + Default::default(), + ) + .await + { + Ok(ConnectSuccess { cid, request_id: _ }) => Ok((service_connector, cid)), + Err(err) => Err(err), + } + } +}