Skip to content

Commit

Permalink
A bit more rebust
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Nov 26, 2024
1 parent 5d48d85 commit f833eef
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 196 deletions.
173 changes: 0 additions & 173 deletions odds/backend/scanner/website/__init__.py
Original file line number Diff line number Diff line change
@@ -1,173 +0,0 @@
import asyncio
import json
from pathlib import Path
import httpx
# import bleach
from urllib.parse import urljoin
import re
import nh3
import bs4

CACHE_DIR = Path('.caches/web-scraper')
CACHE_DIR.mkdir(parents=True, exist_ok=True)

WS = re.compile(r'\s+', re.UNICODE | re.MULTILINE)
ALLOWED_TAGS = {'a', 'abbr', 'acronym', 'b', 'blockquote', 'code',
'em', 'i', 'li', 'ol', 'strong', 'ul', 'table', 'tr', 'td', 'th', 'tbody', 'thead', 'title'}
CLEAN_TAGS = {'script', 'style', 'meta', 'iframe'}
class AllowedAttributes():

def __init__(self, url) -> None:
self.links = set()
self.url = url

def __call__(self, tag, name, value):
if tag in ('a', 'abbr', 'acronym'):
if 'name' == 'title':
return value
if tag =='a' and name == 'href' and value:
link = urljoin(self.url, value)
self.links.add(link)
return link
return None

class Scraper:

headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:124.0) Gecko/20100101 Firefox/124.0'
}
WORKER_COUNT = 5

def __init__(self, base_urls) -> None:
self.base_urls = base_urls
self.q = asyncio.Queue()
self.out_q = asyncio.Queue()
self.outstanding = set()
self.all_urls = set()

def queue(self, url: str) -> None:
if url not in self.all_urls:
self.all_urls.add(url)
self.outstanding.add(url)
self.q.put_nowait(url)

def mark_done(self, url: str) -> None:
self.outstanding.remove(url)
if self.done():
for i in range(self.WORKER_COUNT):
self.q.put_nowait(None)
self.out_q.put_nowait(None)

def done(self) -> None:
return len(self.outstanding) == 0

async def scrape(self, url: str) -> list[str]:
links = None
content = None
key = url.replace('/', '_').replace(':', '_').replace('.', '_')
cache_file = CACHE_DIR / f'{key}.json'
if cache_file.exists():
with open(cache_file) as file:
data = json.load(file)
content = data.get('content')
content_type = data.get('content_type')
final_url = data.get('final_url')

if content is None:
async with httpx.AsyncClient(headers=self.headers, timeout=30) as client:
await asyncio.sleep(self.WORKER_COUNT / 4)
r = await client.get(url, follow_redirects=True)
r.raise_for_status()
# check content type to ensure it's html:
content_type = r.headers.get('content-type', '').lower()
if content_type.startswith('text/html'):
content = r.text
final_url = str(r.url)
with open(cache_file, 'w') as file:
json.dump({
'content': content,
'content_type': content_type,
'final_url': final_url
}, file)

if not content_type.startswith('text/html'):
links = []
if links is None:
if final_url != url:
if not any(final_url.startswith(base_url) for base_url in self.base_urls):
links = []
else:
links = [final_url]
# print(f'{url}: GOT STATUS', r.status_code)
# use bs4 to get canonical link:
if links is None:
soup = bs4.BeautifulSoup(content, 'html.parser')
canonical = soup.find('link', rel='canonical')
if canonical:
canonical = canonical.get('href')
if not url.startswith(canonical):
links = [canonical]
if links is None:
allowed_attributes = AllowedAttributes(final_url)
content = nh3.clean(
content,
tags=ALLOWED_TAGS,
clean_content_tags=CLEAN_TAGS,
attribute_filter=allowed_attributes,
link_rel='',
url_schemes={'http', 'https'},
)
content = WS.sub(' ', content)
processed = True
# print(f'{url}: CLEANED CONTENT', content)
# print(f'{url}: LINKS', allowed_attributes.links)
self.out_q.put_nowait(dict(
url=url,
content=content
))
links = allowed_attributes.links

_links = []
for link in links:
if any(link.startswith(base_url) for base_url in self.base_urls):
link = link.split('#')[0]
link = link.strip()
_links.append(link)
return _links

async def worker(self, i: int) -> None:
while not self.done():
url = await self.q.get()
if url is None:
break
# print(f'{url}: worker {i} starting (Q LEN {self.q.qsize()})')
try:
new_urls = await self.scrape(url)
except Exception as e:
print(f'{url} worker {i} error scraping: {e!r}')
new_urls = []
for new_url in new_urls:
self.queue(new_url)
self.mark_done(url)

async def __call__(self):
for base_url in self.base_urls:
self.queue(base_url)
async with asyncio.TaskGroup() as tg:
for i in range(self.WORKER_COUNT):
tg.create_task(self.worker(i))
while True:
item = await self.out_q.get()
if item is None:
break
yield item

async def main(bases):
scraper = Scraper(bases)
count = 0
async for item in scraper():
count += 1
print(f'GOT ITEM {count}: {item['url']}, {len(item['content'])} chars, #Q {scraper.q.qsize()}')

