Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Register and Connect Util Functions #35

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
3 changes: 2 additions & 1 deletion citadel-internal-service-connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ tokio = { workspace = true, features = ["net", "rt", "macros"] }
tokio-util = { workspace = true, features = ["codec"] }
bincode2 = { workspace = true }
serde = { workspace = true }
futures = { workspace = true, features = ["alloc"] }
futures = { workspace = true, features = ["alloc"] }
uuid = { workspace = true, features = ["v4"]}
244 changes: 241 additions & 3 deletions citadel-internal-service-connector/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use crate::codec::{CodecError, SerializingCodec};
use citadel_internal_service_types::{
InternalServicePayload, InternalServiceRequest, InternalServiceResponse,
ConnectMode, InternalServicePayload, InternalServiceRequest, InternalServiceResponse,
SecBuffer, SessionSecuritySettings, UdpMode,
};
use futures::stream::{SplitSink, SplitStream};
use futures::{Sink, Stream, StreamExt};
use futures::{Sink, SinkExt, Stream, StreamExt};
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio_util::codec::{Decoder, Framed, LengthDelimitedCodec};
use uuid::Uuid;

pub struct InternalServiceConnector {
pub sink: WrappedSink,
Expand All @@ -26,7 +30,11 @@ pub struct WrappedSink {
}

impl InternalServiceConnector {
pub async fn connect<T: ToSocketAddrs>(addr: T) -> Result<Self, Box<dyn std::error::Error>> {
/// Establishes a connection with the Internal Service running at the given address. Returns an
/// InternalServiceConnector that can be used to easily interface with the Internal Service.
pub async fn connect_to_service<T: ToSocketAddrs>(
addr: T,
) -> Result<Self, Box<dyn std::error::Error>> {
let conn = TcpStream::connect(addr).await?;
let (sink, mut stream) = wrap_tcp_conn(conn).split();
let greeter_packet = stream
Expand All @@ -48,6 +56,236 @@ impl InternalServiceConnector {
pub fn split(self) -> (WrappedSink, WrappedStream) {
(self.sink, self.stream)
}

/// Sends a request to register at server running at the given address. Returns a Result with
/// an InternalServiceResponse that specifies whether or not the request was successfully sent.
pub async fn register<T: Into<SocketAddr>, S: Into<String>, R: Into<SecBuffer>>(
&mut self,
server_address: T,
full_name: S,
username: S,
proposed_password: R,
session_security_settings: SessionSecuritySettings,
) -> Result<InternalServiceResponse, Box<dyn std::error::Error>> {
let outbound_request = InternalServiceRequest::Register {
request_id: Uuid::new_v4(),
server_addr: server_address.into(),
full_name: full_name.into(),
username: username.into(),
proposed_password: proposed_password.into(),
session_security_settings,
connect_after_register: false,
};
match self.sink.send(outbound_request).await {
Ok(_) => self
Tjemmmic marked this conversation as resolved.
Show resolved Hide resolved
.stream
.next()
.await
.ok_or(Err("Service Connector - Register Stream Failure")?),
Err(error_message) => Err(Box::new(error_message)),
}
}

/// Sends a request to register at server running at the given address. Uses the default values
/// except for proposed credentials and the target server's address. Returns a Result with an
/// InternalServiceResponse that specifies whether or not the request was successfully sent.
pub async fn register_with_defaults<
T: Into<SocketAddr>,
S: Into<String>,
R: Into<SecBuffer>,
>(
&mut self,
server_address: T,
full_name: S,
username: S,
proposed_password: R,
) -> Result<InternalServiceResponse, Box<dyn std::error::Error>> {
self.register(
server_address,
full_name,
username,
proposed_password,
Default::default(),
)
.await
}

/// Sends a request to register at server running at the given address. Sends a request to
/// connect immediately following a successful registration. Returns a Result with an
/// InternalServiceResponse that specifies whether or not the request was successfully sent.
pub async fn register_and_connect<T: Into<SocketAddr>, S: Into<String>, R: Into<SecBuffer>>(
&mut self,
server_address: T,
full_name: S,
username: S,
proposed_password: R,
session_security_settings: SessionSecuritySettings,
) -> Result<InternalServiceResponse, Box<dyn std::error::Error>> {
let outbound_request = InternalServiceRequest::Register {
request_id: Uuid::new_v4(),
server_addr: server_address.into(),
full_name: full_name.into(),
username: username.into(),
proposed_password: proposed_password.into(),
session_security_settings,
connect_after_register: true,
};
match self.sink.send(outbound_request).await {
Ok(_) => self.stream.next().await.ok_or(Err(
"Service Connector - Register and Connect Stream Failure",
)?),
Err(error_message) => Err(Box::new(error_message)),
}
}

/// Sends a request to connect to the current server with the given credentials. Returns a
/// Result with an InternalServiceResponse that specifies whether or not the request
/// was successfully sent.
pub async fn connect<S: Into<String>, R: Into<SecBuffer>>(
&mut self,
username: S,
password: R,
connect_mode: ConnectMode,
udp_mode: UdpMode,
keep_alive_timeout: Option<Duration>,
session_security_settings: SessionSecuritySettings,
) -> Result<InternalServiceResponse, Box<dyn std::error::Error>> {
let outbound_request = InternalServiceRequest::Connect {
request_id: Uuid::new_v4(),
username: username.into(),
password: password.into(),
connect_mode,
udp_mode,
keep_alive_timeout,
session_security_settings,
};
match self.sink.send(outbound_request).await {
Ok(_) => self
.stream
.next()
.await
.ok_or(Err("Service Connector - Connect Stream Failure")?),
Err(error_message) => Err(Box::new(error_message)),
}
}

/// Sends a request to connect to the current server with the given credentials. Uses default
/// values for all parameters other than credentials. Returns a Result with an
/// InternalServiceResponse that specifies whether or not the request was successfully sent.
pub async fn connect_with_defaults<S: Into<String>, R: Into<SecBuffer>>(
&mut self,
username: S,
password: R,
) -> Result<InternalServiceResponse, Box<dyn std::error::Error>> {
self.connect(
username,
password,
Default::default(),
Default::default(),
Default::default(),
Default::default(),
)
.await
}

/// Sends a request to register with peer with CID peer_cid. Returns a Result with an
/// InternalServiceResponse that specifies whether or not the request was successfully sent.
pub async fn peer_register<T: Into<u64>>(
&mut self,
cid: T,
peer_cid: T,
session_security_settings: SessionSecuritySettings,
) -> Result<InternalServiceResponse, Box<dyn std::error::Error>> {
let outbound_request = InternalServiceRequest::PeerRegister {
request_id: Uuid::new_v4(),
cid: cid.into(),
peer_cid: peer_cid.into(),
session_security_settings,
connect_after_register: false,
};
match self.sink.send(outbound_request).await {
Ok(_) => self
.stream
.next()
.await
.ok_or(Err("Service Connector - Peer Register Stream Failure")?),
Err(error_message) => Err(Box::new(error_message)),
}
}

/// Sends a request to register with peer with CID peer_cid. Uses the default values except for
/// proposed credentials. Returns a Result with an InternalServiceResponse that specifies
/// whether or not the request was successfully sent.
pub async fn peer_register_with_defaults<T: Into<u64>>(
&mut self,
cid: T,
peer_cid: T,
) -> Result<InternalServiceResponse, Box<dyn std::error::Error>> {
self.peer_register(cid, peer_cid, Default::default()).await
}

/// Sends a request to register with peer with CID peer_cid. Sends a request to
/// connect immediately following a successful registration. Returns a Result with an
/// InternalServiceResponse that specifies whether or not the request was successfully sent.
pub async fn peer_register_and_connect<T: Into<u64>>(
&mut self,
cid: T,
peer_cid: T,
session_security_settings: SessionSecuritySettings,
) -> Result<InternalServiceResponse, Box<dyn std::error::Error>> {
let outbound_request = InternalServiceRequest::PeerRegister {
request_id: Uuid::new_v4(),
cid: cid.into(),
peer_cid: peer_cid.into(),
session_security_settings,
connect_after_register: true,
};
match self.sink.send(outbound_request).await {
Ok(_) => self.stream.next().await.ok_or(Err(
"Service Connector - Peer Register and Connect Stream Failure",
)?),
Err(error_message) => Err(Box::new(error_message)),
}
}

/// Sends a request to connect to peer with CID peer_cid. Returns a
/// Result with an InternalServiceResponse that specifies whether or not the request
/// was successfully sent.
pub async fn peer_connect<T: Into<u64>>(
&mut self,
cid: T,
peer_cid: T,
udp_mode: UdpMode,
session_security_settings: SessionSecuritySettings,
) -> Result<InternalServiceResponse, Box<dyn std::error::Error>> {
let outbound_request = InternalServiceRequest::PeerConnect {
request_id: Uuid::new_v4(),
cid: cid.into(),
peer_cid: peer_cid.into(),
udp_mode,
session_security_settings,
};
match self.sink.send(outbound_request).await {
Ok(_) => self
.stream
.next()
.await
.ok_or(Err("Service Connector - Peer Connect Stream Failure")?),
Err(error_message) => Err(Box::new(error_message)),
}
}

/// Sends a request to connect to peer with CID peer_cid. Uses default values for connection
/// parameters. Returns a Result with an InternalServiceResponse that specifies whether or
/// not the request was successfully sent.
pub async fn peer_connect_with_defaults<T: Into<u64>>(
&mut self,
cid: T,
peer_cid: T,
) -> Result<InternalServiceResponse, Box<dyn std::error::Error>> {
self.peer_connect(cid, peer_cid, Default::default(), Default::default())
.await
}
}

impl Stream for WrappedStream {
Expand Down
2 changes: 1 addition & 1 deletion citadel-internal-service/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub async fn register_and_connect_to_server<
)> = Vec::new();

for item in services_to_create {
let (mut sink, mut stream) = InternalServiceConnector::connect(item.internal_service_addr)
let (mut sink, mut stream) = InternalServiceConnector::connect_to_service(item.internal_service_addr)
.await?
.split();

Expand Down
2 changes: 1 addition & 1 deletion citadel-internal-service/tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ mod tests {

// begin mocking the GUI/CLI access
let (mut sink, mut stream) =
InternalServiceConnector::connect(bind_address_internal_service)
InternalServiceConnector::connect_to_service(bind_address_internal_service)
.await?
.split();

Expand Down
Loading