Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Embassy support #11

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 29 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,40 @@ categories = ["network-programming"]
keywords = ["coap", "server", "async", "IoT"]
edition = "2021"

[features]
default-features = ["tokio"]
tokio = ["std", "dep:tokio", "dep:tokio-stream", "dep:tokio-util"]
std = []
observable = []
embassy = ["dep:oneshot", "dep:embassy-util", "dep:embassy-executor", "dep:embassy-net", "dep:watch"]

[dependencies]
tokio = { version = "1.17.0", features = ["full"] }
tokio-stream = "0.1.8"
tokio-util = { version = "0.7.0", features = ["codec", "net"] }
bytes = "1.1.0"
futures = "0.3.21"
tokio = { version = "1.17.0", features = ["full"], optional = true }
tokio-stream = { version = "0.1.8", optional = true }
tokio-util = { version = "0.7.0", features = ["codec", "net"], optional = true }
thingbuf = { version = "0.1", default-features = false }
bytes = { version = "1.1.0", default-features = false }
futures = { version = "0.3.21", default-features = false }
async-trait = "0.1.53"
pin-project = "1.0.10"
sync_wrapper = "0.1.1"
dyn-clone = "1.0.5"
anyhow = "1.0.56"
thiserror = "1.0.30"
env_logger = "0.9.0"
log = "0.4.16"
coap-lite = "0.9.0"
rand = "0.8.5"
anyhow = { version = "1.0.56", default-features = false }
log = { version = "0.4.16", default-features = false }
coap-lite = { path = "../coap-lite", default-features = false }
rand = { version = "0.8", default-features = false }
hashbrown = "0.12"

embassy-executor = { path = "../embassy/embassy-executor", default-features = false, optional = true }
embassy-net = { path = "../embassy/embassy-net", default-features = false, optional = true }
oneshot = { version = "0.1", default-features = false, features = ["async"], optional = true }
watch = { path = "../watch", default-features = false, optional = true }

[target.'cfg(target_os = "none")'.dependencies]
embassy-util = { path = "../embassy/embassy-util", default-features = false, optional = true }

[target.'cfg(not(target_os = "none"))'.dependencies]
embassy-util = { path = "../embassy/embassy-util", features = ["std"], optional = true }

[dev-dependencies]
async-stream = "0.3.3"
Expand Down
17 changes: 11 additions & 6 deletions src/app/app_builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::fmt::Debug;
use std::hash::Hash;
use core::fmt::Debug;
use core::hash::Hash;

use alloc::vec::Vec;
use rand::Rng;

use crate::app::app_handler::AppHandler;
use crate::app::ResourceBuilder;
Expand Down Expand Up @@ -92,10 +95,12 @@ impl<Endpoint: Ord + Clone> AppBuilder<Endpoint> {
}
}

