Skip to content

Commit

Permalink
Merge pull request #210 from holochain/rust-client-main
Browse files Browse the repository at this point in the history
Rust client main
  • Loading branch information
ThetaSinner authored Jun 3, 2024
2 parents 5957a0a + bf4a3b5 commit 9457e50
Show file tree
Hide file tree
Showing 10 changed files with 480 additions and 138 deletions.
129 changes: 76 additions & 53 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,15 +1,32 @@
[workspace]
members = [
"crates/trycp_api",
"crates/trycp_client",
"crates/trycp_server",
"ts/test/fixture/zomes/coordinator",
"ts/test/fixture/zomes/integrity",
]
resolver = "2"

[workspace.dependencies]
futures = "0.3"
hdi = "0.4.1-rc.0"
hdk = "0.3.1-rc.0"
nix = { version = "0.28", features = ["signal"] }
once_cell = "1.5.0"
parking_lot = "0.12"
reqwest = { version = "0.12", default-features = false }
rmp-serde = "=0.15.5"
serde = "1.0.181"
serde_bytes = "0.11"
serde_json = "1.0.117"
slab = "0.4"
snafu = "0.6"
structopt = "0.2"
tokio = "1.38"
tokio-tungstenite = "0.21"
trycp_api = { version = "0.16.0-dev.7", path = "crates/trycp_api" }
url = "2"

[profile.dev]
opt-level = "z"
11 changes: 11 additions & 0 deletions crates/trycp_api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "trycp_api"
version = "0.16.0-dev.7"
description = "conductor provisioner API for tryorama"
license = "CAL-1.0"
edition = "2021"

[dependencies]
serde = { workspace = true, features = ["derive"] }
serde_bytes = { workspace = true }
serde_json = { workspace = true }
155 changes: 155 additions & 0 deletions crates/trycp_api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#![deny(missing_docs)]
//! Protocol for trycp_server websocket messages.
/// Requests must include a message id.
#[derive(serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct RequestWrapper {
/// The message id.
pub id: u64,

/// The request content.
pub request: Request,
}

/// Trycp server requests.
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "type")]
pub enum Request {
/// Given a DNA file, stores the DNA and returns the path at which it is stored.
SaveDna {
/// This is actually the dna filename.
id: String,

/// Content.
#[serde(with = "serde_bytes")]
content: Vec<u8>,
},

/// Given a DNA URL, ensures that the DNA is downloaded and returns the path at which it is stored.
DownloadDna {
/// Url.
url: String,
},

/// Set up a player.
ConfigurePlayer {
/// The player id.
id: String,

/// The Holochain configuration data that is not provided by trycp.
///
/// For example:
/// ```yaml
/// signing_service_uri: ~
/// encryption_service_uri: ~
/// decryption_service_uri: ~
/// dpki: ~
/// network: ~
/// ```
partial_config: String,
},

/// Start a conductor.
Startup {
/// The conductor id.
id: String,

/// The log level of the conductor.
log_level: Option<String>,
},

/// Shut down a conductor.
Shutdown {
/// The id of the conductor to shut down.
id: String,

/// The signal with which to shut down the conductor.
signal: Option<String>,
},

/// Shuts down all running conductors.
Reset,

/// Make an admin request.
CallAdminInterface {
/// The conductor id.
id: String,

/// The request.
#[serde(with = "serde_bytes")]
message: Vec<u8>,
},

/// Hook up an app interface.
ConnectAppInterface {
/// Token.
token: Vec<u8>,

/// Port.
port: u16,
},

/// Disconnect an app interface.
DisconnectAppInterface {
/// Port.
port: u16,
},

/// Make an ap request.
CallAppInterface {
/// Port.
port: u16,

/// The request.
#[serde(with = "serde_bytes")]
message: Vec<u8>,
},
}

/// Message response types.
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
#[serde(untagged)]
pub enum MessageResponse {
/// Unit response.
Null,

/// Encoded response.
Bytes(Vec<u8>)
}

impl MessageResponse {
/// Convert into bytes.
pub fn into_bytes(self) -> Vec<u8> {
match self {
Self::Null => Vec::new(),
Self::Bytes(v) => v,
}
}
}

/// A Message from a trycp_server.
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "type")]
pub enum MessageToClient {
/// A signal emitted by a conductor.
Signal {
/// The app port from which this signal was emitted.
port: u16,

/// The content of the signal.
data: Vec<u8>,
},

/// A response to a trycp server request.
Response {
/// request message id.
id: u64,

/// message content.
response: std::result::Result<MessageResponse, String>,
},
}
14 changes: 14 additions & 0 deletions crates/trycp_client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "trycp_client"
version = "0.16.0-dev.7"
description = "Client for TryCP"
license = "CAL-1.0"
edition = "2021"

