Skip to content

Commit

Permalink
New model H3640
Browse files Browse the repository at this point in the history
  • Loading branch information
juacas committed Jun 18, 2024
2 parents b8f5f9c + 28d8521 commit e01b7a5
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 73 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
![Validate with hassfest](https://github.com/juacas/zte_tracker/workflows/Validate%20with%20hassfest/badge.svg?branch=master)

![GitHub contributors](https://img.shields.io/github/contributors/juacas/zte_tracker)
![Maintenance](https://img.shields.io/maintenance/yes/2023)
![Maintenance](https://img.shields.io/maintenance/yes/2025)
![GitHub commit activity](https://img.shields.io/github/commit-activity/y/juacas/zte_tracker)
![GitHub commits since tagged version](https://img.shields.io/github/commits-since/juacas/zte_tracker/v1.0.0)
![GitHub last commit](https://img.shields.io/github/last-commit/juacas/zte_tracker)
Expand All @@ -32,11 +32,11 @@ Component to integrate some ZTE routers as a device trackers in home assistant.
| ZTE H288A | H288A |
| ZTE H388X | H388X |
| ZTE H3600P | H3600P |
| ZTE H6645P V2 | H6645P |
| ZTE H3640 V10 | H3640 |
| ZTE H6645P V2 | H6645P |


This integration could work with more routers. Try one of the above and see if it work with yours.
This integration could work with more routers. Try one of the above and see if it works with yours.

## Installation

Expand Down
4 changes: 2 additions & 2 deletions custom_components/zte_tracker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

_LOGGER = logging.getLogger(__name__)

def setup(hass, config):
def setup(hass: HomeAssistant, config: ConfigEntry):
"""Set up is called when Home Assistant is loading our component."""
plattform_conf = config.get(DOMAIN)
_LOGGER.debug("Client initialized for ZTE {0} @{1}".format(plattform_conf[CONF_MODEL]
Expand Down Expand Up @@ -65,7 +65,7 @@ def handle_pause(call):

# Load platforms
for platform in PLATFORMS:
hass.async_create_task(
hass.create_task(
discovery.async_load_platform(
hass, platform, DOMAIN, plattform_conf, config
)
Expand Down
2 changes: 1 addition & 1 deletion custom_components/zte_tracker/device_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ def _get_data(self)->list:
self.scanning = False
return []
elif not self.router_client.login():
self.statusmsg = self.router_client.statusmsg
self.scanning = False
_LOGGER.warning("Login failed: {0}@{1}".format(self.router_client.username, self.router_client.host))
self.router_client.logout()
Expand All @@ -120,6 +119,7 @@ def _get_data(self)->list:
finally:
self.router_client.logout()

self.statusmsg = self.router_client.statusmsg
self.scanning = True

# Create a list of Device tuples.
Expand Down
183 changes: 116 additions & 67 deletions custom_components/zte_tracker/zteclient/zte_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,45 @@

_LOGGER = logging.getLogger(__name__)
_MODELS = {
'F6640': {
'wlan_script': 'wlan_client_stat_lua.lua',
'wlan_id_element': 'OBJ_WLAN_AD_ID',
'lan_script': 'accessdev_landevs_lua.lua',
'lan_id_element': 'OBJ_ACCESSDEV_ID',
"F6640": {
"wlan_script": "wlan_client_stat_lua.lua",
"wlan_id_element": "OBJ_WLAN_AD_ID",
"lan_script": "accessdev_landevs_lua.lua",
"lan_id_element": "OBJ_ACCESSDEV_ID",
},
"H288A": {
"wlan_script": "accessdev_ssiddev_lua.lua",
"wlan_id_element": "OBJ_ACCESSDEV_ID",
"lan_script": "accessdev_landevs_lua.lua",
"lan_id_element": "OBJ_ACCESSDEV_ID",
},
'H288A': {
'wlan_script': 'accessdev_ssiddev_lua.lua',
'wlan_id_element': 'OBJ_ACCESSDEV_ID',
'lan_script': 'accessdev_landevs_lua.lua',
'lan_id_element': 'OBJ_ACCESSDEV_ID'}
}
# Synonym H169A is like H288A
_MODELS['H169A'] = _MODELS['H288A']
_MODELS["H169A"] = _MODELS["H288A"]
# Synonym H388X is like H288A
_MODELS['H388X'] = _MODELS['H288A']
_MODELS["H388X"] = _MODELS["H288A"]
# Synonym H2640 is like H388X
_MODELS['H2640'] = _MODELS['H288A']
_MODELS["H2640"] = _MODELS["H288A"]
# Synonym F6645P is like F6640
_MODELS['F6645P'] = _MODELS['F6640']
_MODELS["F6645P"] = _MODELS["F6640"]
# Synonym H3600P is like H288A
_MODELS['H3600P'] = _MODELS['H288A']
_MODELS["H3600P"] = _MODELS["H288A"]
# Synonym H6645P is like H288A
_MODELS['H6645P'] = _MODELS['H288A']
_MODELS["H6645P"] = _MODELS["H288A"]
# Synonym H3640 is like H288A
_MODELS['H3640'] = _MODELS['H288A']
_MODELS["H3640"] = _MODELS["H288A"]
class zteClient:
def __init__(self, host, username, password,model):
def __init__(self, host, username, password, model):
"""Initialize the client."""
self.statusmsg = None
self.host = host
self.username = username
self.password = password
self.session = None
self.login_data = None
self.status = 'on'
self.status = "on"
self.device_info = None
self.guid = int(time.time()*1000)
self.guid = int(time.time() * 1000)
self.model = model
self.paths = _MODELS[model]

Expand All @@ -63,7 +64,7 @@ def reboot(self) -> bool:
try:
raise Exception("Not implemented")
except Exception as e:
_LOGGER.error('Failed to reboot: {0}'.format(e))
_LOGGER.error("Failed to reboot: {0}".format(e))
return False
finally:
self.logout()
Expand All @@ -76,57 +77,82 @@ def login(self) -> bool:
"""
try:
self.session = Session()
self.session.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36'})
self.session.headers.update({'DNT': '1'})
self.session.cookies.set('_TESTCOOKIESUPPORT', '1')
self.session.headers.update(
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36"
}
)
self.session.headers.update({"DNT": "1"})
self.session.cookies.set("_TESTCOOKIESUPPORT", "1")

# Step1: Get session token.
session_token = self.get_session_token()

# Step2: query for login token.
r = self.session.get('http://{0}/?_type=loginData&_tag=login_token&_={1}'.format(self.host, self.get_guid() ), verify=False)
r = self.session.get(
"http://{0}/?_type=loginData&_tag=login_token&_={1}".format(
self.host, self.get_guid()
),
verify=False,
)
self.log_request(r)
# parse XML response.
xml_response = ET.fromstring(r.content)
assert xml_response.tag == 'ajax_response_xml_root', 'Unexpected response ' + xml_response.text
assert xml_response.tag == "ajax_response_xml_root", (
"Unexpected response " + xml_response.text
)
login_token = xml_response.text
assert login_token, 'Empty login_token'
assert login_token, "Empty login_token"

# Step3: Login entry
pass_hash = self.password + login_token
password_param = hashlib.sha256(pass_hash.encode()).hexdigest()

r = self.session.post("http://{0}/?_type=loginData&_tag=login_entry".format(self.host),verify=False,
data= { "action": "login", "Password": password_param, "Username": self.username, "_sessionTOKEN": session_token })
r = self.session.post(
"http://{0}/?_type=loginData&_tag=login_entry".format(self.host),
verify=False,
data={
"action": "login",
"Password": password_param,
"Username": self.username,
"_sessionTOKEN": session_token,
},
)
self.log_request(r)
self.login_data = r.json()
# if login need refresh make a new request.
if self.login_data['login_need_refresh'] == 1:
if self.login_data["login_need_refresh"] == 1:
_LOGGER.debug("REFRESH")
# r = self.session.get('http://{0}/'.format(self.host), verify=False)
# self.log_request(r)
self.statusmsg = None

return True
except Exception as e:
self.statusmsg = 'Failed login: {0}'.format(e)
self.statusmsg = "Failed login: {0}".format(e)
_LOGGER.error(self.statusmsg)
self.login_data = None
self.session.close()
return False

def get_guid(self):
guid = self.guid
self.guid +=1
self.guid += 1
return guid

def get_session_token(self):
r = self.session.get('http://{0}/?_type=loginData&_tag=login_entry'.format(self.host),verify=False)
r = self.session.get(
"http://{0}/?_type=loginData&_tag=login_entry".format(self.host),
verify=False,
timeout=1, # timeout 1 second
)
self.log_request(r)
self.status = 'on'
self.status = "on"
device_info = r.json()
assert device_info['lockingTime'] == 0 and device_info['sess_token'], 'Empty sess_token. Device locked?'
session_token = device_info['sess_token']
assert (
device_info["lockingTime"] == 0 and device_info["sess_token"]
), "Empty sess_token. Device locked?"
session_token = device_info["sess_token"]
return session_token

## LOGOUT ##
Expand All @@ -135,14 +161,17 @@ def logout(self):
if self.login_data is None:
return False

r = self.session.post('http://{0}?_type=loginData&_tag=logout_entry'.format(self.host),
data={'IF_LogOff':'1'}, verify=False)
r = self.session.post(
"http://{0}?_type=loginData&_tag=logout_entry".format(self.host),
data={"IF_LogOff": "1"},
verify=False,
)
self.log_request(r)

assert r.ok, r
_LOGGER.debug("Logged out")
except Exception as e:
_LOGGER.error('Failed to logout: {0}'.format(e))
_LOGGER.error("Failed to logout: {0}".format(e))
finally:
self.session.close()
self.login_data = None
Expand All @@ -163,35 +192,48 @@ def get_lan_devices(self):
"""
# GET DEVICES RESPONSE from http://10.0.0.1/?_type=menuData&_tag=accessdev_homepage_lua.lua&InstNum=5&_=1663922344910
try:
r= self.session.get('http://{0}/?_type=menuView&_tag=localNetStatus&_={1}'.format(self.host, self.get_guid()),verify=False)
lan_request = 'http://{0}/?_type=menuData&_tag={1}&_{2}'.format(self.host, self.paths['lan_script'], self.get_guid());
r= self.session.get(lan_request, verify=False)
r = self.session.get(
"http://{0}/?_type=menuView&_tag=localNetStatus&_={1}".format(
self.host, self.get_guid()
),
verify=False,
)
lan_request = "http://{0}/?_type=menuData&_tag={1}&_{2}".format(
self.host, self.paths["lan_script"], self.get_guid()
)
r = self.session.get(lan_request, verify=False)
self.log_request(r)
devices = self.parse_devices(r.text, self.paths['lan_id_element'], 'LAN')
self.statusmsg = 'OK'
devices = self.parse_devices(r.text, self.paths["lan_id_element"], "LAN")
self.statusmsg = "OK"
return devices
except Exception as e:
self.statusmsg = 'Failed to get LAN devices: {0}'.format(e)
self.statusmsg = "Failed to get LAN devices: {0}".format(e)
_LOGGER.error(self.statusmsg)
return []


def get_wifi_devices(self):
"""
Get the list of devices connected to the wifi
:return: list of devices
"""
# GET DEVICES RESPONSE
try:
r= self.session.get('http://{0}/?_type=menuView&_tag=localNetStatus&_={1}'.format(self.host, self.get_guid()),verify=False)
wlan_request = 'http://{0}/?_type=menuData&_tag={1}&_={2}'.format(self.host, self.paths['wlan_script'],self.get_guid())
r= self.session.get(wlan_request,verify=False)
r = self.session.get(
"http://{0}/?_type=menuView&_tag=localNetStatus&_={1}".format(
self.host, self.get_guid()
),
verify=False,
)
wlan_request = "http://{0}/?_type=menuData&_tag={1}&_={2}".format(
self.host, self.paths["wlan_script"], self.get_guid()
)
r = self.session.get(wlan_request, verify=False)
self.log_request(r)
devices = self.parse_devices(r.text, self.paths['wlan_id_element'], 'WLAN')
devices = self.parse_devices(r.text, self.paths["wlan_id_element"], "WLAN")

self.statusmsg = 'OK'
self.statusmsg = "OK"
except Exception as e:
self.statusmsg = 'Failed to get Devices: {0} rdev {2}'.format(e, r.content)
self.statusmsg = "Failed to get Devices: {0} rdev {2}".format(e, r.content)
_LOGGER.error(self.statusmsg)
return []

Expand All @@ -203,23 +245,30 @@ def log_request(self, r):
_LOGGER.debug(r.text[0:200])

# Parse xml response to get devices
def parse_devices(self, xml_response, node_name='OBJ_WLAN_AD_ID', network_type='WLAN'):
def parse_devices(
self, xml_response, node_name="OBJ_WLAN_AD_ID", network_type="WLAN"
):
"""Parse the xml response and return a list of devices."""
devices = []
xml = ET.fromstring(xml_response)
assert xml.tag == 'ajax_response_xml_root', 'Unexpected response ' + xml_response

for device in xml.findall(f'{node_name}/Instance'):
device_info = {'Active': True, "IconType": None, "NetworkType": network_type }
for i in range(0, int(len(device)/2)):
paramname = device[i*2].text
paramvalue = device[i*2+1].text
if paramname == 'MACAddress':
device_info['MACAddress'] = paramvalue
elif paramname == 'IPAddress':
device_info['IPAddress'] = paramvalue
elif paramname == 'HostName':
device_info['HostName'] = paramvalue
assert xml.tag == "ajax_response_xml_root", (
"Unexpected response " + xml_response
)

for device in xml.findall(f"{node_name}/Instance"):
device_info = {
"Active": True,
"IconType": None,
"NetworkType": network_type,
}
for i in range(0, int(len(device) / 2)):
paramname = device[i * 2].text
paramvalue = device[i * 2 + 1].text
if paramname == "MACAddress":
device_info["MACAddress"] = paramvalue
elif paramname == "IPAddress":
device_info["IPAddress"] = paramvalue
elif paramname == "HostName":
device_info["HostName"] = paramvalue
devices.append(device_info)
return devices

0 comments on commit e01b7a5

Please sign in to comment.