Skip to content

Commit

Permalink
Now compiles, downloads is bugged
Browse files Browse the repository at this point in the history
  • Loading branch information
nick42d committed Nov 24, 2024
1 parent eee317f commit 74a2efd
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 165 deletions.
83 changes: 51 additions & 32 deletions async-callback-manager/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::{
task::{
AsyncTask, AsyncTaskKind, FutureTask, ResponseInformation, SpawnedTask, StreamTask,
TaskInformation, TaskList, TaskOutcome, TaskWaiter,
AsyncTask, AsyncTaskKind, FutureTask, SpawnedTask, StreamTask, TaskInformation, TaskList,
TaskOutcome, TaskWaiter,
},
Constraint, DEFAULT_STREAM_CHANNEL_SIZE,
};
use futures::{Stream, StreamExt};
use std::{any::TypeId, future::Future};
use std::{any::TypeId, future::Future, sync::Arc};

#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct TaskId(pub(crate) usize);
Expand All @@ -23,18 +23,24 @@ pub(crate) type DynStreamTask<Frntend, Bkend, Md> =
Box<dyn FnOnce(&Bkend) -> DynMutationStream<Frntend, Bkend, Md>>;

pub(crate) type DynTaskSpawnCallback<Cstrnt> = dyn Fn(TaskInformation<Cstrnt>);
pub(crate) type DynResponseReceivedCallback = dyn Fn(ResponseInformation);

pub struct AsyncCallbackManager<Frntend, Bkend, Md> {
next_task_id: usize,
tasks_list: TaskList<Frntend, Bkend, Md>,
// It could be possible to make these generic instead of dynamic, however this type would then
// require 2 more type parameters.
on_task_spawn: Box<DynTaskSpawnCallback<Md>>,
on_response_received: Box<DynResponseReceivedCallback>,
on_id_overflow: Box<dyn Fn()>,
}

/// Temporary struct to store task details before it is added to the task list.
pub(crate) struct TempSpawnedTask<Frntend, Bkend, Md> {
waiter: TaskWaiter<Frntend, Bkend, Md>,
type_id: TypeId,
type_name: &'static str,
type_debug: Arc<String>,
}

