Skip to content

Commit

Permalink
Complete implementation of requests
Browse files Browse the repository at this point in the history
  • Loading branch information
nick42d committed Nov 13, 2024
1 parent 293739d commit fc10f27
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 71 deletions.
8 changes: 5 additions & 3 deletions async-rodio-sink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tracing::info;
use tracing::warn;

#[derive(Debug)]
struct Percentage(pub u8);
pub struct Percentage(pub u8);

#[derive(Debug)]
pub enum SeekDirection {
Expand All @@ -38,7 +38,7 @@ enum AsyncRodioRequest<S, I> {
Seek(Duration, SeekDirection, RodioOneshot<(Duration, I)>),
}

pub struct VolumeUpdate(Percentage);
pub struct VolumeUpdate(pub Percentage);
pub struct ProgressUpdate<I>(Duration, I);
// At this stage this difference between DonePlaying and Stopped is very thin.
// DonePlaying means that the song has been dropped by the player, whereas
Expand Down Expand Up @@ -398,7 +398,7 @@ where
pub fn new(channel_size: usize) -> Self {
let (tx, mut rx) =
tokio::sync::mpsc::channel::<AsyncRodioRequest<Decoder<Cursor<S>>, I>>(channel_size);
let handle = tokio::task::spawn_blocking(move || {
let handle = tokio::task::spawn(async move {
// Rodio can produce output to stderr when we don't want it to, so we use Gag to
// suppress stdout/stderr. The downside is that even though this runs in
// a seperate thread all stderr for the whole app may be gagged.
Expand Down Expand Up @@ -479,6 +479,7 @@ where
}
next_song_duration = song.total_duration();
&tx.0.send(AsyncRodioResponse::Queued(next_song_duration));
let txs = tx.0.clone();
let song = add_periodic_access(song, PROGRESS_UPDATE_DELAY, move |s| {
txs.blocking_send(AsyncRodioResponse::ProgressUpdate(s.get_pos()));
});
Expand All @@ -492,6 +493,7 @@ where
if !sink.empty() {
sink.stop()
}
let txs = tx.0.clone();
let song = add_periodic_access(song, PROGRESS_UPDATE_DELAY, move |s| {
txs.blocking_send(AsyncRodioResponse::ProgressUpdate(s.get_pos()));
});
Expand Down
10 changes: 5 additions & 5 deletions youtui/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ pub struct Youtui {
status: AppStatus,
event_handler: EventHandler,
window_state: YoutuiWindow,
task_manager: AsyncCallbackManager<Server>,
server: Server,
task_manager: AsyncCallbackManager<Arc<Server>>,
server: Arc<Server>,
callback_rx: mpsc::Receiver<AppCallback>,
terminal: Terminal<CrosstermBackend<io::Stdout>>,
/// If Youtui will redraw on the next rendering loop.
Expand All @@ -69,7 +69,7 @@ pub enum AppCallback {
}

impl Youtui {
pub fn new(rt: RuntimeInfo) -> Result<Youtui> {
pub async fn new(rt: RuntimeInfo) -> Result<Youtui> {
let RuntimeInfo {
api_key,
debug,
Expand Down Expand Up @@ -108,11 +108,11 @@ impl Youtui {
response.type_name, response.type_id, response.sender_id, response.task_id
)
});
let server = server::Server::new(api_key, po_token);
let server = Arc::new(server::Server::new(api_key, po_token));
let backend = CrosstermBackend::new(stdout);
let terminal = Terminal::new(backend)?;
let event_handler = EventHandler::new(EVENT_CHANNEL_SIZE)?;
let window_state = YoutuiWindow::new(callback_tx, &mut task_manager);
let window_state = YoutuiWindow::new(callback_tx, &mut task_manager).await;
Ok(Youtui {
status: AppStatus::Running,
event_handler,
Expand Down
82 changes: 48 additions & 34 deletions youtui/src/app/server.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
#![warn(clippy::unwrap_used)]
use super::structures::ListSongID;
use crate::{config::ApiKey, Result};
use api::GetArtistSongsProgressUpdate;
use async_callback_manager::{BackendStreamingTask, BackendTask};
use async_rodio_sink::AutoplayUpdate;
use async_rodio_sink::PausePlayResponse;
use async_rodio_sink::PlayUpdate;
use async_rodio_sink::ProgressUpdate;
use async_rodio_sink::QueueUpdate;
use async_rodio_sink::SeekDirection;
use async_rodio_sink::Stopped;
use async_rodio_sink::VolumeUpdate;
use downloader::DownloadProgressUpdate;
use downloader::Downloader;
use downloader::InMemSong;
use futures::Future;
use futures::Stream;
use std::sync::Arc;
use std::time::Duration;
use ytmapi_rs::common::VideoID;
use ytmapi_rs::common::{ArtistChannelID, SearchSuggestion};
use ytmapi_rs::parse::SearchResultArtist;
Expand All @@ -25,9 +35,9 @@ pub type ArcServer = Arc<Server>;
/// Application backend that is capable of spawning concurrent tasks in response
/// to requests. Tasks each receive a handle to respond back to the caller.
pub struct Server {
api: api::Api,
player: player::Player,
downloader: downloader::Downloader,
pub api: api::Api,
pub player: player::Player,
pub downloader: downloader::Downloader,
}

impl Server {
Expand Down Expand Up @@ -81,8 +91,11 @@ pub struct DownloadSong(pub VideoID<'static>, pub ListSongID);
// Send IncreaseVolume(5)
// Send IncreaseVolume(5), killing previous task
// Volume will now be 10 - should be 15, should not allow caller to cause this.
pub struct IncreaseVolume(i8);
pub struct Seek(i8);
pub struct IncreaseVolume(pub i8);
pub struct Seek {
duration: Duration,
direction: SeekDirection,
}
pub struct Stop(ListSongID);
pub struct PausePlay(ListSongID);
// Play a song, starting from the start, regardless what's queued.
Expand Down Expand Up @@ -139,77 +152,78 @@ impl BackendStreamingTask<ArcServer> for DownloadSong {
self,
backend: &ArcServer,
) -> impl futures::Stream<Item = Self::Output> + Send + Unpin + 'static {
let backend = backend.clone();
backend.download_song(self.0, self.1)
}
}
impl BackendTask<Downloader> for IncreaseVolume {
type Output = ();
impl BackendTask<ArcServer> for Seek {
type Output = Option<ProgressUpdate<ListSongID>>;
fn into_future(
self,
backend: &Downloader,
backend: &ArcServer,
) -> impl Future<Output = Self::Output> + Send + 'static {
todo!();
async {}
let backend = backend.clone();
async move { backend.player.seek(self.duration, self.direction).await }
}
}
impl BackendTask<Downloader> for Seek {
type Output = ();
impl BackendTask<ArcServer> for IncreaseVolume {
type Output = Option<VolumeUpdate>;
fn into_future(
self,
backend: &Downloader,
backend: &ArcServer,
) -> impl Future<Output = Self::Output> + Send + 'static {
todo!();
async {}
let backend = backend.clone();
async move { backend.player.increase_volume(self.0).await }
}
}
impl BackendTask<Downloader> for Stop {
type Output = ();
impl BackendTask<ArcServer> for Stop {
type Output = Option<Stopped<ListSongID>>;
fn into_future(
self,
backend: &Downloader,
backend: &ArcServer,
) -> impl Future<Output = Self::Output> + Send + 'static {
todo!();
async {}
let backend = backend.clone();
async move { backend.player.stop(self.0).await }
}
}
impl BackendTask<Downloader> for PausePlay {
type Output = ();
impl BackendTask<ArcServer> for PausePlay {
type Output = Option<PausePlayResponse<ListSongID>>;
fn into_future(
self,
backend: &Downloader,
backend: &ArcServer,
) -> impl Future<Output = Self::Output> + Send + 'static {
todo!();
async {}
let backend = backend.clone();
async move { backend.player.pause_play(self.0).await }
}
}

impl BackendStreamingTask<ArcServer> for PlaySong {
type Output = ();
type Output = PlayUpdate<ListSongID>;
fn into_stream(
self,
backend: &ArcServer,
) -> impl Stream<Item = Self::Output> + Send + Unpin + 'static {
todo!();
futures::stream::empty()
let backend = backend.clone();
backend.player.play_song(self.song, self.id).unwrap()
}
}
impl BackendStreamingTask<ArcServer> for AutoplaySong {
type Output = ();
type Output = AutoplayUpdate<ListSongID>;
fn into_stream(
self,
backend: &ArcServer,
) -> impl Stream<Item = Self::Output> + Send + Unpin + 'static {
todo!();
futures::stream::empty()
let backend = backend.clone();
backend.player.autoplay_song(self.song, self.id).unwrap()
}
}
impl BackendStreamingTask<ArcServer> for QueueSong {
type Output = ();
type Output = QueueUpdate<ListSongID>;
fn into_stream(
self,
backend: &ArcServer,
) -> impl Stream<Item = Self::Output> + Send + Unpin + 'static {
todo!();
futures::stream::empty()
let backend = backend.clone();
backend.player.queue_song(self.song, self.id).unwrap()
}
}
15 changes: 9 additions & 6 deletions youtui/src/app/server/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl Api {
pub fn get_artist_songs(
&self,
browse_id: ArtistChannelID<'static>,
) -> impl Stream<Item = GetArtistSongsProgressUpdate> {
) -> impl Stream<Item = GetArtistSongsProgressUpdate> + 'static {
let api = async { self.get_api().await.map_err(Error::new_api_error_string) };
get_artist_songs(api, browse_id)
}
Expand Down Expand Up @@ -181,7 +181,7 @@ pub enum GetArtistSongsProgressUpdate {
fn get_artist_songs(
api: impl Future<Output = Result<ConcurrentApi>> + Send + 'static,
browse_id: ArtistChannelID<'static>,
) -> impl Stream<Item = GetArtistSongsProgressUpdate> {
) -> impl Stream<Item = GetArtistSongsProgressUpdate> + 'static {
/// Bailout function that will log an error and send NoSongsFound if we get
/// an unrecoverable error.
async fn bailout(e: impl std::fmt::Display, tx: Sender<GetArtistSongsProgressUpdate>) {
Expand All @@ -198,7 +198,7 @@ fn get_artist_songs(
Err(e) => return bailout(e, tx).await,
Ok(api) => api,
};
let query = ytmapi_rs::query::GetArtistQuery::new(browse_id);
let query = ytmapi_rs::query::GetArtistQuery::new(&browse_id);
let artist = query_api_with_retry(&api, query).await;
let artist = match artist {
Ok(a) => a,
Expand Down Expand Up @@ -277,9 +277,12 @@ fn get_artist_songs(
.inspect(|a_id| {
tracing::info!("Spawning request for caller tracks for album ID {:?}", a_id,)
})
.map(|a_id| async move {
let query = GetAlbumQuery::new(&a_id);
query_api_with_retry(&api, query).await
.map(|a_id| {
let api = api.clone();
async move {
let query = GetAlbumQuery::new(&a_id);
query_api_with_retry(&api, query).await
}
})
.collect::<FuturesOrdered<_>>();
while let Some(maybe_album) = stream.next().await {
Expand Down
63 changes: 59 additions & 4 deletions youtui/src/app/server/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ use crate::app::structures::ListSongID;
use crate::app::structures::Percentage;
use crate::core::send_or_error;
use crate::Result;
use async_rodio_sink::AsyncRodio;
use futures::Stream;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
Expand All @@ -21,15 +24,67 @@ pub enum Response {
Error(ListSongID),
}

struct ArcInMemSong(Arc<InMemSong>);

impl AsRef<[u8]> for ArcInMemSong {
fn as_ref(&self) -> &[u8] {
self.0.as_ref().0.as_ref()
}
}

pub struct Player {
rodio_tx: mpsc::Sender<RodioMessage>,
rodio_handle: AsyncRodio<ArcInMemSong, ListSongID>,
}

// Consider if this can be managed by Server.
impl Player {
pub fn new() -> Self {
let (msg_tx, msg_rx) = mpsc::channel(PLAYER_MSG_QUEUE_SIZE);
spawn_rodio_thread(msg_rx);
Self { rodio_tx: msg_tx }
let rodio_handle = AsyncRodio::new(PLAYER_MSG_QUEUE_SIZE);
Self { rodio_handle }
}
pub fn autoplay_song(
&self,
song: Arc<InMemSong>,
song_id: ListSongID,
) -> std::result::Result<impl Stream<Item = async_rodio_sink::AutoplayUpdate<ListSongID>>, ()>
{
let song = ArcInMemSong(song);
self.rodio_handle.autoplay_song(song, song_id)
}
pub fn play_song(
&self,
song: Arc<InMemSong>,
song_id: ListSongID,
) -> std::result::Result<impl Stream<Item = async_rodio_sink::PlayUpdate<ListSongID>>, ()> {
let song = ArcInMemSong(song);
self.rodio_handle.play_song(song, song_id)
}
pub fn queue_song(
&self,
song: Arc<InMemSong>,
song_id: ListSongID,
) -> std::result::Result<impl Stream<Item = async_rodio_sink::QueueUpdate<ListSongID>>, ()>
{
let song = ArcInMemSong(song);
self.rodio_handle.queue_song(song, song_id)
}
pub async fn seek(
&self,
duration: Duration,
direction: async_rodio_sink::SeekDirection,
) -> Option<async_rodio_sink::ProgressUpdate<ListSongID>> {
self.rodio_handle.seek(duration, direction).await
}
pub async fn stop(&self, song_id: ListSongID) -> Option<async_rodio_sink::Stopped<ListSongID>> {
self.rodio_handle.stop(song_id).await
}
pub async fn pause_play(
&self,
song_id: ListSongID,
) -> Option<async_rodio_sink::PausePlayResponse<ListSongID>> {
self.rodio_handle.pause_play(song_id).await
}
pub async fn increase_volume(&self, vol_inc: i8) -> Option<async_rodio_sink::VolumeUpdate> {
self.rodio_handle.increase_volume(vol_inc).await
}
}
7 changes: 4 additions & 3 deletions youtui/src/app/ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::component::actionhandler::{
use super::keycommand::{
CommandVisibility, DisplayableCommand, DisplayableMode, KeyCommand, Keymap,
};
use super::server::ArcServer;
use super::view::{DrawableMut, Scrollable};
use super::AppCallback;
use super::{server, structures::*};
Expand Down Expand Up @@ -299,15 +300,15 @@ impl TextHandler for YoutuiWindow {
}

impl YoutuiWindow {
pub fn new(
pub async fn new(
callback_tx: mpsc::Sender<AppCallback>,
callback_manager: &mut async_callback_manager::AsyncCallbackManager<server::Server>,
callback_manager: &mut async_callback_manager::AsyncCallbackManager<ArcServer>,
) -> YoutuiWindow {
// TODO: derive default
YoutuiWindow {
context: WindowContext::Browser,
prev_context: WindowContext::Browser,
playlist: Playlist::new(callback_manager, callback_tx.clone()),
playlist: Playlist::new(callback_manager, callback_tx.clone()).await,
browser: Browser::new(callback_manager, callback_tx.clone()),
logger: Logger::new(callback_tx.clone()),
keybinds: global_keybinds(),
Expand Down
Loading

0 comments on commit fc10f27

Please sign in to comment.