Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simplity network requests #31

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 42 additions & 59 deletions pyairctrl/airctrl.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import pad, unpad
import urllib.request
import requests
import base64
import binascii
import argparse
Expand Down Expand Up @@ -67,8 +67,9 @@ def ssdp(timeout=1, repeats=3, debug=False):
if len(urls): break
resp = []
for ip in urls.keys():
with urllib.request.urlopen(urls[ip]) as response:
xml = ET.fromstring(response.read())
response = requests.get(urls[ip])
if response.status_code == requests.codes.ok:
xml = ET.fromstring(response.text)
resp.append({'ip': ip})
ns = {'urn': 'urn:schemas-upnp-org:device-1-0'}
for d in xml.findall('urn:device', ns):
Expand All @@ -84,15 +85,11 @@ def __init__(self, host):

def _get_key(self):
print('Exchanging secret key with the device ...')
url = 'http://{}/di/v1/products/0/security'.format(self._host)
a = random.getrandbits(256)
A = pow(G, a, P)
data = json.dumps({'diffie': format(A, 'x')})
data_enc = data.encode('ascii')
req = urllib.request.Request(url=url, data=data_enc, method='PUT')
with urllib.request.urlopen(req) as response:
resp = response.read().decode('ascii')
dh = json.loads(resp)
dh = self._put('0/security', data_enc)
key = dh['key']
B = int(dh['hellman'], 16)
s = pow(B, a, P)
Expand Down Expand Up @@ -128,22 +125,11 @@ def load_key(self):
self._get_key()

def _check_key(self):
url = 'http://{}/di/v1/products/1/air'.format(self._host)
self._get(url)
self._get('1/air')

def set_values(self, values, debug=False):
body = encrypt(values, self._session_key)
url = 'http://{}/di/v1/products/1/air'.format(self._host)
req = urllib.request.Request(url=url, data=body, method='PUT')
try:
with urllib.request.urlopen(req) as response:
resp = response.read()
resp = decrypt(resp.decode('ascii'), self._session_key)
status = json.loads(resp)
self._dump_status(status, debug=debug)
except urllib.error.HTTPError as e:
print("Error setting values (response code: {})".format(e.code))

status = self._put('1/air', values, encrypted=True)
self._dump_status(status, debug=debug)

def set_wifi(self, ssid, pwd):
values = {}
Expand All @@ -152,29 +138,37 @@ def set_wifi(self, ssid, pwd):
if pwd:
values['password'] = pwd
pprint.pprint(values)
body = encrypt(values, self._session_key)
url = 'http://{}/di/v1/products/0/wifi'.format(self._host)
req = urllib.request.Request(url=url, data=body, method='PUT')
with urllib.request.urlopen(req) as response:
resp = response.read()
resp = decrypt(resp.decode('ascii'), self._session_key)
wifi = json.loads(resp)
pprint.pprint(wifi)
wifi = self._put('0/wifi', values, encrypted=True)
pprint.pprint(wifi)

def _get_once(self, url):
with urllib.request.urlopen(url) as response:
resp = response.read()
resp = decrypt(resp.decode('ascii'), self._session_key)
return json.loads(resp)
def _get_once(self, endpoint):
url = 'http://{}/di/v1/products/{}'.format(self._host, endpoint)
response = requests.get(url)
response.raise_for_status()
resp = decrypt(response.text, self._session_key)
return json.loads(resp)

def _get(self, url):
def _get(self, endpoint):
try:
return self._get_once(url)
return self._get_once(endpoint)
except Exception as e:
print("GET error: {}".format(str(e)))
print("Will retry after getting a new key ...")
self._get_key()
return self._get_once(url)
print('Cannot read from device: {}: {}'.format(type(e).__name__, e))
print('Will retry after getting a new key ...')
self._get_key()
return self._get_once(endpoint)

def _put(self, endpoint, body, encrypted=False):
if encrypted:
body = encrypt(body, self._session_key)
url = 'http://{}/di/v1/products/{}'.format(self._host, endpoint)
response = requests.put(url, body)
response.raise_for_status()
if response.status_code == 255 and len(response.text) == 1:
raise ValueError('An option not supported by device')
resp = response.text
if encrypted:
resp = decrypt(resp, self._session_key)
return json.loads(resp)

def _dump_status(self, status, debug=False):
if debug:
Expand Down Expand Up @@ -251,40 +245,29 @@ def _dump_status(self, status, debug=False):
print('Error: {}'.format(err))

def get_status(self, debug=False):
url = 'http://{}/di/v1/products/1/air'.format(self._host)
status = self._get(url)
status = self._get('1/air')
self._dump_status(status, debug=debug)

def get_wifi(self):
url = 'http://{}/di/v1/products/0/wifi'.format(self._host)
wifi = self._get(url)
wifi = self._get('0/wifi')
pprint.pprint(wifi)

def get_firmware(self):
url = 'http://{}/di/v1/products/0/firmware'.format(self._host)
firmware = self._get(url)
firmware = self._get('0/firmware')
pprint.pprint(firmware)

def get_filters(self):
url = 'http://{}/di/v1/products/1/fltsts'.format(self._host)
filters = self._get(url)
filters = self._get('1/fltsts')
print('Pre-filter and Wick: clean in {} hours'.format(filters['fltsts0']))
if 'wicksts' in filters:
print('Wick filter: replace in {} hours'.format(filters['wicksts']))
print('Active carbon filter: replace in {} hours'.format(filters['fltsts2']))
print('HEPA filter: replace in {} hours'.format(filters['fltsts1']))

def pair(self, client_id, client_secret):
values = {}
values['Pair'] = ['FI-AIR-AND', client_id, client_secret]
body = encrypt(values, self._session_key)
url = 'http://{}/di/v1/products/0/pairing'.format(self._host)
req = urllib.request.Request(url=url, data=body, method='PUT')
with urllib.request.urlopen(req) as response:
resp = response.read()
resp = decrypt(resp.decode('ascii'), self._session_key)
resp = json.loads(resp)
pprint.pprint(resp)
values = {'Pair': ['FI-AIR-AND', client_id, client_secret] }
resp = self._put('0/pairing', values, encrypted=True)
pprint.pprint(resp)


def main():
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pycryptodomex>=3.4.7
requests
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
long_description_content_type="text/markdown",
url="https://github.com/rgerganov/py-air-control",
packages=['pyairctrl'],
install_requires=['pycryptodomex>=3.4.7'],
install_requires=['pycryptodomex>=3.4.7', 'requests'],
entry_points={
'console_scripts': [
'airctrl=pyairctrl.airctrl:main',
Expand Down