-
Notifications
You must be signed in to change notification settings - Fork 2
/
utils.py
executable file
·151 lines (121 loc) · 4.67 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@author: YangyangLi
@contact:[email protected]
@version: 0.0.1
@license: MIT Licence
@file: utils.py
@time: 2022-12-15
"""
import re
from pathlib import Path
import asyncio
import aiofiles
import aiohttp
from lxml import etree
from typing import List
import yaml
import logging
LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
def format_title(title: str):
"""Format title to be used in the URL.
.. example::
>>> format_title('The Art of Computer Programming, Volume 1: Fundamental Algorithms (3rd Edition)')
'The Art of Computer Programming, Volume 1: Fundamental Algorithms'
"""
title = title.rsplit("(")[0].strip()
return title
def get_cover_images(items):
asyncio.run(_get_cover_images(items))
async def download(url, name, headers, session: aiohttp.ClientSession) -> None:
"""Download a file from `url` and save it locally under `local_filename`."""
try:
async with session.get(url, headers=headers) as resp:
if resp.status == 200:
async with aiofiles.open(
f"source/_static/covers/{name.replace(' ', '_')}.jpg", "wb"
) as f:
async for chunk in resp.content.iter_chunked(1024 * 1024):
await asyncio.sleep(0.001)
await f.write(chunk)
else:
raise RuntimeError(
f"Cannot download {url} with status code {resp.status}"
)
except Exception as e:
LOGGER.error(f"Cannot download {url}: {e}")
async def _get_cover_images(items):
timeout = aiohttp.client.ClientTimeout(2 * 60)
async with aiohttp.ClientSession(timeout=timeout) as session:
tasks = []
for item in items:
image_path = Path(
f"source/_static/covers/{item['name'].replace(' ', '_')}.jpg"
)
if not image_path.exists():
tasks.append(_get_cover_image_worker(item, session))
else:
LOGGER.info(f"Cover image for {item['name']} already exists")
await asyncio.gather(*tasks)
async def _get_cover_image_worker(item, session):
base_domain = "https://www.goodreads.com"
search_domain = base_domain + "/search/"
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.163 "
"Safari/537.36",
"Connection": "keep-alive",
}
title = item["name"]
first_num = 2
async with session.get(search_domain, params={"q": title}, headers=headers) as resp:
try:
resp.raise_for_status()
except Exception as e:
LOGGER.info(f"Failed to fetch {title} cover using default\n{e.args}")
else:
tree = etree.HTML(await resp.text())
cover_urls: List[str] = tree.xpath(
f"//table[@class='tableList']//tr[position()<{first_num}]//a[@class='bookTitle']/@href"
)
names: List[str] = tree.xpath(
f"//table[@class='tableList']//tr[position()<{first_num}]//td/a/@title"
)
assert len(cover_urls) == len(names)
if not cover_urls:
LOGGER.info(f"Failed to fetch {title} cover using default")
else:
# fetch the first one
cover = await _fetch_image(
session, base_domain + cover_urls[0], headers
)
await download(cover[0], title, headers, session)
LOGGER.info(f"Successfully fetched {title} cover {cover[0]}")
async def _fetch_image(session, url, header):
cover = []
async with session.get(url, headers=header) as resp:
try:
resp.raise_for_status()
except Exception as e:
LOGGER.info(f"Failed to fetch image cover from {url} \n{e.args}")
return cover
else:
t = await resp.text()
tree = etree.HTML(t)
cover.extend(tree.xpath("//img[@id='coverImage']/@src"))
if not cover:
# use Beautiful Soup
from bs4 import BeautifulSoup
soup = BeautifulSoup(t, "html.parser")
tag = soup.find("img", {"class": "ResponsiveImage"})
cover.append(tag.get("src"))
if not cover:
pat = re.compile(r"<img \n+ id=\"coverImage\" .+? src=\"(.+)\" />")
cover.extend(re.findall(pat, t))
return cover
def main():
books = yaml.safe_load((Path("./source") / "library.yml").read_text())
get_cover_images(books)
if __name__ == "__main__":
main()