diff --git a/README.md b/README.md index eae6535..b40e8d7 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ Write ad-hoc transformations and workflow tasks with Python. Do not use in production! ``` -eval $(cmemc -c seebi-testing.eccenca.dev config eval) +eval $(cmemc -c my-cmem config eval) task clean check deploy ``` diff --git a/cmem_plugin_python/custom.svg b/cmem_plugin_python/python_icon.svg similarity index 100% rename from cmem_plugin_python/custom.svg rename to cmem_plugin_python/python_icon.svg diff --git a/cmem_plugin_python/transform.py b/cmem_plugin_python/transform.py index ce14942..41e7a3d 100644 --- a/cmem_plugin_python/transform.py +++ b/cmem_plugin_python/transform.py @@ -11,10 +11,25 @@ EXAMPLE_CODE = """result = str(inputs) """ +documentation = """ +This transform operator allows the execution of arbitrary Python source code inside of a +transformation 😈 + +The following variable is available in the scope of the code execution: + +- `inputs` - a `Sequence` of `Sequence[str)`, which represents the data which is passed + to the operator in the transformation. The inner sequence is the list of values + from an input port while the outer sequence represents the list of ports. + +In order to provide data for the next operator in the transformation, a `result` +variable of type `Sequence[str]` needs to be prepared. +""" + @Plugin( label="Python Code", plugin_id="cmem_plugin_python-transform", + documentation=documentation, parameters=[ PluginParameter(name="source_code", label="Source Code", default_value=EXAMPLE_CODE), ], @@ -27,9 +42,8 @@ def __init__(self, source_code: PythonCode): def transform(self, inputs: Sequence[Sequence[str]]) -> Sequence[str]: """Transform a collection of values.""" - # pylint: disable=exec-used self.log.info("Start doing bad things with custom code.") scope: dict[str, Any] = {"inputs": inputs} - exec(str(self.source_code), scope) # nosec # noqa: S102 + exec(str(self.source_code), scope) # nosec # noqa: S102 result: Sequence[str] = scope["result"] return result diff --git a/cmem_plugin_python/workflow.py b/cmem_plugin_python/workflow.py index ca53d3f..aeabb2c 100644 --- a/cmem_plugin_python/workflow.py +++ b/cmem_plugin_python/workflow.py @@ -1,59 +1,214 @@ """Python code workflow plugin module""" -from collections.abc import Sequence +from collections.abc import Iterator, Sequence +from types import SimpleNamespace +from typing import Any from cmem_plugin_base.dataintegration.context import ExecutionContext from cmem_plugin_base.dataintegration.description import Icon, Plugin, PluginParameter -from cmem_plugin_base.dataintegration.entity import Entities +from cmem_plugin_base.dataintegration.entity import Entities, Entity from cmem_plugin_base.dataintegration.parameter.code import PythonCode from cmem_plugin_base.dataintegration.plugins import WorkflowPlugin +docu_links = SimpleNamespace() +docu_links.entities = ( + "https://documentation.eccenca.com/latest/develop/python-plugins/development/#entities" +) +docu_links.context = ( + "https://documentation.eccenca.com/23.3/develop/python-plugins/development/#context-objects" +) + +examples_init = SimpleNamespace() +examples_init.no_input_ports = """# no input ports (empty list) +# e.g. if you fetch data from the web +from cmem_plugin_base.dataintegration import ports +input_ports = ports.FixedNumberOfInputs([])""" +examples_init.single_input_flexible = """# a single port with flexible schema +# e.g. to process everything which comes in +from cmem_plugin_base.dataintegration import ports +input_ports = ports.FixedNumberOfInputs( + [ports.FlexibleSchemaPort()] +)""" +examples_init.single_input_fixed = """# a single port with a fixed schema +# e.g. to fetch data from a dataset +from cmem_plugin_base.dataintegration import ports, entity +my_schema = entity.EntitySchema( + type_uri="urn:x-example:output", + paths=[ + entity.EntityPath("name"), + entity.EntityPath("description") + ] +) +input_ports = ports.FixedNumberOfInputs( + [ports.FixedSchemaPort(schema=my_schema)] +)""" +examples_init.no_output = """# no output port +# e.g. if you post data to the web +output_port = None""" +examples_init.fixed_output = """# an output port with fixed schema +from cmem_plugin_base.dataintegration import ports, entity +my_schema = entity.EntitySchema( + type_uri="urn:x-example:output", + paths=[ + entity.EntityPath("name"), + entity.EntityPath("description") + ] +) +output_port = ports.FixedSchemaPort(schema=my_schema)""" + +examples_execute = SimpleNamespace() +examples_execute.take_first = """# take the entities from the first input port +# and copy it to the output port +try: + result = inputs[0] +except IndexError: + raise ValueError("Please connect a task to the first input port.") +""" +examples_execute.randoms = """# create 1000 random strings and output them with a custom schema +from uuid import uuid4 +from secrets import token_hex +from cmem_plugin_base.dataintegration import entity +my_schema = entity.EntitySchema( + type_uri="urn:x-example:random", + paths=[entity.EntityPath("random")] +) +entities = [] +for _ in range(1000): + entity_uri = "urn:uuid:" + str(uuid4()) + values = [[token_hex(10)]] + entities.append( + entity.Entity(uri=entity_uri, values=values) + ) +result = entity.Entities(entities=entities, schema=my_schema) +""" + +documentation = f""" +This workflow task allows the execution of arbitrary Python source code as a workflow task 😈 + +The "configuration" is split into two code fields: initialization and execution. + +## Initialization + +The initialization code is executed on task creation and update. +It is optional and can be used to configure input and output ports of the task as well as to +prepare data for the execution phase. +Note that the execution scope of this code is empty. +All used objects needs to be imported first. + +### Specify input ports + +In order to specify input ports, you have to define the variable `input_ports`. +Here are some valid examples. +If you do not specify any input ports, the default +behavior is a flexible number of flexible schema input ports. + +``` +{examples_init.no_input_ports} +``` + +``` +{examples_init.single_input_flexible} +``` + +``` +{examples_init.single_input_fixed} +``` + +### Specify the output port + +In order to specify the output port, you have to define the variable `output_port`. +Here are some valid examples. +If you do not specify the output port, the default behavior is a flexible schema output port. + +``` python +{examples_init.no_output} +``` + +``` python +{examples_init.fixed_output} +``` + +### Additional Data + +In addition to input and output port specifications, you can provide additional data for +the task execution phase by manipulating the `data` dictionary. + +``` +data["my_schema"] = my_schema # in case you used a schema example above +data["output"] = ":-)" +``` + +## Execution + +The execution code is interpreted in the context of an executed workflow. +The following variables are available in the scope of the code execution: + +- `inputs` - a `Sequence` of `Entities`, which represents the data which will be passed to + the to task in the workflow. Have a look at [the entities documentation]({docu_links.entities}) + for more information. +- `context` - an `ExecutionContext` object, which holds information about the system, + the user the current task and more. Have a look + [the context object documentation]({docu_links.context}) for more information. +- `data` - a `dict` of arbitrary data, which was optionally added by the initialization code. + +In order to provide data for the next workflow task in the workflow, a `result` +variable of type `Entities` needs to be prepared. + +Here are some valid examples: + +``` python +{examples_execute.take_first} +``` + +``` python +{examples_execute.randoms} +``` +""" + @Plugin( label="Python Code", - icon=Icon(file_name="custom.svg", package=__package__), + icon=Icon(file_name="python_icon.svg", package=__package__), plugin_id="cmem_plugin_python-workflow", + description="Run arbitrary Python code as a workflow task.", + documentation=documentation, parameters=[ PluginParameter( name="init_code", - label="Source Code (Initialization)", + label="Initialization", + default_value="", + description="Python source code for the initialization phase (see documentation).", ), PluginParameter( name="execute_code", - label="Source Code (Execution)", + label="Execution", + default_value="", + description="Python source code for the execution phase (see documentation).", ), ], ) class PythonCodeWorkflowPlugin(WorkflowPlugin): """Python Code Workflow Plugin""" - example_init = """result = str(inputs) """ - example_execute = """result = str(inputs) """ + init_code: str + execute_code: str + data: dict def __init__(self, init_code: PythonCode, execute_code: PythonCode): self.init_code = str(init_code) self.execute_code = str(execute_code) - scope: dict = {} + self.data = {} + scope: dict[str, Any] = {"data": self.data} exec(str(self.init_code), scope) # nosec # noqa: S102 - for _ in scope: - setattr(self, _, scope[_]) + if "input_ports" in scope: + self.input_ports = scope["input_ports"] + if "output_port" in scope: + self.output_port = scope["output_port"] + self.data = scope["data"] if "data" in scope else {} def execute(self, inputs: Sequence[Entities], context: ExecutionContext) -> Entities | None: """Start the plugin in workflow context.""" self.log.info("Start doing bad things with custom code.") - scope = {"inputs": inputs, "context": context} + scope: dict[str, Any] = {"inputs": inputs, "context": context, "data": self.data} exec(str(self.execute_code), scope) # nosec # noqa: S102 - try: - entities = scope["entities"] - except KeyError as error: - raise ValueError( - "Python execution code needs to prepare a variable 'entities'" - " of type 'Iterator[cmem_plugin_base.dataintegration.entity.Entity]'." - ) from error - try: - schema = scope["schema"] - except KeyError as error: - raise ValueError( - "Python execution code needs to prepare a variable 'schema'" - " of type 'cmem_plugin_base.dataintegration.entity.EntitySchema'." - ) from error - return Entities(entities=entities, schema=schema) + result: Iterator[Entity] | None = scope["result"] if "result" in scope else None + return result diff --git a/poetry.lock b/poetry.lock index 476447f..e87e575 100644 --- a/poetry.lock +++ b/poetry.lock @@ -999,6 +999,20 @@ typing-extensions = ">=4.4.0,<5.0.0" [package.extras] syntax = ["tree-sitter (>=0.20.1,<0.21.0)", "tree_sitter_languages (>=1.7.0)"] +[[package]] +name = "types-requests" +version = "2.31.0.10" +description = "Typing stubs for requests" +optional = false +python-versions = ">=3.7" +files = [ + {file = "types-requests-2.31.0.10.tar.gz", hash = "sha256:dc5852a76f1eaf60eafa81a2e50aefa3d1f015c34cf0cba130930866b1b22a92"}, + {file = "types_requests-2.31.0.10-py3-none-any.whl", hash = "sha256:b32b9a86beffa876c0c3ac99a4cd3b8b51e973fb8e3bd4e0a6bb32c7efad80fc"}, +] + +[package.dependencies] +urllib3 = ">=2" + [[package]] name = "typing-extensions" version = "4.9.0" @@ -1058,4 +1072,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "d154f7d9ea09bd33a4d438626289797dcf6fc528471996cb46932d4c8b94686b" +content-hash = "7e779c56e8e80cd00d58446657134fb116aa993b35a9d11c9afb79b2b5cc324a" diff --git a/pyproject.toml b/pyproject.toml index e77a9ab..08e32d7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,7 @@ pytest-dotenv = "^0.5.2" pytest-memray = { version = "^1.5.0", markers = "platform_system != 'Windows'" } ruff = "^0.1.5" safety = "^1.10.3" +types-requests = "^2.31.0.10" [build-system] requires = ["poetry-core>=1.0.0", "poetry-dynamic-versioning"] diff --git a/tests/fixtures/cmem_plugin_python_testing.project.zip b/tests/fixtures/cmem_plugin_python_testing.project.zip new file mode 100644 index 0000000..b924d0c Binary files /dev/null and b/tests/fixtures/cmem_plugin_python_testing.project.zip differ diff --git a/tests/test_transform.py b/tests/test_transform.py index 91cad85..6cb2af4 100644 --- a/tests/test_transform.py +++ b/tests/test_transform.py @@ -1,6 +1,5 @@ """Plugin tests.""" - from cmem_plugin_base.dataintegration.parameter.code import PythonCode from cmem_plugin_python.transform import PythonCodeTransformPlugin diff --git a/tests/test_workflow.py b/tests/test_workflow.py new file mode 100644 index 0000000..42f0f26 --- /dev/null +++ b/tests/test_workflow.py @@ -0,0 +1,56 @@ +"""Plugin tests.""" +from typing import TYPE_CHECKING + +import pytest +from cmem_plugin_base.dataintegration.parameter.code import PythonCode + +from cmem_plugin_python.workflow import PythonCodeWorkflowPlugin, examples_execute, examples_init +from tests.utils import TestExecutionContext + +if TYPE_CHECKING: + from cmem_plugin_base.dataintegration.entity import Entities + + +def test_workflow_execution() -> None: + """Test with inputs""" + init_code = PythonCode( + """# init code +pass + """ + ) + execute_code = PythonCode( + """# execute code +entities = None +schema = None + """ + ) + plugin = PythonCodeWorkflowPlugin(init_code=init_code, execute_code=execute_code) + entities = plugin.execute(inputs=[], context=TestExecutionContext()) + assert entities is None + + +def test_example_init_code() -> None: + """Run initialization for all init examples""" + for init_code in vars(examples_init).values(): + PythonCodeWorkflowPlugin(init_code=PythonCode(init_code), execute_code=PythonCode("")) + + +def test_example_execution() -> None: + """Test execution of examples""" + # run 'randoms' first, then feed output to `take_first` + random_size = 1000 + randoms = PythonCodeWorkflowPlugin( + init_code=PythonCode(""), execute_code=PythonCode(examples_execute.randoms) + ) + randoms_result: Entities = randoms.execute(inputs=[], context=TestExecutionContext()) + assert len(list(randoms_result.entities)) == random_size + + take_first = PythonCodeWorkflowPlugin( + init_code=PythonCode(""), execute_code=PythonCode(examples_execute.take_first) + ) + with pytest.raises(ValueError, match="Please connect a task to the first input port"): + take_first.execute(inputs=[], context=TestExecutionContext()) + take_first_result: Entities = take_first.execute( + inputs=[randoms_result], context=TestExecutionContext() + ) + assert len(list(take_first_result.entities)) == random_size