diff --git a/pyproject.toml b/pyproject.toml index fed528d4a..c862c2e3f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,6 @@ [build-system] requires = ["setuptools"] build-backend = "setuptools.build_meta" + +[tool.pylint] +good-names="Manager0,Manager,ObjectManager,Report" diff --git a/src/stratis_cli/_actions/_dynamic.py b/src/stratis_cli/_actions/_dynamic.py new file mode 100644 index 000000000..9ab01e496 --- /dev/null +++ b/src/stratis_cli/_actions/_dynamic.py @@ -0,0 +1,152 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Dynamic class generation +""" +# isort: STDLIB +import os +import xml.etree.ElementTree as ET # nosec B405 +from enum import Enum + +# isort: FIRSTPARTY +from dbus_python_client_gen import DPClientGenerationError, make_class + +from .._errors import StratisCliGenerationError +from ._constants import MANAGER_INTERFACE, REPORT_INTERFACE +from ._environment import get_timeout +from ._introspect import SPECS + +DBUS_TIMEOUT_SECONDS = 120 + +TIMEOUT = get_timeout( + os.environ.get("STRATIS_DBUS_TIMEOUT", DBUS_TIMEOUT_SECONDS * 1000) +) + +MANAGER_SPEC = """ + + + + + +""" + + +class Purpose(Enum): + """ + Purpose of class to be created. + """ + + INVOKE = 0 # invoke D-Bus methods + OBJECT = 1 # represent object in GetManagedObjects result + SEARCH = 2 # search for object in GEtManagedObjects result + + +_LOOKUP = { + "Manager": ( + Purpose.INVOKE, + lambda: ET.fromstring(SPECS[MANAGER_INTERFACE]), # nosec B314 + None, + ), + "Manager0": ( + Purpose.INVOKE, + lambda: ET.fromstring(MANAGER_SPEC), # nosec B314 + None, + ), + "ObjectManager": ( + Purpose.INVOKE, + lambda: ET.fromstring( + SPECS["org.freedesktop.DBus.ObjectManager"] + ), # nosec B314 + None, + ), + "Report": ( + Purpose.INVOKE, + lambda: ET.fromstring(SPECS[REPORT_INTERFACE]), # nosec B314 + None, + ), +} + + +def _add_abs_path_assertion(klass, method_name, key): + """ + Set method_name of method_klass to a new method which checks that the + device paths values at key are absolute paths. + + :param klass: the klass to which this metthod belongs + :param str method_name: the name of the method + :param str key: the key at which the paths can be found in the arguments + """ + method_class = getattr(klass, "Methods") + orig_method = getattr(method_class, method_name) + + def new_method(proxy, args): + """ + New path method + """ + rel_paths = [path for path in args[key] if not os.path.isabs(path)] + assert ( + rel_paths == [] + ), f"Precondition violated: paths {', '.join(rel_paths)} should be absolute" + return orig_method(proxy, args) + + setattr(method_class, method_name, new_method) + + +def make_dyn_class(name): + """ + Dynamically generate a class from introspection specification. + + :param str name: name of class to make + """ + (purpose, interface_func, klass) = _LOOKUP[name] + + if klass is not None: + return klass + + assert interface_func is not None + + if purpose is Purpose.INVOKE: # pragma: no cover + try: + klass = make_class( + name, + interface_func(), + TIMEOUT, + ) + + try: + if name == "Manager": + _add_abs_path_assertion(klass, "CreatePool", "devices") + if name == "Pool": # pragma: no cover + _add_abs_path_assertion(klass, "InitCache", "devices") + _add_abs_path_assertion(klass, "AddCacheDevs", "devices") + _add_abs_path_assertion(klass, "AddDataDevs", "devices") + except AttributeError as err: # pragma: no cover + # This can only happen if the expected method is missing from + # the XML spec or code generation has a bug, we will never + # test for these conditions. + raise StratisCliGenerationError( + "Malformed class definition; could not access a class or " + "method in the generated class definition" + ) from err + + except DPClientGenerationError as err: # pragma: no cover + raise StratisCliGenerationError( + f"Failed to generate class {name} needed for invoking " + "dbus-python methods" + ) from err + + # set the function to None since the class has been obtained + _LOOKUP[name] = (purpose, None, klass) + + return klass diff --git a/src/stratis_cli/_actions/_stratisd_version.py b/src/stratis_cli/_actions/_stratisd_version.py index e4de864c4..9409a9671 100644 --- a/src/stratis_cli/_actions/_stratisd_version.py +++ b/src/stratis_cli/_actions/_stratisd_version.py @@ -21,6 +21,7 @@ from .._errors import StratisCliStratisdVersionError from ._connection import get_object from ._constants import MAXIMUM_STRATISD_VERSION, MINIMUM_STRATISD_VERSION, TOP_OBJECT +from ._dynamic import make_dyn_class def check_stratisd_version(): @@ -30,8 +31,7 @@ def check_stratisd_version(): :raises StratisCliStratisdVersionError """ - # pylint: disable=import-outside-toplevel - from ._data import Manager0 + Manager0 = make_dyn_class("Manager0") version_spec = SpecifierSet(f">={MINIMUM_STRATISD_VERSION}") & SpecifierSet( f"<{MAXIMUM_STRATISD_VERSION}" diff --git a/src/stratis_cli/_actions/_top.py b/src/stratis_cli/_actions/_top.py index 7f2c79944..ca639b0de 100644 --- a/src/stratis_cli/_actions/_top.py +++ b/src/stratis_cli/_actions/_top.py @@ -33,6 +33,7 @@ from .._stratisd_constants import ReportKey, StratisdErrors from ._connection import get_object from ._constants import TOP_OBJECT +from ._dynamic import make_dyn_class from ._formatting import print_table @@ -44,8 +45,7 @@ def _fetch_keylist(proxy): :rtype: list of str :raises StratisCliEngineError: """ - # pylint: disable=import-outside-toplevel - from ._data import Manager + Manager = make_dyn_class("Manager") (keys, return_code, message) = Manager.Methods.ListKeys(proxy, {}) if return_code != StratisdErrors.OK: # pragma: no cover @@ -68,8 +68,7 @@ def _add_update_key(proxy, key_desc, capture_key, *, keyfile_path): """ assert capture_key == (keyfile_path is None) - # pylint: disable=import-outside-toplevel - from ._data import Manager + Manager = make_dyn_class("Manager") if capture_key: password = getpass(prompt="Enter key data followed by the return key: ") @@ -117,9 +116,8 @@ def get_report(namespace): :raises StratisCliEngineError: """ - # pylint: disable=import-outside-toplevel if namespace.report_name == ReportKey.MANAGED_OBJECTS.value: - from ._data import ObjectManager + ObjectManager = make_dyn_class("ObjectManager") json_report = ObjectManager.Methods.GetManagedObjects( get_object(TOP_OBJECT), {} @@ -127,14 +125,14 @@ def get_report(namespace): else: if namespace.report_name == ReportKey.ENGINE_STATE.value: - from ._data import Manager + Manager = make_dyn_class("Manager") (report, return_code, message) = Manager.Methods.EngineStateReport( get_object(TOP_OBJECT), {} ) else: - from ._data import Report + Report = make_dyn_class("Report") (report, return_code, message) = Report.Methods.GetReport( get_object(TOP_OBJECT), {"name": namespace.report_name} @@ -242,8 +240,7 @@ def unset_key(namespace): :raises StratisCliNoChangeError: :raises StratisCliIncoherenceError: """ - # pylint: disable=import-outside-toplevel - from ._data import Manager + Manager = make_dyn_class("Manager") proxy = get_object(TOP_OBJECT) diff --git a/tests/whitebox/integration/test_stratis.py b/tests/whitebox/integration/test_stratis.py index 0d128374d..1c5112f90 100644 --- a/tests/whitebox/integration/test_stratis.py +++ b/tests/whitebox/integration/test_stratis.py @@ -23,6 +23,7 @@ # isort: LOCAL from stratis_cli import StratisCliErrorCodes, run +from stratis_cli._actions import _dynamic from stratis_cli._errors import StratisCliStratisdVersionError from ._misc import RUNNER, TEST_RUNNER, RunTestCase, SimTestCase @@ -76,15 +77,14 @@ def test_outdated_stratisd_version(self): Verify that an outdated version of stratisd will produce a StratisCliStratisdVersionError. """ - # pylint: disable=import-outside-toplevel - # isort: LOCAL - from stratis_cli._actions import _data + _dynamic.make_dyn_class("Manager0") command_line = ["--propagate", "daemon", "version"] - # pylint: disable=protected-access with patch.object( - _data.Manager0.Properties.Version, + _dynamic._LOOKUP["Manager0"][ # pylint: disable=protected-access + 2 + ].Properties.Version, "Get", return_value="1.0.0", ): @@ -107,12 +107,14 @@ def test_catch_keyboard_exception(self): at the calling method generated by dbus-python-client-gen. """ - # pylint: disable=import-outside-toplevel - # isort: LOCAL - from stratis_cli._actions import _data + _dynamic.make_dyn_class("Manager0") with patch.object( - _data.Manager0.Properties.Version, "Get", side_effect=KeyboardInterrupt() + _dynamic._LOOKUP["Manager0"][ # pylint: disable=protected-access + 2 + ].Properties.Version, + "Get", + side_effect=KeyboardInterrupt(), ): with self.assertRaises(KeyboardInterrupt): run()(["daemon", "version"])