Skip to content

Commit

Permalink
Add metadata to BackendTasks
Browse files Browse the repository at this point in the history
  • Loading branch information
nick42d committed Nov 13, 2024
1 parent fc10f27 commit 3af0312
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 136 deletions.
16 changes: 11 additions & 5 deletions async-callback-manager/examples/ratatui_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Mode {
}
}
}
impl From<&Mode> for Option<async_callback_manager::Constraint> {
impl<T> From<&Mode> for Option<async_callback_manager::Constraint<T>> {
fn from(value: &Mode) -> Self {
match value {
Mode::BlockPreviousTasks => {
Expand All @@ -46,7 +46,7 @@ struct State {
word: String,
number: String,
mode: Mode,
callback_handle: AsyncCallbackSender<reqwest::Client, Self>,
callback_handle: AsyncCallbackSender<reqwest::Client, Self, ()>,
}
impl State {
fn draw(&self, f: &mut Frame) {
Expand Down Expand Up @@ -79,7 +79,6 @@ impl State {
|state, word| state.word = word,
(&self.mode).into(),
)
.await
.unwrap()
}
async fn handle_start_counter(&mut self) {
Expand All @@ -90,7 +89,6 @@ impl State {
|state, num| state.number = num,
(&self.mode).into(),
)
.await
.unwrap()
}
}
Expand All @@ -100,7 +98,7 @@ async fn main() {
let mut terminal = ratatui::init();
let backend = reqwest::Client::new();
let mut events = EventStream::new().filter_map(event_to_action);
let mut manager = AsyncCallbackManager::new(50);
let mut manager = AsyncCallbackManager::new();
let mut state = State {
word: String::new(),
number: String::new(),
Expand All @@ -127,6 +125,10 @@ async fn main() {

struct GetWordRequest;
impl BackendTask<reqwest::Client> for GetWordRequest {
type ConstraintType = ();
fn metadata() -> Vec<Self::ConstraintType> {
vec![]
}
type Output = String;
fn into_future(
self,
Expand All @@ -149,6 +151,10 @@ impl BackendTask<reqwest::Client> for GetWordRequest {
struct CounterStream;
impl<T> BackendStreamingTask<T> for CounterStream {
type Output = String;
type ConstraintType = ();
fn metadata() -> Vec<Self::ConstraintType> {
vec![]
}
fn into_stream(
self,
_: &T,
Expand Down
12 changes: 12 additions & 0 deletions async-callback-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,30 @@ pub trait BkendMap<Bkend> {
/// TypeId is used as part of the task management process.
pub trait BackendTask<Bkend>: Send + Any {
type Output: Send;
type ConstraintType: PartialEq;
fn into_future(self, backend: &Bkend) -> impl Future<Output = Self::Output> + Send + 'static;
/// Metadata provides a way of grouping different tasks for use in
/// constraints, if you override the default implementation.
fn metadata() -> Vec<Self::ConstraintType> {
vec![]
}
}

/// A task of kind T that can be run on a backend, returning a stream of outputs
/// Output. The type must implement Any, as the TypeId is used as part of the
/// task management process.
pub trait BackendStreamingTask<Bkend>: Send + Any {
type Output: Send;
type ConstraintType: PartialEq;
fn into_stream(
self,
backend: &Bkend,
) -> impl Stream<Item = Self::Output> + Send + Unpin + 'static;
/// Metadata provides a way of grouping different tasks for use in
/// constraints, if you override the default implementation.
fn metadata() -> Vec<Self::ConstraintType> {
vec![]
}
}

struct KillHandle(Option<oneshot::Sender<()>>);
Expand Down
39 changes: 23 additions & 16 deletions async-callback-manager/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,24 @@ use crate::{
task::{ResponseInformation, Task, TaskFromFrontend, TaskInformation, TaskList},
AsyncCallbackSender,
};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};

#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct SenderId(usize);
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct TaskId(usize);

type DynTaskReceivedCallback = dyn FnMut(TaskInformation);
type DynTaskReceivedCallback<Cstrnt> = dyn FnMut(TaskInformation<Cstrnt>);
type DynResponseReceivedCallback = dyn FnMut(ResponseInformation);

pub struct AsyncCallbackManager<Bkend> {
pub struct AsyncCallbackManager<Bkend, Cstrnt> {
next_sender_id: usize,
next_task_id: usize,
this_sender: Sender<TaskFromFrontend<Bkend>>,
this_receiver: Receiver<TaskFromFrontend<Bkend>>,
tasks_list: TaskList,
this_sender: UnboundedSender<TaskFromFrontend<Bkend, Cstrnt>>,
this_receiver: UnboundedReceiver<TaskFromFrontend<Bkend, Cstrnt>>,
tasks_list: TaskList<Cstrnt>,
// TODO: Make generic instead of dynamic.
on_task_received: Box<DynTaskReceivedCallback>,
on_task_received: Box<DynTaskReceivedCallback<Cstrnt>>,
on_response_received: Box<DynResponseReceivedCallback>,
}

Expand All @@ -37,24 +37,30 @@ impl ManagedEventType {
}
}

impl<Bkend> AsyncCallbackManager<Bkend> {
/// Get a new AsyncCallbackManager. Channel size refers to number of
/// messages that can be buffered from senders.
pub fn new(channel_size: usize) -> Self {
let (tx, rx) = mpsc::channel(channel_size);
impl<Bkend, Cstrnt: PartialEq> Default for AsyncCallbackManager<Bkend, Cstrnt> {
fn default() -> Self {
Self::new()
}
}
impl<Bkend, Cstrnt: PartialEq> AsyncCallbackManager<Bkend, Cstrnt> {
/// Get a new AsyncCallbackManager.
// TODO: Consider if this should be bounded. Unbounded has been chose for now as
// it allows senders to send without blocking.
pub fn new() -> Self {
let (tx, rx) = mpsc::unbounded_channel();
AsyncCallbackManager {
next_sender_id: 0,
next_task_id: 0,
this_receiver: rx,
this_sender: tx,
tasks_list: Default::default(),
tasks_list: TaskList::new(),
on_task_received: Box::new(|_| {}),
on_response_received: Box::new(|_| {}),
}
}
pub fn with_on_task_received_callback(
mut self,
cb: impl FnMut(TaskInformation) + 'static,
cb: impl FnMut(TaskInformation<Cstrnt>) + 'static,
) -> Self {
self.on_task_received = Box::new(cb);
self
Expand All @@ -72,7 +78,7 @@ impl<Bkend> AsyncCallbackManager<Bkend> {
pub fn new_sender<Frntend>(
&mut self,
channel_size: usize,
) -> AsyncCallbackSender<Bkend, Frntend> {
) -> AsyncCallbackSender<Bkend, Frntend, Cstrnt> {
let (tx, rx) = mpsc::channel(channel_size);
let task_function_sender = self.this_sender.clone();
let id = SenderId(self.next_sender_id);
Expand Down Expand Up @@ -120,7 +126,7 @@ impl<Bkend> AsyncCallbackManager<Bkend> {
pub async fn process_next_response(&mut self) -> Option<ResponseInformation> {
self.tasks_list.process_next_response().await
}
fn spawn_task(&mut self, backend: &Bkend, task: TaskFromFrontend<Bkend>) {
fn spawn_task(&mut self, backend: &Bkend, task: TaskFromFrontend<Bkend, Cstrnt>) {
(self.on_task_received)(task.get_information());
if let Some(constraint) = task.constraint {
self.tasks_list
Expand All @@ -129,6 +135,7 @@ impl<Bkend> AsyncCallbackManager<Bkend> {
self.tasks_list.push(Task::new(
task.type_id,
task.type_name,
task.metadata,
task.receiver,
task.sender_id,
TaskId(self.next_task_id),
Expand Down
33 changes: 17 additions & 16 deletions async-callback-manager/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ use std::{
future::Future,
};
use tokio::sync::{
mpsc::{self, Receiver, Sender},
mpsc::{self, Receiver, Sender, UnboundedSender},
oneshot,
};

pub struct AsyncCallbackSender<Bkend, Frntend> {
pub struct AsyncCallbackSender<Bkend, Frntend, Cstrnt> {
pub(crate) id: SenderId,
pub(crate) this_sender: Sender<DynCallbackFn<Frntend>>,
pub(crate) this_receiver: Receiver<DynCallbackFn<Frntend>>,
pub(crate) runner_sender: Sender<TaskFromFrontend<Bkend>>,
pub(crate) runner_sender: UnboundedSender<TaskFromFrontend<Bkend, Cstrnt>>,
}

/// A set of state mutations, that can be applied to a Frntend.
Expand All @@ -40,7 +40,7 @@ impl<Frntend> StateMutationBundle<Frntend> {
}
}

impl<Bkend, Frntend> AsyncCallbackSender<Bkend, Frntend> {
impl<Bkend, Frntend, Cstrnt> AsyncCallbackSender<Bkend, Frntend, Cstrnt> {
pub async fn get_next_mutations(
&mut self,
max_mutations: usize,
Expand All @@ -51,15 +51,15 @@ impl<Bkend, Frntend> AsyncCallbackSender<Bkend, Frntend> {
.await;
StateMutationBundle { mutation_list }
}
pub async fn add_stream_callback<R>(
pub fn add_stream_callback<R>(
&self,
request: R,
// TODO: Relax Clone bounds if possible.
handler: impl FnOnce(&mut Frntend, R::Output) + Send + Clone + 'static,
constraint: Option<Constraint>,
constraint: Option<Constraint<Cstrnt>>,
) -> Result<()>
where
R: BackendStreamingTask<Bkend> + 'static,
R: BackendStreamingTask<Bkend, ConstraintType = Cstrnt> + 'static,
Bkend: Send + 'static,
Frntend: 'static,
{
Expand All @@ -80,16 +80,16 @@ impl<Bkend, Frntend> AsyncCallbackSender<Bkend, Frntend> {
.boxed(),
) as DynFallibleFuture
};
self.send_task::<R>(func, rx, constraint, kill_tx).await
self.send_task::<R>(func, R::metadata(), rx, constraint, kill_tx)
}
pub async fn add_callback<R>(
pub fn add_callback<R>(
&self,
request: R,
handler: impl FnOnce(&mut Frntend, R::Output) + Send + 'static,
constraint: Option<Constraint>,
constraint: Option<Constraint<Cstrnt>>,
) -> Result<()>
where
R: BackendTask<Bkend> + 'static,
R: BackendTask<Bkend, ConstraintType = Cstrnt> + 'static,
Bkend: Send + 'static,
Frntend: 'static,
{
Expand All @@ -109,27 +109,28 @@ impl<Bkend, Frntend> AsyncCallbackSender<Bkend, Frntend> {
.boxed(),
) as DynFallibleFuture
};
self.send_task::<R>(func, rx, constraint, kill_tx).await
self.send_task::<R>(func, R::metadata(), rx, constraint, kill_tx)
}
async fn send_task<R: Any + 'static>(
fn send_task<R: Any + 'static>(
&self,
func: impl FnOnce(&Bkend) -> DynFallibleFuture + 'static,
metadata: Vec<Cstrnt>,
rx: impl Into<TaskReceiver>,
constraint: Option<Constraint>,
constraint: Option<Constraint<Cstrnt>>,
kill_handle: KillHandle,
) -> Result<()> {
self.runner_sender
.send(TaskFromFrontend::new(
TypeId::of::<R>(),
std::any::type_name::<R>(),
metadata,
func,
rx,
self.id,
constraint,
kill_handle,
))
.await
.map_err(|_| Error::ErrorSending)
.map_err(|_| Error::ReceiverDropped)
}
}

Expand Down
Loading

0 comments on commit 3af0312

Please sign in to comment.