diff --git a/Cargo.lock b/Cargo.lock index 9bb7e186c91c..ed9acf5f9094 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -98,6 +98,15 @@ dependencies = [ "version_check", ] +[[package]] +name = "aho-corasick" +version = "0.7.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac" +dependencies = [ + "memchr", +] + [[package]] name = "aho-corasick" version = "1.0.2" @@ -652,6 +661,32 @@ dependencies = [ "log", ] +[[package]] +name = "browser-webrtc-example" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum", + "env_logger 0.10.0", + "futures", + "js-sys", + "libp2p", + "libp2p-webrtc", + "libp2p-webrtc-websys", + "log", + "mime_guess", + "rand 0.8.5", + "rust-embed 6.8.1", + "tokio", + "tokio-util", + "tower", + "tower-http", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-logger", + "web-sys", +] + [[package]] name = "bs58" version = "0.5.0" @@ -661,6 +696,16 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "bstr" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6798148dccfbff0fae41c7574d2fa8f1ef3492fba0face179de5d8d447d67b05" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "bumpalo" version = "3.13.0" @@ -1239,6 +1284,26 @@ dependencies = [ "subtle", ] +[[package]] +name = "dirs" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + [[package]] name = "displaydoc" version = "0.2.4" @@ -1731,6 +1796,19 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "globset" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "029d74589adefde59de1a0c4f4732695c32805624aec7b68d91503d4dba79afc" +dependencies = [ + "aho-corasick 0.7.20", + "bstr", + "fnv", + "log", + "regex", +] + [[package]] name = "gloo-timers" version = "0.2.6" @@ -2119,12 +2197,13 @@ dependencies = [ "libp2p", "libp2p-mplex", "libp2p-webrtc", + "libp2p-webrtc-websys", "log", "mime_guess", "rand 0.8.5", "redis", "reqwest", - "rust-embed", + "rust-embed 8.0.0", "serde", "serde_json", "thirtyfour", @@ -3089,11 +3168,10 @@ dependencies = [ [[package]] name = "libp2p-webrtc" -version = "0.6.0-alpha" +version = "0.6.1-alpha" dependencies = [ "anyhow", "async-trait", - "asynchronous-codec", "bytes", "env_logger 0.10.0", "futures", @@ -3106,15 +3184,13 @@ dependencies = [ "libp2p-noise", "libp2p-ping", "libp2p-swarm", + "libp2p-webrtc-utils", "log", "multihash", - "quick-protobuf", - "quick-protobuf-codec", "quickcheck", "rand 0.8.5", "rcgen 0.11.1", "serde", - "sha2 0.10.7", "stun", "thiserror", "tinytemplate", @@ -3125,6 +3201,55 @@ dependencies = [ "webrtc", ] +[[package]] +name = "libp2p-webrtc-utils" +version = "0.1.0" +dependencies = [ + "asynchronous-codec", + "bytes", + "futures", + "hex", + "hex-literal", + "libp2p-core", + "libp2p-identity", + "libp2p-noise", + "log", + "quick-protobuf", + "quick-protobuf-codec", + "rand 0.8.5", + "serde", + "sha2 0.10.7", + "thiserror", + "tinytemplate", + "unsigned-varint", +] + +[[package]] +name = "libp2p-webrtc-websys" +version = "0.1.0-alpha" +dependencies = [ + "bytes", + "futures", + "futures-timer", + "getrandom 0.2.10", + "hex", + "hex-literal", + "js-sys", + "libp2p-core", + "libp2p-identity", + "libp2p-noise", + "libp2p-ping", + "libp2p-swarm", + "libp2p-webrtc-utils", + "log", + "send_wrapper 0.6.0", + "serde", + "thiserror", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "libp2p-websocket" version = "0.42.1" @@ -3782,7 +3907,7 @@ checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.3.5", "smallvec", "windows-targets", ] @@ -4321,6 +4446,15 @@ dependencies = [ "url", ] +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.3.5" @@ -4330,13 +4464,24 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_users" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" +dependencies = [ + "getrandom 0.2.10", + "redox_syscall 0.2.16", + "thiserror", +] + [[package]] name = "regex" version = "1.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" dependencies = [ - "aho-corasick", + "aho-corasick 1.0.2", "memchr", "regex-automata 0.3.8", "regex-syntax 0.7.5", @@ -4357,7 +4502,7 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" dependencies = [ - "aho-corasick", + "aho-corasick 1.0.2", "memchr", "regex-syntax 0.7.5", ] @@ -4542,14 +4687,39 @@ dependencies = [ "webrtc-util", ] +[[package]] +name = "rust-embed" +version = "6.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a36224c3276f8c4ebc8c20f158eca7ca4359c8db89991c4925132aaaf6702661" +dependencies = [ + "rust-embed-impl 6.8.1", + "rust-embed-utils 7.8.1", + "walkdir", +] + [[package]] name = "rust-embed" version = "8.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1e7d90385b59f0a6bf3d3b757f3ca4ece2048265d70db20a2016043d4509a40" dependencies = [ - "rust-embed-impl", - "rust-embed-utils", + "rust-embed-impl 8.0.0", + "rust-embed-utils 8.0.0", + "walkdir", +] + +[[package]] +name = "rust-embed-impl" +version = "6.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49b94b81e5b2c284684141a2fb9e2a31be90638caf040bf9afbc5a0416afe1ac" +dependencies = [ + "proc-macro2", + "quote", + "rust-embed-utils 7.8.1", + "shellexpand", + "syn 2.0.32", "walkdir", ] @@ -4561,11 +4731,22 @@ checksum = "3c3d8c6fd84090ae348e63a84336b112b5c3918b3bf0493a581f7bd8ee623c29" dependencies = [ "proc-macro2", "quote", - "rust-embed-utils", + "rust-embed-utils 8.0.0", "syn 2.0.32", "walkdir", ] +[[package]] +name = "rust-embed-utils" +version = "7.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d38ff6bf570dc3bb7100fce9f7b60c33fa71d80e88da3f2580df4ff2bdded74" +dependencies = [ + "globset", + "sha2 0.10.7", + "walkdir", +] + [[package]] name = "rust-embed-utils" version = "8.0.0" @@ -4961,6 +5142,15 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shellexpand" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ccc8076840c4da029af4f87e4e8daeb0fca6b87bbb02e10cb60b791450e11e4" +dependencies = [ + "dirs", +] + [[package]] name = "signal-hook" version = "0.3.17" @@ -5238,7 +5428,7 @@ checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" dependencies = [ "cfg-if", "fastrand 2.0.0", - "redox_syscall", + "redox_syscall 0.3.5", "rustix 0.38.4", "windows-sys", ] diff --git a/Cargo.toml b/Cargo.toml index e7b567636ed5..15848016e002 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "core", "examples/autonat", + "examples/browser-webrtc", "examples/chat-example", "examples/dcutr", "examples/distributed-key-value-store", @@ -26,6 +27,7 @@ members = [ "misc/quickcheck-ext", "misc/rw-stream-sink", "misc/server", + "misc/webrtc-utils", "muxers/mplex", "muxers/test-harness", "muxers/yamux", @@ -56,6 +58,7 @@ members = [ "transports/uds", "transports/wasm-ext", "transports/webrtc", + "transports/webrtc-websys", "transports/websocket", "transports/webtransport-websys", "wasm-tests/webtransport-tests", @@ -102,7 +105,9 @@ libp2p-tcp = { version = "0.40.0", path = "transports/tcp" } libp2p-tls = { version = "0.2.1", path = "transports/tls" } libp2p-uds = { version = "0.39.0", path = "transports/uds" } libp2p-wasm-ext = { version = "0.40.0", path = "transports/wasm-ext" } -libp2p-webrtc = { version = "0.6.0-alpha", path = "transports/webrtc" } +libp2p-webrtc = { version = "0.6.1-alpha", path = "transports/webrtc" } +libp2p-webrtc-utils = { version = "0.1.0", path = "misc/webrtc-utils" } +libp2p-webrtc-websys = { version = "0.1.0-alpha", path = "transports/webrtc-websys" } libp2p-websocket = { version = "0.42.1", path = "transports/websocket" } libp2p-webtransport-websys = { version = "0.1.0", path = "transports/webtransport-websys" } libp2p-yamux = { version = "0.44.1", path = "muxers/yamux" } @@ -113,7 +118,6 @@ rw-stream-sink = { version = "0.4.0", path = "misc/rw-stream-sink" } multiaddr = "0.18.0" multihash = "0.19.1" - [patch.crates-io] # Patch away `libp2p-idnentity` in our dependency tree with the workspace version. diff --git a/examples/browser-webrtc/Cargo.toml b/examples/browser-webrtc/Cargo.toml new file mode 100644 index 000000000000..fb43413bc91e --- /dev/null +++ b/examples/browser-webrtc/Cargo.toml @@ -0,0 +1,40 @@ +[package] +authors = ["Doug Anderson "] +description = "Example use of the WebRTC transport in a browser wasm environment" +edition = "2021" +license = "MIT" +name = "browser-webrtc-example" +publish = false +repository = "https://github.com/libp2p/rust-libp2p" +rust-version = { workspace = true } +version = "0.1.0" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +anyhow = "1.0.72" +env_logger = "0.10" +futures = "0.3.28" +log = "0.4" +rand = "0.8" + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +axum = "0.6.19" +libp2p = { path = "../../libp2p", features = ["ed25519", "macros", "ping", "wasm-bindgen", "tokio"] } +libp2p-webrtc = { workspace = true, features = ["tokio"] } +rust-embed = { version = "6.8.1", features = ["include-exclude", "interpolate-folder-path"] } +tokio = { version = "1.29", features = ["macros", "net", "rt", "signal"] } +tokio-util = { version = "0.7", features = ["compat"] } +tower = "0.4" +tower-http = { version = "0.4.0", features = ["cors"] } +mime_guess = "2.0.4" + +[target.'cfg(target_arch = "wasm32")'.dependencies] +js-sys = "0.3.64" +libp2p = { path = "../../libp2p", features = ["ed25519", "macros", "ping", "wasm-bindgen"] } +libp2p-webrtc-websys = { workspace = true } +wasm-bindgen = "0.2.84" +wasm-bindgen-futures = "0.4.37" +wasm-logger = { version = "0.2.0" } +web-sys = { version = "0.3", features = ['Document', 'Element', 'HtmlElement', 'Node', 'Response', 'Window'] } diff --git a/examples/browser-webrtc/README.md b/examples/browser-webrtc/README.md new file mode 100644 index 000000000000..d44cf8799054 --- /dev/null +++ b/examples/browser-webrtc/README.md @@ -0,0 +1,18 @@ +# Rust-libp2p Browser-Server WebRTC Example + +This example demonstrates how to use the `libp2p-webrtc-websys` transport library in a browser to ping the WebRTC Server. +It uses [wasm-pack](https://rustwasm.github.io/docs/wasm-pack/) to build the project for use in the browser. + +## Running the example + +1. Build the client library: +```shell +wasm-pack build --target web --out-dir static +``` + +2. Start the server: +```shell +cargo run +``` + +3. Open the URL printed in the terminal diff --git a/examples/browser-webrtc/src/lib.rs b/examples/browser-webrtc/src/lib.rs new file mode 100644 index 000000000000..1a9856dadcc5 --- /dev/null +++ b/examples/browser-webrtc/src/lib.rs @@ -0,0 +1,104 @@ +#![cfg(target_arch = "wasm32")] + +use futures::StreamExt; +use js_sys::Date; +use libp2p::core::Multiaddr; +use libp2p::identity::{Keypair, PeerId}; +use libp2p::ping; +use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}; +use std::convert::From; +use std::io; +use wasm_bindgen::prelude::*; +use web_sys::{Document, HtmlElement}; + +#[wasm_bindgen] +pub async fn run(libp2p_endpoint: String) -> Result<(), JsError> { + wasm_logger::init(wasm_logger::Config::default()); + + let body = Body::from_current_window()?; + body.append_p("Let's ping the WebRTC Server!")?; + + let local_key = Keypair::generate_ed25519(); + let local_peer_id = PeerId::from(local_key.public()); + let mut swarm = SwarmBuilder::with_wasm_executor( + libp2p_webrtc_websys::Transport::new(libp2p_webrtc_websys::Config::new(&local_key)).boxed(), + Behaviour { + ping: ping::Behaviour::new(ping::Config::new()), + keep_alive: keep_alive::Behaviour, + }, + local_peer_id, + ) + .build(); + + log::info!("Initialize swarm with identity: {local_peer_id}"); + + let addr = libp2p_endpoint.parse::()?; + log::info!("Dialing {addr}"); + swarm.dial(addr)?; + + loop { + match swarm.next().await.unwrap() { + SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event { result: Err(e), .. })) => { + log::error!("Ping failed: {:?}", e); + + break; + } + SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event { + peer, + result: Ok(rtt), + .. + })) => { + log::info!("Ping successful: RTT: {rtt:?}, from {peer}"); + body.append_p(&format!("RTT: {rtt:?} at {}", Date::new_0().to_string()))?; + } + evt => log::info!("Swarm event: {:?}", evt), + } + } + + Ok(()) +} + +#[derive(NetworkBehaviour)] +struct Behaviour { + ping: ping::Behaviour, + keep_alive: keep_alive::Behaviour, +} + +/// Convenience wrapper around the current document body +struct Body { + body: HtmlElement, + document: Document, +} + +impl Body { + fn from_current_window() -> Result { + // Use `web_sys`'s global `window` function to get a handle on the global + // window object. + let document = web_sys::window() + .ok_or(js_error("no global `window` exists"))? + .document() + .ok_or(js_error("should have a document on window"))?; + let body = document + .body() + .ok_or(js_error("document should have a body"))?; + + Ok(Self { body, document }) + } + + fn append_p(&self, msg: &str) -> Result<(), JsError> { + let val = self + .document + .create_element("p") + .map_err(|_| js_error("failed to create