if __name__ == '__main__':
asyncio.run(main(['https://www.camden.gov.uk/', 'https://democracy.camden.gov.uk/']))
25 changes: 15 additions & 10 deletions odds/backend/scanner/website/website_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class Scraper:
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36'
}
WORKER_COUNT = 5
PERIOD = 0.25
CACHE = CACHE_DIR / 'web-scraper'
WS = re.compile(r'\s+', re.UNICODE | re.MULTILINE)
ALLOWED_TAGS = {'a', 'abbr', 'acronym', 'b', 'blockquote', 'code',
Expand All @@ -57,13 +58,15 @@ def __init__(self, base_urls) -> None:
self.all_hashes = set()
self.CACHE.mkdir(parents=True, exist_ok=True)

def queue(self, url: str) -> None:
async def queue(self, url: str) -> None:
if url not in self.all_urls:
self.all_urls.add(url)
# print('OS +', url, len(self.outstanding))
self.outstanding.add(url)
self.q.put_nowait(url)

def mark_done(self, url: str) -> None:
# print(f'OS - ({len(self.outstanding)}): {url}')
self.outstanding.remove(url)
if self.done():
for i in range(self.WORKER_COUNT):
Expand All @@ -87,10 +90,11 @@ async def scrape(self, url: str) -> list[str]:
content = data.get('content')
content_type = data.get('content_type')
final_url = data.get('final_url')
print(f'GOT FROM CACHE: {url} -> {final_url}')

if content is None:
async with httpx.AsyncClient() as client:
await asyncio.sleep(self.WORKER_COUNT / 4)
await asyncio.sleep(self.PERIOD * self.WORKER_COUNT)
r = await client.get(url, follow_redirects=True, headers=self.headers, timeout=30)
r.raise_for_status()
# check content type to ensure it's html:
Expand All @@ -105,7 +109,7 @@ async def scrape(self, url: str) -> list[str]:
'final_url': final_url
}, file)

if not content_type.startswith('text/html'):
if not content or not content_type.startswith('text/html'):
links = []
if links is None:
if final_url != url:
Expand All @@ -115,11 +119,12 @@ async def scrape(self, url: str) -> list[str]:
links = [final_url]
# print(f'{url}: GOT STATUS', r.status_code)
# use bs4 to get canonical link:
content_hash = sha256(content.encode()).hexdigest()
if content_hash in self.all_hashes:
links = []
else:
self.all_hashes.add(content_hash)
if links is None:
content_hash = sha256(content.encode()).hexdigest()
if content_hash in self.all_hashes:
links = []
else:
self.all_hashes.add(content_hash)
if links is None:
soup = bs4.BeautifulSoup(content, 'html.parser')
canonical = soup.find('link', rel='canonical')
Expand Down Expand Up @@ -172,12 +177,12 @@ async def worker(self, i: int) -> None:
print(f'{url} worker {i} error scraping: {e!r}')
new_urls = []
for new_url in new_urls:
self.queue(new_url)
await self.queue(new_url)
self.mark_done(url)

async def __call__(self):
for base_url in self.base_urls:
self.queue(base_url)
await self.queue(base_url)
async with asyncio.TaskGroup() as tg:
for i in range(self.WORKER_COUNT):
tg.create_task(self.worker(i))
Expand Down
21 changes: 11 additions & 10 deletions odds/common/embedder/openai/openai_embedder.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,17 @@ async def embed(self, text: str) -> Optional[Embedding]:
headers=headers,
timeout=60,
)
response.raise_for_status()
result = response.json()
if result['usage']:
self.cost.start_transaction()
self.cost.update_cost('embed', 'tokens', result['usage']['total_tokens'])
self.cost.end_transaction()
if result.get('data') and result['data'][0].get('object') == 'embedding' and result['data'][0]['embedding']:
vector: list[float] = result['data'][0]['embedding']
embedding: Embedding = np.array(vector, dtype=np.float32)
return embedding
if response:
response.raise_for_status()
result = response.json()
if result['usage']:
# self.cost.start_transaction()
self.cost.update_cost('embed', 'tokens', result['usage']['total_tokens'])
# self.cost.end_transaction()
if result.get('data') and result['data'][0].get('object') == 'embedding' and result['data'][0]['embedding']:
vector: list[float] = result['data'][0]['embedding']
embedding: Embedding = np.array(vector, dtype=np.float32)
return embedding
return None

def print_total_usage(self):
Expand Down
4 changes: 2 additions & 2 deletions odds/common/llm/mistral/mistral_llm_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ async def internal_fetch_data(self, request: dict, query: LLMQuery) -> Any:
if response is not None:
result = response.json()
if result['usage']:
self.cost_collector.start_transaction()
# self.cost_collector.start_transaction()
self.cost_collector.update_cost(query.model(), 'prompt', result['usage']['prompt_tokens'])
self.cost_collector.update_cost(query.model(), 'completion', result['usage']['completion_tokens'])
self.cost_collector.end_transaction()
# self.cost_collector.end_transaction()
if result.get('choices') and result['choices'][0].get('message') and result['choices'][0]['message'].get('content'):
content: str = result['choices'][0]['message']['content']
self.cache.set_cache(request, content)
Expand Down
2 changes: 1 addition & 1 deletion odds/common/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async def __call__(self, client, method, *args, **kwargs) -> Response:
return response
except Exception as e:
if response:
print('RETRYING', repr(e), args[0], response.status_code, response.text)
print('RETRYING', repr(e), args[0], response.status_code, response.text[:200])
else:
print('RETRYING', repr(e), args[0])
if i == self.retries - 1:
Expand Down

0 comments on commit f833eef

Please sign in to comment.