Skip to content

Commit

Permalink
Implement TemporaryFile download, can actually download with ~700mbit…
Browse files Browse the repository at this point in the history
… now
  • Loading branch information
Emily3403 committed Nov 7, 2024
1 parent 5cd3ae4 commit 64275df
Show file tree
Hide file tree
Showing 20 changed files with 494 additions and 172 deletions.
8 changes: 2 additions & 6 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,6 @@ massif.out.*
# Pyre type checker
.pyre/





# Custom stuff
data
LegalConflict.txt
/src/isisdl/resources/state.db
/src/isisdl/resources/new_state.db
9 changes: 6 additions & 3 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ packages =

python_requires = >=3.10
install_requires =
cryptography~=41.0.3
cryptography~=42.0.8
requests~=2.31.0
aiohttp~=3.8.5
aiohttp~=3.9.5
aiodns~=3.0.0
aiofiles~=24.1.0
pyyaml~=6.0.1
packaging~=23.1
colorama~=0.4.6
Expand Down Expand Up @@ -54,6 +55,7 @@ testing =
tox~=4.8.0
types-requests~=2.31.0.2
types-pyyaml~=6.0.12.11
types-aiofiles~=24.1.0
twine~=4.0.2
build~=0.10.0
radon~=6.0.1
Expand All @@ -62,4 +64,5 @@ testing =
isisdl = resources/**

[flake8]
max-line-length = 500
max-line-length = 500
ignore = E251, E202
6 changes: 2 additions & 4 deletions src/isisdl/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,13 @@ def print_version() -> None:


async def _new_main() -> None:
await asyncio.sleep(6000000)
with DatabaseSessionMaker() as db:
config = read_config(db)
user = read_user(db)
if user is None:
return

session = await authenticate_new_session(user, config)

if session is None:
return

Expand All @@ -52,9 +50,9 @@ async def _new_main() -> None:

urls = await gather_media_urls(db, session, courses, config)
if not urls:
return None
return

downloaded_content = await download_media_urls(db, urls)
downloaded_content = await download_media_urls(db, session, urls, config)
# After downloading everything, run the hardlink resolution, this time based on checksums.

_ = downloaded_content
Expand Down
149 changes: 142 additions & 7 deletions src/isisdl/api/crud.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@
from __future__ import annotations

import asyncio
import re
from asyncio import Event
from base64 import standard_b64decode
from collections import defaultdict
from datetime import datetime
from html import unescape
from itertools import chain
from typing import Any, Literal, cast, DefaultDict
from typing import Any, Literal, cast, DefaultDict, Iterable

from aiohttp import ClientSession as InternetSession
import aiofiles
import aiofiles.os
from aiohttp import ClientSession as InternetSession, TCPConnector, ClientConnectorSSLError, ClientConnectorCertificateError, ClientSSLError, ClientConnectorError
from sqlalchemy import select
from sqlalchemy.orm import Session as DatabaseSession

from isisdl.api.models import AuthenticatedSession, Course, MediaURL, MediaType, NormalizedDocument
from isisdl.api.models import AuthenticatedSession, Course, MediaURL, MediaType, NormalizedDocument, TempFile, BadURL, MediaContainer, Error
from isisdl.api.rate_limiter import RateLimiter, ThrottleType
from isisdl.backend.models import User, Config
from isisdl.db_conf import add_or_update_objects_to_database
from isisdl.settings import url_finder, isis_ignore, extern_ignore, regex_is_isis_document, regex_is_isis_video
from isisdl.utils import datetime_fromtimestamp_with_None, flat_map
from isisdl.db_conf import add_or_update_objects_to_database, add_object_to_database
from isisdl.settings import url_finder, isis_ignore, extern_ignore, regex_is_isis_document, regex_is_isis_video, connection_pool_limit, download_chunk_size, DEBUG_ASSERTS, logger
from isisdl.utils import datetime_fromtimestamp_with_None, flat_map, get_download_url_from_url
from isisdl.version import __version__


async def authenticate_new_session(user: User, config: Config) -> AuthenticatedSession | None:
session = InternetSession(headers={"User-Agent": f"isisdl (Python aiohttp) version {__version__}"})
session = InternetSession(headers={"User-Agent": f"isisdl (Python aiohttp) version {__version__}"}, connector=TCPConnector(limit=connection_pool_limit))

# First step of authenticating
await session.get("https://isis.tu-berlin.de/auth/shibboleth/index.php")
Expand Down Expand Up @@ -82,6 +88,23 @@ def read_courses(db: DatabaseSession) -> list[Course]:
return list(db.execute(select(Course)).scalars().all())


def sort_courses(courses: Iterable[Course]) -> list[Course]:
"""
Sort courses based on time_of_last_access if it is not None,
otherwise, sort based on time_of_last_modification if it is not None,
otherwise, sort based on time_of_start if it is not None,
otherwise, sort based on time_of_end if it is not None,
"""
earliest = datetime.fromtimestamp(0)

return sorted(courses, key=lambda course: (
course.time_of_last_access or earliest,
course.time_of_last_modification or earliest,
course.time_of_start or earliest,
course.time_of_end or earliest
), reverse=True)


def parse_courses_from_API(db: DatabaseSession, courses: list[dict[str, Any]], config: Config) -> list[Course] | None:
existing_courses = {it.id: it for it in read_courses(db)}

Expand Down Expand Up @@ -258,3 +281,115 @@ def parse_videos_from_API(db: DatabaseSession, videos: list[dict[str, Any]], con
)
)
)


# --- Bad URLs ---

def read_bad_urls(db: DatabaseSession) -> dict[tuple[str, int], BadURL]:
return {(it.url, it.course_id): it for it in db.execute(select(BadURL)).scalars().all()}


def filter_bad_urls(db: DatabaseSession, urls: list[MediaURL]) -> list[MediaURL]:
bad_urls = read_bad_urls(db)

good_urls = []
for url in urls:
bad_url = bad_urls.get((url.url, url.course_id))
if bad_url is None or bad_url.should_retry():
good_urls.append(url)

return good_urls


def create_bad_url(db: DatabaseSession, url: MediaURL) -> BadURL | None:
# TODO
pass


# --- Temp Files ---

def read_temp_files(db: DatabaseSession) -> dict[int, dict[str, MediaURL]]:
final: DefaultDict[int, dict[str, MediaURL]] = defaultdict(dict)
for it in db.execute(select(MediaURL)).scalars().all():
final[it.course_id][it.url] = it

return dict(final)


async def download_media_url_to_temp_file(db: DatabaseSession, session: AuthenticatedSession, rate_limiter: RateLimiter, url: MediaURL, course: Course, stop: Event, priority: int, config: Config, extra_args: dict[str, Any] | None = None) -> TempFile | BadURL | None:
if stop.is_set():
return None

try:
temp_file = await create_temp_file(db, session, url, course)
if temp_file is None:
return None # TODO

path = temp_file.path(config)
extra_args = extra_args or {}

await aiofiles.os.makedirs(path.parent, exist_ok=True)
await rate_limiter.register_url(course, temp_file.throttle_type)

print(f"Writing {path}")

async with aiofiles.open(path, "wb") as f, session.get(temp_file.download_url, **(extra_args | {"params": {"token": session.api_token}})) as response:
if isinstance(response, Error) or response.status != 200:
return create_bad_url(db, url)

chunked_response = response.content.iter_chunked(download_chunk_size)

while True:
if stop.is_set():
return None

token = await rate_limiter.get(temp_file.throttle_type)
if token is None:
continue

try:
chunk = await anext(chunked_response)
except StopAsyncIteration:
break
except TimeoutError:
await asyncio.sleep(0.5)
continue

if DEBUG_ASSERTS:
assert len(chunk) <= token.num_bytes

await f.write(chunk)
rate_limiter.return_token(token)

rate_limiter.complete_url(course, temp_file.throttle_type)

print(f"Finished! {path}")
return temp_file

except (ClientSSLError, ClientConnectorSSLError, ClientConnectorError, ClientConnectorCertificateError) as ex:
logger.error(f"SSL Error downloading {url.url}: {type(ex)} {ex}")

# TODO: Won't this crash eventually?
# return await download_temp_file(db, session, rate_limiter, url, course, stop, priority, config, {})

except Exception as ex:
logger.error(f"Error downloading {url.url}: {type(ex)} {ex}")
raise

# TODO: What does this mean?
return None


async def create_temp_file(db: DatabaseSession, session: AuthenticatedSession, url: MediaURL, course: Course) -> TempFile | None:
return add_object_to_database(db, TempFile(
course=course,
url=url.url,
download_url=await get_download_url_from_url(db, session, url) or url.url,
throttle_type=ThrottleType.from_media_type(url.media_type),
))


# --- MediaContainers ---

def create_media_containers_from_temp_files(db: DatabaseSession, temp_files: list[TempFile]) -> list[MediaContainer] | None:
return None
62 changes: 54 additions & 8 deletions src/isisdl/api/download.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,41 @@
from __future__ import annotations

import asyncio
from asyncio import Event
from collections import defaultdict

from sqlalchemy.orm import Session as DatabaseSession

from isisdl.api.crud import sort_courses, download_media_url_to_temp_file, filter_bad_urls, create_media_containers_from_temp_files
from isisdl.api.endpoints import DocumentListAPI, VideoListAPI
from isisdl.api.models import MediaContainer, MediaURL, AuthenticatedSession, Course

__all__ = ["download_media_urls"]

from isisdl.api.models import MediaContainer, MediaURL, AuthenticatedSession, Course, TempFile
from isisdl.api.rate_limiter import RateLimiter
from isisdl.backend.models import Config


async def gather_media_urls(db: DatabaseSession, session: AuthenticatedSession, courses: list[Course], config: Config) -> list[MediaURL]:
urls = []
for response in asyncio.as_completed([
VideoListAPI.get(db, session, courses, config),
DocumentListAPI.get(db, session, courses),
VideoListAPI.get(db, session, courses, config)
]):
urls.extend(await response)

return urls


async def download_media_urls(db: DatabaseSession, urls: list[MediaURL]) -> list[MediaContainer]:
async def download_media_urls(db: DatabaseSession, session: AuthenticatedSession, urls: list[MediaURL], config: Config) -> list[MediaContainer] | None:
"""
- Figure out which containers need downloading
This is the main function that downloads the files from the web. It does so by following these steps:
1. Filter out bad urls and already downloaded urls.
2. For each course, download the documents as temporary files and save them.
- Paths are derived as a hash from the URL
- Courses are sorted based on time of access or modification
- Videos are not downloaded as there is no possibility of collision due to the sha256 hash in the url
3. Conflict resolution, based on file hashes, is done.
- Resolve all conflicts in file paths
- Develop an algorithm to deterministically sort files based on the optional attributes they have
- Filter same download url
Expand All @@ -34,7 +44,43 @@ async def download_media_urls(db: DatabaseSession, urls: list[MediaURL]) -> list
# To Integrate somewhere
- Modify download_url based on urls, following Google Drive etc.
- From ISIS: mod/resource and mod/url need following
- How to handle bad links?
"""

# TODO: Filter out already downloaded URLs
urls_to_download = filter_bad_urls(db, urls)

urls_per_course = defaultdict(list)
courses = set()
for url in urls_to_download:
courses.add(url.course)
urls_per_course[url.course_id].append(url)

stop = Event() # TODO: Migrate this to somewhere where @onkill can use it
rate_limiter = RateLimiter.from_bandwidth(250) # TODO: Make this configurable

temp_files = []
download = [download_temporary_files(db, session, rate_limiter, urls_per_course[course.id], course, stop, priority=i, config=config) for i, course in enumerate(sort_courses(courses))]
for response in asyncio.as_completed(download):
temp_files.extend(await response)

# TODO: Measure if it would be worth moving this into the previous loop. A lot of CPU time is free before and now being used.
return create_media_containers_from_temp_files(db, temp_files)


async def download_temporary_files(db: DatabaseSession, session: AuthenticatedSession, rate_limiter: RateLimiter, urls: list[MediaURL], course: Course, stop: Event, priority: int, config: Config) -> list[TempFile]:
"""
This function downloads the media urls (belonging to a course) as temporary files.
Assumption is that urls are filtered by course.
"""

return []
await rate_limiter.register_course(course)
temp_files = []

for response in asyncio.as_completed([download_media_url_to_temp_file(db, session, rate_limiter, url, course, stop, priority, config) for url in urls]):
temp_files.append(await response)

rate_limiter.complete_course(course)
return [file for file in temp_files if isinstance(file, TempFile)]
Loading

0 comments on commit 64275df

Please sign in to comment.