[dependencies]
futures = { workspace = true }
rmp-serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = [ "full" ] }
tokio-tungstenite = { workspace = true }
trycp_api = { workspace = true }
42 changes: 42 additions & 0 deletions crates/trycp_client/examples/start_stop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use trycp_client::*;

const ONE_MIN: std::time::Duration = std::time::Duration::from_secs(60);

#[tokio::main(flavor = "multi_thread")]
async fn main() {
let (c, _r) = TrycpClient::connect("ws://127.0.0.1:9000").await.unwrap();

c.request(Request::Reset, ONE_MIN).await.unwrap();

c.request(
Request::ConfigurePlayer {
id: "alice".to_string(),
partial_config: "".to_string(),
},
ONE_MIN,
)
.await
.unwrap();

c.request(
Request::Startup {
id: "alice".to_string(),
log_level: None,
},
ONE_MIN,
)
.await
.unwrap();

c.request(
Request::Shutdown {
id: "alice".to_string(),
signal: None,
},
ONE_MIN,
)
.await
.unwrap();

c.request(Request::Reset, ONE_MIN).await.unwrap();
}
138 changes: 138 additions & 0 deletions crates/trycp_client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#![deny(missing_docs)]
//! Trycp client.
use futures::{sink::SinkExt, stream::StreamExt};
use std::collections::HashMap;
use std::io::Result;
use std::sync::Arc;
use tokio_tungstenite::{
tungstenite::{client::IntoClientRequest, Message},
*,
};
pub use trycp_api::Request;
use trycp_api::*;

type WsCore = WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>;
type WsSink = futures::stream::SplitSink<WsCore, Message>;
type Ws = Arc<tokio::sync::Mutex<WsSink>>;

/// Signal emitted from a conductor.
pub struct Signal {
/// The app port from which this signal was emitted.
pub port: u16,

/// The content of the signal.
pub data: Vec<u8>,
}

/// Trycp client recv.
pub struct SignalRecv(tokio::sync::mpsc::Receiver<Signal>);

impl SignalRecv {
/// Receive.
pub async fn recv(&mut self) -> Option<Signal> {
self.0.recv().await
}
}

/// Trycp client.
pub struct TrycpClient {
ws: Ws,
pend: Arc<
std::sync::Mutex<HashMap<u64, tokio::sync::oneshot::Sender<Result<MessageResponse>>>>,
>,
recv_task: tokio::task::JoinHandle<()>,
}

impl Drop for TrycpClient {
fn drop(&mut self) {
let ws = self.ws.clone();
tokio::task::spawn(async move {
let _ = ws.lock().await.close().await;
});
self.recv_task.abort();
}
}

impl TrycpClient {
/// Connect to a remote trycp server.
pub async fn connect<R>(request: R) -> Result<(Self, SignalRecv)>
where
R: IntoClientRequest + Unpin,
{
let (w, _) = tokio_tungstenite::connect_async(request)
.await
.map_err(std::io::Error::other)?;

let (sink, mut stream) = w.split();

let map: HashMap<u64, tokio::sync::oneshot::Sender<Result<MessageResponse>>> =
HashMap::new();
let pend = Arc::new(std::sync::Mutex::new(map));

let (recv_send, recv_recv) = tokio::sync::mpsc::channel(32);

let pend2 = pend.clone();
let recv_task = tokio::task::spawn(async move {
while let Some(Ok(msg)) = stream.next().await {
let msg = msg.into_data();
let msg: MessageToClient = rmp_serde::from_slice(&msg).unwrap();

match msg {
MessageToClient::Signal { port, data } => {
recv_send.send(Signal { port, data }).await.unwrap();
}
MessageToClient::Response { id, response } => {
if let Some(resp) = pend2.lock().unwrap().remove(&id) {
let _ = resp.send(response.map_err(std::io::Error::other));
}
}
}
}
});

let ws = Arc::new(tokio::sync::Mutex::new(sink));

Ok((
Self {
ws,
pend,
recv_task,
},
SignalRecv(recv_recv),
))
}

/// Make a request of the trycp server.
pub async fn request(
&self,
request: Request,
timeout: std::time::Duration,
) -> Result<MessageResponse> {
static RID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
let mid = RID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

let (s, r) = tokio::sync::oneshot::channel();

self.pend.lock().unwrap().insert(mid, s);

let pend = self.pend.clone();
tokio::task::spawn(async move {
tokio::time::sleep(timeout).await;
pend.lock().unwrap().remove(&mid);
});

let request = RequestWrapper { id: mid, request };

let request = rmp_serde::to_vec_named(&request).map_err(std::io::Error::other)?;

self.ws
.lock()
.await
.send(Message::Binary(request))
.await
.map_err(std::io::Error::other)?;

r.await.map_err(|_| std::io::Error::other("Closed"))?
}
}
Loading

0 comments on commit 9457e50

Please sign in to comment.