impl<Endpoint: Debug + Clone + Ord + Eq + Hash + Send + 'static>
IntoHandler<AppHandler<Endpoint>, Endpoint> for AppBuilder<Endpoint>
impl<
Endpoint: Debug + Clone + Ord + Eq + Hash + Send + 'static,
R: Rng + Send + Sync + Clone + 'static,
> IntoHandler<AppHandler<Endpoint, R>, Endpoint, R> for AppBuilder<Endpoint>
{
fn into_handler(self, mtu: Option<u32>) -> AppHandler<Endpoint> {
AppHandler::from_builder(self, mtu)
fn into_handler(self, mtu: Option<u32>, rng: R) -> AppHandler<Endpoint, R> {
AppHandler::from_builder(self, mtu, rng)
}
}
124 changes: 89 additions & 35 deletions src/app/app_handler.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::Hash;
use std::pin::Pin;
use std::sync::Arc;
use alloc::boxed::Box;
use alloc::fmt::Debug;
use alloc::sync::Arc;
use alloc::vec::Vec;
use core::pin::Pin;
use core::{hash::Hash, task::Poll};
use hashbrown::HashMap;
use pin_project::pin_project;
use rand::Rng;

use coap_lite::{BlockHandler, CoapRequest, MessageClass, MessageType, Packet};
use futures::Stream;
#[cfg(feature = "embassy")]
use embassy_util::{blocking_mutex::raw::CriticalSectionRawMutex, mutex::Mutex};
use futures::{Future, Stream};
use log::{debug, warn};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::Mutex;
use tokio_stream::wrappers::UnboundedReceiverStream;
#[cfg(feature = "tokio")]
use tokio::sync::{mpsc::UnboundedSender, Mutex};

use crate::app::app_builder::AppBuilder;
use crate::app::block_handler_util::new_block_handler;
Expand All @@ -29,55 +34,95 @@ pub(crate) const DEFAULT_BLOCK_TRANSFER: bool = true;
/// Main PacketHandler for an application suite of handlers. Efficiency and concurrency are
/// the primary goals of this implementation, but with the need to balance developer friendliness
/// of the main API.
pub struct AppHandler<Endpoint: Debug + Clone + Ord + Eq + Hash> {
pub struct AppHandler<Endpoint: Debug + Clone + Ord + Eq + Hash, R: Clone> {
#[cfg(feature = "tokio")]
retransmission_manager: Arc<Mutex<RetransmissionManager<Endpoint>>>,
#[cfg(feature = "embassy")]
retransmission_manager: Arc<Mutex<CriticalSectionRawMutex, RetransmissionManager<Endpoint>>>,

/// Special internal [`coap_lite::BlockHandler`] that we use only for formatting errors
/// that might be larger than MTU.
#[cfg(feature = "tokio")]
error_block_handler: Arc<Mutex<BlockHandler<Endpoint>>>,
#[cfg(feature = "embassy")]
error_block_handler: Arc<Mutex<CriticalSectionRawMutex, BlockHandler<Endpoint>>>,

/// Full set of handlers registered for this app, grouped by path but searchable using inexact
/// matching. See [`PathMatcher`] for more.
handlers_by_path: Arc<PathMatcher<ResourceHandler<Endpoint>>>,
rng: R,
}

impl<Endpoint: Debug + Clone + Ord + Eq + Hash> Clone for AppHandler<Endpoint> {
impl<Endpoint: Debug + Clone + Ord + Eq + Hash, R: Clone> Clone for AppHandler<Endpoint, R> {
fn clone(&self) -> Self {
Self {
retransmission_manager: self.retransmission_manager.clone(),
error_block_handler: self.error_block_handler.clone(),
handlers_by_path: self.handlers_by_path.clone(),
rng: self.rng.clone(),
}
}
}

impl<Endpoint: Debug + Clone + Ord + Eq + Hash + Send + 'static> PacketHandler<Endpoint>
for AppHandler<Endpoint>
#[pin_project]
struct PacketStream<F: Send> {
#[pin]
fut: F,
response: Vec<Packet>,
fut_complete: bool,
}

impl<F: Future<Output = Vec<Packet>> + Send> Stream for PacketStream<F> {
type Item = Packet;

fn poll_next(
self: Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<Option<Self::Item>> {
let this = self.project();

if !*this.fut_complete {
match this.fut.poll(cx) {
Poll::Ready(response) => {
*this.response = response;
*this.fut_complete = true;
// FIXME: should use some more efficient container
this.response.reverse();
}
Poll::Pending => {
return Poll::Pending;
}
}
}

Poll::Ready(this.response.pop())
}
}

impl<Endpoint: Debug + Clone + Ord + Eq + Hash + Send + 'static, R: Rng + Send + Sync + Clone>
PacketHandler<Endpoint> for AppHandler<Endpoint, R>
{
fn handle<'a>(
&'a self,
packet: Packet,
peer: Endpoint,
) -> Pin<Box<dyn Stream<Item = Packet> + Send + 'a>> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

// TODO: This spawn is technically unnecessary as we could implement a Stream ourselves
// similar to how async-stream crate does it, but the boiler plate doesn't really seem
// worth it for now.
tokio::spawn({
let cloned_self = self.clone();
async move {
cloned_self.handle_packet(tx, packet, peer).await;
}
});
Box::pin(UnboundedReceiverStream::new(rx))
let handler = self.handle_packet(packet, peer);
Box::pin(PacketStream {
fut: handler,
response: vec![],
fut_complete: false,
})
}
}

impl<Endpoint: Debug + Clone + Ord + Eq + Hash + Send + 'static> AppHandler<Endpoint> {
pub fn from_builder(builder: AppBuilder<Endpoint>, mtu: Option<u32>) -> Self {
impl<Endpoint: Debug + Clone + Ord + Eq + Hash + Send + 'static, R: Rng + Clone>
AppHandler<Endpoint, R>
{
pub fn from_builder(builder: AppBuilder<Endpoint>, mtu: Option<u32>, mut rng: R) -> Self {
let retransmission_manager = Arc::new(Mutex::new(RetransmissionManager::new(
TransmissionParameters::default(),
&mut rng,
)));

let build_params = BuildParameters {
Expand Down Expand Up @@ -110,16 +155,20 @@ impl<Endpoint: Debug + Clone + Ord + Eq + Hash + Send + 'static> AppHandler<Endp
retransmission_manager,
error_block_handler,
handlers_by_path,
rng,
}
}

async fn handle_packet(&self, tx: UnboundedSender<Packet>, packet: Packet, peer: Endpoint) {
async fn handle_packet(&self, packet: Packet, peer: Endpoint) -> Vec<Packet> {
match packet.header.code {
MessageClass::Request(_) => {
self.handle_get(tx, packet, peer).await;
let mut packets = vec![];
self.handle_get(&mut packets, packet, peer).await;
packets
}
MessageClass::Response(_) => {
warn!("Spurious response message from {peer:?}, ignoring...");
vec![]
}
MessageClass::Empty => {
match packet.header.get_type() {
Expand All @@ -133,41 +182,45 @@ impl<Endpoint: Debug + Clone + Ord + Eq + Hash + Send + 'static> AppHandler<Endp
"Got {t:?} from {peer:?} for unrecognized message ID {message_id}"
);
}

vec![]
}
MessageType::Confirmable => {
// A common way in CoAP to trigger a cheap "ping" to make sure
// the server is alive.
tx.send(new_pong_message(&packet)).unwrap();
vec![new_pong_message(&packet)]
}
MessageType::NonConfirmable => {
debug!("Ignoring Non-Confirmable Empty message from {peer:?}");
vec![]
}
}
}
code => {
warn!("Unhandled message code {code} from {peer:?}, ignoring...");
vec![]
}
}
}

async fn handle_get(&self, tx: UnboundedSender<Packet>, packet: Packet, peer: Endpoint) {
async fn handle_get(&self, out: &mut Vec<Packet>, packet: Packet, peer: Endpoint) {
let mut request = CoapRequest::from_packet(packet, peer);
if let Err(e) = self.try_handle_get(&tx, &mut request).await {
if let Err(e) = self.try_handle_get(out, &mut request).await {
if request.apply_from_error(e.into_handling_error()) {
// If the error happens to need block2 handling, let's do that here...
let _ = self
.error_block_handler
.lock()
.await
.intercept_response(&mut request);
tx.send(request.response.unwrap().message).unwrap();
out.push(request.response.unwrap().message);
}
}
}

async fn try_handle_get(
&self,
tx: &UnboundedSender<Packet>,
out: &mut Vec<Packet>,
request: &mut CoapRequest<Endpoint>,
) -> Result<(), CoapError> {
let paths = request.get_path_as_vec().map_err(CoapError::bad_request)?;
Expand All @@ -193,7 +246,8 @@ impl<Endpoint: Debug + Clone + Ord + Eq + Hash + Send + 'static> AppHandler<Endp
unmatched_path: Vec::from(&paths[matched_index..]),
};

value.handle(tx, wrapped_request).await
let mut rng = self.rng.clone();
value.handle(out, wrapped_request, &mut rng).await
}
None => Err(CoapError::not_found()),
}
Expand Down
4 changes: 3 additions & 1 deletion src/app/coap_utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::HashMap;
use hashbrown::HashMap;

use alloc::string::{String, ToString};
use alloc::vec::Vec;
use coap_lite::error::IncompatibleOptionValueFormat;
use coap_lite::option_value::OptionValueType;
use coap_lite::{CoapOption, CoapRequest, MessageClass, MessageType, Packet};
Expand Down
11 changes: 7 additions & 4 deletions src/app/core_handler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::Hash;
use std::sync::Arc;
use alloc::boxed::Box;
use alloc::string::String;
use alloc::sync::Arc;
use alloc::vec::Vec;
use core::fmt::Debug;
use core::hash::Hash;
use hashbrown::HashMap;

use async_trait::async_trait;
use coap_lite::{ContentFormat, ResponseType};
Expand Down
7 changes: 5 additions & 2 deletions src/app/core_link.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use crate::app::resource_builder::DiscoverableResource;
use alloc::boxed::Box;
use alloc::string::{String, ToString};
use alloc::vec::Vec;
use coap_lite::link_format::{LinkAttributeWrite, LinkFormatWrite};
use coap_lite::ContentFormat;
use core::fmt::{Debug, Error, Write};
use dyn_clone::DynClone;
use std::collections::HashMap;
use std::fmt::{Debug, Error, Write};
use hashbrown::HashMap;

#[derive(Default, Debug)]
pub struct CoreLink {
Expand Down
8 changes: 5 additions & 3 deletions src/app/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fmt::Debug;
use std::{error, fmt};
use core::fmt;
use core::fmt::Debug;

use alloc::string::{String, ToString};
use coap_lite::error::HandlingError;
use coap_lite::ResponseType;

Expand Down Expand Up @@ -51,7 +52,8 @@ impl fmt::Display for CoapError {
}
}

impl error::Error for CoapError {}
#[cfg(feature = "std")]
impl std::error::Error for CoapError {}

impl From<HandlingError> for CoapError {
fn from(src: HandlingError) -> Self {
Expand Down
10 changes: 8 additions & 2 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
pub use app_builder::AppBuilder;
use core::fmt::Debug;
use core::hash::Hash;
pub use error::CoapError;
#[cfg(feature = "observable")]
pub use observable_resource::ObservableResource;
#[cfg(feature = "observable")]
pub use observers::Observers;
#[cfg(feature = "observable")]
pub use observers::ObserversHolder;
pub use request::Request;
pub use resource_builder::ResourceBuilder;
pub use response::Response;
use std::fmt::Debug;
use std::hash::Hash;

pub mod app_builder;
pub mod app_handler;
Expand All @@ -16,8 +19,11 @@ mod coap_utils;
mod core_handler;
mod core_link;
pub mod error;
#[cfg(feature = "observable")]
pub mod observable_resource;
#[cfg(feature = "observable")]
mod observe_handler;
#[cfg(feature = "observable")]
mod observers;
mod path_matcher;
pub mod request;
Expand Down
Loading