From 74a2efd87cb54f15df904f3092d43a66888112d8 Mon Sep 17 00:00:00 2001 From: Nick Dowsett Date: Sun, 24 Nov 2024 20:58:38 +0800 Subject: [PATCH] Now compiles, downloads is bugged --- async-callback-manager/src/manager.rs | 83 +++++++---- async-callback-manager/src/task.rs | 45 ++++-- youtui/src/app.rs | 39 +++-- youtui/src/app/component/actionhandler.rs | 17 +++ youtui/src/app/ui.rs | 50 +++++-- youtui/src/app/ui/logger.rs | 16 +- youtui/src/app/ui/playlist.rs | 169 +++++++++++----------- youtui/src/core.rs | 1 - 8 files changed, 255 insertions(+), 165 deletions(-) diff --git a/async-callback-manager/src/manager.rs b/async-callback-manager/src/manager.rs index a6741b6..7e5ad3e 100644 --- a/async-callback-manager/src/manager.rs +++ b/async-callback-manager/src/manager.rs @@ -1,12 +1,12 @@ use crate::{ task::{ - AsyncTask, AsyncTaskKind, FutureTask, ResponseInformation, SpawnedTask, StreamTask, - TaskInformation, TaskList, TaskOutcome, TaskWaiter, + AsyncTask, AsyncTaskKind, FutureTask, SpawnedTask, StreamTask, TaskInformation, TaskList, + TaskOutcome, TaskWaiter, }, Constraint, DEFAULT_STREAM_CHANNEL_SIZE, }; use futures::{Stream, StreamExt}; -use std::{any::TypeId, future::Future}; +use std::{any::TypeId, future::Future, sync::Arc}; #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub struct TaskId(pub(crate) usize); @@ -23,7 +23,6 @@ pub(crate) type DynStreamTask = Box DynMutationStream>; pub(crate) type DynTaskSpawnCallback = dyn Fn(TaskInformation); -pub(crate) type DynResponseReceivedCallback = dyn Fn(ResponseInformation); pub struct AsyncCallbackManager { next_task_id: usize, @@ -31,10 +30,17 @@ pub struct AsyncCallbackManager { // It could be possible to make these generic instead of dynamic, however this type would then // require 2 more type parameters. on_task_spawn: Box>, - on_response_received: Box, on_id_overflow: Box, } +/// Temporary struct to store task details before it is added to the task list. +pub(crate) struct TempSpawnedTask { + waiter: TaskWaiter, + type_id: TypeId, + type_name: &'static str, + type_debug: Arc, +} + impl Default for AsyncCallbackManager { fn default() -> Self { Self::new() @@ -48,7 +54,6 @@ impl AsyncCallbackManager { next_task_id: Default::default(), tasks_list: TaskList::new(), on_task_spawn: Box::new(|_| {}), - on_response_received: Box::new(|_| {}), on_id_overflow: Box::new(|| {}), } } @@ -63,15 +68,6 @@ impl AsyncCallbackManager { self.on_task_spawn = Box::new(cb); self } - // TODO: when is this called? - pub fn with_on_response_received_callback( - mut self, - cb: impl Fn(ResponseInformation) + 'static, - ) -> Self { - todo!(); - self.on_response_received = Box::new(cb); - self - } /// Await for the next response from one of the spawned tasks, or returns /// None if no tasks were in the list. pub async fn get_next_response(&mut self) -> Option> { @@ -88,20 +84,41 @@ impl AsyncCallbackManager { constraint, metadata, } = task; - let (waiter, type_id, type_name) = match task { + match task { AsyncTaskKind::Future(future_task) => { - self.spawn_future_task(backend, future_task, &constraint) + let outcome = self.spawn_future_task(backend, future_task, &constraint); + self.add_task_to_list(outcome, metadata, constraint); } AsyncTaskKind::Stream(stream_task) => { - self.spawn_stream_task(backend, stream_task, &constraint) + let outcome = self.spawn_stream_task(backend, stream_task, &constraint); + self.add_task_to_list(outcome, metadata, constraint); } // Don't call (self.on_task_spawn)() for NoOp. - AsyncTaskKind::NoOp => return, - }; + AsyncTaskKind::Multi(tasks) => { + for task in tasks { + self.spawn_task(backend, task) + } + } + AsyncTaskKind::NoOp => (), + } + } + fn add_task_to_list( + &mut self, + details: TempSpawnedTask, + metadata: Vec, + constraint: Option>, + ) { + let TempSpawnedTask { + waiter, + type_id, + type_name, + type_debug, + } = details; let sp = SpawnedTask { type_id, task_id: TaskId(self.next_task_id), type_name, + type_debug, receiver: waiter, metadata, }; @@ -123,7 +140,7 @@ impl AsyncCallbackManager { backend: &Bkend, future_task: FutureTask, constraint: &Option>, - ) -> (TaskWaiter, TypeId, &'static str) + ) -> TempSpawnedTask where Frntend: 'static, Bkend: 'static, @@ -132,23 +149,24 @@ impl AsyncCallbackManager { (self.on_task_spawn)(TaskInformation { type_id: future_task.type_id, type_name: future_task.type_name, - type_debug: future_task.type_debug, + type_debug: &future_task.type_debug, constraint, }); let future = (future_task.task)(backend); let handle = tokio::spawn(future); - ( - TaskWaiter::Future(handle), - future_task.type_id, - future_task.type_name, - ) + TempSpawnedTask { + waiter: TaskWaiter::Future(handle), + type_id: future_task.type_id, + type_name: future_task.type_name, + type_debug: Arc::new(future_task.type_debug), + } } fn spawn_stream_task( &self, backend: &Bkend, stream_task: StreamTask, constraint: &Option>, - ) -> (TaskWaiter, TypeId, &'static str) + ) -> TempSpawnedTask where Frntend: 'static, Bkend: 'static, @@ -163,7 +181,7 @@ impl AsyncCallbackManager { (self.on_task_spawn)(TaskInformation { type_id, type_name, - type_debug, + type_debug: &type_debug, constraint, }); let mut stream = task(backend); @@ -180,13 +198,14 @@ impl AsyncCallbackManager { } }) .abort_handle(); - ( - TaskWaiter::Stream { + TempSpawnedTask { + waiter: TaskWaiter::Stream { receiver: rx, abort_handle, }, type_id, type_name, - ) + type_debug: Arc::new(type_debug), + } } } diff --git a/async-callback-manager/src/task.rs b/async-callback-manager/src/task.rs index d886c69..635f675 100644 --- a/async-callback-manager/src/task.rs +++ b/async-callback-manager/src/task.rs @@ -6,6 +6,7 @@ use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use std::{ any::{type_name, TypeId}, fmt::Debug, + sync::Arc, }; use tokio::{ sync::mpsc, @@ -24,6 +25,7 @@ pub struct AsyncTask { pub(crate) enum AsyncTaskKind { Future(FutureTask), Stream(StreamTask), + Multi(Vec>), NoOp, } @@ -41,6 +43,20 @@ pub(crate) struct FutureTask { pub(crate) type_debug: String, } +impl FromIterator> + for AsyncTask +{ + fn from_iter>>(iter: T) -> Self { + let v = iter.into_iter().collect(); + // TODO: Better handle constraints / metadata. + AsyncTask { + task: AsyncTaskKind::Multi(v), + constraint: None, + metadata: vec![], + } + } +} + impl AsyncTask { pub fn new_no_op() -> AsyncTask { Self { @@ -286,6 +302,14 @@ impl AsyncTask { constraint, metadata, }, + AsyncTaskKind::Multi(v) => { + let mapped = v.into_iter().map(|task| task.map(f.clone())).collect(); + AsyncTask { + task: AsyncTaskKind::Multi(mapped), + constraint, + metadata, + } + } } } } @@ -297,26 +321,18 @@ pub(crate) struct TaskList { pub(crate) struct SpawnedTask { pub(crate) type_id: TypeId, pub(crate) type_name: &'static str, + pub(crate) type_debug: Arc, pub(crate) receiver: TaskWaiter, pub(crate) task_id: TaskId, pub(crate) metadata: Vec, } -// User visible struct for introspection. -#[derive(Debug, Clone)] -pub struct ResponseInformation { - pub type_id: TypeId, - pub type_name: &'static str, - pub task_id: TaskId, - pub task_is_now_finished: bool, -} - -// User visible struct for introspection. +/// User visible struct for introspection. #[derive(Debug, Clone)] pub struct TaskInformation<'a, Cstrnt> { pub type_id: TypeId, pub type_name: &'static str, - pub type_debug: String, + pub type_debug: &'a str, pub constraint: &'a Option>, } @@ -363,6 +379,7 @@ pub enum TaskOutcome { error: JoinError, type_id: TypeId, type_name: &'static str, + type_debug: Arc, task_id: TaskId, }, /// Mutation was received from a task. @@ -370,6 +387,7 @@ pub enum TaskOutcome { mutation: DynStateMutation, type_id: TypeId, type_name: &'static str, + type_debug: Arc, task_id: TaskId, }, } @@ -392,8 +410,9 @@ impl TaskList { TaskOutcome::MutationReceived { mutation, type_id: task.type_id, - type_name: task.type_name, + type_debug: task.type_debug.clone(), task_id: task.task_id, + type_name: task.type_name, }, ), Err(error) => ( @@ -401,6 +420,7 @@ impl TaskList { TaskOutcome::TaskPanicked { type_id: task.type_id, type_name: task.type_name, + type_debug: task.type_debug.clone(), task_id: task.task_id, error, }, @@ -417,6 +437,7 @@ impl TaskList { type_id: task.type_id, type_name: task.type_name, task_id: task.task_id, + type_debug: task.type_debug.clone(), }, ); } diff --git a/youtui/src/app.rs b/youtui/src/app.rs index 1804e1f..33f9181 100644 --- a/youtui/src/app.rs +++ b/youtui/src/app.rs @@ -14,7 +14,7 @@ use std::borrow::Cow; use std::{io, sync::Arc}; use structures::ListSong; use tokio::sync::mpsc; -use tracing::info; +use tracing::{error, info, warn}; use tracing_subscriber::prelude::*; use ui::WindowContext; use ui::YoutuiWindow; @@ -35,7 +35,6 @@ thread_local! { } const CALLBACK_CHANNEL_SIZE: usize = 64; -const ASYNC_CALLBACK_SENDER_CHANNEL_SIZE: usize = 64; const EVENT_CHANNEL_SIZE: usize = 256; const LOG_FILE_NAME: &str = "debug.log"; @@ -98,17 +97,18 @@ impl Youtui { task.type_debug, task.type_id, task.constraint ) }) - .with_on_response_received_callback(|response| { - info!( - "Received response to {:?}: type_id: {:?}, sender_id: {:?}, task_id: {:?}", - response.type_name, response.type_id, response.sender_id, response.task_id - ) - }); + .with_on_id_overflow_callback(|| warn!("Task IDs have overflowed. New tasks will temporarily not block or kill existing tasks")); let server = Arc::new(server::Server::new(api_key, po_token)); let backend = CrosstermBackend::new(stdout); let terminal = Terminal::new(backend)?; let event_handler = EventHandler::new(EVENT_CHANNEL_SIZE)?; - let window_state = YoutuiWindow::new(callback_tx, &config); + let (window_state, effect) = YoutuiWindow::new(callback_tx, &config); + // Even the creation of a YoutuiWindow causes an effect. We'll spawn it straight + // away. + task_manager.spawn_task( + &server, + effect.map(|this: &mut Self| &mut this.window_state), + ); Ok(Youtui { status: AppStatus::Running, event_handler, @@ -158,19 +158,26 @@ impl Youtui { } fn handle_effect(&mut self, effect: TaskOutcome) { match effect { - async_callback_manager::TaskOutcome::StreamClosed => (), + async_callback_manager::TaskOutcome::StreamClosed => { + info!("Received a stream closed message from task manager") + } async_callback_manager::TaskOutcome::TaskPanicked { - error, - type_id, - type_name, - task_id, - } => panic!("Panicked running task {type_name}"), + type_debug, error, .. + } => { + error!("Task {type_debug} panicked!"); + std::panic::resume_unwind(error.into_panic()) + } async_callback_manager::TaskOutcome::MutationReceived { mutation, type_id, - type_name, + type_debug, task_id, + .. } => { + info!( + "Received response to {:?}: type_id: {:?}, task_id: {:?}", + type_debug, type_id, task_id + ); let next_task = mutation(self); self.task_manager.spawn_task(&self.server, next_task); } diff --git a/youtui/src/app/component/actionhandler.rs b/youtui/src/app/component/actionhandler.rs index 616dcbd..fdc79a2 100644 --- a/youtui/src/app/component/actionhandler.rs +++ b/youtui/src/app/component/actionhandler.rs @@ -145,6 +145,23 @@ pub enum KeyHandleAction { Mode, NoMap, } +impl KeyHandleOutcome +where + Frntend: 'static, + Bkend: 'static, + Md: 'static, +{ + pub fn map( + self, + f: impl Fn(&mut NewFrntend) -> &mut Frntend + Send + Clone + 'static, + ) -> KeyHandleOutcome { + match self { + KeyHandleOutcome::Action(a) => KeyHandleOutcome::Action(a.map(f)), + KeyHandleOutcome::Mode => KeyHandleOutcome::Mode, + KeyHandleOutcome::NoMap => KeyHandleOutcome::NoMap, + } + } +} /// The action from handling a key event (no Action type required) pub enum KeyHandleOutcome { Action(AsyncTask), diff --git a/youtui/src/app/ui.rs b/youtui/src/app/ui.rs index d3f0beb..824cf04 100644 --- a/youtui/src/app/ui.rs +++ b/youtui/src/app/ui.rs @@ -312,17 +312,29 @@ impl TextHandler for YoutuiWindow { } fn handle_event_repr(&mut self, event: &Event) -> Option> { match self.context { - WindowContext::Browser => self.browser.handle_event_repr(event), - WindowContext::Playlist => self.playlist.handle_event_repr(event), - WindowContext::Logs => self.logger.handle_event_repr(event), + WindowContext::Browser => self + .browser + .handle_event_repr(event) + .map(|effect| effect.map(|this: &mut YoutuiWindow| &mut this.browser)), + WindowContext::Playlist => self + .playlist + .handle_event_repr(event) + .map(|effect| effect.map(|this: &mut YoutuiWindow| &mut this.playlist)), + WindowContext::Logs => self + .logger + .handle_event_repr(event) + .map(|effect| effect.map(|this: &mut YoutuiWindow| &mut this.logger)), } } } impl YoutuiWindow { - pub fn new(callback_tx: mpsc::Sender, config: &Config) -> YoutuiWindow { + pub fn new( + callback_tx: mpsc::Sender, + config: &Config, + ) -> (YoutuiWindow, ComponentEffect) { let (playlist, task) = Playlist::new(callback_tx.clone()); - YoutuiWindow { + let this = YoutuiWindow { context: WindowContext::Browser, prev_context: WindowContext::Browser, playlist, @@ -332,7 +344,8 @@ impl YoutuiWindow { key_stack: Vec::new(), help: Default::default(), callback_tx, - } + }; + (this, task.map(|this: &mut Self| &mut this.playlist)) } // Splitting out event types removes one layer of indentation. pub async fn handle_initial_event( @@ -416,7 +429,7 @@ impl YoutuiWindow { // dominant. TODO: Remove allocation match handle_key_stack(self.get_this_keybinds(), self.key_stack.clone()) { KeyHandleAction::Action(a) => { - let effect = a.apply(&mut self).await; + let effect = a.apply(self).await; self.key_stack.clear(); return effect; } @@ -433,19 +446,28 @@ impl YoutuiWindow { let subcomponents_outcome = match self.context { // TODO: Remove allocation WindowContext::Browser => { - handle_key_stack_and_action(&mut self.browser, self.key_stack.clone()).await + handle_key_stack_and_action(&mut self.browser, self.key_stack.clone()) + .await + .map(|this: &mut Self| &mut this.browser) } WindowContext::Playlist => { - handle_key_stack_and_action(&mut self.playlist, self.key_stack.clone()).await + handle_key_stack_and_action(&mut self.playlist, self.key_stack.clone()) + .await + .map(|this: &mut Self| &mut this.playlist) } WindowContext::Logs => { - handle_key_stack_and_action(&mut self.logger, self.key_stack.clone()).await + handle_key_stack_and_action(&mut self.logger, self.key_stack.clone()) + .await + .map(|this: &mut Self| &mut this.logger) } }; - if let KeyHandleAction::Mode = subcomponents_outcome { - return AsyncTask::new_no_op(); - } - self.key_stack.clear() + let effect = match subcomponents_outcome { + KeyHandleOutcome::Action(a) => a, + KeyHandleOutcome::Mode => return AsyncTask::new_no_op(), + KeyHandleOutcome::NoMap => AsyncTask::new_no_op(), + }; + self.key_stack.clear(); + effect } fn key_pending(&self) -> bool { !self.key_stack.is_empty() diff --git a/youtui/src/app/ui/logger.rs b/youtui/src/app/ui/logger.rs index 459cb90..d533c20 100644 --- a/youtui/src/app/ui/logger.rs +++ b/youtui/src/app/ui/logger.rs @@ -1,11 +1,12 @@ use crate::app::{ - component::actionhandler::{Action, Component, KeyRouter, TextHandler}, + component::actionhandler::{Action, Component, ComponentEffect, KeyRouter, TextHandler}, keycommand::KeyCommand, server::{ArcServer, TaskMetadata}, ui::AppCallback, view::Drawable, }; use crate::core::send_or_error; +use async_callback_manager::AsyncTask; use crossterm::event::KeyCode; use draw::draw_logger; use ratatui::{prelude::Rect, Frame}; @@ -55,10 +56,7 @@ impl Action for LoggerAction { LoggerAction::ExitPageMode => "Exit Page Mode".into(), } } - async fn apply( - self, - state: &mut Self::State, - ) -> crate::app::component::actionhandler::ComponentEffect + async fn apply(self, state: &mut Self::State) -> ComponentEffect where Self: Sized, { @@ -77,6 +75,7 @@ impl Action for LoggerAction { LoggerAction::ExitPageMode => state.handle_exit_page_mode(), LoggerAction::ViewBrowser => state.handle_view_browser().await, } + AsyncTask::new_no_op() } } pub struct Logger { @@ -116,8 +115,11 @@ impl TextHandler for Logger { fn clear_text(&mut self) -> bool { false } - fn handle_event_repr(&mut self, _event: &crossterm::event::Event) -> bool { - false + fn handle_event_repr( + &mut self, + _event: &crossterm::event::Event, + ) -> Option> { + None } } diff --git a/youtui/src/app/ui/playlist.rs b/youtui/src/app/ui/playlist.rs index 60b7b92..ed27f7d 100644 --- a/youtui/src/app/ui/playlist.rs +++ b/youtui/src/app/ui/playlist.rs @@ -219,9 +219,7 @@ impl SongListComponent for Playlist { // Primatives impl Playlist { /// When creating a Playlist, an effect is also created. - pub fn new( - ui_tx: mpsc::Sender, - ) -> (Self, AsyncTask) { + pub fn new(ui_tx: mpsc::Sender) -> (Self, ComponentEffect) { // Ensure volume is synced with player. let task = AsyncTask::new_future( // Since IncreaseVolume responds back with player volume after change, this is a @@ -246,7 +244,7 @@ impl Playlist { /// Add a task to: /// - Stop playback of the song 'song_id', if it is still playing. /// - If stop was succesful, update state. - pub fn stop_song_id(&self, song_id: ListSongID) -> AsyncTask { + pub fn stop_song_id(&self, song_id: ListSongID) -> ComponentEffect { AsyncTask::new_future( Stop(song_id), Self::handle_stopped, @@ -258,11 +256,11 @@ impl Playlist { /// Drop downloads no longer relevant for ID, download new /// relevant downloads, start playing song at ID, set PlayState. If the /// selected song is buffering, stop playback until it's complete. - pub fn play_song_id(&mut self, id: ListSongID) -> AsyncTask { + pub fn play_song_id(&mut self, id: ListSongID) -> ComponentEffect { // Drop previous songs self.drop_unscoped_from_id(id); // Queue next downloads - self.download_upcoming_from_id(id); + let mut effects = vec![self.download_upcoming_from_id(id)]; // Reset duration self.cur_played_dur = None; if let Some(song_index) = self.get_index_from_id(id) { @@ -278,37 +276,43 @@ impl Playlist { let constraint = Some(Constraint::new_block_matching_metadata( TaskMetadata::PlayingSong, )); - let handle_update = move |this: &mut Self, update| { - match update { - Ok(u) => this.handle_play_update(u), - Err(e) => { - error!("Error {e} received when trying to decode {:?}", id); - this.handle_set_to_error(id); - AsyncTask::new_no_op() - } - }; + let handle_update = move |this: &mut Self, update| match update { + Ok(u) => this.handle_play_update(u), + Err(e) => { + error!("Error {e} received when trying to decode {:?}", id); + this.handle_set_to_error(id); + AsyncTask::new_no_op() + } }; self.play_status = PlayState::Playing(id); self.queue_status = QueueState::NotQueued; - return AsyncTask::new_stream(task, handle_update, constraint); + effects.push(AsyncTask::new_stream_chained( + task, + handle_update, + constraint, + )); + return effects.into_iter().collect(); } else { // Stop current song, but only if next song is buffering. - if let Some(cur_id) = self.get_cur_playing_id() { - self.stop_song_id(cur_id); - } + let effect = self + .get_cur_playing_id() + .map(|cur_id| self.stop_song_id(cur_id)); self.play_status = PlayState::Buffering(id); self.queue_status = QueueState::NotQueued; + if let Some(effect) = effect { + effects.push(effect); + } } } - AsyncTask::new_no_op() + effects.into_iter().collect() } /// Drop downloads no longer relevant for ID, download new /// relevant downloads, start playing song at ID, set PlayState. - pub fn autoplay_song_id(&mut self, id: ListSongID) -> AsyncTask { + pub fn autoplay_song_id(&mut self, id: ListSongID) -> ComponentEffect { // Drop previous songs self.drop_unscoped_from_id(id); // Queue next downloads - self.download_upcoming_from_id(id); + let mut effects = vec![self.download_upcoming_from_id(id)]; // Reset duration self.cur_played_dur = None; if let Some(song_index) = self.get_index_from_id(id) { @@ -321,30 +325,32 @@ impl Playlist { // Result. let task = DecodeSong(pointer.clone()).map_stream(move |song| AutoplaySong { song, id }); - let handle_update = move |this: &mut Self, update| { - match update { - Ok(u) => this.handle_autoplay_update(u), - Err(e) => { - error!("Error {e} received when trying to decode {:?}", id); - this.handle_set_to_error(id); - AsyncTask::new_no_op() - } - }; + let handle_update = move |this: &mut Self, update| match update { + Ok(u) => this.handle_autoplay_update(u), + Err(e) => { + error!("Error {e} received when trying to decode {:?}", id); + this.handle_set_to_error(id); + AsyncTask::new_no_op() + } }; self.play_status = PlayState::Playing(id); self.queue_status = QueueState::NotQueued; - return AsyncTask::new_stream(task, handle_update, None); + effects.push(AsyncTask::new_stream_chained(task, handle_update, None)); + return effects.into_iter().collect(); } else { // Stop current song, but only if next song is buffering. - if let Some(cur_id) = self.get_cur_playing_id() { + let effect = self + .get_cur_playing_id() // TODO: Consider how race condition is supposed to be handled with this. - self.stop_song_id(cur_id); - } + .map(|cur_id| self.stop_song_id(cur_id)); self.play_status = PlayState::Buffering(id); self.queue_status = QueueState::NotQueued; + if let Some(effect) = effect { + effects.push(effect); + } } }; - AsyncTask::new_no_op() + effects.into_iter().collect() } /// Stop playing and clear playlist. pub fn reset(&mut self) -> ComponentEffect { @@ -397,19 +403,17 @@ impl Playlist { AsyncTask::new_no_op() } /// Play song at ID, if it was buffering. - pub fn handle_song_downloaded(&mut self, id: ListSongID) { + pub fn handle_song_downloaded(&mut self, id: ListSongID) -> ComponentEffect { if let PlayState::Buffering(target_id) = self.play_status { if target_id == id { info!("Playing"); - self.play_song_id(id); + return self.play_song_id(id); } } + AsyncTask::new_no_op() } /// Download song at ID, if it is still in the list. - pub fn download_song_if_exists( - &mut self, - id: ListSongID, - ) -> AsyncTask { + pub fn download_song_if_exists(&mut self, id: ListSongID) -> ComponentEffect { let Some(song_index) = self.get_index_from_id(id) else { return AsyncTask::new_no_op(); }; @@ -427,11 +431,11 @@ impl Playlist { }; song.download_status = DownloadStatus::Queued; // TODO: Consider how to handle race conditions. - AsyncTask::new_stream( + AsyncTask::new_stream_chained( DownloadSong(song.raw.video_id.clone(), id), |this: &mut Playlist, item| { let DownloadProgressUpdate { kind, id } = item; - this.handle_song_download_progress_update(kind, id); + this.handle_song_download_progress_update(kind, id) }, None, ) @@ -469,13 +473,11 @@ impl Playlist { .map(|i| i + 1) .and_then(|i| self.get_id_from_index(i)); match next_song_id { - Some(id) => { - return self.play_song_id(id); - } + Some(id) => self.play_song_id(id), None => { info!("No next song - finishing playback"); self.queue_status = QueueState::NotQueued; - return self.stop_song_id(*id); + self.stop_song_id(*id) } } } @@ -485,11 +487,12 @@ impl Playlist { /// stopped. This is triggered when a song has finished playing. The /// softer, Autoplay message, lets the Player use gapless playback if songs /// are queued correctly. - pub fn autoplay_next_or_stop(&mut self, prev_id: ListSongID) { + pub fn autoplay_next_or_stop(&mut self, prev_id: ListSongID) -> ComponentEffect { let cur = &self.play_status; match cur { PlayState::NotPlaying | PlayState::Stopped => { warn!("Asked to play next, but not currently playing"); + AsyncTask::new_no_op() } PlayState::Paused(id) | PlayState::Playing(id) @@ -497,32 +500,30 @@ impl Playlist { | PlayState::Error(id) => { // Guard against duplicate message received. if id > &prev_id { - return; + return AsyncTask::new_no_op(); } let next_song_id = self .get_index_from_id(*id) .map(|i| i + 1) .and_then(|i| self.get_id_from_index(i)); match next_song_id { - Some(id) => { - self.autoplay_song_id(id); - } + Some(id) => self.autoplay_song_id(id), None => { info!("No next song - resetting play status"); self.queue_status = QueueState::NotQueued; // As a neat hack I only need to ask the player to stop current ID - even if // it's playing the queued track, it doesn't know about it. - self.stop_song_id(*id); + self.stop_song_id(*id) } } } } } /// Download some upcoming songs, if they aren't already downloaded. - pub fn download_upcoming_from_id(&mut self, id: ListSongID) { + pub fn download_upcoming_from_id(&mut self, id: ListSongID) -> ComponentEffect { // Won't download if already downloaded. let Some(song_index) = self.get_index_from_id(id) else { - return; + return AsyncTask::new_no_op(); }; let mut song_ids_list = Vec::new(); song_ids_list.push(id); @@ -532,9 +533,12 @@ impl Playlist { song_ids_list.push(id); } } - for song_id in song_ids_list { - self.download_song_if_exists(song_id); - } + // TODO: Don't love the way metadata and constraints are handled with this task + // type that is collected, find a better way. + song_ids_list + .into_iter() + .map(|song_id| self.download_song_if_exists(song_id)) + .collect() } /// Drop strong reference from previous songs or songs above the buffer list /// size to drop them from memory. @@ -607,7 +611,7 @@ impl Playlist { &mut self, duration: Duration, direction: SeekDirection, - ) -> AsyncTask { + ) -> ComponentEffect { // Consider if we also want to update current duration. AsyncTask::new_future_chained( Seek { @@ -682,7 +686,7 @@ impl Playlist { } /// Handle global pause/play action. Toggle state (visual), toggle playback /// (server). - pub async fn pauseplay(&mut self) -> AsyncTask { + pub async fn pauseplay(&mut self) -> ComponentEffect { let id = match self.play_status { PlayState::Playing(id) => { self.play_status = PlayState::Paused(id); @@ -716,18 +720,18 @@ impl Playlist { &mut self, update: DownloadProgressUpdateType, id: ListSongID, - ) { + ) -> ComponentEffect { // Not valid if song doesn't exist or hasn't initiated download (i.e - task // cancelled). if let Some(song) = self.get_song_from_id(id) { match song.download_status { DownloadStatus::None | DownloadStatus::Downloaded(_) | DownloadStatus::Failed => { - return + return AsyncTask::new_no_op() } _ => (), } } else { - return; + return AsyncTask::new_no_op(); } tracing::info!("Task valid - updating song download status"); match update { @@ -741,7 +745,7 @@ impl Playlist { s.download_status = DownloadStatus::Downloaded(Arc::new(song_buf)); s.id }) { - self.handle_song_downloaded(new_id) + return self.handle_song_downloaded(new_id); }; } DownloadProgressUpdateType::Error => { @@ -760,6 +764,7 @@ impl Playlist { } } } + AsyncTask::new_no_op() } /// Handle volume message from server pub fn handle_volume_update(&mut self, response: Option) { @@ -773,7 +778,7 @@ impl Playlist { return self.handle_set_song_play_progress(duration, id) } PlayUpdate::Playing(duration, id) => self.handle_playing(duration, id), - PlayUpdate::DonePlaying(id) => self.handle_done_playing(id), + PlayUpdate::DonePlaying(id) => return self.handle_done_playing(id), // This is a player invariant. PlayUpdate::Error(e) => error!("{e}"), } @@ -788,7 +793,7 @@ impl Playlist { return self.handle_set_song_play_progress(duration, id) } QueueUpdate::Queued(duration, id) => self.handle_queued(duration, id), - QueueUpdate::DonePlaying(id) => self.handle_done_playing(id), + QueueUpdate::DonePlaying(id) => return self.handle_done_playing(id), QueueUpdate::Error(e) => error!("{e}"), } AsyncTask::new_no_op() @@ -802,7 +807,7 @@ impl Playlist { return self.handle_set_song_play_progress(duration, id) } AutoplayUpdate::Playing(duration, id) => self.handle_playing(duration, id), - AutoplayUpdate::DonePlaying(id) => self.handle_done_playing(id), + AutoplayUpdate::DonePlaying(id) => return self.handle_done_playing(id), AutoplayUpdate::AutoplayQueued(id) => self.handle_autoplay_queued(id), AutoplayUpdate::Error(e) => error!("{e}"), } @@ -813,7 +818,7 @@ impl Playlist { &mut self, d: Duration, id: ListSongID, - ) -> AsyncTask { + ) -> ComponentEffect { if !self.check_id_is_cur(id) { return AsyncTask::new_no_op(); } @@ -841,18 +846,16 @@ impl Playlist { let task = DecodeSong(song.clone()).map_stream(move |song| QueueSong { song, id }); info!("Queuing up song!"); - let handle_update = move |this: &mut Self, update| { - match update { - Ok(u) => this.handle_queue_update(u), - Err(e) => { - error!("Error {e} received when trying to decode {:?}", id); - this.handle_set_to_error(id); - AsyncTask::new_no_op() - } - }; + let handle_update = move |this: &mut Self, update| match update { + Ok(u) => this.handle_queue_update(u), + Err(e) => { + error!("Error {e} received when trying to decode {:?}", id); + this.handle_set_to_error(id); + AsyncTask::new_no_op() + } }; self.queue_status = QueueState::Queued(next_song.id); - return AsyncTask::new_stream(task, handle_update, None); + return AsyncTask::new_stream_chained(task, handle_update, None); } } } @@ -860,15 +863,15 @@ impl Playlist { AsyncTask::new_no_op() } /// Handle done playing message from server - pub fn handle_done_playing(&mut self, id: ListSongID) { + pub fn handle_done_playing(&mut self, id: ListSongID) -> ComponentEffect { if QueueState::Queued(id) == self.queue_status { self.queue_status = QueueState::NotQueued; - return; + return AsyncTask::new_no_op(); } if !self.check_id_is_cur(id) { - return; + return AsyncTask::new_no_op(); } - self.autoplay_next_or_stop(id); + self.autoplay_next_or_stop(id) } /// Handle queued message from server pub fn handle_queued(&mut self, duration: Option, id: ListSongID) { diff --git a/youtui/src/core.rs b/youtui/src/core.rs index 96b98e5..1b8a2e0 100644 --- a/youtui/src/core.rs +++ b/youtui/src/core.rs @@ -1,4 +1,3 @@ -use async_callback_manager::{BackendStreamingTask, BackendTask}; use std::borrow::Borrow; use tokio::sync::mpsc; use tracing::error;