impl<Frntend, Bkend, Md: PartialEq> Default for AsyncCallbackManager<Frntend, Bkend, Md> {
fn default() -> Self {
Self::new()
Expand All @@ -48,7 +54,6 @@ impl<Frntend, Bkend, Md: PartialEq> AsyncCallbackManager<Frntend, Bkend, Md> {
next_task_id: Default::default(),
tasks_list: TaskList::new(),
on_task_spawn: Box::new(|_| {}),
on_response_received: Box::new(|_| {}),
on_id_overflow: Box::new(|| {}),
}
}
Expand All @@ -63,15 +68,6 @@ impl<Frntend, Bkend, Md: PartialEq> AsyncCallbackManager<Frntend, Bkend, Md> {
self.on_task_spawn = Box::new(cb);
self
}
// TODO: when is this called?
pub fn with_on_response_received_callback(
mut self,
cb: impl Fn(ResponseInformation) + 'static,
) -> Self {
todo!();
self.on_response_received = Box::new(cb);
self
}
/// Await for the next response from one of the spawned tasks, or returns
/// None if no tasks were in the list.
pub async fn get_next_response(&mut self) -> Option<TaskOutcome<Frntend, Bkend, Md>> {
Expand All @@ -88,20 +84,41 @@ impl<Frntend, Bkend, Md: PartialEq> AsyncCallbackManager<Frntend, Bkend, Md> {
constraint,
metadata,
} = task;
let (waiter, type_id, type_name) = match task {
match task {
AsyncTaskKind::Future(future_task) => {
self.spawn_future_task(backend, future_task, &constraint)
let outcome = self.spawn_future_task(backend, future_task, &constraint);
self.add_task_to_list(outcome, metadata, constraint);
}
AsyncTaskKind::Stream(stream_task) => {
self.spawn_stream_task(backend, stream_task, &constraint)
let outcome = self.spawn_stream_task(backend, stream_task, &constraint);
self.add_task_to_list(outcome, metadata, constraint);
}
// Don't call (self.on_task_spawn)() for NoOp.
AsyncTaskKind::NoOp => return,
};
AsyncTaskKind::Multi(tasks) => {
for task in tasks {
self.spawn_task(backend, task)
}
}
AsyncTaskKind::NoOp => (),
}
}
fn add_task_to_list(
&mut self,
details: TempSpawnedTask<Frntend, Bkend, Md>,
metadata: Vec<Md>,
constraint: Option<Constraint<Md>>,
) {
let TempSpawnedTask {
waiter,
type_id,
type_name,
type_debug,
} = details;
let sp = SpawnedTask {
type_id,
task_id: TaskId(self.next_task_id),
type_name,
type_debug,
receiver: waiter,
metadata,
};
Expand All @@ -123,7 +140,7 @@ impl<Frntend, Bkend, Md: PartialEq> AsyncCallbackManager<Frntend, Bkend, Md> {
backend: &Bkend,
future_task: FutureTask<Frntend, Bkend, Md>,
constraint: &Option<Constraint<Md>>,
) -> (TaskWaiter<Frntend, Bkend, Md>, TypeId, &'static str)
) -> TempSpawnedTask<Frntend, Bkend, Md>
where
Frntend: 'static,
Bkend: 'static,
Expand All @@ -132,23 +149,24 @@ impl<Frntend, Bkend, Md: PartialEq> AsyncCallbackManager<Frntend, Bkend, Md> {
(self.on_task_spawn)(TaskInformation {
type_id: future_task.type_id,
type_name: future_task.type_name,
type_debug: future_task.type_debug,
type_debug: &future_task.type_debug,
constraint,
});
let future = (future_task.task)(backend);
let handle = tokio::spawn(future);
(
TaskWaiter::Future(handle),
future_task.type_id,
future_task.type_name,
)
TempSpawnedTask {
waiter: TaskWaiter::Future(handle),
type_id: future_task.type_id,
type_name: future_task.type_name,
type_debug: Arc::new(future_task.type_debug),
}
}
fn spawn_stream_task(
&self,
backend: &Bkend,
stream_task: StreamTask<Frntend, Bkend, Md>,
constraint: &Option<Constraint<Md>>,
) -> (TaskWaiter<Frntend, Bkend, Md>, TypeId, &'static str)
) -> TempSpawnedTask<Frntend, Bkend, Md>
where
Frntend: 'static,
Bkend: 'static,
Expand All @@ -163,7 +181,7 @@ impl<Frntend, Bkend, Md: PartialEq> AsyncCallbackManager<Frntend, Bkend, Md> {
(self.on_task_spawn)(TaskInformation {
type_id,
type_name,
type_debug,
type_debug: &type_debug,
constraint,
});
let mut stream = task(backend);
Expand All @@ -180,13 +198,14 @@ impl<Frntend, Bkend, Md: PartialEq> AsyncCallbackManager<Frntend, Bkend, Md> {
}
})
.abort_handle();
(
TaskWaiter::Stream {
TempSpawnedTask {
waiter: TaskWaiter::Stream {
receiver: rx,
abort_handle,
},
type_id,
type_name,
)
type_debug: Arc::new(type_debug),
}
}
}
45 changes: 33 additions & 12 deletions async-callback-manager/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use std::{
any::{type_name, TypeId},
fmt::Debug,
sync::Arc,
};
use tokio::{
sync::mpsc,
Expand All @@ -24,6 +25,7 @@ pub struct AsyncTask<Frntend, Bkend, Md> {
pub(crate) enum AsyncTaskKind<Frntend, Bkend, Md> {
Future(FutureTask<Frntend, Bkend, Md>),
Stream(StreamTask<Frntend, Bkend, Md>),
Multi(Vec<AsyncTask<Frntend, Bkend, Md>>),
NoOp,
}

Expand All @@ -41,6 +43,20 @@ pub(crate) struct FutureTask<Frntend, Bkend, Md> {
pub(crate) type_debug: String,
}

impl<Frntend, Bkend, Md> FromIterator<AsyncTask<Frntend, Bkend, Md>>
for AsyncTask<Frntend, Bkend, Md>
{
fn from_iter<T: IntoIterator<Item = AsyncTask<Frntend, Bkend, Md>>>(iter: T) -> Self {
let v = iter.into_iter().collect();
// TODO: Better handle constraints / metadata.
AsyncTask {
task: AsyncTaskKind::Multi(v),
constraint: None,
metadata: vec![],
}
}
}

impl<Frntend, Bkend, Md> AsyncTask<Frntend, Bkend, Md> {
pub fn new_no_op() -> AsyncTask<Frntend, Bkend, Md> {
Self {
Expand Down Expand Up @@ -286,6 +302,14 @@ impl<Frntend, Bkend, Md> AsyncTask<Frntend, Bkend, Md> {
constraint,
metadata,
},
AsyncTaskKind::Multi(v) => {
let mapped = v.into_iter().map(|task| task.map(f.clone())).collect();
AsyncTask {
task: AsyncTaskKind::Multi(mapped),
constraint,
metadata,
}
}
}
}
}
Expand All @@ -297,26 +321,18 @@ pub(crate) struct TaskList<Bkend, Frntend, Md> {
pub(crate) struct SpawnedTask<Frntend, Bkend, Md> {
pub(crate) type_id: TypeId,
pub(crate) type_name: &'static str,
pub(crate) type_debug: Arc<String>,
pub(crate) receiver: TaskWaiter<Frntend, Bkend, Md>,
pub(crate) task_id: TaskId,
pub(crate) metadata: Vec<Md>,
}

// User visible struct for introspection.
#[derive(Debug, Clone)]
pub struct ResponseInformation {
pub type_id: TypeId,
pub type_name: &'static str,
pub task_id: TaskId,
pub task_is_now_finished: bool,
}

// User visible struct for introspection.
/// User visible struct for introspection.
#[derive(Debug, Clone)]
pub struct TaskInformation<'a, Cstrnt> {
pub type_id: TypeId,
pub type_name: &'static str,
pub type_debug: String,
pub type_debug: &'a str,
pub constraint: &'a Option<Constraint<Cstrnt>>,
}

Expand Down Expand Up @@ -363,13 +379,15 @@ pub enum TaskOutcome<Frntend, Bkend, Md> {
error: JoinError,
type_id: TypeId,
type_name: &'static str,
type_debug: Arc<String>,
task_id: TaskId,
},
/// Mutation was received from a task.
MutationReceived {
mutation: DynStateMutation<Frntend, Bkend, Md>,
type_id: TypeId,
type_name: &'static str,
type_debug: Arc<String>,
task_id: TaskId,
},
}
Expand All @@ -392,15 +410,17 @@ impl<Bkend, Frntend, Md: PartialEq> TaskList<Frntend, Bkend, Md> {
TaskOutcome::MutationReceived {
mutation,
type_id: task.type_id,
type_name: task.type_name,
type_debug: task.type_debug.clone(),
task_id: task.task_id,
type_name: task.type_name,
},
),
Err(error) => (
Some(idx),
TaskOutcome::TaskPanicked {
type_id: task.type_id,
type_name: task.type_name,
type_debug: task.type_debug.clone(),
task_id: task.task_id,
error,
},
Expand All @@ -417,6 +437,7 @@ impl<Bkend, Frntend, Md: PartialEq> TaskList<Frntend, Bkend, Md> {
type_id: task.type_id,
type_name: task.type_name,
task_id: task.task_id,
type_debug: task.type_debug.clone(),
},
);
}
Expand Down
39 changes: 23 additions & 16 deletions youtui/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::borrow::Cow;
use std::{io, sync::Arc};
use structures::ListSong;
use tokio::sync::mpsc;
use tracing::info;
use tracing::{error, info, warn};
use tracing_subscriber::prelude::*;
use ui::WindowContext;
use ui::YoutuiWindow;
Expand All @@ -35,7 +35,6 @@ thread_local! {
}

