diff --git a/src/safeds_runner/server/pipeline_manager.py b/src/safeds_runner/server/pipeline_manager.py index 7db0d84..11743b5 100644 --- a/src/safeds_runner/server/pipeline_manager.py +++ b/src/safeds_runner/server/pipeline_manager.py @@ -35,12 +35,14 @@ class PipelineManager: """ def __init__(self) -> None: - """Create a new PipelineManager object, which needs to be started by calling startup().""" + """Create a new PipelineManager object, which is lazily started, when needed.""" self._placeholder_map: dict = {} self._websocket_target: simple_websocket.Server | None = None @cached_property def _multiprocessing_manager(self) -> SyncManager: + if multiprocessing.get_start_method() != "spawn": + multiprocessing.set_start_method("spawn", force=True) return multiprocessing.Manager() @cached_property @@ -65,6 +67,7 @@ def _startup(self) -> None: This method should not be called during the bootstrap phase of the python interpreter, as it leads to a crash. """ + _mq = self._messages_queue # Initialize it here before starting a thread to avoid potential race condition if not self._messages_queue_thread.is_alive(): self._messages_queue_thread.start() diff --git a/tests/safeds_runner/server/test_runner_main.py b/tests/safeds_runner/server/test_runner_main.py index 0e69e22..d95252a 100644 --- a/tests/safeds_runner/server/test_runner_main.py +++ b/tests/safeds_runner/server/test_runner_main.py @@ -7,6 +7,7 @@ def test_should_runner_start_successfully() -> None: + subprocess._USE_VFORK = False # Do not fork the subprocess as it is unsafe to do process = subprocess.Popen(["poetry", "run", "safe-ds-runner", "start"], cwd=_project_root, stderr=subprocess.PIPE) while process.poll() is None: process_line = str(typing.cast(IO[bytes], process.stderr).readline(), "utf-8").strip() diff --git a/tests/safeds_runner/server/test_websocket_mock.py b/tests/safeds_runner/server/test_websocket_mock.py index f428ed1..5796c7f 100644 --- a/tests/safeds_runner/server/test_websocket_mock.py +++ b/tests/safeds_runner/server/test_websocket_mock.py @@ -23,11 +23,11 @@ def __init__(self, messages: list[str]): self.received: list[str] = [] self.close_reason: int | None = None self.close_message: str | None = None - self.condition_variable = threading.Condition() + self.condition_variable = threading.Condition(lock=threading.Lock()) def send(self, msg: str) -> None: - self.received.append(msg) with self.condition_variable: + self.received.append(msg) self.condition_variable.notify_all() def receive(self) -> str | None: @@ -44,7 +44,11 @@ def wait_for_messages(self, wait_for_messages: int = 1) -> None: with self.condition_variable: if len(self.received) >= wait_for_messages: return - self.condition_variable.wait() + self.condition_variable.wait(1.0) # this should not be needed, but it seems the process can get stuck + + def get_next_received_message(self) -> str: + with self.condition_variable: + return self.received.pop(0) @pytest.mark.parametrize( @@ -208,7 +212,7 @@ def test_should_execute_pipeline_return_exception( mock_connection = MockWebsocketConnection(messages) ws_main(mock_connection, app_pipeline_manager) mock_connection.wait_for_messages(1) - exception_message = Message.from_dict(json.loads(mock_connection.received.pop(0))) + exception_message = Message.from_dict(json.loads(mock_connection.get_next_received_message())) assert exception_message.type == expected_response_runtime_error.type assert exception_message.id == expected_response_runtime_error.id @@ -293,7 +297,7 @@ def test_should_execute_pipeline_return_valid_placeholder( # And compare with expected responses while len(expected_responses) > 0: mock_connection.wait_for_messages(1) - next_message = Message.from_dict(json.loads(mock_connection.received.pop(0))) + next_message = Message.from_dict(json.loads(mock_connection.get_next_received_message())) assert next_message == expected_responses.pop(0) @@ -360,5 +364,5 @@ def test_should_successfully_execute_simple_flow(messages: list[str], expected_r mock_connection = MockWebsocketConnection(messages) ws_main(mock_connection, app_pipeline_manager) mock_connection.wait_for_messages(1) - query_result_invalid = Message.from_dict(json.loads(mock_connection.received.pop(0))) + query_result_invalid = Message.from_dict(json.loads(mock_connection.get_next_received_message())) assert query_result_invalid == expected_response