Skip to content

Commit

Permalink
change download method, add lossless quality
Browse files Browse the repository at this point in the history
  • Loading branch information
vm86 committed Nov 16, 2024
1 parent e2e31f7 commit f0a2aa5
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 141 deletions.
2 changes: 1 addition & 1 deletion yandex_fuse/virt_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def insert(self) -> tuple[str, dict[str, Any]]:
INSERT INTO {self.__tablename__}
({columns})
VALUES
'({placeholders})'
({placeholders})
"""
return query, data

Expand Down
76 changes: 48 additions & 28 deletions yandex_fuse/ya_music_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,11 @@ async def read_from(self, offset: int, size: int) -> bytes:
self.__download_task.result()

self.__ready_read.clear()
with suppress(AsyncTimeoutError):
try:
await wait_for(self.__ready_read.wait(), timeout=5)
except AsyncTimeoutError:
log.warning("Slow downloading %s", self.__track.name)

self.__total_read += size
return bytes(self.__bytes.getbuffer()[offset : offset + size])

Expand Down Expand Up @@ -190,7 +193,7 @@ async def download(self) -> None:
continue

buffer.write(chunk)
new_buffer = self.__tag.to_bytes(buffer)
new_buffer = self.__tag.to_bytes(buffer, self.__track.codec)
if new_buffer is None:
continue
self._write_tag(
Expand All @@ -214,6 +217,7 @@ async def download(self) -> None:
raise
else:
log.debug("Track %s downloaded", self.__track.name)
self.__ready_read.set()
if self.__download_task is None:
return
download_task = self.__download_task
Expand All @@ -230,6 +234,7 @@ class SQLTrack(SQLRow):
playlist_id: str
codec: str
bitrate: int
quality: str
size: int
artist: str
title: str
Expand Down Expand Up @@ -291,6 +296,7 @@ class YaMusicFS(VirtFS):
REFERENCES playlists(playlist_id) ON DELETE RESTRICT,
codec BLOB(8) NOT NULL,
bitrate INT NOT NULL,
quality TEXT(20) NOT NULL,
size INT NOT NULL,
artist TEXT(255),
title TEXT(255),
Expand Down Expand Up @@ -391,27 +397,44 @@ async def get_or_update_direct_link(
except ClientError as err:
log.error("Fail get direct link: %r", err) # noqa: TRY400

new_direct_link = await self._ya_player.get_download_link(
new_direct_links = await self._ya_player.get_download_links(
track_id,
codec,
bitrate_in_kbps,
)
if new_direct_link is None:

if new_direct_links is None:
return None

expired = int((time.time() + 8600) * 1e9)
cursor.execute(
"""
INSERT INTO direct_link
(track_id, link, expired)
VALUES(?, ?, ?)
ON CONFLICT(track_id)
DO UPDATE SET link=excluded.link,expired=excluded.expired
""",
(track_id, new_direct_link, expired),
)
log.debug("Direct link: %s, track: %s", new_direct_link, track_id)
return new_direct_link
for new_direct_link in new_direct_links:
log.debug("Check direct link %s", new_direct_link)
try:
async with self._client_session.request(
"HEAD",
new_direct_link,
) as resp:
if not resp.ok:
continue
except ClientError as err:
log.error("Fail get direct link: %r", err) # noqa: TRY400
continue

expired = int((time.time() + 8600) * 1e9)
cursor.execute(
"""
INSERT INTO direct_link
(track_id, link, expired)
VALUES(?, ?, ?)
ON CONFLICT(track_id)
DO UPDATE SET link=excluded.link,expired=excluded.expired
""",
(track_id, new_direct_link, expired),
)
log.debug(
"Direct link: %s, track: %s", new_direct_link, track_id
)
return new_direct_link
return None

def _get_playlist_by_id(self, playlist_id: str) -> SQLPlaylist | None:
return SQLPlaylist.from_row(
Expand Down Expand Up @@ -569,7 +592,7 @@ async def _update_track(
byte = BytesIO()
async for chunk in resp.content.iter_chunked(1024):
byte.write(chunk)
new_buffer = track.tag.to_bytes(byte)
new_buffer = track.tag.to_bytes(byte, track.codec)
if new_buffer is None:
continue
track.size += track.tag.size
Expand Down Expand Up @@ -628,6 +651,7 @@ async def _update_track(
playlist_id=playlist_id,
codec=track.codec,
bitrate=track.bitrate_in_kbps,
quality=track.quality,
size=track.size,
artist=tag["artist"],
title=tag["title"],
Expand Down Expand Up @@ -974,7 +998,7 @@ async def open(

buffer = await self._get_buffer(track)
if buffer is None:
raise FUSEError(errno.EPERM)
raise FUSEError(errno.EPIPE)

file_info = await super().open(inode, flags, ctx)
if flags & os.O_RDWR or flags & os.O_WRONLY:
Expand Down Expand Up @@ -1015,7 +1039,7 @@ async def read(self, fd: FileHandleT, offset: int, size: int) -> bytes:
stream_reader.track.playlist_id
)

if playlist is not None and playlist.batch_id is not None:
if playlist is not None and playlist.batch_id:
await self._ya_player.feedback_track(
stream_reader.track.track_id,
"trackStarted",
Expand Down Expand Up @@ -1050,7 +1074,7 @@ async def release(self, fd: FileHandleT) -> None:
stream_reader.track.playlist_id
)

if playlist is not None and playlist.batch_id is not None:
if playlist is not None and playlist.batch_id:
await self._ya_player.feedback_track(
stream_reader.track.track_id,
"trackFinished",
Expand Down Expand Up @@ -1098,18 +1122,14 @@ async def setxattr(
def xattrs(self, inode: InodeT) -> dict[str, Any]:
return {
"inode": inode,
"nlookup": len(self._nlookup),
"inode_map_fd": self._inode_map_fd,
"queue_invalidate_inode": self.queue_later_invalidate_inode,
"inode_map_fd": self._inode_map_fd.get(inode),
"stream": {
fd: {
"name": stream.track.name,
"name": stream.track.name.decode(),
"size": stream.track.size,
"codec": stream.track.codec,
"bitrate": stream.track.bitrate,
"play_second": stream.buffer.total_second()
if stream.buffer is not None
else 0,
"play_second": stream.buffer.total_second(),
}
for fd, stream in self._fd_map_stream.items()
},
Expand Down
Loading

0 comments on commit f0a2aa5

Please sign in to comment.