const CALLBACK_CHANNEL_SIZE: usize = 64;
const ASYNC_CALLBACK_SENDER_CHANNEL_SIZE: usize = 64;
const EVENT_CHANNEL_SIZE: usize = 256;
const LOG_FILE_NAME: &str = "debug.log";

Expand Down Expand Up @@ -98,17 +97,18 @@ impl Youtui {
task.type_debug, task.type_id, task.constraint
)
})
.with_on_response_received_callback(|response| {
info!(
"Received response to {:?}: type_id: {:?}, sender_id: {:?}, task_id: {:?}",
response.type_name, response.type_id, response.sender_id, response.task_id
)
});
.with_on_id_overflow_callback(|| warn!("Task IDs have overflowed. New tasks will temporarily not block or kill existing tasks"));
let server = Arc::new(server::Server::new(api_key, po_token));
let backend = CrosstermBackend::new(stdout);
let terminal = Terminal::new(backend)?;
let event_handler = EventHandler::new(EVENT_CHANNEL_SIZE)?;
let window_state = YoutuiWindow::new(callback_tx, &config);
let (window_state, effect) = YoutuiWindow::new(callback_tx, &config);
// Even the creation of a YoutuiWindow causes an effect. We'll spawn it straight
// away.
task_manager.spawn_task(
&server,
effect.map(|this: &mut Self| &mut this.window_state),
);
Ok(Youtui {
status: AppStatus::Running,
event_handler,
Expand Down Expand Up @@ -158,19 +158,26 @@ impl Youtui {
}
fn handle_effect(&mut self, effect: TaskOutcome<Self, ArcServer, TaskMetadata>) {
match effect {
async_callback_manager::TaskOutcome::StreamClosed => (),
async_callback_manager::TaskOutcome::StreamClosed => {
info!("Received a stream closed message from task manager")
}
async_callback_manager::TaskOutcome::TaskPanicked {
error,
type_id,
type_name,
task_id,
} => panic!("Panicked running task {type_name}"),
type_debug, error, ..
} => {
error!("Task {type_debug} panicked!");
std::panic::resume_unwind(error.into_panic())
}
async_callback_manager::TaskOutcome::MutationReceived {
mutation,
type_id,
type_name,
type_debug,
task_id,
..
} => {
info!(
"Received response to {:?}: type_id: {:?}, task_id: {:?}",
type_debug, type_id, task_id
);
let next_task = mutation(self);
self.task_manager.spawn_task(&self.server, next_task);
}
Expand Down
Loading

0 comments on commit 74a2efd

Please sign in to comment.