"))?; + val.set_text_content(Some(msg)); + self.body + .append_child(&val) + .map_err(|_| js_error("failed to append

"))?; + + Ok(()) + } +} + +fn js_error(msg: &str) -> JsError { + io::Error::new(io::ErrorKind::Other, msg).into() +} diff --git a/examples/browser-webrtc/src/main.rs b/examples/browser-webrtc/src/main.rs new file mode 100644 index 000000000000..f919f047af55 --- /dev/null +++ b/examples/browser-webrtc/src/main.rs @@ -0,0 +1,157 @@ +#![allow(non_upper_case_globals)] + +use anyhow::Result; +use axum::extract::{Path, State}; +use axum::http::header::CONTENT_TYPE; +use axum::http::StatusCode; +use axum::response::{Html, IntoResponse}; +use axum::{http::Method, routing::get, Router}; +use futures::StreamExt; +use libp2p::{ + core::muxing::StreamMuxerBox, + core::Transport, + identity, + multiaddr::{Multiaddr, Protocol}, + ping, + swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}, +}; +use libp2p_webrtc as webrtc; +use rand::thread_rng; +use std::net::{Ipv4Addr, SocketAddr}; +use tower_http::cors::{Any, CorsLayer}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + env_logger::builder() + .parse_filters("browser_webrtc_example=debug,libp2p_webrtc=info,libp2p_ping=debug") + .parse_default_env() + .init(); + + let id_keys = identity::Keypair::generate_ed25519(); + let local_peer_id = id_keys.public().to_peer_id(); + let transport = webrtc::tokio::Transport::new( + id_keys, + webrtc::tokio::Certificate::generate(&mut thread_rng())?, + ) + .map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn))) + .boxed(); + + let behaviour = Behaviour { + ping: ping::Behaviour::new(ping::Config::new()), + keep_alive: keep_alive::Behaviour, + }; + + let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build(); + + let address_webrtc = Multiaddr::from(Ipv4Addr::UNSPECIFIED) + .with(Protocol::Udp(0)) + .with(Protocol::WebRTCDirect); + + swarm.listen_on(address_webrtc.clone())?; + + let address = loop { + if let SwarmEvent::NewListenAddr { address, .. } = swarm.select_next_some().await { + if address + .iter() + .any(|e| e == Protocol::Ip4(Ipv4Addr::LOCALHOST)) + { + log::debug!("Ignoring localhost address to make sure the example works in Firefox"); + continue; + } + + log::info!("Listening on: {address}"); + + break address; + } + }; + + let addr = address.with(Protocol::P2p(*swarm.local_peer_id())); + + // Serve .wasm, .js and server multiaddress over HTTP on this address. + tokio::spawn(serve(addr)); + + loop { + tokio::select! { + swarm_event = swarm.next() => { + log::trace!("Swarm Event: {:?}", swarm_event) + }, + _ = tokio::signal::ctrl_c() => { + break; + } + } + } + + Ok(()) +} + +#[derive(NetworkBehaviour)] +struct Behaviour { + ping: ping::Behaviour, + keep_alive: keep_alive::Behaviour, +} + +#[derive(rust_embed::RustEmbed)] +#[folder = "$CARGO_MANIFEST_DIR/static"] +struct StaticFiles; + +/// Serve the Multiaddr we are listening on and the host files. +pub(crate) async fn serve(libp2p_transport: Multiaddr) { + let listen_addr = match libp2p_transport.iter().next() { + Some(Protocol::Ip4(addr)) => addr, + _ => panic!("Expected 1st protocol to be IP4"), + }; + + let server = Router::new() + .route("/", get(get_index)) + .route("/index.html", get(get_index)) + .route("/:path", get(get_static_file)) + .with_state(Libp2pEndpoint(libp2p_transport)) + .layer( + // allow cors + CorsLayer::new() + .allow_origin(Any) + .allow_methods([Method::GET]), + ); + + let addr = SocketAddr::new(listen_addr.into(), 8080); + + log::info!("Serving client files at http://{addr}"); + + axum::Server::bind(&addr) + .serve(server.into_make_service()) + .await + .unwrap(); +} + +#[derive(Clone)] +struct Libp2pEndpoint(Multiaddr); + +/// Serves the index.html file for our client. +/// +/// Our server listens on a random UDP port for the WebRTC transport. +/// To allow the client to connect, we replace the `__LIBP2P_ENDPOINT__` placeholder with the actual address. +async fn get_index( + State(Libp2pEndpoint(libp2p_endpoint)): State, +) -> Result, StatusCode> { + let content = StaticFiles::get("index.html") + .ok_or(StatusCode::NOT_FOUND)? + .data; + + let html = std::str::from_utf8(&content) + .expect("index.html to be valid utf8") + .replace("__LIBP2P_ENDPOINT__", &libp2p_endpoint.to_string()); + + Ok(Html(html)) +} + +/// Serves the static files generated by `wasm-pack`. +async fn get_static_file(Path(path): Path) -> Result { + log::debug!("Serving static file: {path}"); + + let content = StaticFiles::get(&path).ok_or(StatusCode::NOT_FOUND)?.data; + let content_type = mime_guess::from_path(path) + .first_or_octet_stream() + .to_string(); + + Ok(([(CONTENT_TYPE, content_type)], content)) +} diff --git a/examples/browser-webrtc/static/index.html b/examples/browser-webrtc/static/index.html new file mode 100644 index 000000000000..a5a26310e3f4 --- /dev/null +++ b/examples/browser-webrtc/static/index.html @@ -0,0 +1,23 @@ + + + + + + + +

+

Rust Libp2p Demo!

