diff --git a/yandex_fuse/ya_music_fs.py b/yandex_fuse/ya_music_fs.py index de79505..860a250 100644 --- a/yandex_fuse/ya_music_fs.py +++ b/yandex_fuse/ya_music_fs.py @@ -63,7 +63,7 @@ log = logging.getLogger(__name__) LIMIT_TASKS = 10 -LIMIT_ONYOURWAVE = 50 +LIMIT_ONYOURWAVE = 150 PLAYLIST_ID2NAME = {"likes": "Мне нравится", "user:onyourwave": "Моя волна"} @@ -530,48 +530,56 @@ def _create_track(self, track: SQLTrack, parent_inode: int) -> None: track.inode = inode cur.execute(*track.insert()) - @retry_request + def _symlink_track( + self, exist_track: SQLTrack, track: ExtendTrack, dir_inode: InodeT + ) -> None: + playlist_info = self._get_playlist_by_id(exist_track.playlist_id) + if playlist_info is None: + raise RuntimeError("Error get playlist info") + + target = str( + Path("..") + .joinpath(playlist_info.name) + .joinpath(exist_track.name.decode()) + ).encode() + + track_link = self._get_inode_by_name( + InodeT(dir_inode), track.save_name.encode() + ) + if track_link is not None: + return + + with self._db_cursor() as cur: + inode = self._create( + parent_inode=dir_inode, + name=track.save_name.encode(), + size=4096, + mode=(stat.S_IFLNK | 0o777), + target=target, + db_cursor=cur, + ) + log.debug( + "Symlink track %s -> %d / %s - %d", + track.track_id, + inode, + track.save_name, + dir_inode, + ) + async def _update_track( self, track: ExtendTrack, playlist_id: str, dir_inode: InodeT, + *, + uniq: bool = False, ) -> None: if ( exist_track := self._get_track_by_id(track.track_id) ) is not None and exist_track.playlist_id != playlist_id: - playlist_info = self._get_playlist_by_id(exist_track.playlist_id) - if playlist_info is None: - raise RuntimeError("Error get playlist info") - - target = str( - Path("..") - .joinpath(playlist_info.name) - .joinpath(exist_track.name.decode()) - ).encode() - - track_link = self._get_inode_by_name( - InodeT(dir_inode), track.save_name.encode() - ) - if track_link is not None: + if uniq: return - - with self._db_cursor() as cur: - inode = self._create( - parent_inode=dir_inode, - name=track.save_name.encode(), - size=4096, - mode=(stat.S_IFLNK | 0o777), - target=target, - db_cursor=cur, - ) - log.debug( - "Symlink track %s -> %d / %s - %d", - track.track_id, - inode, - track.save_name, - dir_inode, - ) + self._symlink_track(exist_track, track, dir_inode) return if ( @@ -583,6 +591,7 @@ async def _update_track( ) is None: log.warning("Track %s is not be downloaded!", track.save_name) return + async with self._client_session.request( "GET", direct_link, @@ -820,32 +829,35 @@ async def __update_station_tracks( raise RuntimeError("Playlist info is empty!") dir_inode = playlist_info.inode - loaded_tracks = self._get_tracks(playlist_id) - if len(loaded_tracks) > LIMIT_ONYOURWAVE: - log.debug("Music directory is full.") - return tasks = set() error_update = False - async for track in self._ya_player.next_tracks( - playlist_id, - count=LIMIT_ONYOURWAVE, - exclude_track_ids=set(loaded_tracks.keys()), + + while ( + len(loaded_tracks := self._get_tracks(playlist_id)) + < LIMIT_ONYOURWAVE ): - tasks.add( - create_task( - self._update_track(track, playlist_id, dir_inode), - name=f"create-track-{track.save_name}", - ), - ) - if len(tasks) > LIMIT_TASKS: + async for track in self._ya_player.next_tracks( + playlist_id, + count=LIMIT_ONYOURWAVE, + exclude_track_ids=set(loaded_tracks.keys()), + ): + tasks.add( + create_task( + self._update_track( + track, playlist_id, dir_inode, uniq=True + ), + name=f"create-track-{track.save_name}", + ), + ) + if len(tasks) > LIMIT_TASKS: + tasks, error = await self._background_tasks(tasks) + error_update = error or error_update + + while tasks: tasks, error = await self._background_tasks(tasks) error_update = error or error_update - while tasks: - tasks, error = await self._background_tasks(tasks) - error_update = error or error_update - if not error_update: _, batch_id = self._ya_player.get_last_station_info() self._update_plyalist(playlist_id, 0, batch_id)