Skip to content

Commit

Permalink
provide workflow plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
seebi committed Dec 11, 2023
1 parent 5903d1a commit d7cd3c9
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 31 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Write ad-hoc transformations and workflow tasks with Python.
Do not use in production!

```
eval $(cmemc -c seebi-testing.eccenca.dev config eval)
eval $(cmemc -c my-cmem config eval)
task clean check deploy
```

Expand Down
File renamed without changes
18 changes: 16 additions & 2 deletions cmem_plugin_python/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,25 @@

EXAMPLE_CODE = """result = str(inputs) """

documentation = """
This transform operator allows the execution of arbitrary Python source code inside of a
transformation 😈
The following variable is available in the scope of the code execution:
- `inputs` - a `Sequence` of `Sequence[str)`, which represents the data which is passed
to the operator in the transformation. The inner sequence is the list of values
from an input port while the outer sequence represents the list of ports.
In order to provide data for the next operator in the transformation, a `result`
variable of type `Sequence[str]` needs to be prepared.
"""


@Plugin(
label="Python Code",
plugin_id="cmem_plugin_python-transform",
documentation=documentation,
parameters=[
PluginParameter(name="source_code", label="Source Code", default_value=EXAMPLE_CODE),
],
Expand All @@ -27,9 +42,8 @@ def __init__(self, source_code: PythonCode):

def transform(self, inputs: Sequence[Sequence[str]]) -> Sequence[str]:
"""Transform a collection of values."""
# pylint: disable=exec-used
self.log.info("Start doing bad things with custom code.")
scope: dict[str, Any] = {"inputs": inputs}
exec(str(self.source_code), scope) # nosec # noqa: S102
exec(str(self.source_code), scope) # nosec # noqa: S102
result: Sequence[str] = scope["result"]
return result
207 changes: 181 additions & 26 deletions cmem_plugin_python/workflow.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,214 @@
"""Python code workflow plugin module"""
from collections.abc import Sequence
from collections.abc import Iterator, Sequence
from types import SimpleNamespace
from typing import Any

from cmem_plugin_base.dataintegration.context import ExecutionContext
from cmem_plugin_base.dataintegration.description import Icon, Plugin, PluginParameter
from cmem_plugin_base.dataintegration.entity import Entities
from cmem_plugin_base.dataintegration.entity import Entities, Entity
from cmem_plugin_base.dataintegration.parameter.code import PythonCode
from cmem_plugin_base.dataintegration.plugins import WorkflowPlugin

docu_links = SimpleNamespace()
docu_links.entities = (
"https://documentation.eccenca.com/latest/develop/python-plugins/development/#entities"
)
docu_links.context = (
"https://documentation.eccenca.com/23.3/develop/python-plugins/development/#context-objects"
)

examples_init = SimpleNamespace()
examples_init.no_input_ports = """# no input ports (empty list)
# e.g. if you fetch data from the web
from cmem_plugin_base.dataintegration import ports
input_ports = ports.FixedNumberOfInputs([])"""
examples_init.single_input_flexible = """# a single port with flexible schema
# e.g. to process everything which comes in
from cmem_plugin_base.dataintegration import ports
input_ports = ports.FixedNumberOfInputs(
[ports.FlexibleSchemaPort()]
)"""
examples_init.single_input_fixed = """# a single port with a fixed schema
# e.g. to fetch data from a dataset
from cmem_plugin_base.dataintegration import ports, entity
my_schema = entity.EntitySchema(
type_uri="urn:x-example:output",
paths=[
entity.EntityPath("name"),
entity.EntityPath("description")
]
)
input_ports = ports.FixedNumberOfInputs(
[ports.FixedSchemaPort(schema=my_schema)]
)"""
examples_init.no_output = """# no output port
# e.g. if you post data to the web
output_port = None"""
examples_init.fixed_output = """# an output port with fixed schema
from cmem_plugin_base.dataintegration import ports, entity
my_schema = entity.EntitySchema(
type_uri="urn:x-example:output",
paths=[
entity.EntityPath("name"),
entity.EntityPath("description")
]
)
output_port = ports.FixedSchemaPort(schema=my_schema)"""

examples_execute = SimpleNamespace()
examples_execute.take_first = """# take the entities from the first input port
# and copy it to the output port
try:
result = inputs[0]
except IndexError:
raise ValueError("Please connect a task to the first input port.")
"""
examples_execute.randoms = """# create 1000 random strings and output them with a custom schema
from uuid import uuid4
from secrets import token_hex
from cmem_plugin_base.dataintegration import entity
my_schema = entity.EntitySchema(
type_uri="urn:x-example:random",
paths=[entity.EntityPath("random")]
)
entities = []
for _ in range(1000):
entity_uri = "urn:uuid:" + str(uuid4())
values = [[token_hex(10)]]
entities.append(
entity.Entity(uri=entity_uri, values=values)
)
result = entity.Entities(entities=entities, schema=my_schema)
"""

documentation = f"""
This workflow task allows the execution of arbitrary Python source code as a workflow task 😈
The "configuration" is split into two code fields: initialization and execution.
## Initialization
The initialization code is executed on task creation and update.
It is optional and can be used to configure input and output ports of the task as well as to
prepare data for the execution phase.
Note that the execution scope of this code is empty.
All used objects needs to be imported first.
### Specify input ports
In order to specify input ports, you have to define the variable `input_ports`.
Here are some valid examples.
If you do not specify any input ports, the default
behavior is a flexible number of flexible schema input ports.
```
{examples_init.no_input_ports}
```
```
{examples_init.single_input_flexible}
```
```
{examples_init.single_input_fixed}
```
### Specify the output port
In order to specify the output port, you have to define the variable `output_port`.
Here are some valid examples.
If you do not specify the output port, the default behavior is a flexible schema output port.
``` python
{examples_init.no_output}
```
``` python
{examples_init.fixed_output}
```
### Additional Data
In addition to input and output port specifications, you can provide additional data for
the task execution phase by manipulating the `data` dictionary.
```
data["my_schema"] = my_schema # in case you used a schema example above
data["output"] = ":-)"
```
## Execution
The execution code is interpreted in the context of an executed workflow.
The following variables are available in the scope of the code execution:
- `inputs` - a `Sequence` of `Entities`, which represents the data which will be passed to
the to task in the workflow. Have a look at [the entities documentation]({docu_links.entities})
for more information.
- `context` - an `ExecutionContext` object, which holds information about the system,
the user the current task and more. Have a look
[the context object documentation]({docu_links.context}) for more information.
- `data` - a `dict` of arbitrary data, which was optionally added by the initialization code.
In order to provide data for the next workflow task in the workflow, a `result`
variable of type `Entities` needs to be prepared.
Here are some valid examples:
``` python
{examples_execute.take_first}
```
``` python
{examples_execute.randoms}
```
"""


@Plugin(
label="Python Code",
icon=Icon(file_name="custom.svg", package=__package__),
icon=Icon(file_name="python_icon.svg", package=__package__),
plugin_id="cmem_plugin_python-workflow",
description="Run arbitrary Python code as a workflow task.",
documentation=documentation,
parameters=[
PluginParameter(
name="init_code",
label="Source Code (Initialization)",
label="Initialization",
default_value="",
description="Python source code for the initialization phase (see documentation).",
),
PluginParameter(
name="execute_code",
label="Source Code (Execution)",
label="Execution",
default_value="",
description="Python source code for the execution phase (see documentation).",
),
],
)
class PythonCodeWorkflowPlugin(WorkflowPlugin):
"""Python Code Workflow Plugin"""

example_init = """result = str(inputs) """
example_execute = """result = str(inputs) """
init_code: str
execute_code: str
data: dict

def __init__(self, init_code: PythonCode, execute_code: PythonCode):
self.init_code = str(init_code)
self.execute_code = str(execute_code)
scope: dict = {}
self.data = {}
scope: dict[str, Any] = {"data": self.data}
exec(str(self.init_code), scope) # nosec # noqa: S102
for _ in scope:
setattr(self, _, scope[_])
if "input_ports" in scope:
self.input_ports = scope["input_ports"]
if "output_port" in scope:
self.output_port = scope["output_port"]
self.data = scope["data"] if "data" in scope else {}

def execute(self, inputs: Sequence[Entities], context: ExecutionContext) -> Entities | None:
"""Start the plugin in workflow context."""
self.log.info("Start doing bad things with custom code.")
scope = {"inputs": inputs, "context": context}
scope: dict[str, Any] = {"inputs": inputs, "context": context, "data": self.data}
exec(str(self.execute_code), scope) # nosec # noqa: S102
try:
entities = scope["entities"]
except KeyError as error:
raise ValueError(
"Python execution code needs to prepare a variable 'entities'"
" of type 'Iterator[cmem_plugin_base.dataintegration.entity.Entity]'."
) from error
try:
schema = scope["schema"]
except KeyError as error:
raise ValueError(
"Python execution code needs to prepare a variable 'schema'"
" of type 'cmem_plugin_base.dataintegration.entity.EntitySchema'."
) from error
return Entities(entities=entities, schema=schema)
result: Iterator[Entity] | None = scope["result"] if "result" in scope else None
return result
16 changes: 15 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pytest-dotenv = "^0.5.2"
pytest-memray = { version = "^1.5.0", markers = "platform_system != 'Windows'" }
ruff = "^0.1.5"
safety = "^1.10.3"
types-requests = "^2.31.0.10"

[build-system]
requires = ["poetry-core>=1.0.0", "poetry-dynamic-versioning"]
Expand Down
Binary file not shown.
1 change: 0 additions & 1 deletion tests/test_transform.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Plugin tests."""


from cmem_plugin_base.dataintegration.parameter.code import PythonCode

from cmem_plugin_python.transform import PythonCodeTransformPlugin
Expand Down
56 changes: 56 additions & 0 deletions tests/test_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""Plugin tests."""
from typing import TYPE_CHECKING

import pytest
from cmem_plugin_base.dataintegration.parameter.code import PythonCode

from cmem_plugin_python.workflow import PythonCodeWorkflowPlugin, examples_execute, examples_init
from tests.utils import TestExecutionContext

if TYPE_CHECKING:
from cmem_plugin_base.dataintegration.entity import Entities


def test_workflow_execution() -> None:
"""Test with inputs"""
init_code = PythonCode(
"""# init code
pass
"""
)
execute_code = PythonCode(
"""# execute code
entities = None
schema = None
"""
)
plugin = PythonCodeWorkflowPlugin(init_code=init_code, execute_code=execute_code)
entities = plugin.execute(inputs=[], context=TestExecutionContext())
assert entities is None


def test_example_init_code() -> None:
"""Run initialization for all init examples"""
for init_code in vars(examples_init).values():
PythonCodeWorkflowPlugin(init_code=PythonCode(init_code), execute_code=PythonCode(""))


def test_example_execution() -> None:
"""Test execution of examples"""
# run 'randoms' first, then feed output to `take_first`
random_size = 1000
randoms = PythonCodeWorkflowPlugin(
init_code=PythonCode(""), execute_code=PythonCode(examples_execute.randoms)
)
randoms_result: Entities = randoms.execute(inputs=[], context=TestExecutionContext())
assert len(list(randoms_result.entities)) == random_size

take_first = PythonCodeWorkflowPlugin(
init_code=PythonCode(""), execute_code=PythonCode(examples_execute.take_first)
)
with pytest.raises(ValueError, match="Please connect a task to the first input port"):
take_first.execute(inputs=[], context=TestExecutionContext())
take_first_result: Entities = take_first.execute(
inputs=[randoms_result], context=TestExecutionContext()
)
assert len(list(take_first_result.entities)) == random_size

0 comments on commit d7cd3c9

Please sign in to comment.