+
+ + + + diff --git a/interop-tests/Cargo.toml b/interop-tests/Cargo.toml index d3a2e00647f7..1966aee414df 100644 --- a/interop-tests/Cargo.toml +++ b/interop-tests/Cargo.toml @@ -34,6 +34,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } [target.'cfg(target_arch = "wasm32")'.dependencies] libp2p = { path = "../libp2p", features = ["ping", "macros", "webtransport-websys", "wasm-bindgen", "identify"] } +libp2p-webrtc-websys = { workspace = true } wasm-bindgen = { version = "0.2" } wasm-bindgen-futures = { version = "0.4" } wasm-logger = { version = "0.2.0" } diff --git a/interop-tests/README.md b/interop-tests/README.md index 88cd75188332..bab98df7987c 100644 --- a/interop-tests/README.md +++ b/interop-tests/README.md @@ -8,13 +8,9 @@ You can run this test locally by having a local Redis instance and by having another peer that this test can dial or listen for. For example to test that we can dial/listen for ourselves we can do the following: -1. Start redis (needed by the tests): `docker run --rm -it -p 6379:6379 - redis/redis-stack`. -2. In one terminal run the dialer: `redis_addr=localhost:6379 ip="0.0.0.0" - transport=quic-v1 security=quic muxer=quic is_dialer="true" cargo run --bin ping` -3. In another terminal, run the listener: `redis_addr=localhost:6379 - ip="0.0.0.0" transport=quic-v1 security=quic muxer=quic is_dialer="false" cargo run --bin native_ping` - +1. Start redis (needed by the tests): `docker run --rm -p 6379:6379 redis:7-alpine`. +2. In one terminal run the dialer: `redis_addr=localhost:6379 ip="0.0.0.0" transport=quic-v1 security=quic muxer=quic is_dialer="true" cargo run --bin ping` +3. In another terminal, run the listener: `redis_addr=localhost:6379 ip="0.0.0.0" transport=quic-v1 security=quic muxer=quic is_dialer="false" cargo run --bin native_ping` To test the interop with other versions do something similar, except replace one of these nodes with the other version's interop test. @@ -29,6 +25,15 @@ Firefox is not yet supported as it doesn't support all required features yet 1. Build the wasm package: `wasm-pack build --target web` 2. Run the dialer: `redis_addr=127.0.0.1:6379 ip=0.0.0.0 transport=webtransport is_dialer=true cargo run --bin wasm_ping` +# Running this test with webrtc-direct + +To run the webrtc-direct test, you'll need the `chromedriver` in your `$PATH`, compatible with your Chrome browser. + +1. Start redis: `docker run --rm -p 6379:6379 redis:7-alpine`. +1. Build the wasm package: `wasm-pack build --target web` +1. With the webrtc-direct listener `RUST_LOG=debug,webrtc=off,webrtc_sctp=off redis_addr="127.0.0.1:6379" ip="0.0.0.0" transport=webrtc-direct is_dialer="false" cargo run --bin native_ping` +1. Run the webrtc-direct dialer: `RUST_LOG=debug,hyper=off redis_addr="127.0.0.1:6379" ip="0.0.0.0" transport=webrtc-direct is_dialer=true cargo run --bin wasm_ping` + # Running all interop tests locally with Compose To run this test against all released libp2p versions you'll need to have the diff --git a/interop-tests/chromium-ping-version.json b/interop-tests/chromium-ping-version.json index 9fb2cd2252c7..ae5c6e10eddd 100644 --- a/interop-tests/chromium-ping-version.json +++ b/interop-tests/chromium-ping-version.json @@ -1,7 +1,10 @@ { "id": "chromium-rust-libp2p-head", "containerImageID": "chromium-rust-libp2p-head", - "transports": [{ "name": "webtransport", "onlyDial": true }], + "transports": [ + { "name": "webtransport", "onlyDial": true }, + { "name": "webrtc-direct", "onlyDial": true } + ], "secureChannels": [], "muxers": [] } diff --git a/interop-tests/src/arch.rs b/interop-tests/src/arch.rs index 2b2181f2459a..b30c3ad1e31f 100644 --- a/interop-tests/src/arch.rs +++ b/interop-tests/src/arch.rs @@ -159,6 +159,7 @@ pub(crate) mod wasm { use libp2p::identity::Keypair; use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; use libp2p::PeerId; + use libp2p_webrtc_websys as webrtc; use std::time::Duration; use crate::{BlpopRequest, Transport}; @@ -181,16 +182,19 @@ pub(crate) mod wasm { ip: &str, transport: Transport, ) -> Result<(BoxedTransport, String)> { - if let Transport::Webtransport = transport { - Ok(( + match transport { + Transport::Webtransport => Ok(( libp2p::webtransport_websys::Transport::new( libp2p::webtransport_websys::Config::new(&local_key), ) .boxed(), format!("/ip4/{ip}/udp/0/quic/webtransport"), - )) - } else { - bail!("Only webtransport supported with wasm") + )), + Transport::WebRtcDirect => Ok(( + webrtc::Transport::new(webrtc::Config::new(&local_key)).boxed(), + format!("/ip4/{ip}/udp/0/webrtc-direct"), + )), + _ => bail!("Only webtransport and webrtc-direct are supported with wasm"), } } diff --git a/interop-tests/src/bin/wasm_ping.rs b/interop-tests/src/bin/wasm_ping.rs index 83ed3c374656..b3a918192261 100644 --- a/interop-tests/src/bin/wasm_ping.rs +++ b/interop-tests/src/bin/wasm_ping.rs @@ -1,3 +1,4 @@ +#![allow(non_upper_case_globals)] use std::process::Stdio; use std::time::Duration; diff --git a/interop-tests/src/lib.rs b/interop-tests/src/lib.rs index 57ce636367b6..54d94430c8e9 100644 --- a/interop-tests/src/lib.rs +++ b/interop-tests/src/lib.rs @@ -50,6 +50,7 @@ pub async fn run_test( let mut maybe_id = None; // See https://github.com/libp2p/rust-libp2p/issues/4071. + #[cfg(not(target_arch = "wasm32"))] if transport == Transport::WebRtcDirect { maybe_id = Some(swarm.listen_on(local_addr.parse()?)?); } diff --git a/misc/webrtc-utils/CHANGELOG.md b/misc/webrtc-utils/CHANGELOG.md new file mode 100644 index 000000000000..d575ea879f8d --- /dev/null +++ b/misc/webrtc-utils/CHANGELOG.md @@ -0,0 +1,6 @@ +## 0.1.0 - unreleased + +- Initial release. + See [PR 4248]. + +[PR 4248]: https://github.com/libp2p/rust-libp2p/pull/4248 diff --git a/misc/webrtc-utils/Cargo.toml b/misc/webrtc-utils/Cargo.toml new file mode 100644 index 000000000000..9fa13284a5c0 --- /dev/null +++ b/misc/webrtc-utils/Cargo.toml @@ -0,0 +1,32 @@ +[package] +authors = ["Doug Anderson "] +categories = ["network-programming"] +description = "Utilities for WebRTC in libp2p" +edition = "2021" +license = "MIT" +name = "libp2p-webrtc-utils" +repository = "https://github.com/libp2p/rust-libp2p" +rust-version = { workspace = true } +version = "0.1.0" +publish = false # TEMP fix for https://github.com/obi1kenobi/cargo-semver-checks-action/issues/53. + +[dependencies] +bytes = "1" +futures = "0.3" +hex = "0.4" +libp2p-core = { workspace = true } +libp2p-identity = { workspace = true } +libp2p-noise = { workspace = true } +log = "0.4.19" +quick-protobuf = "0.8" +quick-protobuf-codec = { workspace = true } +rand = "0.8" +serde = { version = "1.0", features = ["derive"] } +sha2 = "0.10.7" +thiserror = "1" +tinytemplate = "1.2" +asynchronous-codec = "0.6" + +[dev-dependencies] +hex-literal = "0.4" +unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } diff --git a/misc/webrtc-utils/src/fingerprint.rs b/misc/webrtc-utils/src/fingerprint.rs new file mode 100644 index 000000000000..a02c4d1116dc --- /dev/null +++ b/misc/webrtc-utils/src/fingerprint.rs @@ -0,0 +1,109 @@ +// Copyright 2023 Doug Anderson. +// Copyright 2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use libp2p_core::multihash; +use sha2::Digest as _; +use std::fmt; + +pub const SHA256: &str = "sha-256"; +const MULTIHASH_SHA256_CODE: u64 = 0x12; + +type Multihash = multihash::Multihash<64>; + +/// A certificate fingerprint that is assumed to be created using the SHA256 hash algorithm. +#[derive(Eq, PartialEq, Copy, Clone)] +pub struct Fingerprint([u8; 32]); + +impl Fingerprint { + pub const FF: Fingerprint = Fingerprint([0xFF; 32]); + + pub const fn raw(digest: [u8; 32]) -> Self { + Fingerprint(digest) + } + + /// Creates a new [Fingerprint] from a raw certificate by hashing the given bytes with SHA256. + pub fn from_certificate(bytes: &[u8]) -> Self { + Fingerprint(sha2::Sha256::digest(bytes).into()) + } + + /// Converts [`Multihash`](multihash::Multihash) to [`Fingerprint`]. + pub fn try_from_multihash(hash: Multihash) -> Option { + if hash.code() != MULTIHASH_SHA256_CODE { + // Only support SHA256 for now. + return None; + } + + let bytes = hash.digest().try_into().ok()?; + + Some(Self(bytes)) + } + + /// Converts this fingerprint to [`Multihash`](multihash::Multihash). + pub fn to_multihash(self) -> Multihash { + Multihash::wrap(MULTIHASH_SHA256_CODE, &self.0).expect("fingerprint's len to be 32 bytes") + } + + /// Formats this fingerprint as uppercase hex, separated by colons (`:`). + /// + /// This is the format described in . + pub fn to_sdp_format(self) -> String { + self.0.map(|byte| format!("{byte:02X}")).join(":") + } + + /// Returns the algorithm used (e.g. "sha-256"). + /// See + pub fn algorithm(&self) -> String { + SHA256.to_owned() + } +} + +impl fmt::Debug for Fingerprint { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&hex::encode(self.0)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const SDP_FORMAT: &str = "7D:E3:D8:3F:81:A6:80:59:2A:47:1E:6B:6A:BB:07:47:AB:D3:53:85:A8:09:3F:DF:E1:12:C1:EE:BB:6C:C6:AC"; + const REGULAR_FORMAT: [u8; 32] = + hex_literal::hex!("7DE3D83F81A680592A471E6B6ABB0747ABD35385A8093FDFE112C1EEBB6CC6AC"); + + #[test] + fn sdp_format() { + let fp = Fingerprint::raw(REGULAR_FORMAT); + + let formatted = fp.to_sdp_format(); + + assert_eq!(formatted, SDP_FORMAT) + } + + #[test] + fn from_sdp() { + let mut bytes = [0; 32]; + bytes.copy_from_slice(&hex::decode(SDP_FORMAT.replace(':', "")).unwrap()); + + let fp = Fingerprint::raw(bytes); + assert_eq!(fp, Fingerprint::raw(REGULAR_FORMAT)); + } +} diff --git a/transports/webrtc/src/generated/message.proto b/misc/webrtc-utils/src/generated/message.proto similarity index 100% rename from transports/webrtc/src/generated/message.proto rename to misc/webrtc-utils/src/generated/message.proto diff --git a/transports/webrtc/src/generated/mod.rs b/misc/webrtc-utils/src/generated/mod.rs similarity index 100% rename from transports/webrtc/src/generated/mod.rs rename to misc/webrtc-utils/src/generated/mod.rs diff --git a/transports/webrtc/src/generated/webrtc/mod.rs b/misc/webrtc-utils/src/generated/webrtc/mod.rs similarity index 100% rename from transports/webrtc/src/generated/webrtc/mod.rs rename to misc/webrtc-utils/src/generated/webrtc/mod.rs diff --git a/transports/webrtc/src/generated/webrtc/pb.rs b/misc/webrtc-utils/src/generated/webrtc/pb.rs similarity index 100% rename from transports/webrtc/src/generated/webrtc/pb.rs rename to misc/webrtc-utils/src/generated/webrtc/pb.rs diff --git a/misc/webrtc-utils/src/lib.rs b/misc/webrtc-utils/src/lib.rs new file mode 100644 index 000000000000..c744634de30e --- /dev/null +++ b/misc/webrtc-utils/src/lib.rs @@ -0,0 +1,15 @@ +mod proto { + #![allow(unreachable_pub)] + include!("generated/mod.rs"); + pub use self::webrtc::pb::{mod_Message::Flag, Message}; +} + +mod fingerprint; +pub mod noise; +pub mod sdp; +mod stream; +mod transport; + +pub use fingerprint::{Fingerprint, SHA256}; +pub use stream::{DropListener, Stream, MAX_MSG_LEN}; +pub use transport::parse_webrtc_dial_addr; diff --git a/transports/webrtc/src/tokio/upgrade/noise.rs b/misc/webrtc-utils/src/noise.rs similarity index 95% rename from transports/webrtc/src/tokio/upgrade/noise.rs rename to misc/webrtc-utils/src/noise.rs index 34e3526a2fe9..023766bc1df9 100644 --- a/transports/webrtc/src/tokio/upgrade/noise.rs +++ b/misc/webrtc-utils/src/noise.rs @@ -24,15 +24,14 @@ use libp2p_identity as identity; use libp2p_identity::PeerId; use libp2p_noise as noise; -use crate::tokio::fingerprint::Fingerprint; -use crate::tokio::Error; +use crate::fingerprint::Fingerprint; -pub(crate) async fn inbound( +pub async fn inbound( id_keys: identity::Keypair, stream: T, client_fingerprint: Fingerprint, server_fingerprint: Fingerprint, -) -> Result +) -> Result where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, { @@ -49,12 +48,12 @@ where Ok(peer_id) } -pub(crate) async fn outbound( +pub async fn outbound( id_keys: identity::Keypair, stream: T, server_fingerprint: Fingerprint, client_fingerprint: Fingerprint, -) -> Result +) -> Result where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, { diff --git a/misc/webrtc-utils/src/sdp.rs b/misc/webrtc-utils/src/sdp.rs new file mode 100644 index 000000000000..7c4facaf27e3 --- /dev/null +++ b/misc/webrtc-utils/src/sdp.rs @@ -0,0 +1,157 @@ +// Copyright 2023 Doug Anderson +// Copyright 2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. +use crate::fingerprint::Fingerprint; +use serde::Serialize; +use std::net::{IpAddr, SocketAddr}; +use tinytemplate::TinyTemplate; + +use rand::distributions::Alphanumeric; +use rand::{thread_rng, Rng}; + +pub fn answer(addr: SocketAddr, server_fingerprint: Fingerprint, client_ufrag: &str) -> String { + let answer = render_description( + SERVER_SESSION_DESCRIPTION, + addr, + server_fingerprint, + client_ufrag, + ); + + log::trace!("Created SDP answer: {answer}"); + + answer +} + +// See [`CLIENT_SESSION_DESCRIPTION`]. +// +// a=ice-lite +// +// A lite implementation is only appropriate for devices that will *always* be connected to +// the public Internet and have a public IP address at which it can receive packets from any +// correspondent. ICE will not function when a lite implementation is placed behind a NAT +// (RFC8445). +// +// a=tls-id: +// +// "TLS ID" uniquely identifies a TLS association. +// The ICE protocol uses a "TLS ID" system to indicate whether a fresh DTLS connection +// must be reopened in case of ICE renegotiation. Considering that ICE renegotiations +// never happen in our use case, we can simply put a random value and not care about +// it. Note however that the TLS ID in the answer must be present if and only if the +// offer contains one. (RFC8842) +// TODO: is it true that renegotiations never happen? what about a connection closing? +// "tls-id" attribute MUST be present in the initial offer and respective answer (RFC8839). +// XXX: but right now browsers don't send it. +// +// a=setup:passive +// +// "passive" indicates that the remote DTLS server will only listen for incoming +// connections. (RFC5763) +// The answerer (server) MUST not be located behind a NAT (RFC6135). +// +// The answerer MUST use either a setup attribute value of setup:active or setup:passive. +// Note that if the answerer uses setup:passive, then the DTLS handshake will not begin until +// the answerer is received, which adds additional latency. setup:active allows the answer and +// the DTLS handshake to occur in parallel. Thus, setup:active is RECOMMENDED. +// +// a=candidate: +// +// A transport address for a candidate that can be used for connectivity checks (RFC8839). +// +// a=end-of-candidates +const SERVER_SESSION_DESCRIPTION: &str = "v=0 +o=- 0 0 IN {ip_version} {target_ip} +s=- +t=0 0 +a=ice-lite +m=application {target_port} UDP/DTLS/SCTP webrtc-datachannel +c=IN {ip_version} {target_ip} +a=mid:0 +a=ice-options:ice2 +a=ice-ufrag:{ufrag} +a=ice-pwd:{pwd} +a=fingerprint:{fingerprint_algorithm} {fingerprint_value} +a=setup:passive +a=sctp-port:5000 +a=max-message-size:16384 +a=candidate:1467250027 1 UDP 1467250027 {target_ip} {target_port} typ host +a=end-of-candidates +"; + +/// Indicates the IP version used in WebRTC: `IP4` or `IP6`. +#[derive(Serialize)] +enum IpVersion { + IP4, + IP6, +} + +/// Context passed to the templating engine, which replaces the above placeholders (e.g. +/// `{IP_VERSION}`) with real values. +#[derive(Serialize)] +struct DescriptionContext { + pub(crate) ip_version: IpVersion, + pub(crate) target_ip: IpAddr, + pub(crate) target_port: u16, + pub(crate) fingerprint_algorithm: String, + pub(crate) fingerprint_value: String, + pub(crate) ufrag: String, + pub(crate) pwd: String, +} + +/// Renders a [`TinyTemplate`] description using the provided arguments. +pub fn render_description( + description: &str, + addr: SocketAddr, + fingerprint: Fingerprint, + ufrag: &str, +) -> String { + let mut tt = TinyTemplate::new(); + tt.add_template("description", description).unwrap(); + + let context = DescriptionContext { + ip_version: { + if addr.is_ipv4() { + IpVersion::IP4 + } else { + IpVersion::IP6 + } + }, + target_ip: addr.ip(), + target_port: addr.port(), + fingerprint_algorithm: fingerprint.algorithm(), + fingerprint_value: fingerprint.to_sdp_format(), + // NOTE: ufrag is equal to pwd. + ufrag: ufrag.to_owned(), + pwd: ufrag.to_owned(), + }; + tt.render("description", &context).unwrap() +} + +/// Generates a random ufrag and adds a prefix according to the spec. +pub fn random_ufrag() -> String { + format!( + "libp2p+webrtc+v1/{}", + thread_rng() + .sample_iter(&Alphanumeric) + .take(64) + .map(char::from) + .collect::() + ) +} diff --git a/transports/webrtc/src/tokio/substream.rs b/misc/webrtc-utils/src/stream.rs similarity index 87% rename from transports/webrtc/src/tokio/substream.rs rename to misc/webrtc-utils/src/stream.rs index 89e52376a482..a6de759a4125 100644 --- a/transports/webrtc/src/tokio/substream.rs +++ b/misc/webrtc-utils/src/stream.rs @@ -1,4 +1,5 @@ // Copyright 2022 Parity Technologies (UK) Ltd. +// Copyright 2023 Protocol Labs. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), @@ -18,24 +19,20 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use asynchronous_codec::Framed; use bytes::Bytes; use futures::{channel::oneshot, prelude::*, ready}; -use tokio_util::compat::Compat; -use webrtc::data::data_channel::{DataChannel, PollDataChannel}; use std::{ io, pin::Pin, - sync::Arc, task::{Context, Poll}, }; use crate::proto::{Flag, Message}; -use crate::tokio::{ - substream::drop_listener::GracefullyClosed, - substream::framed_dc::FramedDc, - substream::state::{Closing, State}, +use crate::{ + stream::drop_listener::GracefullyClosed, + stream::framed_dc::FramedDc, + stream::state::{Closing, State}, }; mod drop_listener; @@ -47,7 +44,7 @@ mod state; /// "As long as message interleaving is not supported, the sender SHOULD limit the maximum message /// size to 16 KB to avoid monopolization." /// Source: -const MAX_MSG_LEN: usize = 16384; // 16kiB +pub const MAX_MSG_LEN: usize = 16 * 1024; /// Length of varint, in bytes. const VARINT_LEN: usize = 2; /// Overhead of the protobuf encoding, in bytes. @@ -55,26 +52,28 @@ const PROTO_OVERHEAD: usize = 5; /// Maximum length of data, in bytes. const MAX_DATA_LEN: usize = MAX_MSG_LEN - VARINT_LEN - PROTO_OVERHEAD; -pub(crate) use drop_listener::DropListener; -/// A substream on top of a WebRTC data channel. +pub use drop_listener::DropListener; +/// A stream backed by a WebRTC data channel. /// -/// To be a proper libp2p substream, we need to implement [`AsyncRead`] and [`AsyncWrite`] as well +/// To be a proper libp2p stream, we need to implement [`AsyncRead`] and [`AsyncWrite`] as well /// as support a half-closed state which we do by framing messages in a protobuf envelope. -pub struct Substream { - io: FramedDc, +pub struct Stream { + io: FramedDc, state: State, read_buffer: Bytes, /// Dropping this will close the oneshot and notify the receiver by emitting `Canceled`. drop_notifier: Option>, } -impl Substream { - /// Returns a new `Substream` and a listener, which will notify the receiver when/if the substream - /// is dropped. - pub(crate) fn new(data_channel: Arc) -> (Self, DropListener) { +impl Stream +where + T: AsyncRead + AsyncWrite + Unpin + Clone, +{ + /// Returns a new [`Stream`] and a [`DropListener`], which will notify the receiver when/if the stream is dropped. + pub fn new(data_channel: T) -> (Self, DropListener) { let (sender, receiver) = oneshot::channel(); - let substream = Self { + let stream = Self { io: framed_dc::new(data_channel.clone()), state: State::Open, read_buffer: Bytes::default(), @@ -82,10 +81,10 @@ impl Substream { }; let listener = DropListener::new(framed_dc::new(data_channel), receiver); - (substream, listener) + (stream, listener) } - /// Gracefully closes the "read-half" of the substream. + /// Gracefully closes the "read-half" of the stream. pub fn poll_close_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { match self.state.close_read_barrier()? { @@ -113,7 +112,10 @@ impl Substream { } } -impl AsyncRead for Substream { +impl AsyncRead for Stream +where + T: AsyncRead + AsyncWrite + Unpin, +{ fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -157,7 +159,10 @@ impl AsyncRead for Substream { } } -impl AsyncWrite for Substream { +impl AsyncWrite for Stream +where + T: AsyncRead + AsyncWrite + Unpin, +{ fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -236,10 +241,13 @@ impl AsyncWrite for Substream { } } -fn io_poll_next( - io: &mut Framed, quick_protobuf_codec::Codec>, +fn io_poll_next( + io: &mut FramedDc, cx: &mut Context<'_>, -) -> Poll, Option>)>>> { +) -> Poll, Option>)>>> +where + T: AsyncRead + AsyncWrite + Unpin, +{ match ready!(io.poll_next_unpin(cx)) .transpose() .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))? @@ -262,8 +270,8 @@ mod tests { // Largest possible message. let message = [0; MAX_DATA_LEN]; - let protobuf = crate::proto::Message { - flag: Some(crate::proto::Flag::FIN), + let protobuf = Message { + flag: Some(Flag::FIN), message: Some(message.to_vec()), }; diff --git a/transports/webrtc/src/tokio/substream/drop_listener.rs b/misc/webrtc-utils/src/stream/drop_listener.rs similarity index 77% rename from transports/webrtc/src/tokio/substream/drop_listener.rs rename to misc/webrtc-utils/src/stream/drop_listener.rs index 735240456fe3..b638ea84b09f 100644 --- a/transports/webrtc/src/tokio/substream/drop_listener.rs +++ b/misc/webrtc-utils/src/stream/drop_listener.rs @@ -20,7 +20,7 @@ use futures::channel::oneshot; use futures::channel::oneshot::Canceled; -use futures::{FutureExt, SinkExt}; +use futures::{AsyncRead, AsyncWrite, FutureExt, SinkExt}; use std::future::Future; use std::io; @@ -28,46 +28,42 @@ use std::pin::Pin; use std::task::{Context, Poll}; use crate::proto::{Flag, Message}; -use crate::tokio::substream::framed_dc::FramedDc; +use crate::stream::framed_dc::FramedDc; #[must_use] -pub(crate) struct DropListener { - state: State, +pub struct DropListener { + state: State, } -impl DropListener { - pub(crate) fn new(stream: FramedDc, receiver: oneshot::Receiver) -> Self { - let substream_id = stream.get_ref().stream_identifier(); - +impl DropListener { + pub fn new(stream: FramedDc, receiver: oneshot::Receiver) -> Self { Self { - state: State::Idle { - stream, - receiver, - substream_id, - }, + state: State::Idle { stream, receiver }, } } } -enum State { +enum State { /// The [`DropListener`] is idle and waiting to be activated. Idle { - stream: FramedDc, + stream: FramedDc, receiver: oneshot::Receiver, - substream_id: u16, }, /// The stream got dropped and we are sending a reset flag. SendingReset { - stream: FramedDc, + stream: FramedDc, }, Flushing { - stream: FramedDc, + stream: FramedDc, }, /// Bad state transition. Poisoned, } -impl Future for DropListener { +impl Future for DropListener +where + T: AsyncRead + AsyncWrite + Unpin, +{ type Output = io::Result<()>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -77,23 +73,18 @@ impl Future for DropListener { match std::mem::replace(state, State::Poisoned) { State::Idle { stream, - substream_id, mut receiver, } => match receiver.poll_unpin(cx) { Poll::Ready(Ok(GracefullyClosed {})) => { return Poll::Ready(Ok(())); } Poll::Ready(Err(Canceled)) => { - log::info!("Substream {substream_id} dropped without graceful close, sending Reset"); + log::info!("Stream dropped without graceful close, sending Reset"); *state = State::SendingReset { stream }; continue; } Poll::Pending => { - *state = State::Idle { - stream, - substream_id, - receiver, - }; + *state = State::Idle { stream, receiver }; return Poll::Pending; } }, @@ -126,5 +117,5 @@ impl Future for DropListener { } } -/// Indicates that our substream got gracefully closed. -pub(crate) struct GracefullyClosed {} +/// Indicates that our stream got gracefully closed. +pub struct GracefullyClosed {} diff --git a/transports/webrtc/src/tokio/substream/framed_dc.rs b/misc/webrtc-utils/src/stream/framed_dc.rs similarity index 75% rename from transports/webrtc/src/tokio/substream/framed_dc.rs rename to misc/webrtc-utils/src/stream/framed_dc.rs index 1b3860b662b3..4409b79a0ed0 100644 --- a/transports/webrtc/src/tokio/substream/framed_dc.rs +++ b/misc/webrtc-utils/src/stream/framed_dc.rs @@ -19,22 +19,18 @@ // DEALINGS IN THE SOFTWARE. use asynchronous_codec::Framed; -use tokio_util::compat::Compat; -use tokio_util::compat::TokioAsyncReadCompatExt; -use webrtc::data::data_channel::{DataChannel, PollDataChannel}; +use futures::{AsyncRead, AsyncWrite}; -use std::sync::Arc; - -use super::{MAX_DATA_LEN, MAX_MSG_LEN, VARINT_LEN}; use crate::proto::Message; +use crate::stream::{MAX_DATA_LEN, MAX_MSG_LEN, VARINT_LEN}; -pub(crate) type FramedDc = Framed, quick_protobuf_codec::Codec>; -pub(crate) fn new(data_channel: Arc) -> FramedDc { - let mut inner = PollDataChannel::new(data_channel); - inner.set_read_buf_capacity(MAX_MSG_LEN); - +pub(crate) type FramedDc = Framed>; +pub(crate) fn new(inner: T) -> FramedDc +where + T: AsyncRead + AsyncWrite, +{ let mut framed = Framed::new( - inner.compat(), + inner, quick_protobuf_codec::Codec::new(MAX_MSG_LEN - VARINT_LEN), ); // If not set, `Framed` buffers up to 131kB of data before sending, which leads to "outbound diff --git a/transports/webrtc/src/tokio/substream/state.rs b/misc/webrtc-utils/src/stream/state.rs similarity index 99% rename from transports/webrtc/src/tokio/substream/state.rs rename to misc/webrtc-utils/src/stream/state.rs index b1768aa21652..082325e4d47a 100644 --- a/transports/webrtc/src/tokio/substream/state.rs +++ b/misc/webrtc-utils/src/stream/state.rs @@ -277,7 +277,7 @@ impl State { } } - /// Acts as a "barrier" for [`Substream::poll_close_read`](super::Substream::poll_close_read). + /// Acts as a "barrier" for [`Stream::poll_close_read`](super::Stream::poll_close_read). pub(crate) fn close_read_barrier(&mut self) -> io::Result> { loop { match self { diff --git a/misc/webrtc-utils/src/transport.rs b/misc/webrtc-utils/src/transport.rs new file mode 100644 index 000000000000..440ad73ed022 --- /dev/null +++ b/misc/webrtc-utils/src/transport.rs @@ -0,0 +1,101 @@ +use crate::fingerprint::Fingerprint; +use libp2p_core::{multiaddr::Protocol, Multiaddr}; +use std::net::{IpAddr, SocketAddr}; + +/// Parse the given [`Multiaddr`] into a [`SocketAddr`] and a [`Fingerprint`] for dialing. +pub fn parse_webrtc_dial_addr(addr: &Multiaddr) -> Option<(SocketAddr, Fingerprint)> { + let mut iter = addr.iter(); + + let ip = match iter.next()? { + Protocol::Ip4(ip) => IpAddr::from(ip), + Protocol::Ip6(ip) => IpAddr::from(ip), + _ => return None, + }; + + let port = iter.next()?; + let webrtc = iter.next()?; + let certhash = iter.next()?; + + let (port, fingerprint) = match (port, webrtc, certhash) { + (Protocol::Udp(port), Protocol::WebRTCDirect, Protocol::Certhash(cert_hash)) => { + let fingerprint = Fingerprint::try_from_multihash(cert_hash)?; + + (port, fingerprint) + } + _ => return None, + }; + + match iter.next() { + Some(Protocol::P2p(_)) => {} + // peer ID is optional + None => {} + // unexpected protocol + Some(_) => return None, + } + + Some((SocketAddr::new(ip, port), fingerprint)) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::{Ipv4Addr, Ipv6Addr}; + + #[test] + fn parse_valid_address_with_certhash_and_p2p() { + let addr = "/ip4/127.0.0.1/udp/39901/webrtc-direct/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w/p2p/12D3KooWNpDk9w6WrEEcdsEH1y47W71S36yFjw4sd3j7omzgCSMS" + .parse() + .unwrap(); + + let maybe_parsed = parse_webrtc_dial_addr(&addr); + + assert_eq!( + maybe_parsed, + Some(( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 39901), + Fingerprint::raw(hex_literal::hex!( + "e2929e4a5548242ed6b512350df8829b1e4f9d50183c5732a07f99d7c4b2b8eb" + )) + )) + ); + } + + #[test] + fn peer_id_is_not_required() { + let addr = "/ip4/127.0.0.1/udp/39901/webrtc-direct/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w" + .parse() + .unwrap(); + + let maybe_parsed = parse_webrtc_dial_addr(&addr); + + assert_eq!( + maybe_parsed, + Some(( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 39901), + Fingerprint::raw(hex_literal::hex!( + "e2929e4a5548242ed6b512350df8829b1e4f9d50183c5732a07f99d7c4b2b8eb" + )) + )) + ); + } + + #[test] + fn parse_ipv6() { + let addr = + "/ip6/::1/udp/12345/webrtc-direct/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w/p2p/12D3KooWNpDk9w6WrEEcdsEH1y47W71S36yFjw4sd3j7omzgCSMS" + .parse() + .unwrap(); + + let maybe_parsed = parse_webrtc_dial_addr(&addr); + + assert_eq!( + maybe_parsed, + Some(( + SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 12345), + Fingerprint::raw(hex_literal::hex!( + "e2929e4a5548242ed6b512350df8829b1e4f9d50183c5732a07f99d7c4b2b8eb" + )) + )) + ); + } +} diff --git a/transports/webrtc-websys/CHANGELOG.md b/transports/webrtc-websys/CHANGELOG.md new file mode 100644 index 000000000000..f144aaf35cd2 --- /dev/null +++ b/transports/webrtc-websys/CHANGELOG.md @@ -0,0 +1,6 @@ +## 0.1.0-alpha - unreleased + +- Initial alpha release. + See [PR 4248]. + +[PR 4248]: https://github.com/libp2p/rust-libp2p/pull/4248 diff --git a/transports/webrtc-websys/Cargo.toml b/transports/webrtc-websys/Cargo.toml new file mode 100644 index 000000000000..9c5950718158 --- /dev/null +++ b/transports/webrtc-websys/Cargo.toml @@ -0,0 +1,36 @@ +[package] +authors = ["Doug Anderson "] +categories = ["asynchronous", "network-programming", "wasm", "web-programming"] +description = "WebRTC for libp2p under WASM environment" +edition = "2021" +keywords = ["libp2p", "networking", "peer-to-peer"] +license = "MIT" +name = "libp2p-webrtc-websys" +repository = "https://github.com/libp2p/rust-libp2p" +rust-version = { workspace = true } +version = "0.1.0-alpha" +publish = false # TEMP fix for https://github.com/obi1kenobi/cargo-semver-checks-action/issues/53. + +[dependencies] +bytes = "1" +futures = "0.3" +futures-timer = "3" +getrandom = { version = "0.2.9", features = ["js"] } +hex = "0.4.3" +js-sys = { version = "0.3" } +libp2p-core = { workspace = true } +libp2p-identity = { workspace = true } +libp2p-noise = { workspace = true } +libp2p-webrtc-utils = { workspace = true } +log = "0.4.19" +send_wrapper = { version = "0.6.0", features = ["futures"] } +serde = { version = "1.0", features = ["derive"] } +thiserror = "1" +wasm-bindgen = { version = "0.2.87" } +wasm-bindgen-futures = { version = "0.4.37" } +web-sys = { version = "0.3.64", features = ["Document", "Location", "MessageEvent", "Navigator", "RtcCertificate", "RtcConfiguration", "RtcDataChannel", "RtcDataChannelEvent", "RtcDataChannelInit", "RtcDataChannelState", "RtcDataChannelType", "RtcPeerConnection", "RtcSdpType", "RtcSessionDescription", "RtcSessionDescriptionInit", "Window"] } + +[dev-dependencies] +hex-literal = "0.4" +libp2p-ping = { workspace = true } +libp2p-swarm = { workspace = true, features = ["wasm-bindgen"] } diff --git a/transports/webrtc-websys/README.md b/transports/webrtc-websys/README.md new file mode 100644 index 000000000000..b522f31ba65d --- /dev/null +++ b/transports/webrtc-websys/README.md @@ -0,0 +1,9 @@ +# Rust `libp2p-webrtc-websys` + +Browser Transport made available through `web-sys` bindings. + +## Usage + +Use with `Swarm::with_wasm_executor` to enable the `wasm-bindgen` executor for the `Swarm`. + +See the [browser-webrtc](../../examples/browser-webrtc) example for a full example. diff --git a/transports/webrtc-websys/src/connection.rs b/transports/webrtc-websys/src/connection.rs new file mode 100644 index 000000000000..dfdebbc98c04 --- /dev/null +++ b/transports/webrtc-websys/src/connection.rs @@ -0,0 +1,308 @@ +//! A libp2p connection backed by an [RtcPeerConnection](https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection). + +use super::{Error, Stream}; +use crate::stream::DropListener; +use futures::channel::mpsc; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use js_sys::{Object, Reflect}; +use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; +use libp2p_webrtc_utils::Fingerprint; +use send_wrapper::SendWrapper; +use std::pin::Pin; +use std::task::Waker; +use std::task::{ready, Context, Poll}; +use wasm_bindgen::prelude::*; +use wasm_bindgen_futures::JsFuture; +use web_sys::{ + RtcConfiguration, RtcDataChannel, RtcDataChannelEvent, RtcDataChannelInit, RtcDataChannelType, + RtcSessionDescriptionInit, +}; + +/// A WebRTC Connection. +/// +/// All connections need to be [`Send`] which is why some fields are wrapped in [`SendWrapper`]. +/// This is safe because WASM is single-threaded. +pub struct Connection { + /// The [RtcPeerConnection] that is used for the WebRTC Connection + inner: SendWrapper, + + /// Whether the connection is closed + closed: bool, + /// An [`mpsc::channel`] for all inbound data channels. + /// + /// Because the browser's WebRTC API is event-based, we need to use a channel to obtain all inbound data channels. + inbound_data_channels: SendWrapper>, + /// A list of futures, which, once completed, signal that a [`Stream`] has been dropped. + drop_listeners: FuturesUnordered, + no_drop_listeners_waker: Option, + + _ondatachannel_closure: SendWrapper>, +} + +impl Connection { + /// Create a new inner WebRTC Connection + pub(crate) fn new(peer_connection: RtcPeerConnection) -> Self { + // An ondatachannel Future enables us to poll for incoming data channel events in poll_incoming + let (mut tx_ondatachannel, rx_ondatachannel) = mpsc::channel(4); // we may get more than one data channel opened on a single peer connection + + let ondatachannel_closure = Closure::new(move |ev: RtcDataChannelEvent| { + log::trace!("New data channel"); + + if let Err(e) = tx_ondatachannel.try_send(ev.channel()) { + if e.is_full() { + log::warn!("Remote is opening too many data channels, we can't keep up!"); + return; + } + + if e.is_disconnected() { + log::warn!("Receiver is gone, are we shutting down?"); + } + } + }); + peer_connection + .inner + .set_ondatachannel(Some(ondatachannel_closure.as_ref().unchecked_ref())); + + Self { + inner: SendWrapper::new(peer_connection), + closed: false, + drop_listeners: FuturesUnordered::default(), + no_drop_listeners_waker: None, + inbound_data_channels: SendWrapper::new(rx_ondatachannel), + _ondatachannel_closure: SendWrapper::new(ondatachannel_closure), + } + } + + fn new_stream_from_data_channel(&mut self, data_channel: RtcDataChannel) -> Stream { + let (stream, drop_listener) = Stream::new(data_channel); + + self.drop_listeners.push(drop_listener); + if let Some(waker) = self.no_drop_listeners_waker.take() { + waker.wake() + } + stream + } + + /// Closes the Peer Connection. + /// + /// This closes the data channels also and they will return an error + /// if they are used. + fn close_connection(&mut self) { + if !self.closed { + log::trace!("connection::close_connection"); + self.inner.inner.close(); + self.closed = true; + } + } +} + +impl Drop for Connection { + fn drop(&mut self) { + self.close_connection(); + } +} + +/// WebRTC native multiplexing +/// Allows users to open substreams +impl StreamMuxer for Connection { + type Substream = Stream; + type Error = Error; + + fn poll_inbound( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match ready!(self.inbound_data_channels.poll_next_unpin(cx)) { + Some(data_channel) => { + let stream = self.new_stream_from_data_channel(data_channel); + + Poll::Ready(Ok(stream)) + } + None => { + // This only happens if the [`RtcPeerConnection::ondatachannel`] closure gets freed which means we are most likely shutting down the connection. + log::debug!("`Sender` for inbound data channels has been dropped"); + Poll::Ready(Err(Error::Connection("connection closed".to_owned()))) + } + } + } + + fn poll_outbound( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + log::trace!("Creating outbound data channel"); + + let data_channel = self.inner.new_regular_data_channel(); + let stream = self.new_stream_from_data_channel(data_channel); + + Poll::Ready(Ok(stream)) + } + + /// Closes the Peer Connection. + fn poll_close( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + log::trace!("connection::poll_close"); + + self.close_connection(); + Poll::Ready(Ok(())) + } + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + match ready!(self.drop_listeners.poll_next_unpin(cx)) { + Some(Ok(())) => {} + Some(Err(e)) => { + log::debug!("a DropListener failed: {e}") + } + None => { + self.no_drop_listeners_waker = Some(cx.waker().clone()); + return Poll::Pending; + } + } + } + } +} + +pub(crate) struct RtcPeerConnection { + inner: web_sys::RtcPeerConnection, +} + +impl RtcPeerConnection { + pub(crate) async fn new(algorithm: String) -> Result { + let algo: Object = Object::new(); + Reflect::set(&algo, &"name".into(), &"ECDSA".into()).unwrap(); + Reflect::set(&algo, &"namedCurve".into(), &"P-256".into()).unwrap(); + Reflect::set(&algo, &"hash".into(), &algorithm.into()).unwrap(); + + let certificate_promise = + web_sys::RtcPeerConnection::generate_certificate_with_object(&algo) + .expect("certificate to be valid"); + + let certificate = JsFuture::from(certificate_promise).await?; + + let mut config = RtcConfiguration::default(); + // wrap certificate in a js Array first before adding it to the config object + let certificate_arr = js_sys::Array::new(); + certificate_arr.push(&certificate); + config.certificates(&certificate_arr); + + let inner = web_sys::RtcPeerConnection::new_with_configuration(&config)?; + + Ok(Self { inner }) + } + + /// Creates the stream for the initial noise handshake. + /// + /// The underlying data channel MUST have `negotiated` set to `true` and carry the ID 0. + pub(crate) fn new_handshake_stream(&self) -> (Stream, DropListener) { + Stream::new(self.new_data_channel(true)) + } + + /// Creates a regular data channel for when the connection is already established. + pub(crate) fn new_regular_data_channel(&self) -> RtcDataChannel { + self.new_data_channel(false) + } + + fn new_data_channel(&self, negotiated: bool) -> RtcDataChannel { + const LABEL: &str = ""; + + let dc = match negotiated { + true => { + let mut options = RtcDataChannelInit::new(); + options.negotiated(true).id(0); // id is only ever set to zero when negotiated is true + + self.inner + .create_data_channel_with_data_channel_dict(LABEL, &options) + } + false => self.inner.create_data_channel(LABEL), + }; + dc.set_binary_type(RtcDataChannelType::Arraybuffer); // Hardcoded here, it's the only type we use + + dc + } + + pub(crate) async fn create_offer(&self) -> Result { + let offer = JsFuture::from(self.inner.create_offer()).await?; + + let offer = Reflect::get(&offer, &JsValue::from_str("sdp")) + .expect("sdp should be valid") + .as_string() + .expect("sdp string should be valid string"); + + Ok(offer) + } + + pub(crate) async fn set_local_description( + &self, + sdp: RtcSessionDescriptionInit, + ) -> Result<(), Error> { + let promise = self.inner.set_local_description(&sdp); + JsFuture::from(promise).await?; + + Ok(()) + } + + pub(crate) fn local_fingerprint(&self) -> Result { + let sdp = &self + .inner + .local_description() + .ok_or_else(|| Error::JsError("No local description".to_string()))? + .sdp(); + + let fingerprint = parse_fingerprint(sdp) + .ok_or_else(|| Error::JsError("No fingerprint in SDP".to_string()))?; + + Ok(fingerprint) + } + + pub(crate) async fn set_remote_description( + &self, + sdp: RtcSessionDescriptionInit, + ) -> Result<(), Error> { + let promise = self.inner.set_remote_description(&sdp); + JsFuture::from(promise).await?; + + Ok(()) + } +} + +/// Parse Fingerprint from a SDP. +fn parse_fingerprint(sdp: &str) -> Option { + // split the sdp by new lines / carriage returns + let lines = sdp.split("\r\n"); + + // iterate through the lines to find the one starting with a=fingerprint: + // get the value after the first space + // return the value as a Fingerprint + for line in lines { + if line.starts_with("a=fingerprint:") { + let fingerprint = line.split(' ').nth(1).unwrap(); + let bytes = hex::decode(fingerprint.replace(':', "")).unwrap(); + let arr: [u8; 32] = bytes.as_slice().try_into().unwrap(); + return Some(Fingerprint::raw(arr)); + } + } + None +} + +#[cfg(test)] +mod sdp_tests { + use super::*; + + #[test] + fn test_fingerprint() { + let sdp: &str = "v=0\r\no=- 0 0 IN IP6 ::1\r\ns=-\r\nc=IN IP6 ::1\r\nt=0 0\r\na=ice-lite\r\nm=application 61885 UDP/DTLS/SCTP webrtc-datachannel\r\na=mid:0\r\na=setup:passive\r\na=ice-ufrag:libp2p+webrtc+v1/YwapWySn6fE6L9i47PhlB6X4gzNXcgFs\r\na=ice-pwd:libp2p+webrtc+v1/YwapWySn6fE6L9i47PhlB6X4gzNXcgFs\r\na=fingerprint:sha-256 A8:17:77:1E:02:7E:D1:2B:53:92:70:A6:8E:F9:02:CC:21:72:3A:92:5D:F4:97:5F:27:C4:5E:75:D4:F4:31:89\r\na=sctp-port:5000\r\na=max-message-size:16384\r\na=candidate:1467250027 1 UDP 1467250027 ::1 61885 typ host\r\n"; + let fingerprint = match parse_fingerprint(sdp) { + Some(fingerprint) => fingerprint, + None => panic!("No fingerprint found"), + }; + assert_eq!(fingerprint.algorithm(), "sha-256"); + assert_eq!(fingerprint.to_sdp_format(), "A8:17:77:1E:02:7E:D1:2B:53:92:70:A6:8E:F9:02:CC:21:72:3A:92:5D:F4:97:5F:27:C4:5E:75:D4:F4:31:89"); + } +} diff --git a/transports/webrtc-websys/src/error.rs b/transports/webrtc-websys/src/error.rs new file mode 100644 index 000000000000..e226dea80690 --- /dev/null +++ b/transports/webrtc-websys/src/error.rs @@ -0,0 +1,57 @@ +use wasm_bindgen::{JsCast, JsValue}; + +/// Errors that may happen on the [`Transport`](crate::Transport) or the +/// [`Connection`](crate::Connection). +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Invalid multiaddr: {0}")] + InvalidMultiaddr(&'static str), + + #[error("JavaScript error: {0}")] + JsError(String), + + #[error("JavaScript typecasting failed")] + JsCastFailed, + + #[error("Unknown remote peer ID")] + UnknownRemotePeerId, + + #[error("Connection error: {0}")] + Connection(String), + + #[error("Authentication error")] + Authentication(#[from] libp2p_noise::Error), +} + +impl Error { + pub(crate) fn from_js_value(value: JsValue) -> Self { + let s = if value.is_instance_of::() { + js_sys::Error::from(value) + .to_string() + .as_string() + .unwrap_or_else(|| "Unknown error".to_string()) + } else { + "Unknown error".to_string() + }; + + Error::JsError(s) + } +} + +impl std::convert::From for Error { + fn from(value: JsValue) -> Self { + Error::from_js_value(value) + } +} + +impl From for Error { + fn from(value: String) -> Self { + Error::JsError(value) + } +} + +impl From for Error { + fn from(value: std::io::Error) -> Self { + Error::JsError(value.to_string()) + } +} diff --git a/transports/webrtc-websys/src/lib.rs b/transports/webrtc-websys/src/lib.rs new file mode 100644 index 000000000000..04fced4111bb --- /dev/null +++ b/transports/webrtc-websys/src/lib.rs @@ -0,0 +1,13 @@ +#![doc = include_str!("../README.md")] + +mod connection; +mod error; +mod sdp; +mod stream; +mod transport; +mod upgrade; + +pub use self::connection::Connection; +pub use self::error::Error; +pub use self::stream::Stream; +pub use self::transport::{Config, Transport}; diff --git a/transports/webrtc-websys/src/sdp.rs b/transports/webrtc-websys/src/sdp.rs new file mode 100644 index 000000000000..6f50262b988a --- /dev/null +++ b/transports/webrtc-websys/src/sdp.rs @@ -0,0 +1,55 @@ +use libp2p_webrtc_utils::Fingerprint; +use std::net::SocketAddr; +use web_sys::{RtcSdpType, RtcSessionDescriptionInit}; + +/// Creates the SDP answer used by the client. +pub(crate) fn answer( + addr: SocketAddr, + server_fingerprint: Fingerprint, + client_ufrag: &str, +) -> RtcSessionDescriptionInit { + let mut answer_obj = RtcSessionDescriptionInit::new(RtcSdpType::Answer); + answer_obj.sdp(&libp2p_webrtc_utils::sdp::answer( + addr, + server_fingerprint, + client_ufrag, + )); + answer_obj +} + +/// Creates the munged SDP offer from the Browser's given SDP offer +/// +/// Certificate verification is disabled which is why we hardcode a dummy fingerprint here. +pub(crate) fn offer(offer: String, client_ufrag: &str) -> RtcSessionDescriptionInit { + // find line and replace a=ice-ufrag: with "\r\na=ice-ufrag:{client_ufrag}\r\n" + // find line and replace a=ice-pwd: with "\r\na=ice-ufrag:{client_ufrag}\r\n" + + let mut munged_sdp_offer = String::new(); + + for line in offer.split("\r\n") { + if line.starts_with("a=ice-ufrag:") { + munged_sdp_offer.push_str(&format!("a=ice-ufrag:{client_ufrag}\r\n")); + continue; + } + + if line.starts_with("a=ice-pwd:") { + munged_sdp_offer.push_str(&format!("a=ice-pwd:{client_ufrag}\r\n")); + continue; + } + + if !line.is_empty() { + munged_sdp_offer.push_str(&format!("{}\r\n", line)); + continue; + } + } + + // remove any double \r\n + let munged_sdp_offer = munged_sdp_offer.replace("\r\n\r\n", "\r\n"); + + log::trace!("Created SDP offer: {munged_sdp_offer}"); + + let mut offer_obj = RtcSessionDescriptionInit::new(RtcSdpType::Offer); + offer_obj.sdp(&munged_sdp_offer); + + offer_obj +} diff --git a/transports/webrtc-websys/src/stream.rs b/transports/webrtc-websys/src/stream.rs new file mode 100644 index 000000000000..812aa5afbbff --- /dev/null +++ b/transports/webrtc-websys/src/stream.rs @@ -0,0 +1,61 @@ +//! The WebRTC [Stream] over the Connection +use self::poll_data_channel::PollDataChannel; +use futures::{AsyncRead, AsyncWrite}; +use send_wrapper::SendWrapper; +use std::pin::Pin; +use std::task::{Context, Poll}; +use web_sys::RtcDataChannel; + +mod poll_data_channel; + +/// A stream over a WebRTC connection. +/// +/// Backed by a WebRTC data channel. +pub struct Stream { + /// Wrapper for the inner stream to make it Send + inner: SendWrapper>, +} + +pub(crate) type DropListener = SendWrapper>; + +impl Stream { + pub(crate) fn new(data_channel: RtcDataChannel) -> (Self, DropListener) { + let (inner, drop_listener) = + libp2p_webrtc_utils::Stream::new(PollDataChannel::new(data_channel)); + + ( + Self { + inner: SendWrapper::new(inner), + }, + SendWrapper::new(drop_listener), + ) + } +} + +impl AsyncRead for Stream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut *self.get_mut().inner).poll_read(cx, buf) + } +} + +impl AsyncWrite for Stream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut *self.get_mut().inner).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut *self.get_mut().inner).poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut *self.get_mut().inner).poll_close(cx) + } +} diff --git a/transports/webrtc-websys/src/stream/poll_data_channel.rs b/transports/webrtc-websys/src/stream/poll_data_channel.rs new file mode 100644 index 000000000000..9c9b19cdb324 --- /dev/null +++ b/transports/webrtc-websys/src/stream/poll_data_channel.rs @@ -0,0 +1,242 @@ +use std::cmp::min; +use std::io; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Mutex; +use std::task::{Context, Poll}; + +use bytes::BytesMut; +use futures::task::AtomicWaker; +use futures::{AsyncRead, AsyncWrite}; +use libp2p_webrtc_utils::MAX_MSG_LEN; +use wasm_bindgen::{prelude::*, JsCast}; +use web_sys::{Event, MessageEvent, RtcDataChannel, RtcDataChannelEvent, RtcDataChannelState}; + +/// [`PollDataChannel`] is a wrapper around around [`RtcDataChannel`] which implements [`AsyncRead`] and [`AsyncWrite`]. +#[derive(Debug, Clone)] +pub(crate) struct PollDataChannel { + /// The [`RtcDataChannel`] being wrapped. + inner: RtcDataChannel, + + new_data_waker: Rc, + read_buffer: Rc>, + + /// Waker for when we are waiting for the DC to be opened. + open_waker: Rc, + + /// Waker for when we are waiting to write (again) to the DC because we previously exceeded the [`MAX_MSG_LEN`] threshold. + write_waker: Rc, + + /// Waker for when we are waiting for the DC to be closed. + close_waker: Rc, + + /// Whether we've been overloaded with data by the remote. + /// + /// This is set to `true` in case `read_buffer` overflows, i.e. the remote is sending us messages faster than we can read them. + /// In that case, we return an [`std::io::Error`] from [`AsyncRead`] or [`AsyncWrite`], depending which one gets called earlier. + /// Failing these will (very likely), cause the application developer to drop the stream which resets it. + overloaded: Rc, + + // Store the closures for proper garbage collection. + // These are wrapped in an [`Rc`] so we can implement [`Clone`]. + _on_open_closure: Rc>, + _on_write_closure: Rc>, + _on_close_closure: Rc>, + _on_message_closure: Rc>, +} + +impl PollDataChannel { + pub(crate) fn new(inner: RtcDataChannel) -> Self { + let open_waker = Rc::new(AtomicWaker::new()); + let on_open_closure = Closure::new({ + let open_waker = open_waker.clone(); + + move |_: RtcDataChannelEvent| { + log::trace!("DataChannel opened"); + open_waker.wake(); + } + }); + inner.set_onopen(Some(on_open_closure.as_ref().unchecked_ref())); + + let write_waker = Rc::new(AtomicWaker::new()); + inner.set_buffered_amount_low_threshold(0); + let on_write_closure = Closure::new({ + let write_waker = write_waker.clone(); + + move |_: Event| { + log::trace!("DataChannel available for writing (again)"); + write_waker.wake(); + } + }); + inner.set_onbufferedamountlow(Some(on_write_closure.as_ref().unchecked_ref())); + + let close_waker = Rc::new(AtomicWaker::new()); + let on_close_closure = Closure::new({ + let close_waker = close_waker.clone(); + + move |_: Event| { + log::trace!("DataChannel closed"); + close_waker.wake(); + } + }); + inner.set_onclose(Some(on_close_closure.as_ref().unchecked_ref())); + + let new_data_waker = Rc::new(AtomicWaker::new()); + let read_buffer = Rc::new(Mutex::new(BytesMut::new())); // We purposely don't use `with_capacity` so we don't eagerly allocate `MAX_READ_BUFFER` per stream. + let overloaded = Rc::new(AtomicBool::new(false)); + + let on_message_closure = Closure::::new({ + let new_data_waker = new_data_waker.clone(); + let read_buffer = read_buffer.clone(); + let overloaded = overloaded.clone(); + + move |ev: MessageEvent| { + let data = js_sys::Uint8Array::new(&ev.data()); + + let mut read_buffer = read_buffer.lock().unwrap(); + + if read_buffer.len() + data.length() as usize > MAX_MSG_LEN { + overloaded.store(true, Ordering::SeqCst); + log::warn!("Remote is overloading us with messages, resetting stream",); + return; + } + + read_buffer.extend_from_slice(&data.to_vec()); + new_data_waker.wake(); + } + }); + inner.set_onmessage(Some(on_message_closure.as_ref().unchecked_ref())); + + Self { + inner, + new_data_waker, + read_buffer, + open_waker, + write_waker, + close_waker, + overloaded, + _on_open_closure: Rc::new(on_open_closure), + _on_write_closure: Rc::new(on_write_closure), + _on_close_closure: Rc::new(on_close_closure), + _on_message_closure: Rc::new(on_message_closure), + } + } + + /// Returns the [RtcDataChannelState] of the [RtcDataChannel] + fn ready_state(&self) -> RtcDataChannelState { + self.inner.ready_state() + } + + /// Returns the current [RtcDataChannel] BufferedAmount + fn buffered_amount(&self) -> usize { + self.inner.buffered_amount() as usize + } + + /// Whether the data channel is ready for reading or writing. + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { + match self.ready_state() { + RtcDataChannelState::Connecting => { + self.open_waker.register(cx.waker()); + return Poll::Pending; + } + RtcDataChannelState::Closing | RtcDataChannelState::Closed => { + return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())) + } + RtcDataChannelState::Open | RtcDataChannelState::__Nonexhaustive => {} + } + + if self.overloaded.load(Ordering::SeqCst) { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::BrokenPipe, + "remote overloaded us with messages", + ))); + } + + Poll::Ready(Ok(())) + } +} + +impl AsyncRead for PollDataChannel { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let this = self.get_mut(); + + futures::ready!(this.poll_ready(cx))?; + + let mut read_buffer = this.read_buffer.lock().unwrap(); + + if read_buffer.is_empty() { + this.new_data_waker.register(cx.waker()); + return Poll::Pending; + } + + // Ensure that we: + // - at most return what the caller can read (`buf.len()`) + // - at most what we have (`read_buffer.len()`) + let split_index = min(buf.len(), read_buffer.len()); + + let bytes_to_return = read_buffer.split_to(split_index); + let len = bytes_to_return.len(); + buf[..len].copy_from_slice(&bytes_to_return); + + Poll::Ready(Ok(len)) + } +} + +impl AsyncWrite for PollDataChannel { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = self.get_mut(); + + futures::ready!(this.poll_ready(cx))?; + + debug_assert!(this.buffered_amount() <= MAX_MSG_LEN); + let remaining_space = MAX_MSG_LEN - this.buffered_amount(); + + if remaining_space == 0 { + this.write_waker.register(cx.waker()); + return Poll::Pending; + } + + let bytes_to_send = min(buf.len(), remaining_space); + + if this + .inner + .send_with_u8_array(&buf[..bytes_to_send]) + .is_err() + { + return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())); + } + + Poll::Ready(Ok(bytes_to_send)) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.buffered_amount() == 0 { + return Poll::Ready(Ok(())); + } + + self.write_waker.register(cx.waker()); + Poll::Pending + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.ready_state() == RtcDataChannelState::Closed { + return Poll::Ready(Ok(())); + } + + if self.ready_state() != RtcDataChannelState::Closing { + self.inner.close(); + } + + self.close_waker.register(cx.waker()); + Poll::Pending + } +} diff --git a/transports/webrtc-websys/src/transport.rs b/transports/webrtc-websys/src/transport.rs new file mode 100644 index 000000000000..ecf137eab8a1 --- /dev/null +++ b/transports/webrtc-websys/src/transport.rs @@ -0,0 +1,140 @@ +use super::upgrade; +use super::Connection; +use super::Error; +use futures::future::FutureExt; +use libp2p_core::multiaddr::Multiaddr; +use libp2p_core::muxing::StreamMuxerBox; +use libp2p_core::transport::{Boxed, ListenerId, Transport as _, TransportError, TransportEvent}; +use libp2p_identity::{Keypair, PeerId}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Config for the [`Transport`]. +#[derive(Clone)] +pub struct Config { + keypair: Keypair, +} + +/// A WebTransport [`Transport`](libp2p_core::Transport) that works with `web-sys`. +pub struct Transport { + config: Config, +} + +impl Config { + /// Constructs a new configuration for the [`Transport`]. + pub fn new(keypair: &Keypair) -> Self { + Config { + keypair: keypair.to_owned(), + } + } +} + +impl Transport { + /// Constructs a new `Transport` with the given [`Config`]. + pub fn new(config: Config) -> Transport { + Transport { config } + } + + /// Wraps `Transport` in [`Boxed`] and makes it ready to be consumed by + /// SwarmBuilder. + pub fn boxed(self) -> Boxed<(PeerId, StreamMuxerBox)> { + self.map(|(peer_id, muxer), _| (peer_id, StreamMuxerBox::new(muxer))) + .boxed() + } +} + +impl libp2p_core::Transport for Transport { + type Output = (PeerId, Connection); + type Error = Error; + type ListenerUpgrade = Pin> + Send>>; + type Dial = Pin> + Send>>; + + fn listen_on( + &mut self, + _id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { + Err(TransportError::MultiaddrNotSupported(addr)) + } + + fn remove_listener(&mut self, _id: ListenerId) -> bool { + false + } + + fn dial(&mut self, addr: Multiaddr) -> Result> { + if maybe_local_firefox() { + return Err(TransportError::Other( + "Firefox does not support WebRTC over localhost or 127.0.0.1" + .to_string() + .into(), + )); + } + + let (sock_addr, server_fingerprint) = libp2p_webrtc_utils::parse_webrtc_dial_addr(&addr) + .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?; + + if sock_addr.port() == 0 || sock_addr.ip().is_unspecified() { + return Err(TransportError::MultiaddrNotSupported(addr)); + } + + let config = self.config.clone(); + + Ok(async move { + let (peer_id, connection) = + upgrade::outbound(sock_addr, server_fingerprint, config.keypair.clone()).await?; + + Ok((peer_id, connection)) + } + .boxed()) + } + + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { + Err(TransportError::MultiaddrNotSupported(addr)) + } + + fn poll( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + + fn address_translation(&self, _listen: &Multiaddr, _observed: &Multiaddr) -> Option { + None + } +} + +/// Checks if local Firefox. +/// +/// See: `` for more details +fn maybe_local_firefox() -> bool { + let window = &web_sys::window().expect("window should be available"); + let ua = match window.navigator().user_agent() { + Ok(agent) => agent.to_lowercase(), + Err(_) => return false, + }; + + let hostname = match window + .document() + .expect("should be valid document") + .location() + { + Some(location) => match location.hostname() { + Ok(hostname) => hostname, + Err(_) => return false, + }, + None => return false, + }; + + // check if web_sys::Navigator::user_agent() matches any of the following: + // - firefox + // - seamonkey + // - iceape + // AND hostname is either localhost or "127.0.0.1" + (ua.contains("firefox") || ua.contains("seamonkey") || ua.contains("iceape")) + && (hostname == "localhost" || hostname == "127.0.0.1" || hostname == "[::1]") +} diff --git a/transports/webrtc-websys/src/upgrade.rs b/transports/webrtc-websys/src/upgrade.rs new file mode 100644 index 000000000000..092baed50c43 --- /dev/null +++ b/transports/webrtc-websys/src/upgrade.rs @@ -0,0 +1,56 @@ +use super::Error; +use crate::connection::RtcPeerConnection; +use crate::sdp; +use crate::Connection; +use libp2p_identity::{Keypair, PeerId}; +use libp2p_webrtc_utils::noise; +use libp2p_webrtc_utils::Fingerprint; +use send_wrapper::SendWrapper; +use std::net::SocketAddr; + +/// Upgrades an outbound WebRTC connection by creating the data channel +/// and conducting a Noise handshake +pub(crate) async fn outbound( + sock_addr: SocketAddr, + remote_fingerprint: Fingerprint, + id_keys: Keypair, +) -> Result<(PeerId, Connection), Error> { + let fut = SendWrapper::new(outbound_inner(sock_addr, remote_fingerprint, id_keys)); + fut.await +} + +/// Inner outbound function that is wrapped in [SendWrapper] +async fn outbound_inner( + sock_addr: SocketAddr, + remote_fingerprint: Fingerprint, + id_keys: Keypair, +) -> Result<(PeerId, Connection), Error> { + let rtc_peer_connection = RtcPeerConnection::new(remote_fingerprint.algorithm()).await?; + + // Create stream for Noise handshake + // Must create data channel before Offer is created for it to be included in the SDP + let (channel, listener) = rtc_peer_connection.new_handshake_stream(); + drop(listener); + + let ufrag = libp2p_webrtc_utils::sdp::random_ufrag(); + + let offer = rtc_peer_connection.create_offer().await?; + let munged_offer = sdp::offer(offer, &ufrag); + rtc_peer_connection + .set_local_description(munged_offer) + .await?; + + let answer = sdp::answer(sock_addr, remote_fingerprint, &ufrag); + rtc_peer_connection.set_remote_description(answer).await?; + + let local_fingerprint = rtc_peer_connection.local_fingerprint()?; + + log::trace!("local_fingerprint: {:?}", local_fingerprint); + log::trace!("remote_fingerprint: {:?}", remote_fingerprint); + + let peer_id = noise::outbound(id_keys, channel, remote_fingerprint, local_fingerprint).await?; + + log::debug!("Remote peer identified as {peer_id}"); + + Ok((peer_id, Connection::new(rtc_peer_connection))) +} diff --git a/transports/webrtc/CHANGELOG.md b/transports/webrtc/CHANGELOG.md index 0eb395574386..78d32f526172 100644 --- a/transports/webrtc/CHANGELOG.md +++ b/transports/webrtc/CHANGELOG.md @@ -1,3 +1,10 @@ +## 0.6.1-alpha - unreleased + +- Move common dependencies to `libp2p-webrtc-utils` crate. + See [PR 4248]. + +[PR 4248]: https://github.com/libp2p/rust-libp2p/pull/4248 + ## 0.6.0-alpha - Update `webrtc` dependency to `v0.8.0`. diff --git a/transports/webrtc/Cargo.toml b/transports/webrtc/Cargo.toml index f69cb12e4802..0ea589cfbaed 100644 --- a/transports/webrtc/Cargo.toml +++ b/transports/webrtc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "libp2p-webrtc" -version = "0.6.0-alpha" +version = "0.6.1-alpha" authors = ["Parity Technologies "] description = "WebRTC transport for libp2p" repository = "https://github.com/libp2p/rust-libp2p" @@ -12,7 +12,6 @@ categories = ["network-programming", "asynchronous"] [dependencies] async-trait = "0.1" -asynchronous-codec = "0.6.2" bytes = "1" futures = "0.3" futures-timer = "3" @@ -21,11 +20,9 @@ if-watch = "3.0" libp2p-core = { workspace = true } libp2p-noise = { workspace = true } libp2p-identity = { workspace = true } +libp2p-webrtc-utils = { workspace = true } log = "0.4" -sha2 = "0.10.7" -multihash = { workspace = true } -quick-protobuf = "0.8" -quick-protobuf-codec = { workspace = true } +multihash = { workspace = true } rand = "0.8" rcgen = "0.11.1" serde = { version = "1.0", features = ["derive"] } diff --git a/transports/webrtc/src/lib.rs b/transports/webrtc/src/lib.rs index f89d946228c3..f71203cc2f7e 100644 --- a/transports/webrtc/src/lib.rs +++ b/transports/webrtc/src/lib.rs @@ -79,13 +79,5 @@ //! hand-crate the SDP answer generated by the remote, this is problematic. A way to solve this //! is to make the hash a part of the remote's multiaddr. On the server side, we turn //! certificate verification off. - -mod proto { - #![allow(unreachable_pub)] - include!("generated/mod.rs"); - #[cfg(feature = "tokio")] - pub(crate) use self::webrtc::pb::{mod_Message::Flag, Message}; -} - #[cfg(feature = "tokio")] pub mod tokio; diff --git a/transports/webrtc/src/tokio/connection.rs b/transports/webrtc/src/tokio/connection.rs index 72e39ce525f9..29983d720b5c 100644 --- a/transports/webrtc/src/tokio/connection.rs +++ b/transports/webrtc/src/tokio/connection.rs @@ -40,7 +40,7 @@ use std::{ task::{Context, Poll}, }; -use crate::tokio::{error::Error, substream, substream::Substream}; +use crate::tokio::{error::Error, stream, stream::Stream}; /// Maximum number of unprocessed data channels. /// See [`Connection::poll_inbound`]. @@ -56,14 +56,14 @@ pub struct Connection { /// Channel onto which incoming data channels are put. incoming_data_channels_rx: mpsc::Receiver>, - /// Future, which, once polled, will result in an outbound substream. + /// Future, which, once polled, will result in an outbound stream. outbound_fut: Option, Error>>>, /// Future, which, once polled, will result in closing the entire connection. close_fut: Option>>, - /// A list of futures, which, once completed, signal that a [`Substream`] has been dropped. - drop_listeners: FuturesUnordered, + /// A list of futures, which, once completed, signal that a [`Stream`] has been dropped. + drop_listeners: FuturesUnordered, no_drop_listeners_waker: Option, } @@ -147,7 +147,7 @@ impl Connection { } impl StreamMuxer for Connection { - type Substream = Substream; + type Substream = Stream; type Error = Error; fn poll_inbound( @@ -156,15 +156,15 @@ impl StreamMuxer for Connection { ) -> Poll> { match ready!(self.incoming_data_channels_rx.poll_next_unpin(cx)) { Some(detached) => { - log::trace!("Incoming substream {}", detached.stream_identifier()); + log::trace!("Incoming stream {}", detached.stream_identifier()); - let (substream, drop_listener) = Substream::new(detached); + let (stream, drop_listener) = Stream::new(detached); self.drop_listeners.push(drop_listener); if let Some(waker) = self.no_drop_listeners_waker.take() { waker.wake() } - Poll::Ready(Ok(substream)) + Poll::Ready(Ok(stream)) } None => { debug_assert!( @@ -226,15 +226,15 @@ impl StreamMuxer for Connection { Ok(detached) => { self.outbound_fut = None; - log::trace!("Outbound substream {}", detached.stream_identifier()); + log::trace!("Outbound stream {}", detached.stream_identifier()); - let (substream, drop_listener) = Substream::new(detached); + let (stream, drop_listener) = Stream::new(detached); self.drop_listeners.push(drop_listener); if let Some(waker) = self.no_drop_listeners_waker.take() { waker.wake() } - Poll::Ready(Ok(substream)) + Poll::Ready(Ok(stream)) } Err(e) => { self.outbound_fut = None; diff --git a/transports/webrtc/src/tokio/fingerprint.rs b/transports/webrtc/src/tokio/fingerprint.rs index c3d58d64e720..c075e4862320 100644 --- a/transports/webrtc/src/tokio/fingerprint.rs +++ b/transports/webrtc/src/tokio/fingerprint.rs @@ -18,30 +18,25 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use sha2::Digest as _; -use std::fmt; use webrtc::dtls_transport::dtls_fingerprint::RTCDtlsFingerprint; const SHA256: &str = "sha-256"; -const MULTIHASH_SHA256_CODE: u64 = 0x12; type Multihash = multihash::Multihash<64>; /// A certificate fingerprint that is assumed to be created using the SHA256 hash algorithm. -#[derive(Eq, PartialEq, Copy, Clone)] -pub struct Fingerprint([u8; 32]); +#[derive(Debug, Eq, PartialEq, Copy, Clone)] +pub struct Fingerprint(libp2p_webrtc_utils::Fingerprint); impl Fingerprint { - pub(crate) const FF: Fingerprint = Fingerprint([0xFF; 32]); - #[cfg(test)] pub fn raw(bytes: [u8; 32]) -> Self { - Self(bytes) + Self(libp2p_webrtc_utils::Fingerprint::raw(bytes)) } /// Creates a fingerprint from a raw certificate. pub fn from_certificate(bytes: &[u8]) -> Self { - Fingerprint(sha2::Sha256::digest(bytes).into()) + Fingerprint(libp2p_webrtc_utils::Fingerprint::from_certificate(bytes)) } /// Converts [`RTCDtlsFingerprint`] to [`Fingerprint`]. @@ -53,58 +48,35 @@ impl Fingerprint { let mut buf = [0; 32]; hex::decode_to_slice(fp.value.replace(':', ""), &mut buf).ok()?; - Some(Self(buf)) + Some(Self(libp2p_webrtc_utils::Fingerprint::raw(buf))) } /// Converts [`Multihash`](multihash::Multihash) to [`Fingerprint`]. pub fn try_from_multihash(hash: Multihash) -> Option { - if hash.code() != MULTIHASH_SHA256_CODE { - // Only support SHA256 for now. - return None; - } - - let bytes = hash.digest().try_into().ok()?; - - Some(Self(bytes)) + Some(Self(libp2p_webrtc_utils::Fingerprint::try_from_multihash( + hash, + )?)) } /// Converts this fingerprint to [`Multihash`](multihash::Multihash). pub fn to_multihash(self) -> Multihash { - Multihash::wrap(MULTIHASH_SHA256_CODE, &self.0).expect("fingerprint's len to be 32 bytes") + self.0.to_multihash() } /// Formats this fingerprint as uppercase hex, separated by colons (`:`). /// /// This is the format described in . pub fn to_sdp_format(self) -> String { - self.0.map(|byte| format!("{byte:02X}")).join(":") + self.0.to_sdp_format() } /// Returns the algorithm used (e.g. "sha-256"). /// See pub fn algorithm(&self) -> String { - SHA256.to_owned() - } -} - -impl fmt::Debug for Fingerprint { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(&hex::encode(self.0)) + self.0.algorithm() } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn sdp_format() { - let fp = Fingerprint::raw(hex_literal::hex!( - "7DE3D83F81A680592A471E6B6ABB0747ABD35385A8093FDFE112C1EEBB6CC6AC" - )); - - let sdp_format = fp.to_sdp_format(); - assert_eq!(sdp_format, "7D:E3:D8:3F:81:A6:80:59:2A:47:1E:6B:6A:BB:07:47:AB:D3:53:85:A8:09:3F:DF:E1:12:C1:EE:BB:6C:C6:AC") + pub(crate) fn into_inner(self) -> libp2p_webrtc_utils::Fingerprint { + self.0 } } diff --git a/transports/webrtc/src/tokio/mod.rs b/transports/webrtc/src/tokio/mod.rs index 85e041bf98f1..4f2c0dd91169 100644 --- a/transports/webrtc/src/tokio/mod.rs +++ b/transports/webrtc/src/tokio/mod.rs @@ -24,7 +24,7 @@ mod error; mod fingerprint; mod req_res_chan; mod sdp; -mod substream; +mod stream; mod transport; mod udp_mux; mod upgrade; diff --git a/transports/webrtc/src/tokio/sdp.rs b/transports/webrtc/src/tokio/sdp.rs index d2f424e5d4ed..e49345a01b2e 100644 --- a/transports/webrtc/src/tokio/sdp.rs +++ b/transports/webrtc/src/tokio/sdp.rs @@ -18,22 +18,19 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use serde::Serialize; -use tinytemplate::TinyTemplate; +pub(crate) use libp2p_webrtc_utils::sdp::random_ufrag; +use libp2p_webrtc_utils::sdp::render_description; +use libp2p_webrtc_utils::Fingerprint; +use std::net::SocketAddr; use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; -use std::net::{IpAddr, SocketAddr}; - -use crate::tokio::fingerprint::Fingerprint; - /// Creates the SDP answer used by the client. pub(crate) fn answer( addr: SocketAddr, - server_fingerprint: &Fingerprint, + server_fingerprint: Fingerprint, client_ufrag: &str, ) -> RTCSessionDescription { - RTCSessionDescription::answer(render_description( - SERVER_SESSION_DESCRIPTION, + RTCSessionDescription::answer(libp2p_webrtc_utils::sdp::answer( addr, server_fingerprint, client_ufrag, @@ -45,13 +42,16 @@ pub(crate) fn answer( /// /// Certificate verification is disabled which is why we hardcode a dummy fingerprint here. pub(crate) fn offer(addr: SocketAddr, client_ufrag: &str) -> RTCSessionDescription { - RTCSessionDescription::offer(render_description( + let offer = render_description( CLIENT_SESSION_DESCRIPTION, addr, - &Fingerprint::FF, + Fingerprint::FF, client_ufrag, - )) - .unwrap() + ); + + log::trace!("Created SDP offer: {offer}"); + + RTCSessionDescription::offer(offer).unwrap() } // An SDP message that constitutes the offer. @@ -142,111 +142,3 @@ a=setup:actpass a=sctp-port:5000 a=max-message-size:16384 "; - -// See [`CLIENT_SESSION_DESCRIPTION`]. -// -// a=ice-lite -// -// A lite implementation is only appropriate for devices that will *always* be connected to -// the public Internet and have a public IP address at which it can receive packets from any -// correspondent. ICE will not function when a lite implementation is placed behind a NAT -// (RFC8445). -// -// a=tls-id: -// -// "TLS ID" uniquely identifies a TLS association. -// The ICE protocol uses a "TLS ID" system to indicate whether a fresh DTLS connection -// must be reopened in case of ICE renegotiation. Considering that ICE renegotiations -// never happen in our use case, we can simply put a random value and not care about -// it. Note however that the TLS ID in the answer must be present if and only if the -// offer contains one. (RFC8842) -// TODO: is it true that renegotiations never happen? what about a connection closing? -// "tls-id" attribute MUST be present in the initial offer and respective answer (RFC8839). -// XXX: but right now browsers don't send it. -// -// a=setup:passive -// -// "passive" indicates that the remote DTLS server will only listen for incoming -// connections. (RFC5763) -// The answerer (server) MUST not be located behind a NAT (RFC6135). -// -// The answerer MUST use either a setup attribute value of setup:active or setup:passive. -// Note that if the answerer uses setup:passive, then the DTLS handshake will not begin until -// the answerer is received, which adds additional latency. setup:active allows the answer and -// the DTLS handshake to occur in parallel. Thus, setup:active is RECOMMENDED. -// -// a=candidate: -// -// A transport address for a candidate that can be used for connectivity checks (RFC8839). -// -// a=end-of-candidates -// -// Indicate that no more candidates will ever be sent (RFC8838). -const SERVER_SESSION_DESCRIPTION: &str = "v=0 -o=- 0 0 IN {ip_version} {target_ip} -s=- -t=0 0 -a=ice-lite -m=application {target_port} UDP/DTLS/SCTP webrtc-datachannel -c=IN {ip_version} {target_ip} -a=mid:0 -a=ice-options:ice2 -a=ice-ufrag:{ufrag} -a=ice-pwd:{pwd} -a=fingerprint:{fingerprint_algorithm} {fingerprint_value} - -a=setup:passive -a=sctp-port:5000 -a=max-message-size:16384 -a=candidate:1 1 UDP 1 {target_ip} {target_port} typ host -a=end-of-candidates -"; - -/// Indicates the IP version used in WebRTC: `IP4` or `IP6`. -#[derive(Serialize)] -enum IpVersion { - IP4, - IP6, -} - -/// Context passed to the templating engine, which replaces the above placeholders (e.g. -/// `{IP_VERSION}`) with real values. -#[derive(Serialize)] -struct DescriptionContext { - pub(crate) ip_version: IpVersion, - pub(crate) target_ip: IpAddr, - pub(crate) target_port: u16, - pub(crate) fingerprint_algorithm: String, - pub(crate) fingerprint_value: String, - pub(crate) ufrag: String, - pub(crate) pwd: String, -} - -/// Renders a [`TinyTemplate`] description using the provided arguments. -fn render_description( - description: &str, - addr: SocketAddr, - fingerprint: &Fingerprint, - ufrag: &str, -) -> String { - let mut tt = TinyTemplate::new(); - tt.add_template("description", description).unwrap(); - - let context = DescriptionContext { - ip_version: { - if addr.is_ipv4() { - IpVersion::IP4 - } else { - IpVersion::IP6 - } - }, - target_ip: addr.ip(), - target_port: addr.port(), - fingerprint_algorithm: fingerprint.algorithm(), - fingerprint_value: fingerprint.to_sdp_format(), - // NOTE: ufrag is equal to pwd. - ufrag: ufrag.to_owned(), - pwd: ufrag.to_owned(), - }; - tt.render("description", &context).unwrap() -} diff --git a/transports/webrtc/src/tokio/stream.rs b/transports/webrtc/src/tokio/stream.rs new file mode 100644 index 000000000000..4278a751e27b --- /dev/null +++ b/transports/webrtc/src/tokio/stream.rs @@ -0,0 +1,80 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::{ + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use futures::prelude::*; +use libp2p_webrtc_utils::MAX_MSG_LEN; +use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; +use webrtc::data::data_channel::{DataChannel, PollDataChannel}; + +/// A substream on top of a WebRTC data channel. +/// +/// To be a proper libp2p substream, we need to implement [`AsyncRead`] and [`AsyncWrite`] as well +/// as support a half-closed state which we do by framing messages in a protobuf envelope. +pub struct Stream { + inner: libp2p_webrtc_utils::Stream>, +} + +pub(crate) type DropListener = libp2p_webrtc_utils::DropListener>; + +impl Stream { + /// Returns a new `Substream` and a listener, which will notify the receiver when/if the substream + /// is dropped. + pub(crate) fn new(data_channel: Arc) -> (Self, DropListener) { + let mut data_channel = PollDataChannel::new(data_channel).compat(); + data_channel.get_mut().set_read_buf_capacity(MAX_MSG_LEN); + + let (inner, drop_listener) = libp2p_webrtc_utils::Stream::new(data_channel); + + (Self { inner }, drop_listener) + } +} +impl AsyncRead for Stream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut self.get_mut().inner).poll_read(cx, buf) + } +} + +impl AsyncWrite for Stream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.get_mut().inner).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().inner).poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().inner).poll_close(cx) + } +} diff --git a/transports/webrtc/src/tokio/transport.rs b/transports/webrtc/src/tokio/transport.rs index faac75b24d58..4b3f15d5978c 100644 --- a/transports/webrtc/src/tokio/transport.rs +++ b/transports/webrtc/src/tokio/transport.rs @@ -119,7 +119,7 @@ impl libp2p_core::Transport for Transport { } fn dial(&mut self, addr: Multiaddr) -> Result> { - let (sock_addr, server_fingerprint) = parse_webrtc_dial_addr(&addr) + let (sock_addr, server_fingerprint) = libp2p_webrtc_utils::parse_webrtc_dial_addr(&addr) .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?; if sock_addr.port() == 0 || sock_addr.ip().is_unspecified() { return Err(TransportError::MultiaddrNotSupported(addr)); @@ -140,7 +140,7 @@ impl libp2p_core::Transport for Transport { sock_addr, config.inner, udp_mux, - client_fingerprint, + client_fingerprint.into_inner(), server_fingerprint, config.id_keys, ) @@ -337,7 +337,7 @@ impl Stream for ListenStream { new_addr.addr, self.config.inner.clone(), self.udp_mux.udp_mux_handle(), - self.config.fingerprint, + self.config.fingerprint.into_inner(), new_addr.ufrag, self.config.id_keys.clone(), ) @@ -427,40 +427,6 @@ fn parse_webrtc_listen_addr(addr: &Multiaddr) -> Option { Some(SocketAddr::new(ip, port)) } -/// Parse the given [`Multiaddr`] into a [`SocketAddr`] and a [`Fingerprint`] for dialing. -fn parse_webrtc_dial_addr(addr: &Multiaddr) -> Option<(SocketAddr, Fingerprint)> { - let mut iter = addr.iter(); - - let ip = match iter.next()? { - Protocol::Ip4(ip) => IpAddr::from(ip), - Protocol::Ip6(ip) => IpAddr::from(ip), - _ => return None, - }; - - let port = iter.next()?; - let webrtc = iter.next()?; - let certhash = iter.next()?; - - let (port, fingerprint) = match (port, webrtc, certhash) { - (Protocol::Udp(port), Protocol::WebRTCDirect, Protocol::Certhash(cert_hash)) => { - let fingerprint = Fingerprint::try_from_multihash(cert_hash)?; - - (port, fingerprint) - } - _ => return None, - }; - - match iter.next() { - Some(Protocol::P2p(_)) => {} - // peer ID is optional - None => {} - // unexpected protocol - Some(_) => return None, - } - - Some((SocketAddr::new(ip, port), fingerprint)) -} - // Tests ////////////////////////////////////////////////////////////////////////////////////////// #[cfg(test)] @@ -469,7 +435,7 @@ mod tests { use futures::future::poll_fn; use libp2p_core::{multiaddr::Protocol, Transport as _}; use rand::thread_rng; - use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + use std::net::{IpAddr, Ipv6Addr}; #[test] fn missing_webrtc_protocol() { @@ -480,44 +446,6 @@ mod tests { assert!(maybe_parsed.is_none()); } - #[test] - fn parse_valid_address_with_certhash_and_p2p() { - let addr = "/ip4/127.0.0.1/udp/39901/webrtc-direct/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w/p2p/12D3KooWNpDk9w6WrEEcdsEH1y47W71S36yFjw4sd3j7omzgCSMS" - .parse() - .unwrap(); - - let maybe_parsed = parse_webrtc_dial_addr(&addr); - - assert_eq!( - maybe_parsed, - Some(( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 39901), - Fingerprint::raw(hex_literal::hex!( - "e2929e4a5548242ed6b512350df8829b1e4f9d50183c5732a07f99d7c4b2b8eb" - )) - )) - ); - } - - #[test] - fn peer_id_is_not_required() { - let addr = "/ip4/127.0.0.1/udp/39901/webrtc-direct/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w" - .parse() - .unwrap(); - - let maybe_parsed = parse_webrtc_dial_addr(&addr); - - assert_eq!( - maybe_parsed, - Some(( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 39901), - Fingerprint::raw(hex_literal::hex!( - "e2929e4a5548242ed6b512350df8829b1e4f9d50183c5732a07f99d7c4b2b8eb" - )) - )) - ); - } - #[test] fn tcp_is_invalid_protocol() { let addr = "/ip4/127.0.0.1/tcp/12345/webrtc-direct/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w" @@ -540,26 +468,6 @@ mod tests { assert!(maybe_parsed.is_none()); } - #[test] - fn parse_ipv6() { - let addr = - "/ip6/::1/udp/12345/webrtc-direct/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w/p2p/12D3KooWNpDk9w6WrEEcdsEH1y47W71S36yFjw4sd3j7omzgCSMS" - .parse() - .unwrap(); - - let maybe_parsed = parse_webrtc_dial_addr(&addr); - - assert_eq!( - maybe_parsed, - Some(( - SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 12345), - Fingerprint::raw(hex_literal::hex!( - "e2929e4a5548242ed6b512350df8829b1e4f9d50183c5732a07f99d7c4b2b8eb" - )) - )) - ); - } - #[test] fn can_parse_valid_addr_without_certhash() { let addr = "/ip6/::1/udp/12345/webrtc-direct".parse().unwrap(); diff --git a/transports/webrtc/src/tokio/upgrade.rs b/transports/webrtc/src/tokio/upgrade.rs index 2d5e3fe2c10e..414fc2721d05 100644 --- a/transports/webrtc/src/tokio/upgrade.rs +++ b/transports/webrtc/src/tokio/upgrade.rs @@ -18,15 +18,14 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -mod noise; +use libp2p_webrtc_utils::{noise, Fingerprint}; use futures::channel::oneshot; use futures::future::Either; use futures_timer::Delay; use libp2p_identity as identity; use libp2p_identity::PeerId; -use rand::distributions::Alphanumeric; -use rand::{thread_rng, Rng}; +use std::{net::SocketAddr, sync::Arc, time::Duration}; use webrtc::api::setting_engine::SettingEngine; use webrtc::api::APIBuilder; use webrtc::data::data_channel::DataChannel; @@ -38,9 +37,8 @@ use webrtc::ice::udp_network::UDPNetwork; use webrtc::peer_connection::configuration::RTCConfiguration; use webrtc::peer_connection::RTCPeerConnection; -use std::{net::SocketAddr, sync::Arc, time::Duration}; - -use crate::tokio::{error::Error, fingerprint::Fingerprint, sdp, substream::Substream, Connection}; +use crate::tokio::sdp::random_ufrag; +use crate::tokio::{error::Error, sdp, stream::Stream, Connection}; /// Creates a new outbound WebRTC connection. pub(crate) async fn outbound( @@ -59,7 +57,7 @@ pub(crate) async fn outbound( log::debug!("created SDP offer for outbound connection: {:?}", offer.sdp); peer_connection.set_local_description(offer).await?; - let answer = sdp::answer(addr, &server_fingerprint, &ufrag); + let answer = sdp::answer(addr, server_fingerprint, &ufrag); log::debug!( "calculated SDP answer for outbound connection: {:?}", answer @@ -155,18 +153,6 @@ async fn new_inbound_connection( Ok(connection) } -/// Generates a random ufrag and adds a prefix according to the spec. -fn random_ufrag() -> String { - format!( - "libp2p+webrtc+v1/{}", - thread_rng() - .sample_iter(&Alphanumeric) - .take(64) - .map(char::from) - .collect::() - ) -} - fn setting_engine( udp_mux: Arc, ufrag: &str, @@ -203,9 +189,7 @@ async fn get_remote_fingerprint(conn: &RTCPeerConnection) -> Fingerprint { Fingerprint::from_certificate(&cert_bytes) } -async fn create_substream_for_noise_handshake( - conn: &RTCPeerConnection, -) -> Result { +async fn create_substream_for_noise_handshake(conn: &RTCPeerConnection) -> Result { // NOTE: the data channel w/ `negotiated` flag set to `true` MUST be created on both ends. let data_channel = conn .create_data_channel( @@ -234,7 +218,7 @@ async fn create_substream_for_noise_handshake( } }; - let (substream, drop_listener) = Substream::new(channel); + let (substream, drop_listener) = Stream::new(channel); drop(drop_listener); // Don't care about cancelled substreams during initial handshake. Ok(substream)