From fc10f27aed8bbc938589754c9b272b553195ba46 Mon Sep 17 00:00:00 2001 From: Nick Dowsett Date: Wed, 13 Nov 2024 08:31:18 +0800 Subject: [PATCH] Complete implementation of requests --- async-rodio-sink/src/lib.rs | 8 ++-- youtui/src/app.rs | 10 ++-- youtui/src/app/server.rs | 82 +++++++++++++++++++-------------- youtui/src/app/server/api.rs | 15 +++--- youtui/src/app/server/player.rs | 63 +++++++++++++++++++++++-- youtui/src/app/ui.rs | 7 +-- youtui/src/app/ui/playlist.rs | 37 +++++++++------ youtui/src/main.rs | 2 +- 8 files changed, 153 insertions(+), 71 deletions(-) diff --git a/async-rodio-sink/src/lib.rs b/async-rodio-sink/src/lib.rs index fb26cf9..488d4ee 100644 --- a/async-rodio-sink/src/lib.rs +++ b/async-rodio-sink/src/lib.rs @@ -19,7 +19,7 @@ use tracing::info; use tracing::warn; #[derive(Debug)] -struct Percentage(pub u8); +pub struct Percentage(pub u8); #[derive(Debug)] pub enum SeekDirection { @@ -38,7 +38,7 @@ enum AsyncRodioRequest { Seek(Duration, SeekDirection, RodioOneshot<(Duration, I)>), } -pub struct VolumeUpdate(Percentage); +pub struct VolumeUpdate(pub Percentage); pub struct ProgressUpdate(Duration, I); // At this stage this difference between DonePlaying and Stopped is very thin. // DonePlaying means that the song has been dropped by the player, whereas @@ -398,7 +398,7 @@ where pub fn new(channel_size: usize) -> Self { let (tx, mut rx) = tokio::sync::mpsc::channel::>, I>>(channel_size); - let handle = tokio::task::spawn_blocking(move || { + let handle = tokio::task::spawn(async move { // Rodio can produce output to stderr when we don't want it to, so we use Gag to // suppress stdout/stderr. The downside is that even though this runs in // a seperate thread all stderr for the whole app may be gagged. @@ -479,6 +479,7 @@ where } next_song_duration = song.total_duration(); &tx.0.send(AsyncRodioResponse::Queued(next_song_duration)); + let txs = tx.0.clone(); let song = add_periodic_access(song, PROGRESS_UPDATE_DELAY, move |s| { txs.blocking_send(AsyncRodioResponse::ProgressUpdate(s.get_pos())); }); @@ -492,6 +493,7 @@ where if !sink.empty() { sink.stop() } + let txs = tx.0.clone(); let song = add_periodic_access(song, PROGRESS_UPDATE_DELAY, move |s| { txs.blocking_send(AsyncRodioResponse::ProgressUpdate(s.get_pos())); }); diff --git a/youtui/src/app.rs b/youtui/src/app.rs index b71d466..2c999ce 100644 --- a/youtui/src/app.rs +++ b/youtui/src/app.rs @@ -44,8 +44,8 @@ pub struct Youtui { status: AppStatus, event_handler: EventHandler, window_state: YoutuiWindow, - task_manager: AsyncCallbackManager, - server: Server, + task_manager: AsyncCallbackManager>, + server: Arc, callback_rx: mpsc::Receiver, terminal: Terminal>, /// If Youtui will redraw on the next rendering loop. @@ -69,7 +69,7 @@ pub enum AppCallback { } impl Youtui { - pub fn new(rt: RuntimeInfo) -> Result { + pub async fn new(rt: RuntimeInfo) -> Result { let RuntimeInfo { api_key, debug, @@ -108,11 +108,11 @@ impl Youtui { response.type_name, response.type_id, response.sender_id, response.task_id ) }); - let server = server::Server::new(api_key, po_token); + let server = Arc::new(server::Server::new(api_key, po_token)); let backend = CrosstermBackend::new(stdout); let terminal = Terminal::new(backend)?; let event_handler = EventHandler::new(EVENT_CHANNEL_SIZE)?; - let window_state = YoutuiWindow::new(callback_tx, &mut task_manager); + let window_state = YoutuiWindow::new(callback_tx, &mut task_manager).await; Ok(Youtui { status: AppStatus::Running, event_handler, diff --git a/youtui/src/app/server.rs b/youtui/src/app/server.rs index c7ed48d..3b8b0db 100644 --- a/youtui/src/app/server.rs +++ b/youtui/src/app/server.rs @@ -1,13 +1,23 @@ +#![warn(clippy::unwrap_used)] use super::structures::ListSongID; use crate::{config::ApiKey, Result}; use api::GetArtistSongsProgressUpdate; use async_callback_manager::{BackendStreamingTask, BackendTask}; +use async_rodio_sink::AutoplayUpdate; +use async_rodio_sink::PausePlayResponse; +use async_rodio_sink::PlayUpdate; +use async_rodio_sink::ProgressUpdate; +use async_rodio_sink::QueueUpdate; +use async_rodio_sink::SeekDirection; +use async_rodio_sink::Stopped; +use async_rodio_sink::VolumeUpdate; use downloader::DownloadProgressUpdate; use downloader::Downloader; use downloader::InMemSong; use futures::Future; use futures::Stream; use std::sync::Arc; +use std::time::Duration; use ytmapi_rs::common::VideoID; use ytmapi_rs::common::{ArtistChannelID, SearchSuggestion}; use ytmapi_rs::parse::SearchResultArtist; @@ -25,9 +35,9 @@ pub type ArcServer = Arc; /// Application backend that is capable of spawning concurrent tasks in response /// to requests. Tasks each receive a handle to respond back to the caller. pub struct Server { - api: api::Api, - player: player::Player, - downloader: downloader::Downloader, + pub api: api::Api, + pub player: player::Player, + pub downloader: downloader::Downloader, } impl Server { @@ -81,8 +91,11 @@ pub struct DownloadSong(pub VideoID<'static>, pub ListSongID); // Send IncreaseVolume(5) // Send IncreaseVolume(5), killing previous task // Volume will now be 10 - should be 15, should not allow caller to cause this. -pub struct IncreaseVolume(i8); -pub struct Seek(i8); +pub struct IncreaseVolume(pub i8); +pub struct Seek { + duration: Duration, + direction: SeekDirection, +} pub struct Stop(ListSongID); pub struct PausePlay(ListSongID); // Play a song, starting from the start, regardless what's queued. @@ -139,77 +152,78 @@ impl BackendStreamingTask for DownloadSong { self, backend: &ArcServer, ) -> impl futures::Stream + Send + Unpin + 'static { + let backend = backend.clone(); backend.download_song(self.0, self.1) } } -impl BackendTask for IncreaseVolume { - type Output = (); +impl BackendTask for Seek { + type Output = Option>; fn into_future( self, - backend: &Downloader, + backend: &ArcServer, ) -> impl Future + Send + 'static { - todo!(); - async {} + let backend = backend.clone(); + async move { backend.player.seek(self.duration, self.direction).await } } } -impl BackendTask for Seek { - type Output = (); +impl BackendTask for IncreaseVolume { + type Output = Option; fn into_future( self, - backend: &Downloader, + backend: &ArcServer, ) -> impl Future + Send + 'static { - todo!(); - async {} + let backend = backend.clone(); + async move { backend.player.increase_volume(self.0).await } } } -impl BackendTask for Stop { - type Output = (); +impl BackendTask for Stop { + type Output = Option>; fn into_future( self, - backend: &Downloader, + backend: &ArcServer, ) -> impl Future + Send + 'static { - todo!(); - async {} + let backend = backend.clone(); + async move { backend.player.stop(self.0).await } } } -impl BackendTask for PausePlay { - type Output = (); +impl BackendTask for PausePlay { + type Output = Option>; fn into_future( self, - backend: &Downloader, + backend: &ArcServer, ) -> impl Future + Send + 'static { - todo!(); - async {} + let backend = backend.clone(); + async move { backend.player.pause_play(self.0).await } } } impl BackendStreamingTask for PlaySong { - type Output = (); + type Output = PlayUpdate; fn into_stream( self, backend: &ArcServer, ) -> impl Stream + Send + Unpin + 'static { - todo!(); - futures::stream::empty() + let backend = backend.clone(); + backend.player.play_song(self.song, self.id).unwrap() } } impl BackendStreamingTask for AutoplaySong { - type Output = (); + type Output = AutoplayUpdate; fn into_stream( self, backend: &ArcServer, ) -> impl Stream + Send + Unpin + 'static { - todo!(); - futures::stream::empty() + let backend = backend.clone(); + backend.player.autoplay_song(self.song, self.id).unwrap() } } impl BackendStreamingTask for QueueSong { - type Output = (); + type Output = QueueUpdate; fn into_stream( self, backend: &ArcServer, ) -> impl Stream + Send + Unpin + 'static { - todo!(); - futures::stream::empty() + let backend = backend.clone(); + backend.player.queue_song(self.song, self.id).unwrap() } } diff --git a/youtui/src/app/server/api.rs b/youtui/src/app/server/api.rs index a2e7625..b871429 100644 --- a/youtui/src/app/server/api.rs +++ b/youtui/src/app/server/api.rs @@ -68,7 +68,7 @@ impl Api { pub fn get_artist_songs( &self, browse_id: ArtistChannelID<'static>, - ) -> impl Stream { + ) -> impl Stream + 'static { let api = async { self.get_api().await.map_err(Error::new_api_error_string) }; get_artist_songs(api, browse_id) } @@ -181,7 +181,7 @@ pub enum GetArtistSongsProgressUpdate { fn get_artist_songs( api: impl Future> + Send + 'static, browse_id: ArtistChannelID<'static>, -) -> impl Stream { +) -> impl Stream + 'static { /// Bailout function that will log an error and send NoSongsFound if we get /// an unrecoverable error. async fn bailout(e: impl std::fmt::Display, tx: Sender) { @@ -198,7 +198,7 @@ fn get_artist_songs( Err(e) => return bailout(e, tx).await, Ok(api) => api, }; - let query = ytmapi_rs::query::GetArtistQuery::new(browse_id); + let query = ytmapi_rs::query::GetArtistQuery::new(&browse_id); let artist = query_api_with_retry(&api, query).await; let artist = match artist { Ok(a) => a, @@ -277,9 +277,12 @@ fn get_artist_songs( .inspect(|a_id| { tracing::info!("Spawning request for caller tracks for album ID {:?}", a_id,) }) - .map(|a_id| async move { - let query = GetAlbumQuery::new(&a_id); - query_api_with_retry(&api, query).await + .map(|a_id| { + let api = api.clone(); + async move { + let query = GetAlbumQuery::new(&a_id); + query_api_with_retry(&api, query).await + } }) .collect::>(); while let Some(maybe_album) = stream.next().await { diff --git a/youtui/src/app/server/player.rs b/youtui/src/app/server/player.rs index 432f5bd..cbc95dc 100644 --- a/youtui/src/app/server/player.rs +++ b/youtui/src/app/server/player.rs @@ -3,6 +3,9 @@ use crate::app::structures::ListSongID; use crate::app::structures::Percentage; use crate::core::send_or_error; use crate::Result; +use async_rodio_sink::AsyncRodio; +use futures::Stream; +use std::ops::Deref; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; @@ -21,15 +24,67 @@ pub enum Response { Error(ListSongID), } +struct ArcInMemSong(Arc); + +impl AsRef<[u8]> for ArcInMemSong { + fn as_ref(&self) -> &[u8] { + self.0.as_ref().0.as_ref() + } +} + pub struct Player { - rodio_tx: mpsc::Sender, + rodio_handle: AsyncRodio, } // Consider if this can be managed by Server. impl Player { pub fn new() -> Self { - let (msg_tx, msg_rx) = mpsc::channel(PLAYER_MSG_QUEUE_SIZE); - spawn_rodio_thread(msg_rx); - Self { rodio_tx: msg_tx } + let rodio_handle = AsyncRodio::new(PLAYER_MSG_QUEUE_SIZE); + Self { rodio_handle } + } + pub fn autoplay_song( + &self, + song: Arc, + song_id: ListSongID, + ) -> std::result::Result>, ()> + { + let song = ArcInMemSong(song); + self.rodio_handle.autoplay_song(song, song_id) + } + pub fn play_song( + &self, + song: Arc, + song_id: ListSongID, + ) -> std::result::Result>, ()> { + let song = ArcInMemSong(song); + self.rodio_handle.play_song(song, song_id) + } + pub fn queue_song( + &self, + song: Arc, + song_id: ListSongID, + ) -> std::result::Result>, ()> + { + let song = ArcInMemSong(song); + self.rodio_handle.queue_song(song, song_id) + } + pub async fn seek( + &self, + duration: Duration, + direction: async_rodio_sink::SeekDirection, + ) -> Option> { + self.rodio_handle.seek(duration, direction).await + } + pub async fn stop(&self, song_id: ListSongID) -> Option> { + self.rodio_handle.stop(song_id).await + } + pub async fn pause_play( + &self, + song_id: ListSongID, + ) -> Option> { + self.rodio_handle.pause_play(song_id).await + } + pub async fn increase_volume(&self, vol_inc: i8) -> Option { + self.rodio_handle.increase_volume(vol_inc).await } } diff --git a/youtui/src/app/ui.rs b/youtui/src/app/ui.rs index a46ed64..986e01d 100644 --- a/youtui/src/app/ui.rs +++ b/youtui/src/app/ui.rs @@ -8,6 +8,7 @@ use super::component::actionhandler::{ use super::keycommand::{ CommandVisibility, DisplayableCommand, DisplayableMode, KeyCommand, Keymap, }; +use super::server::ArcServer; use super::view::{DrawableMut, Scrollable}; use super::AppCallback; use super::{server, structures::*}; @@ -299,15 +300,15 @@ impl TextHandler for YoutuiWindow { } impl YoutuiWindow { - pub fn new( + pub async fn new( callback_tx: mpsc::Sender, - callback_manager: &mut async_callback_manager::AsyncCallbackManager, + callback_manager: &mut async_callback_manager::AsyncCallbackManager, ) -> YoutuiWindow { // TODO: derive default YoutuiWindow { context: WindowContext::Browser, prev_context: WindowContext::Browser, - playlist: Playlist::new(callback_manager, callback_tx.clone()), + playlist: Playlist::new(callback_manager, callback_tx.clone()).await, browser: Browser::new(callback_manager, callback_tx.clone()), logger: Logger::new(callback_tx.clone()), keybinds: global_keybinds(), diff --git a/youtui/src/app/ui/playlist.rs b/youtui/src/app/ui/playlist.rs index 043a650..d075dbd 100644 --- a/youtui/src/app/ui/playlist.rs +++ b/youtui/src/app/ui/playlist.rs @@ -1,5 +1,5 @@ use crate::app::server::downloader::DownloadProgressUpdateType; -use crate::app::server::Server; +use crate::app::server::{ArcServer, IncreaseVolume, Server}; use crate::app::structures::{Percentage, SongListComponent}; use crate::app::view::draw::draw_table; use crate::app::view::{BasicConstraint, DrawableMut, TableItem}; @@ -13,7 +13,8 @@ use crate::app::{ use crate::app::CALLBACK_CHANNEL_SIZE; use crate::{app::structures::DownloadStatus, core::send_or_error}; -use async_callback_manager::{AsyncCallbackManager, AsyncCallbackSender}; +use async_callback_manager::{AsyncCallbackManager, AsyncCallbackSender, Constraint}; +use async_rodio_sink::VolumeUpdate; use crossterm::event::KeyCode; use ratatui::widgets::TableState; use ratatui::{layout::Rect, Frame}; @@ -36,7 +37,7 @@ pub struct Playlist { pub queue_status: QueueState, pub volume: Percentage, ui_tx: mpsc::Sender, - async_tx: AsyncCallbackSender, + async_tx: AsyncCallbackSender, keybinds: Vec>, cur_selected: usize, pub widget_state: TableState, @@ -203,17 +204,21 @@ impl SongListComponent for Playlist { // Primatives impl Playlist { - pub fn new( - callback_manager: &mut AsyncCallbackManager, + pub async fn new( + callback_manager: &mut AsyncCallbackManager, ui_tx: mpsc::Sender, ) -> Self { - // This could fail, made to try send to avoid needing to change function - // signature to asynchronous. Should change. - ui_tx - // Since IncreaseVolume responds back with player volume after change, this is a neat - // hack. - .try_send(AppCallback::IncreaseVolume(0)) - .unwrap_or_else(|e| error!("Error <{e}> received getting initial player volume.")); + let async_tx = callback_manager.new_sender(CALLBACK_CHANNEL_SIZE); + // Ensure volume is synced with player. + async_tx + .add_callback( + // Since IncreaseVolume responds back with player volume after change, this is a + // neat hack. + IncreaseVolume(0), + Self::handle_set_volume, + Some(Constraint::new_block_same_type()), + ) + .await; Playlist { ui_tx, volume: Percentage(50), @@ -223,7 +228,7 @@ impl Playlist { keybinds: playlist_keybinds(), cur_selected: 0, queue_status: QueueState::NotQueued, - async_tx: callback_manager.new_sender(CALLBACK_CHANNEL_SIZE), + async_tx, widget_state: Default::default(), } } @@ -667,8 +672,10 @@ impl Playlist { } } /// Handle volume message from server - pub fn handle_set_volume(&mut self, p: Percentage) { - self.volume = p; + pub fn handle_set_volume(&mut self, response: Option) { + if let Some(v) = response { + self.volume = Percentage(v.0 .0) + } } /// Handle song progress message from server pub async fn handle_set_song_play_progress(&mut self, d: Duration, id: ListSongID) { diff --git a/youtui/src/main.rs b/youtui/src/main.rs index ef51e94..cd103f2 100644 --- a/youtui/src/main.rs +++ b/youtui/src/main.rs @@ -325,7 +325,7 @@ async fn get_api(config: &Config) -> Result { } pub async fn run_app(rt: RuntimeInfo) -> Result<()> { - let mut app = app::Youtui::new(rt)?; + let mut app = app::Youtui::new(rt).await?; app.run().await?; Ok(()) }