From 852f7c6bbd8bbc24dd7dc44a3ec73b5106364f82 Mon Sep 17 00:00:00 2001 From: Doug Ransom Date: Wed, 21 Feb 2024 09:40:38 -0800 Subject: [PATCH] natlinkconfig programs now use logging (#65) * natlinkstatus.py: entered getUnimacroGrammarsDirectory * multiple changes in natlinkstatus.py and natlinkconfigfunctions.py, letting eg unimacro setup be better * add 2 tests execscriptest.py is ok, buttonclicktest.py fails. The cause is playEvents. (Dragon 16) * the buttonclicktest.py shows the bug when natlink.playEvents is called. * also enter dragonfly (dragonfly2) in the config procedure, including testing for upgrades.. * add a few changes to nsformat.py (a\\determiner and I\\pronoun, a word with \\ (properties) can also have length 2 when splitted into wordList. Logging: * use pythong logging module to report output of pip etc. * command line uses logging. * logging to cli. Loader.py: * adapt load_or_reload_module function with force_load, so it works from Unimacro (_control.py grammar). --------- Co-authored-by: Quintijn Hoogenboom --- pyproject.toml | 1 + src/natlinkcore/configure/foo.txt | 5 + src/natlinkcore/configure/loggert.py | 14 ++ .../configure/natlinkconfig_cli.py | 21 ++- .../configure/natlinkconfig_gui.py | 65 +++++++- .../configure/natlinkconfigfunctions.py | 148 ++++++++++++------ src/natlinkcore/loader.py | 40 ++--- src/natlinkcore/natlinkstatus.py | 37 ++++- src/natlinkcore/natlinkutils.py | 5 +- src/natlinkcore/nsformat.py | 7 +- tests/test_natlinktimer.py | 58 +++---- tests/test_nsformat.py | 31 ++++ 12 files changed, 324 insertions(+), 108 deletions(-) create mode 100644 src/natlinkcore/configure/foo.txt create mode 100644 src/natlinkcore/configure/loggert.py diff --git a/pyproject.toml b/pyproject.toml index f79257f..0a7c749 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,7 @@ dependencies= [ "pysimplegui>=4.60.3", "pydebugstring", "dtactions>=1.5.7", + "platformdirs >= 4.2.0" ] classifiers=[ "Development Status :: 4 - Beta", diff --git a/src/natlinkcore/configure/foo.txt b/src/natlinkcore/configure/foo.txt new file mode 100644 index 0000000..7c027fe --- /dev/null +++ b/src/natlinkcore/configure/foo.txt @@ -0,0 +1,5 @@ +ERROR: Could not find a version that satisfies the requirement foobar32 (from versions: none) +ERROR: No matching distribution found for foobar32 + +[notice] A new release of pip available: 22.3.1 -> 24.0 +[notice] To update, run: python.exe -m pip install --upgrade pip diff --git a/src/natlinkcore/configure/loggert.py b/src/natlinkcore/configure/loggert.py new file mode 100644 index 0000000..9580857 --- /dev/null +++ b/src/natlinkcore/configure/loggert.py @@ -0,0 +1,14 @@ +import logging +import sys +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.DEBUG) +#formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +#handler.setFormatter(formatter) + +root = logging.getLogger() +root.addHandler(handler) +root.setLevel(logging.DEBUG) +logging.debug("A") +logging.info("B") +logging.log(logging.DEBUG,"C") +print("\nD E") diff --git a/src/natlinkcore/configure/natlinkconfig_cli.py b/src/natlinkcore/configure/natlinkconfig_cli.py index 7bb074e..9d3a44b 100644 --- a/src/natlinkcore/configure/natlinkconfig_cli.py +++ b/src/natlinkcore/configure/natlinkconfig_cli.py @@ -5,8 +5,25 @@ import os import os.path from natlinkcore.configure import extensions_and_folders - +from platformdirs import user_log_dir +from pathlib import Path from natlinkcore.configure import natlinkconfigfunctions +import logging +appname="natlink" +logdir = Path(user_log_dir(appname=appname,ensure_exists=True)) +logfilename=logdir/f"cli_log.txt" +file_handler = logging.FileHandler(logfilename) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +file_handler.setFormatter(formatter) +logfile_logger = logging.getLogger() + +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.DEBUG) +logfile_logger.addHandler(handler) + +file_handler.setLevel(logging.DEBUG) +logfile_logger.addHandler(file_handler) +logfile_logger.setLevel(logging.DEBUG) def _main(Options=None): """Catch the options and perform the resulting command line functions @@ -17,6 +34,8 @@ def _main(Options=None): etc., usage above... """ + + cli = CLI() cli.Config = natlinkconfigfunctions.NatlinkConfig() shortOptions = "DVNOHKaAiIxXbBuqe" diff --git a/src/natlinkcore/configure/natlinkconfig_gui.py b/src/natlinkcore/configure/natlinkconfig_gui.py index bddde8a..e85d0d4 100644 --- a/src/natlinkcore/configure/natlinkconfig_gui.py +++ b/src/natlinkcore/configure/natlinkconfig_gui.py @@ -1,8 +1,26 @@ -#pylint:disable=W0621, W0703 +#pylint:disable=W0621, W0703, W0603 import sys import platform import PySimpleGUI as sg +import logging +from platformdirs import user_log_dir +from pathlib import Path +appname="natlink" +logdir = Path(user_log_dir(appname=appname,ensure_exists=True)) +logfilename=logdir/"config_gui_log.txt" +file_handler = logging.FileHandler(logfilename) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +file_handler.setFormatter(formatter) +logfile_logger = logging.getLogger() + +#always leave at debug. So we have this if there is ever a problem. +file_handler.setLevel(logging.DEBUG) +logfile_logger.addHandler(file_handler) +logfile_logger.setLevel(logging.DEBUG) + + + # https://www.pysimplegui.org/en/latest/ from natlinkcore.configure.natlinkconfigfunctions import NatlinkConfig from natlinkcore import natlinkstatus @@ -14,6 +32,8 @@ Config = NatlinkConfig() Status = natlinkstatus.NatlinkStatus() + + SYMBOL_UP = '▲' SYMBOL_DOWN = '▼' @@ -47,6 +67,8 @@ def collapse(layout, key, visible): [sg.T('Natlink GUI Output')], [sg.Output(size=(40,10), echo_stdout_stderr=True, expand_x=True, key='-OUTPUT-')]] + + #### Main UI Layout #### layout = [[sg.T('Environment:', font='bold'), sg.T(f'Windows OS: {osVersion.major}, Build: {osVersion.build}'), sg.T(f'Python: {pyVersion}'), sg.T(f'Dragon Version: {Status.getDNSVersion()}')], #### Projects Checkbox #### @@ -61,6 +83,23 @@ def collapse(layout, key, visible): window = sg.Window('Natlink configuration GUI', layout, enable_close_attempted_event=True) + #this is for the GUI logging. +#set the level on this corresponding to the natlink levels later. +#This must be created after the window is created, or nothing gets logged. + +#and we don't add the handler until just before the window is read the first time +#because a write during this period will cause a problem. + +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.DEBUG) +try: + ll=Config.config_get("settings","log_level") + if ll != "Debug": + handler.setLevel(logging.INFO) +except Exception as ee: + logging.debug(f"{__file__} failed to get log_level {ee} ") + + def ThreadIsRunning(): global Thread_Running Thread_Running = not Thread_Running @@ -69,6 +108,13 @@ def ThreadIsRunning(): # Natlink def SetNatlinkLoggingOutput(values, event): Config.setLogging(values['Set_Logging_Natlink']) + #also if natlink is not debug level, use info for logging in this application also. + try: + ll=Config.config_get("settings","log_level") + if ll != "Debug": + handler.setLevel(logging.INFO) + except Exception as ee: + logging.debug(f"{__file__} failed to get log_level {ee} ") # Dragonfly2 def Dragonfly2UserDir(values, event): @@ -112,11 +158,24 @@ def OpenNatlinkConfig(values, event): unimacro_dispatch = {'Set_UserDir_Unimacro': UnimacroUserDir, 'Clear_UserDir_Unimacro': UnimacroUserDir} autohotkey_dispatch = {'Set_Exe_Ahk': AhkExeDir, 'Clear_Exe_Ahk': AhkExeDir, 'Set_ScriptsDir_Ahk': AhkUserDir,'Clear_ScriptsDir_Ahk': AhkUserDir} + #### Event Loop #### try: + #we want to set the logger up just before we call window.read() + #because if something is logged before the first window.read() there will be a failure. + + + + #this would be a good spot to change the logging level, based on the natlink logging level + #note the natlink logging level doesn't correspond one to one with the logging module levels. + logfile_logger.addHandler(handler) + + handler.setLevel(logging.DEBUG) + #do not change the logger level from debug + #change it in the handler. Because a file handler logs everything. while True: event, values = window.read() - if (event == '-WINDOW CLOSE ATTEMPTED-' or event == 'Exit') and not Thread_Running: + if (event in ['-WINDOW CLOSE ATTEMPTED-', 'Exit']) and not Thread_Running: break # Hidden Columns logic # TODO: if project is enabled, update the project state to enabled. @@ -140,7 +199,7 @@ def OpenNatlinkConfig(values, event): Thread_Running = not Thread_Running elif Thread_Running: - choice = sg.popup(f'Please Wait: Pip install is in progress', keep_on_top=True, custom_text=('Wait','Force Close')) + choice = sg.popup('Please Wait: Pip install is in progress', keep_on_top=True, custom_text=('Wait','Force Close')) if choice == 'Force Close': break diff --git a/src/natlinkcore/configure/natlinkconfigfunctions.py b/src/natlinkcore/configure/natlinkconfigfunctions.py index aa9c336..e1b2e81 100644 --- a/src/natlinkcore/configure/natlinkconfigfunctions.py +++ b/src/natlinkcore/configure/natlinkconfigfunctions.py @@ -8,7 +8,7 @@ # Quintijn Hoogenboom, January 2008 (...), August 2022 # -#pylint:disable=C0302, W0702, R0904, C0116, W0613, R0914, R0912 +#pylint:disable=C0302, W0702, R0904, C0116, W0613, R0914, R0912, R1732, W1514, W0107 """With the functions in this module Natlink can be configured. These functions are called in different ways: @@ -20,7 +20,7 @@ import shutil import sys import subprocess -from pprint import pprint +from pprint import pformat from pathlib import Path import configparser @@ -30,8 +30,24 @@ from natlinkcore import readwritefile from natlinkcore import tkinter_dialogs +import logging + isfile, isdir, join = os.path.isfile, os.path.isdir, os.path.join + +def do_pip(*args): + """ + Run a pip command with args. + Diagnostic logging.3 + """ + + + command = [sys.executable,"-m", "pip"] + list(args) + logging.info(f"command: {command} ") + completed_process=subprocess.run(command,capture_output=True) + logging.debug(f"completed_process: {completed_process}") + completed_process.check_returncode() + class NatlinkConfig: """performs the configuration tasks of Natlink @@ -148,7 +164,7 @@ def setDirectory(self, option, dir_path, section=None): """ section = section or 'directories' if not dir_path: - print('==== Please specify the wanted directory in Dialog window ====\n') + logging.info('==== Please specify the wanted directory in Dialog window ====\n') prev_path = self.config_get('previous settings', option) or self.documents_path dir_path = tkinter_dialogs.GetDirFromDialog(title=f'Please choose a "{option}"', initialdir=prev_path) if not dir_path: @@ -161,9 +177,9 @@ def setDirectory(self, option, dir_path, section=None): if directory is False: directory = config.expand_path(dir_path) if dir_path == directory: - print(f'Cannot set "{option}", the given path is invalid: "{directory}"') + logging.info(f'Cannot set "{option}", the given path is invalid: "{directory}"') else: - print(f'Cannot set "{option}", the given path is invalid: "{directory}" ("{dir_path}")') + logging.info(f'Cannot set "{option}", the given path is invalid: "{directory}" ("{dir_path}")') return nice_dir_path = self.prefix_home(dir_path) @@ -171,9 +187,9 @@ def setDirectory(self, option, dir_path, section=None): self.config_set(section, option, nice_dir_path) self.config_remove('previous settings', option) if section == 'directories': - print(f'Set option "{option}" to "{dir_path}"') + logging.info(f'Set option "{option}" to "{dir_path}"') else: - print(f'Set in section "{section}", option "{option}" to "{dir_path}"') + logging.info(f'Set in section "{section}", option "{option}" to "{dir_path}"') return def clearDirectory(self, option, section=None): @@ -182,7 +198,7 @@ def clearDirectory(self, option, section=None): section = section or 'directories' old_value = self.config_get(section, option) if not old_value: - print(f'The "{option}" was not set, nothing changed...') + logging.info(f'The "{option}" was not set, nothing changed...') return if isValidDir(old_value): self.config_set('previous settings', option, old_value) @@ -190,7 +206,7 @@ def clearDirectory(self, option, section=None): self.config_remove('previous settings', option) self.config_remove(section, option) - print(f'cleared "{option}"') + logging.info(f'cleared "{option}"') def prefix_home(self, dir_path): @@ -210,15 +226,15 @@ def setFile(self, option, file_path, section): prev_path = self.config_get('previous settings', option) or "" file_path = tkinter_dialogs.GetFileFromDialog(title=f'Please choose a "{option}"', initialdir=prev_path) if not file_path: - print('No valid file specified') + logging.info('No valid file specified') return file_path = file_path.strip() if not Path(file_path).is_file(): - print(f'No valid file specified ("{file_path}")') + logging.info(f'No valid file specified ("{file_path}")') self.config_set(section, option, file_path) self.config_remove('previous settings', option) - print(f'Set in section "{section}", option "{option}" to "{file_path}"') + logging.info(f'Set in section "{section}", option "{option}" to "{file_path}"') return def clearFile(self, option, section): @@ -226,7 +242,7 @@ def clearFile(self, option, section): """ old_value = self.config_get(section, option) if not old_value: - print(f'The "{option}" was not set, nothing changed...') + logging.info(f'The "{option}" was not set, nothing changed...') return if isValidFile(old_value): self.config_set('previous settings', option, old_value) @@ -234,7 +250,7 @@ def clearFile(self, option, section): self.config_remove('previous settings', option) self.config_remove(section, option) - print(f'cleared "{option}"') + logging.info(f'cleared "{option}"') def setLogging(self, logginglevel): @@ -245,10 +261,10 @@ def setLogging(self, logginglevel): value = logginglevel.title() old_value = self.config_get('settings', "log_level") if old_value == value: - print(f'setLogging, setting is already "{old_value}"') + logging.info(f'setLogging, setting is already "{old_value}"') return True if value in ["Critical", "Fatal", "Error", "Warning", "Info", "Debug"]: - print(f'setLogging, setting logging to: "{value}"') + logging.info(f'setLogging, setting logging to: "{value}"') self.config_set('settings', "log_level", value) if old_value is not None: self.config_set('previous settings', "log_level", old_value) @@ -268,23 +284,23 @@ def disableDebugOutput(self): def enable_unimacro(self, arg): unimacro_user_dir = self.status.getUnimacroUserDirectory() if unimacro_user_dir and isdir(unimacro_user_dir): - print(f'UnimacroUserDirectory is already defined: "{unimacro_user_dir}"\n\tto change, first clear (option "O") and then set again') - print('\nWhen you want to upgrade Unimacro, also first clear ("O"), then choose this option ("o") again.\n') + logging.info(f'UnimacroUserDirectory is already defined: "{unimacro_user_dir}"\n\tto change, first clear (option "O") and then set again') + logging.info('\nWhen you want to upgrade Unimacro, also first clear ("O"), then choose this option ("o") again.\n') return uni_dir = self.status.getUnimacroDirectory() if uni_dir: - print('==== instal and/or update unimacro====\n') + logging.info('==== instal and/or update unimacro====\n') try: - subprocess.check_call([sys.executable, "-m", "pip", "install", "--upgrade", "unimacro"]) + do_pip("install", "--upgrade", "unimacro") except subprocess.CalledProcessError: - print('====\ncould not pip install --upgrade unimacro\n====\n') + logging.info('====\ncould not pip install --upgrade unimacro\n====\n') return else: try: - subprocess.check_call([sys.executable, "-m", "pip", "install", "unimacro"]) + do_pip("install", "unimacro") except subprocess.CalledProcessError: - print('====\ncould not pip install unimacro\n====\n') + logging.info('====\ncould not pip install unimacro\n====\n') return self.status.refresh() # refresh status uni_dir = self.status.getUnimacroDirectory() @@ -308,29 +324,67 @@ def disable_unimacro(self, arg=None): self.config_remove('directories', 'unimacrodirectory') # could still be there... self.status.refresh() + ### Dragonfly: + + def enable_dragonfly(self, arg): + dragonfly_user_dir = self.status.getDragonflyUserDirectory() + if dragonfly_user_dir and isdir(dragonfly_user_dir): + logging.info(f'DragonflyUserDirectory is already defined: "{dragonfly_user_dir}"\n\tto change, first clear (option "D") and then set again') + logging.info('\nWhen you want to upgrade Dragonfly, also first clear ("D"), then choose this option ("d") again.\n') + return + + df_dir = self.status.getDragonflyDirectory() + if df_dir: + logging.info('==== instal and/or update dragonfly2====\n') + try: + do_pip( "install", "--upgrade", "dragonfly2") + except subprocess.CalledProcessError: + logging.info('====\ncould not pip install --upgrade dragonfly2\n====\n') + return + else: + try: + do_pip( "install", "dragonfly2") + except subprocess.CalledProcessError: + logging.info('====\ncould not pip install dragonfly2\n====\n') + return + self.status.refresh() # refresh status + df_dir = self.status.getDragonflyDirectory() + + self.setDirectory('DragonflyUserDirectory', arg) + dragonfly_user_dir = self.config_get('dragonfly', 'dragonflyuserdirectory') + if not dragonfly_user_dir: + return + + + def disable_dragonfly(self, arg=None): + """disable dragonfly, do not expect arg + """ + self.config_remove('directories', 'dragonflyuserdirectory') # could still be there... + self.status.refresh() + def enable_vocola(self, arg): """enable vocola, by setting arg (prompting if False), and other settings """ vocola_user_dir = self.status.getVocolaUserDirectory() if vocola_user_dir and isdir(vocola_user_dir): - print(f'VocolaUserDirectory is already defined: "{vocola_user_dir}"\n\tto change, first clear (option "V") and then set again') - print('\nWhen you want to upgrade Vocola (vocola2), also first clear ("V"), then choose this option ("v") again.\n') + logging.info(f'VocolaUserDirectory is already defined: "{vocola_user_dir}"\n\tto change, first clear (option "V") and then set again') + logging.info('\nWhen you want to upgrade Vocola (vocola2), also first clear ("V"), then choose this option ("v") again.\n') return voc_dir = self.status.getVocolaDirectory() if voc_dir: - print('==== instal and/or update vocola2====\n') + logging.info('==== instal and/or update vocola2====\n') try: - subprocess.check_call([sys.executable, "-m", "pip", "install", "--upgrade", "vocola2"]) + do_pip("install", "--upgrade", "vocola2") except subprocess.CalledProcessError: - print('====\ncould not pip install --upgrade vocola2\n====\n') + logging.info('====\ncould not pip install --upgrade vocola2\n====\n') return else: try: - subprocess.check_call([sys.executable, "-m", "pip", "install", "vocola2"]) + do_pip("install", "vocola2") except subprocess.CalledProcessError: - print('====\ncould not pip install vocola2\n====\n') + logging.info('====\ncould not pip install vocola2\n====\n') return self.status.refresh() # refresh status voc_dir = self.status.getVocolaDirectory() @@ -366,32 +420,32 @@ def copyUnimacroIncludeFile(self): toFolder = Path(self.status.getVocolaUserDirectory()) if not unimacroDir.is_dir(): mess = f'copyUnimacroIncludeFile: unimacroDir "{str(unimacroDir)}" is not a directory' - print(mess) + logging.warn(mess) return fromFile = fromFolder/uscFile if not fromFile.is_file(): mess = f'copyUnimacroIncludeFile: file "{str(fromFile)}" does not exist (is not a valid file)' - print(mess) + logging.warn(mess) return if not toFolder.is_dir(): mess = f'copyUnimacroIncludeFile: vocolaUserDirectory does not exist "{str(toFolder)}" (is not a directory)' - print(mess) + logging.warn(mess) return toFile = toFolder/uscFile if toFolder.is_file(): - print(f'remove previous "{str(toFile)}"') + logging.info(f'remove previous "{str(toFile)}"') try: os.remove(toFile) except: mess = f'copyUnimacroIncludeFile: Could not remove previous version of "{str(toFile)}"' - print(mess) + logging.info(mess) try: shutil.copyfile(fromFile, toFile) - print(f'copied "{uscFile}" from "{str(fromFolder)}" to "{str(toFolder)}"') + logging.info(f'copied "{uscFile}" from "{str(fromFolder)}" to "{str(toFolder)}"') except: mess = f'Could not copy new version of "{uscFile}", from "{str(fromFolder)}" to "{str(toFolder)}"' - print(mess) + logging.warn(mess) return return @@ -404,17 +458,17 @@ def removeUnimacroIncludeFile(self): toFolder = Path(self.status.getVocolaUserDirectory()) if not toFolder.is_dir(): mess = f'removeUnimacroIncludeFile: vocolaUserDirectory does not exist "{str(toFolder)}" (is not a directory)' - print(mess) + logging.warn(mess) return toFile = toFolder/uscFile if toFolder.is_file(): - print(f'remove Unimacro include file "{str(toFile)}"') + logging.info(f'remove Unimacro include file "{str(toFile)}"') try: os.remove(toFile) except: mess = f'copyUnimacroIncludeFile: Could not remove previous version of "{str(toFile)}"' - print(mess) + logging.warning(mess) def includeUnimacroVchLineInVocolaFiles(self, subDirectory=None): """include the Unimacro wrapper support line into all Vocola command files @@ -442,7 +496,7 @@ def includeUnimacroVchLineInVocolaFiles(self, subDirectory=None): if not os.path.isdir(toFolder): mess = f'cannot find Vocola command files directory, not a valid path: {toFolder}' - print(mess) + logging.warning(mess) return mess nFiles = 0 for f in os.listdir(toFolder): @@ -470,7 +524,7 @@ def includeUnimacroVchLineInVocolaFiles(self, subDirectory=None): # subdirectory, recursive self.includeUnimacroVchLineInVocolaFiles(F) mess = f'changed {nFiles} files in {toFolder}' - print(mess) + logging.warning(mess) return True def removeUnimacroVchLineInVocolaFiles(self, subDirectory=None): @@ -502,7 +556,7 @@ def removeUnimacroVchLineInVocolaFiles(self, subDirectory=None): if not os.path.isdir(toFolder): mess = f'cannot find Vocola command files directory, not a valid path: {toFolder}' - print(mess) + logging.warning(mess) return mess nFiles = 0 for f in os.listdir(toFolder): @@ -525,7 +579,7 @@ def removeUnimacroVchLineInVocolaFiles(self, subDirectory=None): self.removeUnimacroVchLineInVocolaFiles(F) self.disableVocolaTakesUnimacroActions() mess = f'removed include lines from {nFiles} files in {toFolder}' - print(mess) + logging.warning(mess) return True @@ -567,7 +621,7 @@ def openConfigFile(self): """open the natlink.ini config file """ os.startfile(self.config_path) - print(f'opened "{self.config_path}" in a separate window') + logging.info(f'opened "{self.config_path}" in a separate window') return True def setAhkExeDir(self, arg): @@ -595,8 +649,8 @@ def clearAhkUserDir(self, arg=None): self.clearDirectory(key, section='autohotkey') def printPythonPath(self): - print('the python path:') - pprint(sys.path) + logging.info('the python path:') + logging.info(pformat(sys.path)) def isValidDir(path): diff --git a/src/natlinkcore/loader.py b/src/natlinkcore/loader.py index bf8422d..e562314 100644 --- a/src/natlinkcore/loader.py +++ b/src/natlinkcore/loader.py @@ -325,31 +325,31 @@ def load_or_reload_module(self, mod_path: Path, force_load: bool = False) -> Non return else: maybe_module = self.loaded_modules.get(mod_path) - if force_load or maybe_module is None: + # remove force_load here, in favor of below: + if maybe_module is None: self.logger.info(f'loading module: {mod_name}') module = self._import_module_from_path(mod_path) self.loaded_modules[mod_path] = module return - else: - module = maybe_module - last_modified_time = mod_path.stat().st_mtime - diff = last_modified_time - last_attempt_time # check for -0.1 instead of 0, a ??? - # _pre_load_callback may need this.. - if force_load or diff > 0: - if force_load: - self.logger.info(f'reloading module: {mod_name}, force_load: {force_load}') - else: - self.logger.info(f'reloading module: {mod_name}') - - self.unload_module(module) - del module - module = self._import_module_from_path(mod_path) - self.loaded_modules[mod_path] = module - self.logger.debug(f'loaded module: {module.__name__}') - return + + module = maybe_module + last_modified_time = mod_path.stat().st_mtime + diff = last_modified_time - last_attempt_time # check for -0.1 instead of 0, a ??? + # _pre_load_callback may need this.. + if force_load or diff > 0: + if force_load: + self.logger.info(f'reloading module: {mod_name}, force_load: {force_load}') else: - # self.logger.debug(f'skipping unchanged loaded module: {mod_name}') - return + self.logger.info(f'reloading module: {mod_name}') + + self.unload_module(module) + del module + module = self._import_module_from_path(mod_path) + self.loaded_modules[mod_path] = module + self.logger.debug(f'loaded module: {module.__name__}') + return + # self.logger.debug(f'skipping unchanged loaded module: {mod_name}') + return except Exception: self.logger.exception(traceback.format_exc()) self.logger.debug(f'load_or_reload_module, exception, add to self.bad_modules {mod_path}') diff --git a/src/natlinkcore/natlinkstatus.py b/src/natlinkcore/natlinkstatus.py index 260243f..5f5745d 100644 --- a/src/natlinkcore/natlinkstatus.py +++ b/src/natlinkcore/natlinkstatus.py @@ -5,9 +5,16 @@ # (C) Copyright Quintijn Hoogenboom, February 2008/January 2018/extended for python3, Natlink5.0.1 Febr 2022 # #pylint:disable=C0302, C0116, R0902, R0904, R0912, W0107, E1101, C0415 -"""The following functions are provided in this module: +"""Normal use: -The functions below are put into the class NatlinkStatus. +``` +from natlinkcore import natlinkstatus +... +status = natlinkstatus.NatlinkStatus() +``` + + +Then the following functions (methods) can be called: The functions below should not change anything in settings, only get information. @@ -138,7 +145,7 @@ class NatlinkStatus(metaclass=singleton.Singleton): """ known_directory_options = ['userdirectory', 'dragonflyuserdirectory', - 'unimacrodirectory', + 'unimacrodirectory', 'unimacrogrammarsdirectory', 'vocoladirectory', 'vocolagrammarsdirectory'] def __init__(self): @@ -165,6 +172,9 @@ def __init__(self): ## Dragonfly self.DragonflyDirectory = None self.DragonflyUserDirectory = None + ## dtactions + self.DtactionsDirectory = None + ## AutoHotkey: self.AhkUserDir = None self.AhkExeDir = None @@ -500,12 +510,12 @@ def getDragonflyDirectory(self): if self.DragonflyDirectory is not None: return self.DragonflyDirectory try: - import dragonfly + import dragonfly2 except ImportError: self.DragonflyDirectory = "" return "" - self.DragonflyDirectory = str(Path(dragonfly.__file__).parent) + self.DragonflyDirectory = str(Path(dragonfly2.__file__).parent) return self.DragonflyDirectory @@ -597,6 +607,20 @@ def getVocolaGrammarsDirectory(self): self.VocolaGrammarsDirectory = voc_grammars_dir return voc_grammars_dir + def getDtactionsDirectory(self): + """dtactions directory should be found with an import (like getUnimacroDirectory) + """ + + if self.DtactionsDirectory is not None: + return self.DtactionsDirectory + try: + import dtactions + except ImportError: + self.DtactionsDirectory = "" + return "" + self.DtactionsDirectory = dtactions.__path__[-1] + return self.DtactionsDirectory + def getAhkUserDir(self): return self.getAhkUserDirFromIni() @@ -733,6 +757,7 @@ def getNatlinkStatusDict(self): 'UserDirectory', 'DragonflyDirectory', 'DragonflyUserDirectory', 'ExtraGrammarDirectories', + 'DtactionsDirectory', 'InstallVersion', # 'IncludeUnimacroInPythonPath', 'AhkExeDir', 'AhkUserDir']: @@ -803,7 +828,7 @@ def getNatlinkStatusString(self): self.appendAndRemove(L, D, key) else: self.appendAndRemove(L, D, 'unimacroIsEnabled', "---Unimacro is disabled") - for key in ('UnimacroUserDirectory', 'UnimacroDirectory'): + for key in ('UnimacroUserDirectory', 'UnimacroGrammarsDirectory', 'UnimacroDirectory'): del D[key] ## UserDirectory: if D['userIsEnabled']: diff --git a/src/natlinkcore/natlinkutils.py b/src/natlinkcore/natlinkutils.py index e85b2e8..1dcc1a7 100644 --- a/src/natlinkcore/natlinkutils.py +++ b/src/natlinkcore/natlinkutils.py @@ -312,7 +312,10 @@ def __init__(self): self.grammarName = '' def __del__(self): - self.gramObj.unload() + try: + self.gramObj.unload() + except AttributeError: + pass def load(self, gramSpec, allResults=0, hypothesis=0, grammarName=None): self.grammarName = grammarName or self.grammarName diff --git a/src/natlinkcore/nsformat.py b/src/natlinkcore/nsformat.py index f7bbab2..8eea88c 100644 --- a/src/natlinkcore/nsformat.py +++ b/src/natlinkcore/nsformat.py @@ -104,6 +104,7 @@ propDict['letter'] = (flag_no_space_next,) # lowercase is hardcoded in below. propDict['spelling-letter'] = (flag_no_space_next,) # lowercase is hardcoded in below. propDict['uppercase-letter'] = (flag_no_space_next,) +propDict['determiner'] = tuple() # nothing special #--------------------------------------------------------------------------- # This is the main formatting entry point. It takes the old format state and @@ -222,6 +223,7 @@ def formatPassword(wordList): nextRepeat = countDict[w] outList.append(str(nextRepeat)) else: + w = w.split('\\')[0] outList.append(w.capitalize()) return ''.join(outList) @@ -419,7 +421,7 @@ def getWordInfo(word): if word.find('\\') == -1: return set() # no flags wList = word.split('\\') - if len(wList) == 3: + if len(wList) in (2,3): prop = wList[1] if not prop: return set() @@ -429,6 +431,9 @@ def getWordInfo(word): return set(propDict['left-double-quote']) if prop.startswith('right-'): return set(propDict['right-double-quote']) + if prop in ['determiner', 'pronoun']: + return set() + print(f'getWordInfo, unknown word property: {prop} {"word"}') return set() # empty tuple # should not come here diff --git a/tests/test_natlinktimer.py b/tests/test_natlinktimer.py index 57f4307..f5ea785 100644 --- a/tests/test_natlinktimer.py +++ b/tests/test_natlinktimer.py @@ -144,36 +144,36 @@ def toggleMicrophone(self): def testStopAtMicOff(): - try: - natlink.natConnect() - testGram = TestGrammar(name="stop_at_mic_off") - testGram.resetExperiment() - testGram.interval = 100 # all milliseconds - testGram.sleepTime = 20 - testGram.MaxHit = 6 - testGram.toggleMicAt = 3 - assert natlinktimer.getNatlinktimerStatus() in (0, None) - natlinktimer.setTimerCallback(testGram.doTimerClassic, interval=testGram.interval, callAtMicOff=testGram.cancelMode, debug=debug) - ## 1 timer active: - for _ in range(5): - if testGram.toggleMicAt and testGram.Hit >= testGram.toggleMicAt: - break - if testGram.Hit >= testGram.MaxHit: - break - wait(500) # 0.5 second - if debug: - print(f'waited 0.1 second for timer to finish testGram, Hit: {testGram.Hit} ({testGram.MaxHit})') - else: - raise TestError(f'not enough time to finish the testing procedure (came to {testGram.Hit} of {testGram.MaxHit})') - print(f'testGram.results: {testGram.results}') - assert len(testGram.results) == testGram.toggleMicAt - assert natlinktimer.getNatlinktimerStatus() == 0 ## natlinktimer is NOT destroyed after last timer is gone. - assert testGram.startCancelMode is True - assert testGram.endCancelMode is True + + with natlink.natConnect(): + try: + testGram = TestGrammar(name="stop_at_mic_off") + testGram.resetExperiment() + testGram.interval = 100 # all milliseconds + testGram.sleepTime = 20 + testGram.MaxHit = 6 + testGram.toggleMicAt = 3 + assert natlinktimer.getNatlinktimerStatus() in (0, None) + natlinktimer.setTimerCallback(testGram.doTimerClassic, interval=testGram.interval, callAtMicOff=testGram.cancelMode, debug=debug) + ## 1 timer active: + for _ in range(5): + if testGram.toggleMicAt and testGram.Hit >= testGram.toggleMicAt: + break + if testGram.Hit >= testGram.MaxHit: + break + wait(500) # 0.5 second + if debug: + print(f'waited 0.1 second for timer to finish testGram, Hit: {testGram.Hit} ({testGram.MaxHit})') + else: + raise TestError(f'not enough time to finish the testing procedure (came to {testGram.Hit} of {testGram.MaxHit})') + print(f'testGram.results: {testGram.results}') + assert len(testGram.results) == testGram.toggleMicAt + assert natlinktimer.getNatlinktimerStatus() == 0 ## natlinktimer is NOT destroyed after last timer is gone. + assert testGram.startCancelMode is True + assert testGram.endCancelMode is True - finally: - natlinktimer.stopTimerCallback() - natlink.natDisconnect() + finally: + natlinktimer.stopTimerCallback() # def testStopAtMicOff(): diff --git a/tests/test_nsformat.py b/tests/test_nsformat.py index 95fc14d..0ec53f7 100644 --- a/tests/test_nsformat.py +++ b/tests/test_nsformat.py @@ -11,6 +11,33 @@ def test_formatWords(): + + ## a\\determiner (Dragon 16??) (added propDict[determiner] = tuple(), and likewise for I\\pronoun) + + Input = ["First", ".\\period\\full stop", "a", "test"] + result = nsformat.formatWords(Input) + print(f'result nsformat: "{result}"') + assert(nsformat.formatWords(Input)) == ("First. A test", set()) + + Input = ["Second", ".\\dot\\dot stop", "a", "test"] + result = nsformat.formatWords(Input) + print(f'result nsformat: "{result}"') + assert(nsformat.formatWords(Input)) == ("Second.a test", set()) + + + + Input = "Third a\\determiner test".split() + result = nsformat.formatWords(Input) + print(f'result nsformat: "{result}"') + assert(nsformat.formatWords(Input)) == ("Third a test", set()) + + Input = "Fourth I\\pronoun test".split() + result = nsformat.formatWords(Input) + print(f'result nsformat: "{result}"') + assert(nsformat.formatWords(Input)) == ("Fourth I test", set()) + + + Input = "hello there".split() assert(nsformat.formatWords(Input)) == ("Hello there", set()) @@ -24,6 +51,10 @@ def test_formatWords(): assert result_text == " Continue" assert new_state == set() + + + + sentence = "this is wrong." with pytest.raises(AssertionError): nsformat.formatWords(sentence)