From 3af03128dc853c69361ffc5bd02e51db679dd0a8 Mon Sep 17 00:00:00 2001 From: Nick Dowsett Date: Wed, 13 Nov 2024 20:09:45 +0800 Subject: [PATCH] Add metadata to BackendTasks --- .../examples/ratatui_example.rs | 16 +++-- async-callback-manager/src/lib.rs | 12 ++++ async-callback-manager/src/manager.rs | 39 +++++++----- async-callback-manager/src/sender.rs | 33 +++++----- async-callback-manager/src/task.rs | 60 ++++++++++++------- .../tests/integration_tests.rs | 45 ++++++-------- async-rodio-sink/src/lib.rs | 2 +- youtui/src/app.rs | 28 ++++----- youtui/src/app/server.rs | 8 +-- youtui/src/app/server/downloader.rs | 4 +- youtui/src/app/ui.rs | 22 ++++--- youtui/src/app/ui/playlist.rs | 51 +++++++++------- 12 files changed, 184 insertions(+), 136 deletions(-) diff --git a/async-callback-manager/examples/ratatui_example.rs b/async-callback-manager/examples/ratatui_example.rs index 4cddf33..0b58964 100644 --- a/async-callback-manager/examples/ratatui_example.rs +++ b/async-callback-manager/examples/ratatui_example.rs @@ -29,7 +29,7 @@ impl Mode { } } } -impl From<&Mode> for Option { +impl From<&Mode> for Option> { fn from(value: &Mode) -> Self { match value { Mode::BlockPreviousTasks => { @@ -46,7 +46,7 @@ struct State { word: String, number: String, mode: Mode, - callback_handle: AsyncCallbackSender, + callback_handle: AsyncCallbackSender, } impl State { fn draw(&self, f: &mut Frame) { @@ -79,7 +79,6 @@ impl State { |state, word| state.word = word, (&self.mode).into(), ) - .await .unwrap() } async fn handle_start_counter(&mut self) { @@ -90,7 +89,6 @@ impl State { |state, num| state.number = num, (&self.mode).into(), ) - .await .unwrap() } } @@ -100,7 +98,7 @@ async fn main() { let mut terminal = ratatui::init(); let backend = reqwest::Client::new(); let mut events = EventStream::new().filter_map(event_to_action); - let mut manager = AsyncCallbackManager::new(50); + let mut manager = AsyncCallbackManager::new(); let mut state = State { word: String::new(), number: String::new(), @@ -127,6 +125,10 @@ async fn main() { struct GetWordRequest; impl BackendTask for GetWordRequest { + type ConstraintType = (); + fn metadata() -> Vec { + vec![] + } type Output = String; fn into_future( self, @@ -149,6 +151,10 @@ impl BackendTask for GetWordRequest { struct CounterStream; impl BackendStreamingTask for CounterStream { type Output = String; + type ConstraintType = (); + fn metadata() -> Vec { + vec![] + } fn into_stream( self, _: &T, diff --git a/async-callback-manager/src/lib.rs b/async-callback-manager/src/lib.rs index ce3e6b8..39debca 100644 --- a/async-callback-manager/src/lib.rs +++ b/async-callback-manager/src/lib.rs @@ -23,7 +23,13 @@ pub trait BkendMap { /// TypeId is used as part of the task management process. pub trait BackendTask: Send + Any { type Output: Send; + type ConstraintType: PartialEq; fn into_future(self, backend: &Bkend) -> impl Future + Send + 'static; + /// Metadata provides a way of grouping different tasks for use in + /// constraints, if you override the default implementation. + fn metadata() -> Vec { + vec![] + } } /// A task of kind T that can be run on a backend, returning a stream of outputs @@ -31,10 +37,16 @@ pub trait BackendTask: Send + Any { /// task management process. pub trait BackendStreamingTask: Send + Any { type Output: Send; + type ConstraintType: PartialEq; fn into_stream( self, backend: &Bkend, ) -> impl Stream + Send + Unpin + 'static; + /// Metadata provides a way of grouping different tasks for use in + /// constraints, if you override the default implementation. + fn metadata() -> Vec { + vec![] + } } struct KillHandle(Option>); diff --git a/async-callback-manager/src/manager.rs b/async-callback-manager/src/manager.rs index 2772e9f..b9c4651 100644 --- a/async-callback-manager/src/manager.rs +++ b/async-callback-manager/src/manager.rs @@ -2,24 +2,24 @@ use crate::{ task::{ResponseInformation, Task, TaskFromFrontend, TaskInformation, TaskList}, AsyncCallbackSender, }; -use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub struct SenderId(usize); #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub struct TaskId(usize); -type DynTaskReceivedCallback = dyn FnMut(TaskInformation); +type DynTaskReceivedCallback = dyn FnMut(TaskInformation); type DynResponseReceivedCallback = dyn FnMut(ResponseInformation); -pub struct AsyncCallbackManager { +pub struct AsyncCallbackManager { next_sender_id: usize, next_task_id: usize, - this_sender: Sender>, - this_receiver: Receiver>, - tasks_list: TaskList, + this_sender: UnboundedSender>, + this_receiver: UnboundedReceiver>, + tasks_list: TaskList, // TODO: Make generic instead of dynamic. - on_task_received: Box, + on_task_received: Box>, on_response_received: Box, } @@ -37,24 +37,30 @@ impl ManagedEventType { } } -impl AsyncCallbackManager { - /// Get a new AsyncCallbackManager. Channel size refers to number of - /// messages that can be buffered from senders. - pub fn new(channel_size: usize) -> Self { - let (tx, rx) = mpsc::channel(channel_size); +impl Default for AsyncCallbackManager { + fn default() -> Self { + Self::new() + } +} +impl AsyncCallbackManager { + /// Get a new AsyncCallbackManager. + // TODO: Consider if this should be bounded. Unbounded has been chose for now as + // it allows senders to send without blocking. + pub fn new() -> Self { + let (tx, rx) = mpsc::unbounded_channel(); AsyncCallbackManager { next_sender_id: 0, next_task_id: 0, this_receiver: rx, this_sender: tx, - tasks_list: Default::default(), + tasks_list: TaskList::new(), on_task_received: Box::new(|_| {}), on_response_received: Box::new(|_| {}), } } pub fn with_on_task_received_callback( mut self, - cb: impl FnMut(TaskInformation) + 'static, + cb: impl FnMut(TaskInformation) + 'static, ) -> Self { self.on_task_received = Box::new(cb); self @@ -72,7 +78,7 @@ impl AsyncCallbackManager { pub fn new_sender( &mut self, channel_size: usize, - ) -> AsyncCallbackSender { + ) -> AsyncCallbackSender { let (tx, rx) = mpsc::channel(channel_size); let task_function_sender = self.this_sender.clone(); let id = SenderId(self.next_sender_id); @@ -120,7 +126,7 @@ impl AsyncCallbackManager { pub async fn process_next_response(&mut self) -> Option { self.tasks_list.process_next_response().await } - fn spawn_task(&mut self, backend: &Bkend, task: TaskFromFrontend) { + fn spawn_task(&mut self, backend: &Bkend, task: TaskFromFrontend) { (self.on_task_received)(task.get_information()); if let Some(constraint) = task.constraint { self.tasks_list @@ -129,6 +135,7 @@ impl AsyncCallbackManager { self.tasks_list.push(Task::new( task.type_id, task.type_name, + task.metadata, task.receiver, task.sender_id, TaskId(self.next_task_id), diff --git a/async-callback-manager/src/sender.rs b/async-callback-manager/src/sender.rs index b26564b..c734a40 100644 --- a/async-callback-manager/src/sender.rs +++ b/async-callback-manager/src/sender.rs @@ -10,15 +10,15 @@ use std::{ future::Future, }; use tokio::sync::{ - mpsc::{self, Receiver, Sender}, + mpsc::{self, Receiver, Sender, UnboundedSender}, oneshot, }; -pub struct AsyncCallbackSender { +pub struct AsyncCallbackSender { pub(crate) id: SenderId, pub(crate) this_sender: Sender>, pub(crate) this_receiver: Receiver>, - pub(crate) runner_sender: Sender>, + pub(crate) runner_sender: UnboundedSender>, } /// A set of state mutations, that can be applied to a Frntend. @@ -40,7 +40,7 @@ impl StateMutationBundle { } } -impl AsyncCallbackSender { +impl AsyncCallbackSender { pub async fn get_next_mutations( &mut self, max_mutations: usize, @@ -51,15 +51,15 @@ impl AsyncCallbackSender { .await; StateMutationBundle { mutation_list } } - pub async fn add_stream_callback( + pub fn add_stream_callback( &self, request: R, // TODO: Relax Clone bounds if possible. handler: impl FnOnce(&mut Frntend, R::Output) + Send + Clone + 'static, - constraint: Option, + constraint: Option>, ) -> Result<()> where - R: BackendStreamingTask + 'static, + R: BackendStreamingTask + 'static, Bkend: Send + 'static, Frntend: 'static, { @@ -80,16 +80,16 @@ impl AsyncCallbackSender { .boxed(), ) as DynFallibleFuture }; - self.send_task::(func, rx, constraint, kill_tx).await + self.send_task::(func, R::metadata(), rx, constraint, kill_tx) } - pub async fn add_callback( + pub fn add_callback( &self, request: R, handler: impl FnOnce(&mut Frntend, R::Output) + Send + 'static, - constraint: Option, + constraint: Option>, ) -> Result<()> where - R: BackendTask + 'static, + R: BackendTask + 'static, Bkend: Send + 'static, Frntend: 'static, { @@ -109,27 +109,28 @@ impl AsyncCallbackSender { .boxed(), ) as DynFallibleFuture }; - self.send_task::(func, rx, constraint, kill_tx).await + self.send_task::(func, R::metadata(), rx, constraint, kill_tx) } - async fn send_task( + fn send_task( &self, func: impl FnOnce(&Bkend) -> DynFallibleFuture + 'static, + metadata: Vec, rx: impl Into, - constraint: Option, + constraint: Option>, kill_handle: KillHandle, ) -> Result<()> { self.runner_sender .send(TaskFromFrontend::new( TypeId::of::(), std::any::type_name::(), + metadata, func, rx, self.id, constraint, kill_handle, )) - .await - .map_err(|_| Error::ErrorSending) + .map_err(|_| Error::ReceiverDropped) } } diff --git a/async-callback-manager/src/task.rs b/async-callback-manager/src/task.rs index 8523f35..5bb10e0 100644 --- a/async-callback-manager/src/task.rs +++ b/async-callback-manager/src/task.rs @@ -3,9 +3,8 @@ use futures::{stream::FuturesUnordered, StreamExt}; use std::any::TypeId; use tokio::sync::{mpsc, oneshot}; -#[derive(Default)] -pub(crate) struct TaskList { - pub inner: Vec, +pub(crate) struct TaskList { + pub inner: Vec>, } // User visible struct for introspection. @@ -20,41 +19,44 @@ pub struct ResponseInformation { // User visible struct for introspection. #[derive(Debug, Clone)] -pub struct TaskInformation<'a> { +pub struct TaskInformation<'a, Cstrnt> { pub type_id: TypeId, pub type_name: &'static str, pub sender_id: SenderId, - pub constraint: &'a Option, + pub constraint: &'a Option>, } -pub(crate) struct TaskFromFrontend { +pub(crate) struct TaskFromFrontend { pub(crate) type_id: TypeId, pub(crate) type_name: &'static str, + pub(crate) metadata: Vec, pub(crate) task: DynBackendTask, pub(crate) receiver: TaskReceiver, pub(crate) sender_id: SenderId, - pub(crate) constraint: Option, + pub(crate) constraint: Option>, pub(crate) kill_handle: KillHandle, } -pub(crate) struct Task { +pub(crate) struct Task { pub(crate) type_id: TypeId, pub(crate) type_name: &'static str, pub(crate) receiver: TaskReceiver, pub(crate) sender_id: SenderId, pub(crate) task_id: TaskId, pub(crate) kill_handle: KillHandle, + pub(crate) metadata: Vec, } #[derive(Eq, PartialEq, Debug)] -pub struct Constraint { - pub(crate) constraint_type: ConstraitType, +pub struct Constraint { + pub(crate) constraint_type: ConstraitType, } #[derive(Eq, PartialEq, Debug)] -pub enum ConstraitType { +pub enum ConstraitType { BlockSameType, KillSameType, + BlockMatchingMetatdata(Cstrnt), } pub(crate) enum TaskReceiver { @@ -72,7 +74,10 @@ impl From> for TaskReceiver { } } -impl TaskList { +impl TaskList { + pub(crate) fn new() -> Self { + Self { inner: vec![] } + } /// Returns Some(ResponseInformation) if a task existed in the list, and it /// was processed. Returns None, if no tasks were in the list. pub(crate) async fn process_next_response(&mut self) -> Option { @@ -139,19 +144,25 @@ impl TaskList { } }) } - pub(crate) fn push(&mut self, task: Task) { + pub(crate) fn push(&mut self, task: Task) { self.inner.push(task) } + // TODO: Tests pub(crate) fn handle_constraint( &mut self, - constraint: Constraint, + constraint: Constraint, type_id: TypeId, sender_id: SenderId, ) { // Assuming here that kill implies block also. let task_doesnt_match_constraint = - |task: &Task| (task.type_id != type_id) || (task.sender_id != sender_id); + |task: &Task<_>| (task.type_id != type_id) || (task.sender_id != sender_id); + let task_doesnt_match_metadata = + |task: &Task<_>, constraint: &Cstrnt| !task.metadata.contains(constraint); match constraint.constraint_type { + ConstraitType::BlockMatchingMetatdata(metadata) => self + .inner + .retain(|task| task_doesnt_match_metadata(task, &metadata)), ConstraitType::BlockSameType => { self.inner.retain(task_doesnt_match_constraint); } @@ -166,19 +177,21 @@ impl TaskList { } } -impl TaskFromFrontend { +impl TaskFromFrontend { pub(crate) fn new( type_id: TypeId, type_name: &'static str, + metadata: Vec, task: impl FnOnce(&Bkend) -> DynFallibleFuture + 'static, receiver: impl Into, sender_id: SenderId, - constraint: Option, + constraint: Option>, kill_handle: KillHandle, ) -> Self { Self { type_id, type_name, + metadata, task: Box::new(task), receiver: receiver.into(), sender_id, @@ -186,7 +199,7 @@ impl TaskFromFrontend { kill_handle, } } - pub(crate) fn get_information(&self) -> TaskInformation<'_> { + pub(crate) fn get_information(&self) -> TaskInformation<'_, Cstrnt> { TaskInformation { type_id: self.type_id, type_name: self.type_name, @@ -196,10 +209,11 @@ impl TaskFromFrontend { } } -impl Task { +impl Task { pub(crate) fn new( type_id: TypeId, type_name: &'static str, + metadata: Vec, receiver: TaskReceiver, sender_id: SenderId, task_id: TaskId, @@ -212,11 +226,12 @@ impl Task { sender_id, kill_handle, task_id, + metadata, } } } -impl Constraint { +impl Constraint { pub fn new_block_same_type() -> Self { Self { constraint_type: ConstraitType::BlockSameType, @@ -227,4 +242,9 @@ impl Constraint { constraint_type: ConstraitType::KillSameType, } } + pub fn new_block_matching_metadata(metadata: Cstrnt) -> Self { + Self { + constraint_type: ConstraitType::BlockMatchingMetatdata(metadata), + } + } } diff --git a/async-callback-manager/tests/integration_tests.rs b/async-callback-manager/tests/integration_tests.rs index 4af13f0..060acca 100644 --- a/async-callback-manager/tests/integration_tests.rs +++ b/async-callback-manager/tests/integration_tests.rs @@ -20,6 +20,7 @@ struct MockMutatingBackend { impl BackendTask>> for DelayedBackendMutatingRequest { type Output = String; + type ConstraintType = (); fn into_future( self, backend: &Arc>, @@ -36,6 +37,7 @@ impl BackendTask>> for DelayedBackendMutatingRequ } impl BackendTask for TextTask { type Output = String; + type ConstraintType = (); // Manual async function required due to bounds. #[allow(clippy::manual_async_fn)] fn into_future(self, _: &T) -> impl Future + Send + 'static { @@ -44,6 +46,7 @@ impl BackendTask for TextTask { } impl BackendStreamingTask for StreamingCounterTask { type Output = usize; + type ConstraintType = (); fn into_stream( self, _: &T, @@ -55,6 +58,7 @@ impl BackendStreamingTask>> for DelayedBackendMutatingStreamingCounterTask { type Output = usize; + type ConstraintType = (); fn into_stream( self, backend: &Arc>, @@ -75,16 +79,19 @@ impl BackendStreamingTask>> } } -fn init() -> ( - AsyncCallbackManager, - AsyncCallbackSender, +fn init() -> ( + AsyncCallbackManager, + AsyncCallbackSender, ) { - let mut manager = async_callback_manager::AsyncCallbackManager::new(DEFAULT_CHANNEL_SIZE); + let mut manager = async_callback_manager::AsyncCallbackManager::new(); let sender = manager.new_sender(DEFAULT_CHANNEL_SIZE); (manager, sender) } -async fn drain_manager(mut manager: AsyncCallbackManager, _: Bkend) { +async fn drain_manager( + mut manager: AsyncCallbackManager, + _: Bkend, +) { loop { if manager.process_next_response().await.is_none() { return; @@ -102,7 +109,6 @@ async fn test_mutate_once() { |state, new| *state = new, None, ) - .await .unwrap(); manager.spawn_next_task(&()).await; drain_manager(manager, ()).await; @@ -123,7 +129,6 @@ async fn test_mutate_twice() { |state: &mut Vec<_>, new| state.push(new), None, ) - .await .unwrap(); manager.spawn_next_task(&()).await; state_receiver @@ -132,7 +137,6 @@ async fn test_mutate_twice() { |state, new| state.push(new), None, ) - .await .unwrap(); manager.spawn_next_task(&()).await; drain_manager(manager, ()).await; @@ -156,7 +160,6 @@ async fn test_mutate_stream() { |state: &mut Vec<_>, new| state.push(new), None, ) - .await .unwrap(); manager.spawn_next_task(&()).await; drain_manager(manager, ()).await; @@ -178,7 +181,6 @@ async fn test_mutate_stream_twice() { |state: &mut Vec<_>, new| state.push(new), None, ) - .await .unwrap(); manager.spawn_next_task(&backend).await; state_receiver @@ -187,7 +189,6 @@ async fn test_mutate_stream_twice() { |state: &mut Vec<_>, new| state.push(new), None, ) - .await .unwrap(); manager.spawn_next_task(&backend).await; drain_manager(manager, backend).await; @@ -206,14 +207,13 @@ async fn test_mutate_stream_twice() { async fn test_block_constraint() { let backend = Arc::new(Mutex::new(MockMutatingBackend::default())); let mut state = vec![]; - let (mut manager, mut state_receiver) = init::<_, Vec<_>>(); + let (mut manager, mut state_receiver) = init::<_, Vec<_>, _>(); state_receiver .add_callback( DelayedBackendMutatingRequest("This message should get blocked!".to_string()), |state, new| state.push(new), None, ) - .await .unwrap(); manager.spawn_next_task(&backend).await; state_receiver @@ -222,7 +222,6 @@ async fn test_block_constraint() { |state, new| state.push(new), Some(Constraint::new_block_same_type()), ) - .await .unwrap(); manager.spawn_next_task(&backend).await; drain_manager(manager, backend.clone()).await; @@ -239,14 +238,13 @@ async fn test_block_constraint() { async fn test_kill_constraint() { let mut state = vec![]; let backend = Arc::new(Mutex::new(MockMutatingBackend::default())); - let (mut manager, mut state_receiver) = init::<_, Vec<_>>(); + let (mut manager, mut state_receiver) = init::<_, Vec<_>, _>(); state_receiver .add_callback( DelayedBackendMutatingRequest("This message should get killed!".to_string()), |state, new| state.push(new), None, ) - .await .unwrap(); manager.spawn_next_task(&backend).await; state_receiver @@ -255,7 +253,6 @@ async fn test_kill_constraint() { |state, new| state.push(new), Some(Constraint::new_kill_same_type()), ) - .await .unwrap(); manager.spawn_next_task(&backend).await; drain_manager(manager, backend.clone()).await; @@ -272,14 +269,13 @@ async fn test_kill_constraint() { async fn test_block_constraint_stream() { let backend = Arc::new(Mutex::new(MockMutatingBackend::default())); let mut state = vec![]; - let (mut manager, mut state_receiver) = init::<_, Vec<_>>(); + let (mut manager, mut state_receiver) = init::<_, Vec<_>, _>(); state_receiver .add_stream_callback( DelayedBackendMutatingStreamingCounterTask(5), |state, new| state.push(new), None, ) - .await .unwrap(); manager.spawn_next_task(&backend).await; state_receiver @@ -288,7 +284,6 @@ async fn test_block_constraint_stream() { |state, new| state.push(new), Some(Constraint::new_block_same_type()), ) - .await .unwrap(); manager.spawn_next_task(&backend).await; drain_manager(manager, backend.clone()).await; @@ -305,14 +300,13 @@ async fn test_block_constraint_stream() { async fn test_kill_constraint_stream() { let backend = Arc::new(Mutex::new(MockMutatingBackend::default())); let mut state = vec![]; - let (mut manager, mut state_receiver) = init::<_, Vec<_>>(); + let (mut manager, mut state_receiver) = init::<_, Vec<_>, _>(); state_receiver .add_stream_callback( DelayedBackendMutatingStreamingCounterTask(5), |state, new| state.push(new), None, ) - .await .unwrap(); manager.spawn_next_task(&backend).await; state_receiver @@ -321,7 +315,6 @@ async fn test_kill_constraint_stream() { |state, new| state.push(new), Some(Constraint::new_kill_same_type()), ) - .await .unwrap(); manager.spawn_next_task(&backend).await; drain_manager(manager, backend.clone()).await; @@ -336,7 +329,7 @@ async fn test_kill_constraint_stream() { #[tokio::test] async fn test_task_received_callback() { - let (manager, state_receiver) = init::<(), ()>(); + let (manager, state_receiver) = init::<(), (), _>(); let task_received = Arc::new(std::sync::Mutex::new(false)); let task_received_clone = task_received.clone(); let mut manager = manager.with_on_task_received_callback(move |resp| { @@ -349,7 +342,6 @@ async fn test_task_received_callback() { |_, _| {}, None, ) - .await .unwrap(); manager.manage_next_event(&()).await.unwrap(); assert!(*task_received.lock().unwrap()); @@ -357,7 +349,7 @@ async fn test_task_received_callback() { #[tokio::test] async fn test_response_received_callback() { - let (manager, state_receiver) = init::<(), ()>(); + let (manager, state_receiver) = init::<(), (), _>(); let response_received = Arc::new(std::sync::Mutex::new(false)); let response_received_clone = response_received.clone(); let task_is_now_finished = Arc::new(std::sync::Mutex::new(false)); @@ -373,7 +365,6 @@ async fn test_response_received_callback() { |_, _| {}, None, ) - .await .unwrap(); manager.manage_next_event(&()).await.unwrap(); manager.manage_next_event(&()).await.unwrap(); diff --git a/async-rodio-sink/src/lib.rs b/async-rodio-sink/src/lib.rs index 488d4ee..47cf1b8 100644 --- a/async-rodio-sink/src/lib.rs +++ b/async-rodio-sink/src/lib.rs @@ -43,7 +43,7 @@ pub struct ProgressUpdate(Duration, I); // At this stage this difference between DonePlaying and Stopped is very thin. // DonePlaying means that the song has been dropped by the player, whereas // Stopped simply means that a Stop message to the player was succesful. -pub struct Stopped(I); +pub struct Stopped(pub I); pub enum PausePlayResponse { Paused(I), Resumed(I), diff --git a/youtui/src/app.rs b/youtui/src/app.rs index 2c999ce..df2820f 100644 --- a/youtui/src/app.rs +++ b/youtui/src/app.rs @@ -35,7 +35,6 @@ thread_local! { } const CALLBACK_CHANNEL_SIZE: usize = 64; -const ASYNC_CALLBACK_MANAGER_CHANNEL_SIZE: usize = 64; const ASYNC_CALLBACK_SENDER_CHANNEL_SIZE: usize = 64; const EVENT_CHANNEL_SIZE: usize = 256; const LOG_FILE_NAME: &str = "debug.log"; @@ -94,20 +93,19 @@ impl Youtui { })); // Setup components let (callback_tx, callback_rx) = mpsc::channel(CALLBACK_CHANNEL_SIZE); - let mut task_manager = - async_callback_manager::AsyncCallbackManager::new(ASYNC_CALLBACK_MANAGER_CHANNEL_SIZE) - .with_on_task_received_callback(|task| { - info!( - "Received task {:?}: - type_id: {:?}, sender_id: {:?}, constraint: {:?}", - task.type_name, task.type_id, task.sender_id, task.constraint - ) - }) - .with_on_response_received_callback(|response| { - info!( - "Received response to {:?}: - type_id: {:?}, sender_id: {:?}, task_id: {:?}", - response.type_name, response.type_id, response.sender_id, response.task_id - ) - }); + let mut task_manager = async_callback_manager::AsyncCallbackManager::new() + .with_on_task_received_callback(|task| { + info!( + "Received task {:?}: - type_id: {:?}, sender_id: {:?}, constraint: {:?}", + task.type_name, task.type_id, task.sender_id, task.constraint + ) + }) + .with_on_response_received_callback(|response| { + info!( + "Received response to {:?}: - type_id: {:?}, sender_id: {:?}, task_id: {:?}", + response.type_name, response.type_id, response.sender_id, response.task_id + ) + }); let server = Arc::new(server::Server::new(api_key, po_token)); let backend = CrosstermBackend::new(stdout); let terminal = Terminal::new(backend)?; diff --git a/youtui/src/app/server.rs b/youtui/src/app/server.rs index 3b8b0db..5f89243 100644 --- a/youtui/src/app/server.rs +++ b/youtui/src/app/server.rs @@ -93,11 +93,11 @@ pub struct DownloadSong(pub VideoID<'static>, pub ListSongID); // Volume will now be 10 - should be 15, should not allow caller to cause this. pub struct IncreaseVolume(pub i8); pub struct Seek { - duration: Duration, - direction: SeekDirection, + pub duration: Duration, + pub direction: SeekDirection, } -pub struct Stop(ListSongID); -pub struct PausePlay(ListSongID); +pub struct Stop(pub ListSongID); +pub struct PausePlay(pub ListSongID); // Play a song, starting from the start, regardless what's queued. pub struct PlaySong { song: Arc, diff --git a/youtui/src/app/server/downloader.rs b/youtui/src/app/server/downloader.rs index ebe063f..34a245b 100644 --- a/youtui/src/app/server/downloader.rs +++ b/youtui/src/app/server/downloader.rs @@ -19,8 +19,8 @@ mod utils; #[derive(Debug)] pub struct DownloadProgressUpdate { - kind: DownloadProgressUpdateType, - id: ListSongID, + pub kind: DownloadProgressUpdateType, + pub id: ListSongID, } #[derive(Debug)] diff --git a/youtui/src/app/ui.rs b/youtui/src/app/ui.rs index 986e01d..183f295 100644 --- a/youtui/src/app/ui.rs +++ b/youtui/src/app/ui.rs @@ -8,12 +8,14 @@ use super::component::actionhandler::{ use super::keycommand::{ CommandVisibility, DisplayableCommand, DisplayableMode, KeyCommand, Keymap, }; -use super::server::ArcServer; +use super::server::{ArcServer, IncreaseVolume}; use super::view::{DrawableMut, Scrollable}; -use super::AppCallback; use super::{server, structures::*}; +use super::{AppCallback, ASYNC_CALLBACK_SENDER_CHANNEL_SIZE}; use crate::app::server::downloader::DownloadProgressUpdateType; use crate::core::send_or_error; +use async_callback_manager::{AsyncCallbackSender, Constraint}; +use async_rodio_sink::VolumeUpdate; use crossterm::event::{Event, KeyCode, KeyEvent, KeyModifiers}; use draw::draw_app; use ratatui::widgets::{ListState, TableState}; @@ -69,6 +71,7 @@ pub struct YoutuiWindow { keybinds: Vec>, key_stack: Vec, help: HelpMenu, + async_tx: AsyncCallbackSender, } pub struct HelpMenu { @@ -304,7 +307,6 @@ impl YoutuiWindow { callback_tx: mpsc::Sender, callback_manager: &mut async_callback_manager::AsyncCallbackManager, ) -> YoutuiWindow { - // TODO: derive default YoutuiWindow { context: WindowContext::Browser, prev_context: WindowContext::Browser, @@ -315,6 +317,7 @@ impl YoutuiWindow { key_stack: Vec::new(), help: Default::default(), callback_tx, + async_tx: callback_manager.new_sender(ASYNC_CALLBACK_SENDER_CHANNEL_SIZE), } } pub async fn async_update(&mut self) { @@ -347,7 +350,11 @@ impl YoutuiWindow { pub async fn handle_increase_volume(&mut self, inc: i8) { // Visually update the state first for instant feedback. self.increase_volume(inc); - send_or_error(&self.callback_tx, AppCallback::IncreaseVolume(inc)).await; + self.async_tx.add_callback( + IncreaseVolume(inc), + Self::handle_volume_update, + Some(Constraint::new_block_same_type()), + ); } pub async fn handle_seek(&mut self, inc: i8) { self.playlist.handle_seek(inc).await @@ -370,14 +377,11 @@ impl YoutuiWindow { pub async fn handle_playing(&mut self, duration: Option, id: ListSongID) { self.playlist.handle_playing(duration, id) } - pub async fn handle_stopped(&mut self, id: ListSongID) { - self.playlist.handle_stopped(id) - } pub async fn handle_set_to_error(&mut self, id: ListSongID) { self.playlist.handle_set_to_error(id) } - pub fn handle_set_volume(&mut self, p: Percentage) { - self.playlist.handle_set_volume(p) + pub fn handle_volume_update(&mut self, update: Option) { + self.playlist.handle_volume_update(update) } pub async fn handle_set_song_play_progress(&mut self, d: Duration, id: ListSongID) { self.playlist.handle_set_song_play_progress(d, id).await; diff --git a/youtui/src/app/ui/playlist.rs b/youtui/src/app/ui/playlist.rs index d075dbd..a8d16ba 100644 --- a/youtui/src/app/ui/playlist.rs +++ b/youtui/src/app/ui/playlist.rs @@ -1,5 +1,5 @@ -use crate::app::server::downloader::DownloadProgressUpdateType; -use crate::app::server::{ArcServer, IncreaseVolume, Server}; +use crate::app::server::downloader::{DownloadProgressUpdate, DownloadProgressUpdateType}; +use crate::app::server::{ArcServer, DownloadSong, IncreaseVolume, Server, Stop}; use crate::app::structures::{Percentage, SongListComponent}; use crate::app::view::draw::draw_table; use crate::app::view::{BasicConstraint, DrawableMut, TableItem}; @@ -14,7 +14,7 @@ use crate::app::{ use crate::app::CALLBACK_CHANNEL_SIZE; use crate::{app::structures::DownloadStatus, core::send_or_error}; use async_callback_manager::{AsyncCallbackManager, AsyncCallbackSender, Constraint}; -use async_rodio_sink::VolumeUpdate; +use async_rodio_sink::{Stopped, VolumeUpdate}; use crossterm::event::KeyCode; use ratatui::widgets::TableState; use ratatui::{layout::Rect, Frame}; @@ -210,15 +210,13 @@ impl Playlist { ) -> Self { let async_tx = callback_manager.new_sender(CALLBACK_CHANNEL_SIZE); // Ensure volume is synced with player. - async_tx - .add_callback( - // Since IncreaseVolume responds back with player volume after change, this is a - // neat hack. - IncreaseVolume(0), - Self::handle_set_volume, - Some(Constraint::new_block_same_type()), - ) - .await; + async_tx.add_callback( + // Since IncreaseVolume responds back with player volume after change, this is a + // neat hack. + IncreaseVolume(0), + Self::handle_volume_update, + Some(Constraint::new_block_same_type()), + ); Playlist { ui_tx, volume: Percentage(50), @@ -282,7 +280,9 @@ impl Playlist { } else { // Stop current song, but only if next song is buffering. if let Some(cur_id) = self.get_cur_playing_id() { - send_or_error(&self.ui_tx, AppCallback::Stop(cur_id)).await; + // TODO: Consider how race condition is supposed to be handled with this. + self.async_tx + .add_callback(Stop(cur_id), Self::handle_stopped, None); } self.play_status = PlayState::Buffering(id); self.queue_status = QueueState::NotQueued; @@ -293,7 +293,9 @@ impl Playlist { pub async fn reset(&mut self) { // Stop playback, if playing. if let Some(cur_id) = self.get_cur_playing_id() { - send_or_error(&self.ui_tx, AppCallback::Stop(cur_id)).await; + // TODO: Consider how race condition is supposed to be handled with this. + self.async_tx + .add_callback(Stop(cur_id), Self::handle_stopped, None); } self.clear() // XXX: Also need to kill pending download tasks @@ -361,11 +363,16 @@ impl Playlist { | DownloadStatus::Queued => return, _ => (), }; - send_or_error( - &self.ui_tx, - AppCallback::DownloadSong(song.raw.video_id.clone(), id), - ) - .await; + // TODO: Consider how to handle race conditions. + // XXX: Need to add async callback handling mechanism! + self.async_tx.add_stream_callback( + DownloadSong(song.raw.video_id.clone(), id), + |this, item| { + let DownloadProgressUpdate { kind, id } = item; + this.handle_song_download_progress_update(kind, id); + }, + None, + ); song.download_status = DownloadStatus::Queued; } /// Update the volume in the UI for immediate visual feedback - response @@ -672,7 +679,7 @@ impl Playlist { } } /// Handle volume message from server - pub fn handle_set_volume(&mut self, response: Option) { + pub fn handle_volume_update(&mut self, response: Option) { if let Some(v) = response { self.volume = Percentage(v.0 .0) } @@ -774,7 +781,9 @@ impl Playlist { } } /// Handle stopped message from server - pub fn handle_stopped(&mut self, id: ListSongID) { + pub fn handle_stopped(&mut self, id: Option>) { + let Some(Stopped(id)) = id else { return }; + // TODO: Hoist info up. info!("Received message that playback {:?} has been stopped", id); if self.check_id_is_cur(id) { info!("Stopping {:?}", id);