-
Notifications
You must be signed in to change notification settings - Fork 0
/
program.py
executable file
·68 lines (55 loc) · 2 KB
/
program.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import asyncio
import subprocess
import requests
from time import sleep
from pyppeteer import launcher, launch, connect
async def close_chrome_instances():
try:
subprocess.run(['pkill', '-f', 'chromium-browser'], check=True)
except subprocess.CalledProcessError as e:
print(f"Error closing Chrome instances: {e}")
async def check_open_browser():
try:
response = requests.get('http://localhost:9222/json/version')
if response.status_code == 200:
ws_endpoint = response.json().get('webSocketDebuggerUrl')
return ws_endpoint
except requests.exceptions.RequestException:
return None
async def close_dialog(dialog):
await dialog.dismiss()
async def scraper():
ws_endpoint = await check_open_browser()
if ws_endpoint:
print("Closing existing browser...")
await close_chrome_instances()
#browser = await connect(browserWSEndpoint=ws_endpoint)
print("Launching new browser...")
launcherX = launcher.Launcher({
"headless": True,
"executablePath": '/usr/bin/chromium-browser',
"loop": asyncio.get_running_loop(),
"autoClose": False,
"args": ['--remote-debugging-port=9222', '--auto-accept-camera-and-microphone-capture'],
})
launcherX.port = '9222'
launcherX.url = f'http://127.0.0.1:{launcherX.port}'
browser = await launcherX.launch()
await asyncio.sleep(5)
pages = await browser.pages()
# Set content security policy to allow media access
# await pages[0].setExtraHTTPHeaders({
# 'Content-Security-Policy': "default-src 'self' 'unsafe-inline' blob: data:; script-src 'self' 'unsafe-inline' 'unsafe-eval' blob: data:;"
# })
print("Opening Page...")
try:
pages[0].on(
'dialog',
lambda dialog: asyncio.ensure_future(close_dialog(dialog))
)
await pages[0].goto('http://localhost:9000/', {'waitUntil' : 'domcontentloaded'})
while True: pass
except Exception as error:
print(error)
# await browser.close()
asyncio.run(scraper())