From 08ca9321deb14ee24565c5e76d8d6efb7c94b82b Mon Sep 17 00:00:00 2001 From: ehendrix23 Date: Fri, 19 Jun 2020 13:15:09 -0600 Subject: [PATCH] Release 0.2.5 (#18) * Fix connection for HUB version 4.15.250 Fix for HUB version 4.15.250; aioharmony will not work with lower versions. * Use XMPP instead of web sockets Use XMPP instead of web sockets * Workaround for xmpp issue in Home Assistant * Removed wait, does not work * Add identifier to logger entries in responsehandler Added identifier for which HUB any log messages are produced from responsehandler, making it easier to determine the HUB it is for. * Add slixmpp in setup.py required list Added slixmpp in setup.py for required package. * Update version as beta right now * Added release notes Added release notes * Potential fix for Host unreachable issues * Update __version__.py * Update to wait times on reconnect * Merge for web socket reconnect fix Merge for potential web socket reconnect fix * Flake8 * Beta version update * Further websocket fixes * Changed timeout Changed timeout from 30 seconds to 5 seconds Sleep for 1 second before trying reconnects. * Fixed debug logging issue on reconnects * Update to beta version 8 after merge * Fixes after merge * Enable cleanup after closing websocket * Set version to 0.2.0 * Add instructions for enabling XMPP * Fix for sending command over XMPP Sending a command to a device over XMPP was not working anymore. Fixed. * Merge fix from 0.1.13 Fix for sockets not being closed on some OS's. * Fix listen parameter Fix using --listen parameter * Add closing code to debug for web socket * Update slixmpp to 1.5.2 * Addd handlers for starting, stopping, and in progress Add handlers for starting, stopping, and in progress of activity switches * Improvement to cancel tasks * Protocol as choice Protocol can now be provided for connecting to the HUB. If not provided default behavior occurs. * Check XMPP is available Will check if XMPP is available even when provided as protocol. If it is not then default back to WEBSOCKETS. * logmodules and stop handler fix Added option logmodules to aioharmony. Fix upon disconnect timeout so stop handlers is still called. * Add some timestamps and small fix Add timestamps for printing activities and responses. Fix for when not providing parameter logmodules. * Fixes for XMPP reconnection Some fixes to reset super XMPP class upon reconnect. * Added 2 debug lines Co-authored-by: Erik Hendrix --- README.rst | 16 ++ aioharmony/__main__.py | 289 ++++++++++++++++++++++---------- aioharmony/__version__.py | 2 +- aioharmony/const.py | 13 ++ aioharmony/handler.py | 36 ++++ aioharmony/harmonyapi.py | 9 +- aioharmony/harmonyclient.py | 199 ++++++++++++++++------ aioharmony/hubconnector_xmpp.py | 90 +++++----- 8 files changed, 473 insertions(+), 181 deletions(-) diff --git a/README.rst b/README.rst index f8cd735..cc13402 100755 --- a/README.rst +++ b/README.rst @@ -142,6 +142,22 @@ Release Notes - Remote ID was not retrieved anymore, this is now available again - If HUB disconnects when retrieving information then retrieval will be retried. - Executing aioharmony with option show_detailed_config will now show all config retrieved from HUB +0.2.5 + - Fixed: When using XMPP protocol the switching of an activity was not always discovered. + - Fixed: Call to stop handlers will now be called when timeout occurs on disconnect + - Changed: ClientCallbackType is now in aioharmony.const instead of aioharmony.harmonyclient. + - Changed: default log level for aioharmony main is now ERROR + - New: callback option new_activity_starting to allow a callback when a new activity is being started (new_activity is called when switching activity is completed) + - New: 3 new HANDLER types have been added: + - HANDLER_START_ACTIVITY_NOTIFY_STARTED: activity is being started + - HANDLER_STOP_ACTIVITY_NOTIFY_STARTED: power off is started + - HANDLER_START_ACTIVITY_NOTIRY_INPROGRESS: activity switch is in progress + - New: Protocol to use can be specified (WEBSOCKETS or XMPP) to force specific protocol to be used. If not provided XMPP will be used unless not available then WEBSOCKETS will be used. + - New: protocol used to connect can now be retrieved. It will return WEBSOCKETS when connected over web sockets or XMPP. + - New: One can now supply multiple IP addresses for Harmony HUBs when using aioharmony main. + - New: option activity_monitor for aioharmony main to allow just monitoring of activity changes + - New: option logmodules for aioharmony main to specify the modules to put logging information for + TODO diff --git a/aioharmony/__main__.py b/aioharmony/__main__.py index 58a8562..bf926a9 100755 --- a/aioharmony/__main__.py +++ b/aioharmony/__main__.py @@ -7,52 +7,112 @@ import asyncio import json import logging +import re import sys +from datetime import datetime from typing import Optional import aioharmony.exceptions from aioharmony.harmonyapi import HarmonyAPI, SendCommandDevice from aioharmony.responsehandler import Handler +from aioharmony.const import ClientCallbackType, WEBSOCKETS, XMPP # TODO: Add docstyle comments # TODO: Clean up code styling hub_client = None +_ROOTLOGGER = logging.getLogger() +_LOGGER = logging.getLogger(__name__) -async def get_client(ip_address, show_responses) -> Optional[HarmonyAPI]: - client = HarmonyAPI(ip_address) +class LoggingFilter(logging.Filter): + def __init__(self, modules): + self._modules = modules + + def filter(self, record): + for module in self._modules: + if record.name == module or re.search(module, record.name) is not None: + return True + return False + +async def get_client(ip_address, protocol, show_responses) -> Optional[HarmonyAPI]: + client = HarmonyAPI(ip_address=ip_address, protocol=protocol) def output_response(message): - print(message) + print(f"{client.name}: {message}") - listen_callback = Handler(handler_obj=output_response, - handler_name='output_response', - once=False - ) if show_responses: + listen_callback = Handler(handler_obj=output_response, + handler_name='output_response', + once=False + ) client.register_handler(handler=listen_callback) - if await client.connect(): - print("Connected to HUB {} with firmware version {} and ID {}".format( - client.name, - client.fw_version, - client.hub_id)) - return client - - print("An issue occured trying to connect") + print(f"Trying to connect to HUB with IP {ip_address}.") + try: + if await client.connect(): + print("Connected to HUB {} ({}) with firmware version {} and HUB ID {} using protocol {}".format( + client.name, + ip_address, + client.fw_version, + client.hub_id, + client.protocol)) + return client + except ConnectionRefusedError: + print(f"Failed to connect to HUB {ip_address}.") + + print("An issue occurred trying to connect") return None -async def just_listen(client, _): +async def just_listen(client, args): # Create handler to output everything. + def output_response(message): + print(f"{datetime.today().strftime('%Y-%m-%d %H:%M:%S')} {client.name}: {message}") + print("Starting to listen on HUB {} with firmware version {}".format( client.name, client.fw_version)) - while True: - await asyncio.sleep(60) + # Register callback to show messages if not already done. + if not args.show_responses: + listen_callback = Handler(handler_obj=output_response, + handler_name='output_response', + once=False + ) + client.register_handler(handler=listen_callback) + + return + +async def listen_for_new_activities(client, _): + + def new_activity_starting(activity_info: tuple): + activity_id, activity_name = activity_info + if activity_id == -1: + print(f"{datetime.today().strftime('%Y-%m-%d %H:%M:%S')} {client.name}: Powering off is starting.") + else: + print(f"{datetime.today().strftime('%Y-%m-%d %H:%M:%S')} {client.name}: New activity ID {activity_id} with name {activity_name} is starting.") + + def new_activity_started(activity_info: tuple): + activity_id, activity_name = activity_info + if activity_id == -1: + print(f"{datetime.today().strftime('%Y-%m-%d %H:%M:%S')} {client.name}: Powering off completed.") + else: + print(f"{datetime.today().strftime('%Y-%m-%d %H:%M:%S')} {client.name}: New activity ID {activity_id} with name {activity_name} has started.") + + activity_id, activity_name = client.current_activity + print(f"{datetime.today().strftime('%Y-%m-%d %H:%M:%S')} {client.name}: Current activity ID {activity_id} with name {activity_name}") + + callbacks = { + "config_updated": client.callbacks.config_updated, + "connect": client.callbacks.connect, + "disconnect": client.callbacks.disconnect, + "new_activity_starting": new_activity_starting, + "new_activity": new_activity_started, + } + client.callbacks = ClientCallbackType(**callbacks) + # Functions for use on command line async def show_config(client, _): @@ -61,9 +121,10 @@ async def show_config(client, _): config = client.config if config: - print(json.dumps(client.json_config, sort_keys=True, indent=4)) + print(f"HUB: {client.name}") + print(f"\t {json.dumps(client.json_config, sort_keys=True, indent=4)}") else: - print("There was a problem retrieving the configuration") + print(f"HUB: {client.name} There was a problem retrieving the configuration") async def show_detailed_config(client, _): @@ -72,10 +133,10 @@ async def show_detailed_config(client, _): config = client.hub_config if config: - print(json.dumps(client.hub_config, sort_keys=True, indent=4, - separators=(',', ': '))) + print(f"HUB: {client.name}") + print(f"\t {json.dumps(client.hub_config, sort_keys=True, indent=4,separators=(',', ': '))}") else: - print("There was a problem retrieving the configuration") + print(f"HUB: {client.name} There was a problem retrieving the configuration") async def show_current_activity(client, _): @@ -84,11 +145,11 @@ async def show_current_activity(client, _): activity_id, activity_name = client.current_activity if activity_name: - print("{} ({})".format(activity_name, activity_id)) + print(f"HUB: {client.name} {activity_name} ({activity_id})") elif activity_id: - print(activity_id) + print(f"HUB: {client.name} activity_id") else: - print('Unable to retrieve current activity') + print(f"HUB: {client.name} Unable to retrieve current activity") async def start_activity(client, args): @@ -100,7 +161,7 @@ async def start_activity(client, args): """ if args.activity is None: - print("No activity provided to start") + print(f"HUB: {client.name} No activity provided to start") return if (args.activity.isdigit()) or (args.activity == '-1'): @@ -108,17 +169,16 @@ async def start_activity(client, args): else: activity_id = client.get_activity_id(args.activity) if activity_id: - print('Found activity named %s (id %s)' % (args.activity, - activity_id)) + print(f"HUB: {client.name} Found activity named {args.activity} ({activity_id})") if activity_id: status = await client.start_activity(activity_id) if status[0]: - print('Started Activity, message: ', status[1]) + print(f"HUB: {client.name} Started Activity {args.activity}") else: - print('Activity start failed: ', status[1]) + print(f"HUB: {client.name} Activity start failed: {status[1]}") else: - print('Invalid activity: ', args.activity) + print(f"HUB: {client.name} Invalid activity: {args.activity}") async def power_off(client, _): """Power off Harmony Hub. @@ -126,9 +186,9 @@ async def power_off(client, _): status = await client.power_off() if status: - print('Powered Off') + print(f"HUB: {client.name} Powered Off") else: - print('Power off failed') + print(f"HUB: {client.name} Power off failed") async def send_command(client, args): @@ -148,7 +208,7 @@ async def send_command(client, args): device_id = client.get_device_id(str(args.device_id).strip()) if device_id is None: - print("Device {} is invalid.".format(args.device_id)) + print(f"HUB: {client.name} Device {args.device_id} is invalid.") return snd_cmmnd = SendCommandDevice( @@ -166,14 +226,15 @@ async def send_command(client, args): if result_list: for result in result_list: - print("Sending of command {} to device {} failed with code {}: " + print("HUB: {} Sending of command {} to device {} failed with code {}: " "{}".format( + client.name, result.command.command, result.command.device, result.code, result.msg)) else: - print('Command Sent') + print(f"HUB: {client.name} Command Sent") async def change_channel(client, args): @@ -187,9 +248,9 @@ async def change_channel(client, args): status = await client.change_channel(args.channel) if status: - print('Changed to channel {}'.format(args.channel)) + print(f"HUB: {client.name} Changed to channel {args.channel}") else: - print('Change to channel {} failed'.format(args.channel)) + print(f"HUB: {client.name} Change to channel {args.channel} failed") # def discover(args): @@ -209,11 +270,53 @@ async def sync(client, _): status = await client.sync() if status: - print('Sync complete') + print(f"HUB: {client.name} Sync complete") else: - print("Sync failed") + print(f"HUB: {client.name} Sync failed") +async def execute_per_hub(hub, args): + # Connect to the HUB + try: + _LOGGER.debug("%s: Connecting to HUB", hub) + hub_client = await get_client(hub, + args.protocol, + args.show_responses) + if hub_client is None: + return + except aioharmony.exceptions.TimeOut: + print("Action did not complete within a reasonable time.") + return + + coroutine = None + if hasattr(args, 'func'): + coroutine = args.func(hub_client, args) + + # Execute provided request. + if coroutine is not None: + _LOGGER.debug("%s: Executing function.", hub) + try: + await coroutine + except aioharmony.exceptions.TimeOut: + print("Action did not complete within a reasonable time.") + + # Now sleep for provided time. + if args.wait >= 0: + _LOGGER.debug("%s: Waiting for %s seconds.", hub, args.wait) + await asyncio.sleep(args.wait) + else: + _LOGGER.debug("%s: Waiting till cancelled", hub) + while True: + await asyncio.sleep(60) + + if hub_client: + _LOGGER.debug("%s: Closing connection to HUB.", hub) + try: + await asyncio.wait_for(hub_client.close(), timeout=60) + except aioharmony.exceptions.TimeOut: + _LOGGER.debug("%s: Timeout trying to close connection to HUB.", hub) + + _LOGGER.debug("%s: All done with HUB.", hub) async def run(): """Main method for the script.""" global hub_client @@ -226,25 +329,30 @@ async def run(): # Required flags go here. required_flags.add_argument('--harmony_ip', - help='IP Address of the Harmony device.') + help='IP Address of the Harmony device, multiple IPs can be specified as a comma separated' + ' list without spaces.') required_flags.add_argument('--discover', - action='store_true', - help='Scan for Harmony devices.') + action='store_true', + help='Scan for Harmony devices.') # Flags with default values go here. loglevels = dict((logging.getLevelName(level), level) for level in [10, 20, 30, 40, 50]) - parser.add_argument('--harmony_port', + parser.add_argument('--protocol', required=False, - default=5222, - type=int, - help=('Network port that the Harmony is listening' - ' on.')) + choices=[WEBSOCKETS,XMPP], + help=('Protocol to use to connect to HUB. Note for XMPP one has to ensure that XMPP is enabled' + 'on the hub.')) parser.add_argument('--loglevel', - default='INFO', + default='ERROR', choices=list(loglevels.keys()), help='Logging level for all components to ' 'print to the console.') + parser.add_argument('--logmodules', + required=False, + type=str, + help='Restrict logging to modules specified. Multiple can be provided as a ' + 'comma separated list without any spaces. Use * to include any further submodules.') show_responses_parser = parser.add_mutually_exclusive_group( required=False) @@ -298,9 +406,15 @@ async def run(): sync_parser.set_defaults(func=sync) listen_parser = subparsers.add_parser('listen', help='Output everything ' - 'HUB sends out') + 'HUB sends out. Use in combination with --wait.') listen_parser.set_defaults(func=just_listen) + new_activity_parser = subparsers.add_parser( + 'activity_monitor', + help='Monitor and show when an activity is changing. Use in combination with --wait to keep monitoring for' + 'activities otherwise only current activity will be shown.') + new_activity_parser.set_defaults(func=listen_for_new_activities) + command_parser = subparsers.add_parser( 'send_command', help='Send a simple command.') command_parser.add_argument( @@ -329,9 +443,16 @@ async def run(): args = parser.parse_args() - logging.basicConfig( - level=loglevels[args.loglevel], - format='%(asctime)s:%(levelname)s:\t%(name)s\t%(message)s') + log_formatter = logging.Formatter('%(asctime)s:%(levelname)s:\t%(name)s\t%(message)s') + log_stream = logging.StreamHandler() + log_stream.setFormatter(log_formatter) + _ROOTLOGGER.setLevel(loglevels[args.loglevel]) + _ROOTLOGGER.addHandler(log_stream) + + if args.logmodules is not None: + log_modules = args.logmodules.split(",") + log_filter = LoggingFilter(log_modules) + log_stream.addFilter(log_filter) if args.wait < 0 and args.wait != -1: print("Invalid value provided for --wait.") @@ -342,59 +463,49 @@ async def run(): # discover(args) pass else: - coroutine = None if not hasattr(args, 'func') and not args.show_responses: parser.print_help() return - # Connect to the HUB - try: - hub_client = await get_client(args.harmony_ip, - args.show_responses) - if hub_client is None: - return - except aioharmony.exceptions.TimeOut: - print("Action did not complete within a reasonable time.") - raise + hub_tasks = [] + hub_ips = args.harmony_ip.split(",") + for hub in hub_ips: + # Connect to the HUB + hub_tasks.append(asyncio.ensure_future(execute_per_hub(hub, args))) - if hasattr(args, 'func'): - coroutine = args.func(hub_client, args) + results = await asyncio.gather(*hub_tasks, return_exceptions=True) + for idx, result in enumerate(results): + if isinstance( + result, + Exception): + raise result - # Execute provided request. - try: - if coroutine is not None: - await coroutine - except aioharmony.exceptions.TimeOut: - print("Action did not complete within a reasonable time.") - raise - # Now sleep for provided time. - if args.wait >= 0: - await asyncio.sleep(args.wait) - else: - while True: - await asyncio.sleep(60) +def cancel_tasks(loop): - if hub_client: - await asyncio.wait_for(hub_client.close(), timeout=10) - hub_client = None + _LOGGER.debug("Cancelling any tasks still running.") + loop.run_until_complete(asyncio.sleep(1)) + for task in asyncio.all_tasks(loop): + task.cancel() + # Allow cancellations to be processed + for x in range(10): + loop.run_until_complete(asyncio.sleep(1)) + if len(asyncio.all_tasks(loop)) == 0: + break def main() -> None: loop = asyncio.new_event_loop() try: loop.run_until_complete(run()) - while asyncio.all_tasks(loop): - loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(loop))) + cancel_tasks(loop) loop.close() + except KeyboardInterrupt: print("Exit requested.") - if hub_client is not None: - loop.run_until_complete( - asyncio.wait_for(hub_client.close(), timeout=10) - ) + cancel_tasks(loop) + loop.close() print("Closed.") - if __name__ == '__main__': sys.exit(main()) diff --git a/aioharmony/__version__.py b/aioharmony/__version__.py index a6587ae..d0ba488 100644 --- a/aioharmony/__version__.py +++ b/aioharmony/__version__.py @@ -1 +1 @@ -__version__ = '0.2.4' +__version__ = '0.2.5' \ No newline at end of file diff --git a/aioharmony/const.py b/aioharmony/const.py index c582b09..d932d0e 100644 --- a/aioharmony/const.py +++ b/aioharmony/const.py @@ -16,6 +16,11 @@ DEFAULT_WS_HUB_PORT = '8088' DEFAULT_HARMONY_MIME = 'vnd.logitech.harmony/vnd.logitech.harmony.engine' +WEBSOCKETS = 'WEBSOCKETS' +XMPP = 'XMPP' + +PROTOCOL = Union[WEBSOCKETS, XMPP] + # # The HUB commands that can be send # @@ -69,6 +74,14 @@ Callable[[object, Optional[Any]], Any] ] +ClientCallbackType = NamedTuple('ClientCallbackType', + [('connect', Optional[CallbackType]), + ('disconnect', Optional[CallbackType]), + ('new_activity_starting', Optional[CallbackType]), + ('new_activity', Optional[CallbackType]), + ('config_updated', Optional[CallbackType]) + ]) + ConnectorCallbackType = NamedTuple('ConnectorCallbackType', [('connect', Optional[CallbackType]), ('disconnect', Optional[CallbackType]) diff --git a/aioharmony/handler.py b/aioharmony/handler.py index 5e75ccc..96b5637 100644 --- a/aioharmony/handler.py +++ b/aioharmony/handler.py @@ -182,6 +182,42 @@ def dummy_callback(message): once=False ) +HANDLER_START_ACTIVITY_NOTIFY_STARTED = Handler( + handler_obj=dummy_callback, + handler_name='Activity_Starting', + resp_json={ + 'type': re.compile(r'^connect\.stateDigest\?notify$'), + 'data': { + 'activityStatus': 1, + }, + }, + once=False +) + +HANDLER_STOP_ACTIVITY_NOTIFY_STARTED = Handler( + handler_obj=dummy_callback, + handler_name='Activity_Stopping', + resp_json={ + 'type': re.compile(r'^connect\.stateDigest\?notify$'), + 'data': { + 'activityStatus': 0, + }, + }, + once=False +) + +HANDLER_START_ACTIVITY_NOTIFY_INPROGRESS = Handler( + handler_obj=dummy_callback, + handler_name='Activity_Starting_Inprogress', + resp_json={ + 'type': re.compile(r'^connect\.stateDigest\?notify$'), + 'data': { + 'activityStatus': 2, + }, + }, + once=False +) + HANDLER_START_ACTIVITY_FINISHED = Handler( handler_obj=dummy_callback, handler_name='Activity_Changed', diff --git a/aioharmony/harmonyapi.py b/aioharmony/harmonyapi.py index 6277c54..36e6295 100644 --- a/aioharmony/harmonyapi.py +++ b/aioharmony/harmonyapi.py @@ -16,7 +16,7 @@ from datetime import datetime, timedelta from typing import List, Optional, Union from aioharmony.const import ( - ClientConfigType, SendCommandArg, SendCommandDevice, SendCommandResponse + ClientConfigType, PROTOCOL, SendCommandArg, SendCommandDevice, SendCommandResponse ) from aioharmony.harmonyclient import ClientCallbackType, HarmonyClient from aioharmony.handler import Handler @@ -37,13 +37,14 @@ class HarmonyAPI: # pylint: disable=too-many-arguments def __init__(self, ip_address: str, + protocol: PROTOCOL = None, callbacks: ClientCallbackType = None, loop: asyncio.AbstractEventLoop = None) -> None: _LOGGER.debug("%s: Initialize", ip_address) loop = loop if loop else asyncio.get_event_loop() - self._harmony_client = HarmonyClient( ip_address=ip_address, + protocol=protocol, callbacks=callbacks, loop=loop ) @@ -52,6 +53,10 @@ def __init__(self, def ip_address(self) -> str: return self._harmony_client.ip_address + @property + def protocol(self) -> str: + return self._harmony_client.protocol + @property def hub_config(self) -> ClientConfigType: return self._harmony_client.hub_config diff --git a/aioharmony/harmonyclient.py b/aioharmony/harmonyclient.py index 62458db..83e6011 100755 --- a/aioharmony/harmonyclient.py +++ b/aioharmony/harmonyclient.py @@ -20,9 +20,9 @@ import aioharmony.exceptions as aioexc import aioharmony.handler as handlers from aioharmony.const import ( - CallbackType, ClientConfigType, ConnectorCallbackType, - DEFAULT_XMPP_HUB_PORT, HUB_COMMANDS, SendCommand, SendCommandDevice, - SendCommandResponse + ClientCallbackType, ClientConfigType, ConnectorCallbackType, + DEFAULT_XMPP_HUB_PORT, HUB_COMMANDS, PROTOCOL, SendCommand, SendCommandDevice, + SendCommandResponse, WEBSOCKETS, XMPP ) from aioharmony.helpers import call_callback, search_dict from aioharmony.responsehandler import Handler, ResponseHandler @@ -31,13 +31,6 @@ DEFAULT_TIMEOUT = 60 -ClientCallbackType = NamedTuple('ClientCallbackType', - [('connect', Optional[CallbackType]), - ('disconnect', Optional[CallbackType]), - ('new_activity', Optional[CallbackType]), - ('config_updated', Optional[CallbackType]) - ]) - # TODO: Add docstyle comments # TODO: Clean up code styling @@ -48,12 +41,14 @@ class HarmonyClient: # pylint: disable=too-many-arguments def __init__(self, ip_address: str, + protocol: PROTOCOL = None, callbacks: ClientCallbackType = None, loop: asyncio.AbstractEventLoop = None) -> None: _LOGGER.debug("%s: Initialize HUB", ip_address) self._ip_address = ip_address + self._protocol = protocol self._callbacks = callbacks if callbacks is not None else \ - ClientCallbackType(None, None, None, None) + ClientCallbackType(None, None, None, None, None) self._loop = loop if loop else asyncio.get_event_loop() self._hub_config = ClientConfigType({}, {}, {}, {}, None, [], []) @@ -75,12 +70,24 @@ def __init__(self, # Create the lock for getting HUB information. self._sync_lck = asyncio.Lock() - # Create the activity start handler object + # Create the activity start handler object when start activity is finished handler = copy.copy(handlers.HANDLER_START_ACTIVITY_FINISHED) handler.handler_obj = self._update_activity_callback self._callback_handler.register_handler( handler=handler) + # Create the activity start handler object when start activity is finished + handler = copy.copy(handlers.HANDLER_START_ACTIVITY_NOTIFY_STARTED) + handler.handler_obj = self._update_start_activity_callback + self._callback_handler.register_handler( + handler=handler) + + # Create the activity start handler object when start activity is finished + handler = copy.copy(handlers.HANDLER_STOP_ACTIVITY_NOTIFY_STARTED) + handler.handler_obj = self._update_start_activity_callback + self._callback_handler.register_handler( + handler=handler) + # Create the notification handler object handler = copy.copy(handlers.HANDLER_NOTIFY) handler.handler_obj = self._notification_callback @@ -91,6 +98,10 @@ def __init__(self, def ip_address(self) -> str: return self._ip_address + @property + def protocol(self) -> str: + return self._protocol + @property def name(self) -> Optional[str]: name = self._hub_config.discover_info.get('friendlyName') @@ -119,25 +130,37 @@ def current_activity_id(self): async def _websocket_or_xmpp(self) -> bool: """ Determine if XMPP is enabled, if not fall-back to web sockets. """ + if not self._protocol == WEBSOCKETS : + try: + _, _ = await asyncio.open_connection(host=self._ip_address, + port=DEFAULT_XMPP_HUB_PORT, + loop=self._loop + ) + except ConnectionRefusedError: + if self._protocol == XMPP: + _LOGGER.warning("%s: XMPP is not enabled on this HUB, will be defaulting back to WEBSOCKETS.", + self.name) + else: + _LOGGER.warning("%s: XMPP is not enabled, using web sockets " + "however this might not work with future Harmony " + "firmware updates, please enable XMPP", + self.name) + self._protocol = WEBSOCKETS + except OSError as exc: + _LOGGER.error("%s: Unable to determine if XMPP is enabled: %s", + self.name, + exc) + if self._protocol is None: + return False + else: + _LOGGER.debug("%s: XMPP is enabled", self.name) + self._protocol = XMPP - try: - _, _ = await asyncio.open_connection(host=self._ip_address, - port=DEFAULT_XMPP_HUB_PORT, - loop=self._loop - ) - except ConnectionRefusedError: - _LOGGER.warning("%s: XMPP is not enabled, using web sockets " - "however this might not work with future Harmony " - "firmware updates, please enable XMPP", - self.name) + if self._protocol == WEBSOCKETS: + _LOGGER.debug("%s: Using WEBSOCKETS", self.name) from aioharmony.hubconnector_websocket import HubConnector - except OSError as exc: - _LOGGER.error("%s: Unable to determine if XMPP is enabled: %s", - self.name, - exc) - return False else: - _LOGGER.debug("%s: XMPP is enabled", self.name) + _LOGGER.debug("%s: Using XMPP", self.name) from aioharmony.hubconnector_xmpp import HubConnector self._hub_connection = HubConnector( @@ -233,12 +256,17 @@ async def close(self) -> None: This should be called to ensure everything is stopped and cancelled out. """ + raise_exception = None if self._hub_connection: try: with timeout(DEFAULT_TIMEOUT): await self._hub_connection.close() - except asyncio.TimeoutError: - raise aioexc.TimeOut + except Exception as e: + _LOGGER.debug("%s: Exception occurred during disconnection.", + self.name) + raise_exception = e + if isinstance(raise_exception, asyncio.TimeoutError): + raise_exception = aioexc.TimeOut if self._callback_handler: try: @@ -247,6 +275,10 @@ async def close(self) -> None: except asyncio.TimeoutError: raise aioexc.TimeOut + if raise_exception is not None: + raise raise_exception + + async def disconnect(self) -> None: """Disconnect from Hub""" _LOGGER.debug("%s: Disconnecting from %s", @@ -305,6 +337,7 @@ async def refresh_info_from_hub(self) -> None: # If we were provided a callback handler then call it now. if self._callbacks.config_updated: + _LOGGER.debug("%s: Calling callback handler for config_updated", self.name) call_callback( callback_handler=self._callbacks.config_updated, result=self._hub_config.config, @@ -322,12 +355,12 @@ async def _get_config(self) -> Optional[dict]: self.name) # Send the command to the HUB try: - with timeout(DEFAULT_TIMEOUT): - response = await self.send_to_hub(command='get_config', send_timeout=DEFAULT_TIMEOUT/2) + with timeout(DEFAULT_TIMEOUT/2): + response = await self.send_to_hub(command='get_config', send_timeout=DEFAULT_TIMEOUT/4) except (asyncio.TimeoutError, aioexc.TimeOut): try: - with timeout(DEFAULT_TIMEOUT): - response = await self.send_to_hub(command='get_config', send_timeout=DEFAULT_TIMEOUT/2) + with timeout(DEFAULT_TIMEOUT/2): + response = await self.send_to_hub(command='get_config', send_timeout=DEFAULT_TIMEOUT/4) except (asyncio.TimeoutError, aioexc.TimeOut): raise aioexc.TimeOut @@ -364,20 +397,18 @@ async def _get_config(self) -> Optional[dict]: return self._hub_config.config - async def _retrieve_hub_info(self) -> Optional[dict]: - """Retrieve some information from the Hub.""" - # Send the command to the HUB + async def _retrieve_provision_info(self) -> Optional[dict]: response = None result = None try: - with timeout(DEFAULT_TIMEOUT): - result = await self.send_to_hub(command='provision_info', post=True, send_timeout=DEFAULT_TIMEOUT/2) + with timeout(DEFAULT_TIMEOUT/2): + result = await self.send_to_hub(command='provision_info', post=True, send_timeout=DEFAULT_TIMEOUT/4) except (asyncio.TimeoutError, aioexc.TimeOut): try: _LOGGER.debug("%s: Timeout trying to retrieve provisioning info, retrying.", self.name) - with timeout(DEFAULT_TIMEOUT): - result = await self.send_to_hub(command='provision_info', post=True, send_timeout=DEFAULT_TIMEOUT/2) + with timeout(DEFAULT_TIMEOUT/2): + result = await self.send_to_hub(command='provision_info', post=True, send_timeout=DEFAULT_TIMEOUT/4) except (asyncio.TimeoutError, aioexc.TimeOut): _LOGGER.error("%s: Timeout trying to retrieve provisioning info.", self.name) @@ -393,14 +424,19 @@ async def _retrieve_hub_info(self) -> Optional[dict]: info=result.get('data')) response = self._hub_config.info + return response + + async def _retrieve_discovery_info(self) -> None: + + result = None try: - with timeout(DEFAULT_TIMEOUT): - result = await self.send_to_hub(command='discovery', post=False, send_timeout=DEFAULT_TIMEOUT/2) + with timeout(DEFAULT_TIMEOUT/2): + result = await self.send_to_hub(command='discovery', post=False, send_timeout=DEFAULT_TIMEOUT/4) except (asyncio.TimeoutError, aioexc.TimeOut): try: _LOGGER.debug("%s: Timeout trying to retrieve discovery info, retrying", self.name) - with timeout(DEFAULT_TIMEOUT): - result = await self.send_to_hub(command='discovery', post=False, send_timeout=DEFAULT_TIMEOUT/2) + with timeout(DEFAULT_TIMEOUT/2): + result = await self.send_to_hub(command='discovery', post=False, send_timeout=DEFAULT_TIMEOUT/4) except (asyncio.TimeoutError, aioexc.TimeOut): _LOGGER.error("%s: Timeout trying to retrieve discovery info.", self.name) @@ -415,6 +451,24 @@ async def _retrieve_hub_info(self) -> Optional[dict]: self._hub_config = self._hub_config._replace( discover_info=result.get('data')) + async def _retrieve_hub_info(self) -> Optional[dict]: + """Retrieve some information from the Hub.""" + # Send the command to the HUB + + response = None + + results = await asyncio.gather( + self._retrieve_provision_info(), + self._retrieve_discovery_info(), + return_exceptions=True + ) + for idx, result in enumerate(results): + if isinstance(result, Exception): + raise result + + if idx == 0: + response = result + return response async def send_to_hub(self, @@ -484,17 +538,17 @@ async def _get_current_activity(self) -> bool: # Send the command to the HUB try: - with timeout(DEFAULT_TIMEOUT): - response = await self.send_to_hub(command='get_current_activity', send_timeout=DEFAULT_TIMEOUT/2) + with timeout(DEFAULT_TIMEOUT/2): + response = await self.send_to_hub(command='get_current_activity', send_timeout=DEFAULT_TIMEOUT/4) except (asyncio.TimeoutError, aioexc.TimeOut): _LOGGER.debug("%s: Timeout trying to retrieve current activity, retrying.", self.name) try: - with timeout(DEFAULT_TIMEOUT): - response = await self.send_to_hub(command='get_current_activity', send_timeout=DEFAULT_TIMEOUT/2) + with timeout(DEFAULT_TIMEOUT/2): + response = await self.send_to_hub(command='get_current_activity', send_timeout=DEFAULT_TIMEOUT/4) except (asyncio.TimeoutError, aioexc.TimeOut): _LOGGER.error("%s: Timeout trying to retrieve current activity.", - self.name) + self.name) response = None if not response: @@ -517,6 +571,7 @@ async def _get_current_activity(self) -> bool: # If we were provided a callback handler then call it now. if self._callbacks.new_activity: + _LOGGER.debug("%s: Calling callback handler for new_activity", self.name) call_callback( callback_handler=self._callbacks.new_activity, result=(self._current_activity_id, @@ -580,6 +635,7 @@ async def _update_activity_callback(self, # If we were provided a callback handler then call it now. if self._callbacks.new_activity: + _LOGGER.debug("%s: Calling callback handler for new_activity", self.name) call_callback( callback_handler=self._callbacks.new_activity, result=(self._current_activity_id, @@ -590,6 +646,49 @@ async def _update_activity_callback(self, callback_name='new_activity_callback' ) + # pylint: disable=broad-except + async def _update_start_activity_callback(self, + message: dict = None) -> None: + """Update current activity when changed.""" + _LOGGER.debug("%s: New activity starting notification", self.name) + + message_data = message.get('data') + if message_data is not None and message_data.get('activityStatus') == 0: + # The HUB sends a power off notification again that it is starting when it is done + # thus intercepting this so we do not redo the callback. + if int(message_data.get('activityId')) == -1 and self._current_activity_id == -1: + return + + self._current_activity_id = -1 + _LOGGER.debug("%s: Powering off from activity: %s(%s)", + self.name, + self.get_activity_name(self._current_activity_id), + self._current_activity_id) + self._current_activity_id = -1 + else: + if message_data is not None: + self._current_activity_id = int(message_data.get('activityId')) + else: + self._current_activity_id = None + + _LOGGER.debug("%s: New activity starting: %s(%s)", + self.name, + self.get_activity_name(self._current_activity_id), + self._current_activity_id) + + # If we were provided a callback handler then call it now. + if self._callbacks.new_activity_starting: + _LOGGER.debug("%s: Calling callback handler for new_activity_starting", self.name) + call_callback( + callback_handler=self._callbacks.new_activity_starting, + result=(self._current_activity_id, + self.get_activity_name( + self._current_activity_id) + ), + callback_uuid=self._ip_address, + callback_name='new_activity_starting_callback' + ) + # pylint: disable=too-many-statements # pylint: disable=too-many-locals async def start_activity(self, activity_id) -> tuple: diff --git a/aioharmony/hubconnector_xmpp.py b/aioharmony/hubconnector_xmpp.py index 0ad4d22..00206dc 100644 --- a/aioharmony/hubconnector_xmpp.py +++ b/aioharmony/hubconnector_xmpp.py @@ -61,12 +61,15 @@ def __init__(self, self._connected = False - plugin_config = { + self._plugin_config = { # Enables PLAIN authentication which is off by default. 'feature_mechanisms': {'unencrypted_plain': True}, } + self._init_super() + + def _init_super(self): super(HubConnector, self).__init__( - DEFAULT_USER, DEFAULT_PASSWORD, plugin_config=plugin_config) + DEFAULT_USER, DEFAULT_PASSWORD, plugin_config=self._plugin_config) # Set keep-alive to 30 seconds. self.whitespace_keepalive_interval = 30 @@ -88,7 +91,7 @@ def _register_handlers(self): """Register all the different handlers within XMPP based on messages being received and events.""" - + _LOGGER.debug("%s: Registering internal handlers.", self._ip_address) # Register the callback for messages being received self._listener() @@ -104,6 +107,13 @@ def _register_handlers(self): disposable=False, ) + def _deregister_handlers(self): + # Remove handlers. + _LOGGER.debug("%s: Removing internal handlers.", self._ip_address) + self.del_event_handler('connected', self._connected_handler) + self.del_event_handler('disconnected', self._disconnected_handler) + self.remove_handler('listener') + async def close(self): """Close all connections and tasks @@ -186,6 +196,7 @@ def remove_handlers(): return False # Remove the handlers. + self._connected = True remove_handlers() _LOGGER.debug("%s: Connected to hub", self._ip_address) return True @@ -208,13 +219,7 @@ async def hub_disconnect(self) -> None: def disconnect_result(_): disconnected.set_result(True) - self.del_event_handler('connected', - self._connected_handler) - self.del_event_handler('disconnected', - self._disconnected_handler) - - self.remove_handler('listener') - + self._deregister_handlers() self.add_event_handler('disconnected', disconnect_result, disposable=True, @@ -226,6 +231,7 @@ def disconnect_result(_): with timeout(DEFAULT_TIMEOUT): await disconnected except asyncio.TimeoutError: + _LOGGER.debug("%s: Timeout trying to disconnect.", self._ip_address) self.del_event_handler('disconnected', disconnect_result) raise aioexc.TimeOut @@ -260,6 +266,10 @@ async def _disconnected_handler(self, _) -> None: self._ip_address) self._connected = False is_reconnect = False + + self._deregister_handlers() + self._init_super() + sleep_time = 1 await asyncio.sleep(sleep_time) while True: @@ -342,39 +352,41 @@ def _listener(self) -> None: """Enable callback""" def message_received(event): payload = event.get_payload() - if len(payload) != 1: - _LOGGER.error("%s: Invalid payload length of %s received", + if len(payload) == 0: + _LOGGER.error("%s: Invalid payload length of 0 received.", self._ip_address, len(payload)) return - message = payload[0] - data = {} - # Try to convert JSON object if JSON object was received - if message.text is not None and message.text != '': - try: - data = json.loads(message.text) - except json.JSONDecodeError: - # Should mean only a single value was received. - for item in message.text.split(':'): - item_split = item.split('=') - if len(item_split) == 2: - data.update({item_split[0]: item_split[1]}) - - # Create response dictionary - response = { - 'id': event.get('id'), - 'xmlns': message.attrib.get('xmlns'), - 'cmd': message.attrib.get('mime'), - 'type': message.attrib.get('type'), - 'code': int(message.attrib.get('errorcode', '0')), - 'codestring': message.attrib.get('errorstring'), - 'data': data, - } - _LOGGER.debug("%s: Response payload: %s", self._ip_address, - response) - # Put response on queue. - self._response_queue.put_nowait(response) + for message in payload: + data = {} + # Try to convert JSON object if JSON object was received + if message.text is not None and message.text != '': + try: + data = json.loads(message.text) + except json.JSONDecodeError: + # Should mean only a single value was received. + for item in message.text.split(':'): + item_split = item.split('=') + if len(item_split) == 2: + data.update({item_split[0]: item_split[1]}) + + # Create response dictionary + response = { + 'id': event.get('id'), + 'xmlns': message.attrib.get('xmlns'), + 'cmd': message.attrib.get('mime'), + 'type': message.attrib.get('type'), + 'code': int(message.attrib.get('errorcode', '0')), + 'codestring': message.attrib.get('errorstring'), + 'data': data, + } + _LOGGER.debug("%s: Response payload: %s", self._ip_address, + response) + # Put response on queue. + self._response_queue.put_nowait(response) + + self._listener_message_received = message_received self._listener_message_received = message_received