From 2927a28b74340d40d228cff34372d0cb0e83eb9f Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 6 Aug 2024 16:15:27 +0200 Subject: [PATCH 01/33] add integration test for client --- tests/network/test_integration_6_client.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 tests/network/test_integration_6_client.py diff --git a/tests/network/test_integration_6_client.py b/tests/network/test_integration_6_client.py new file mode 100644 index 0000000000..e69de29bb2 From 8e7cd3e4a1150fa560e38e3e8d390689adbbf978 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 6 Aug 2024 16:15:38 +0200 Subject: [PATCH 02/33] fix the test dir path in docker --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 0608b0b738..4997066d1b 100644 --- a/Makefile +++ b/Makefile @@ -286,7 +286,7 @@ network-module-test: assets INTEGRATION_TEST_IN_DOCKER = docker exec core_test network-integration-test: $(DOCKER_COMPOSE) --file tests/network/docker-compose.yml up -d - -$(INTEGRATION_TEST_IN_DOCKER) pytest -k 'test_integration_' -v --ignore-glob="$(TESTDIR)/network/*ocrd_all*.py" + -$(INTEGRATION_TEST_IN_DOCKER) pytest -k 'test_integration_' -v --ignore-glob="tests/network/*ocrd_all*.py" $(DOCKER_COMPOSE) --file tests/network/docker-compose.yml down --remove-orphans network-integration-test-cicd: From bd16dd78d92d45e1003d18e8fb95f529017ae065 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 6 Aug 2024 16:16:52 +0200 Subject: [PATCH 03/33] update network client --- src/ocrd_network/cli/client.py | 69 ++++++++++++++++++++++++---------- src/ocrd_network/client.py | 13 +------ 2 files changed, 52 insertions(+), 30 deletions(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index 8086658e04..38f017d1c4 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -1,4 +1,5 @@ import click +from http.server import BaseHTTPRequestHandler, HTTPServer from typing import Optional from ocrd.decorators import parameter_option @@ -6,6 +7,37 @@ from ocrd_utils import DEFAULT_METS_BASENAME +STOP_WAITING_CALLBACK = False + + +class ClientCallbackHandler(BaseHTTPRequestHandler): + """ + A simple callback handler for the network client to be invoked when the Processing Worker + sends requests to the `callback_url` set in the processing request submitted to the Processing Server. + """ + + def do_POST(self): + self.send_response(200) + self.send_header("Content-Type", "text/plain") + self.end_headers() + self.wfile.write("finished".encode("utf-8")) + len = int(self.headers.get("Content-Length", 0)) + data = self.rfile.read(len).decode("utf-8") + # TODO: how should the callback-content be handled/printed + print(f"Processor finished: {data}") + global STOP_WAITING_CALLBACK + STOP_WAITING_CALLBACK = True + + +class ClientCallbackServer(HTTPServer): + """ + A simple http-server that listens for callbacks from the Processing Server/Worker. + """ + def __init__(self): + super().__init__(server_address=("0.0.0.0", 0), RequestHandlerClass=ClientCallbackHandler) + self.callback_url = f"http://{self.server_address[0]}:{self.server_address[1]}" + + @click.group('client') def client_cli(): """ @@ -43,25 +75,27 @@ def processing_cli(): @click.option('--callback-url') @click.option('--agent-type', default='worker') def send_processing_request( - address: Optional[str], - processor_name: str, - mets: str, - input_file_grp: str, - output_file_grp: Optional[str], - page_id: Optional[str], - parameter: Optional[dict], - result_queue_name: Optional[str], - callback_url: Optional[str], - # TODO: This is temporally available to toggle - # between the ProcessingWorker/ProcessorServer - agent_type: Optional[str] + address: Optional[str], + processor_name: str, + mets: str, + input_file_grp: str, + output_file_grp: Optional[str], + page_id: Optional[str], + parameter: Optional[dict], + result_queue_name: Optional[str], + callback_url: Optional[str], + # TODO: This is temporally available to toggle + # between the ProcessingWorker/ProcessorServer + agent_type: Optional[str] ): + callback_server = ClientCallbackServer() req_params = { "path_to_mets": mets, "description": "OCR-D Network client request", "input_file_grps": input_file_grp.split(','), "parameters": parameter if parameter else {}, "agent_type": agent_type, + "callback_url": callback_server.callback_url } if output_file_grp: req_params["output_file_grps"] = output_file_grp.split(',') @@ -72,15 +106,12 @@ def send_processing_request( if callback_url: req_params["callback_url"] = callback_url - client = Client( - server_addr_processing=address - ) - response = client.send_processing_request( - processor_name=processor_name, - req_params=req_params - ) + client = Client(server_addr_processing=address) + response = client.send_processing_request(processor_name=processor_name, req_params=req_params) processing_job_id = response.get('job_id', None) print(f"Processing job id: {processing_job_id}") + while not STOP_WAITING_CALLBACK: + callback_server.handle_request() @client_cli.group('workflow') diff --git a/src/ocrd_network/client.py b/src/ocrd_network/client.py index 9fa0b3994a..688651f7de 100644 --- a/src/ocrd_network/client.py +++ b/src/ocrd_network/client.py @@ -5,22 +5,13 @@ from .constants import NETWORK_PROTOCOLS -# TODO: This is just a conceptual implementation and first try to -# trigger further discussions on how this should look like. class Client: - def __init__( - self, - server_addr_processing: str = config.OCRD_NETWORK_SERVER_ADDR_PROCESSING, - server_addr_workflow: str = config.OCRD_NETWORK_SERVER_ADDR_WORKFLOW, - server_addr_workspace: str = config.OCRD_NETWORK_SERVER_ADDR_WORKSPACE - ): + def __init__(self, server_addr_processing: str = config.OCRD_NETWORK_SERVER_ADDR_PROCESSING): self.log = getLogger(f"ocrd_network.client") self.server_addr_processing = server_addr_processing - self.server_addr_workflow = server_addr_workflow - self.server_addr_workspace = server_addr_workspace + verify_server_protocol(self.server_addr_processing) def send_processing_request(self, processor_name: str, req_params: dict): - verify_server_protocol(self.server_addr_processing) req_url = f"{self.server_addr_processing}/processor/{processor_name}" req_headers = {"Content-Type": "application/json; charset=utf-8"} req_json = loads(dumps(req_params)) From b2c0675016616147c90045cfcfb3aee8a401096f Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 6 Aug 2024 16:17:39 +0200 Subject: [PATCH 04/33] integration test for client --- tests/network/test_integration_6_client.py | 32 ++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tests/network/test_integration_6_client.py b/tests/network/test_integration_6_client.py index e69de29bb2..f9dc322fbb 100644 --- a/tests/network/test_integration_6_client.py +++ b/tests/network/test_integration_6_client.py @@ -0,0 +1,32 @@ +from click.testing import CliRunner + +from src.ocrd_network.constants import AgentType, JobState +from tests.base import assets +from tests.network.config import test_config +from ocrd_network.cli.client import client_cli + +PROCESSING_SERVER_URL = test_config.PROCESSING_SERVER_URL + + +def test_client_processing_processor(): + workspace_root = "kant_aufklaerung_1784/data" + path_to_mets = assets.path_to(f"{workspace_root}/mets.xml") + runner = CliRunner() + result = runner.invoke( + client_cli, + args=[ + "processing", "processor", "ocrd-dummy", + "--address", PROCESSING_SERVER_URL, + "--mets", path_to_mets, + "--input-file-grp", "OCR-D-IMG", + "--output-file-grp", "OCR-D-DUMMY-TEST-CLIENT", + "--agent_type", AgentType.PROCESSING_WORKER + ] + ) + # TODO: Do a better result check + assert result.output.count("finished") == 1 + + +def test_client_processing_workflow(): + pass + From db6e566b1cc64701cea4aa60e00fb9c5ce63b1bf Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 6 Aug 2024 16:36:40 +0200 Subject: [PATCH 05/33] Fix flag typo --- tests/network/test_integration_6_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/network/test_integration_6_client.py b/tests/network/test_integration_6_client.py index f9dc322fbb..e95e3ea2a6 100644 --- a/tests/network/test_integration_6_client.py +++ b/tests/network/test_integration_6_client.py @@ -20,7 +20,7 @@ def test_client_processing_processor(): "--mets", path_to_mets, "--input-file-grp", "OCR-D-IMG", "--output-file-grp", "OCR-D-DUMMY-TEST-CLIENT", - "--agent_type", AgentType.PROCESSING_WORKER + "--agent-type", AgentType.PROCESSING_WORKER ] ) # TODO: Do a better result check From bec81ba92e48cdb0f6289807c9617b0d5ec936d3 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 6 Aug 2024 20:41:18 +0200 Subject: [PATCH 06/33] try docker host ip --- src/ocrd_network/cli/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index 38f017d1c4..404600a02d 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -35,7 +35,7 @@ class ClientCallbackServer(HTTPServer): """ def __init__(self): super().__init__(server_address=("0.0.0.0", 0), RequestHandlerClass=ClientCallbackHandler) - self.callback_url = f"http://{self.server_address[0]}:{self.server_address[1]}" + self.callback_url = f"http://172.17.0.1:{self.server_address[1]}" @click.group('client') From 481589641a0427bbea77601205c27b47d93df9ca Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 9 Aug 2024 12:29:46 +0200 Subject: [PATCH 07/33] remove the client server --- src/ocrd_network/cli/client.py | 38 +--------------------- tests/network/test_integration_6_client.py | 3 +- 2 files changed, 2 insertions(+), 39 deletions(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index 404600a02d..2e52a0533a 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -1,5 +1,4 @@ import click -from http.server import BaseHTTPRequestHandler, HTTPServer from typing import Optional from ocrd.decorators import parameter_option @@ -7,37 +6,6 @@ from ocrd_utils import DEFAULT_METS_BASENAME -STOP_WAITING_CALLBACK = False - - -class ClientCallbackHandler(BaseHTTPRequestHandler): - """ - A simple callback handler for the network client to be invoked when the Processing Worker - sends requests to the `callback_url` set in the processing request submitted to the Processing Server. - """ - - def do_POST(self): - self.send_response(200) - self.send_header("Content-Type", "text/plain") - self.end_headers() - self.wfile.write("finished".encode("utf-8")) - len = int(self.headers.get("Content-Length", 0)) - data = self.rfile.read(len).decode("utf-8") - # TODO: how should the callback-content be handled/printed - print(f"Processor finished: {data}") - global STOP_WAITING_CALLBACK - STOP_WAITING_CALLBACK = True - - -class ClientCallbackServer(HTTPServer): - """ - A simple http-server that listens for callbacks from the Processing Server/Worker. - """ - def __init__(self): - super().__init__(server_address=("0.0.0.0", 0), RequestHandlerClass=ClientCallbackHandler) - self.callback_url = f"http://172.17.0.1:{self.server_address[1]}" - - @click.group('client') def client_cli(): """ @@ -88,14 +56,12 @@ def send_processing_request( # between the ProcessingWorker/ProcessorServer agent_type: Optional[str] ): - callback_server = ClientCallbackServer() req_params = { "path_to_mets": mets, "description": "OCR-D Network client request", "input_file_grps": input_file_grp.split(','), "parameters": parameter if parameter else {}, - "agent_type": agent_type, - "callback_url": callback_server.callback_url + "agent_type": agent_type } if output_file_grp: req_params["output_file_grps"] = output_file_grp.split(',') @@ -110,8 +76,6 @@ def send_processing_request( response = client.send_processing_request(processor_name=processor_name, req_params=req_params) processing_job_id = response.get('job_id', None) print(f"Processing job id: {processing_job_id}") - while not STOP_WAITING_CALLBACK: - callback_server.handle_request() @client_cli.group('workflow') diff --git a/tests/network/test_integration_6_client.py b/tests/network/test_integration_6_client.py index e95e3ea2a6..bac9e1bdf8 100644 --- a/tests/network/test_integration_6_client.py +++ b/tests/network/test_integration_6_client.py @@ -24,9 +24,8 @@ def test_client_processing_processor(): ] ) # TODO: Do a better result check - assert result.output.count("finished") == 1 + assert result.output.count("Processing job id:") == 1 def test_client_processing_workflow(): pass - From cb3460f7993689bb6b5d937643b87426ce6dce61 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 9 Aug 2024 13:16:52 +0200 Subject: [PATCH 08/33] refactor status checks --- src/ocrd_network/cli/client.py | 6 +- src/ocrd_network/client_utils.py | 67 +++++++++++++++++++ .../test_integration_5_processing_server.py | 15 ++--- tests/network/test_integration_ocrd_all.py | 9 +-- tests/network/utils.py | 47 ------------- 5 files changed, 82 insertions(+), 62 deletions(-) create mode 100644 src/ocrd_network/client_utils.py delete mode 100644 tests/network/utils.py diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index 2e52a0533a..fe87d01fbf 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -42,6 +42,7 @@ def processing_cli(): @click.option('--result-queue-name') @click.option('--callback-url') @click.option('--agent-type', default='worker') +@click.option('-b', '--block-till-job-end', default=False) def send_processing_request( address: Optional[str], processor_name: str, @@ -54,7 +55,8 @@ def send_processing_request( callback_url: Optional[str], # TODO: This is temporally available to toggle # between the ProcessingWorker/ProcessorServer - agent_type: Optional[str] + agent_type: Optional[str], + block_till_job_end: Optional[bool] ): req_params = { "path_to_mets": mets, @@ -76,6 +78,8 @@ def send_processing_request( response = client.send_processing_request(processor_name=processor_name, req_params=req_params) processing_job_id = response.get('job_id', None) print(f"Processing job id: {processing_job_id}") + if block_till_job_end: + pass @client_cli.group('workflow') diff --git a/src/ocrd_network/client_utils.py b/src/ocrd_network/client_utils.py new file mode 100644 index 0000000000..96fab03372 --- /dev/null +++ b/src/ocrd_network/client_utils.py @@ -0,0 +1,67 @@ +from requests import get as request_get, post as request_post +from time import sleep +from .constants import JobState + + +def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int): + if job_type not in ["workflow", "processor"]: + raise ValueError("Unknown job type, expected 'workflow' or 'processor'") + job_state = JobState.unset + while tries > 0: + sleep(wait) + if job_type == "processor": + job_state = get_ps_processing_job_status(ps_server_host, job_id) + if job_type == "workflow": + job_state = get_ps_workflow_job_status(ps_server_host, job_id) + if job_state == JobState.success or job_state == JobState.failed: + break + tries -= 1 + return job_state + + +def poll_job_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState: + return _poll_endpoint_status(ps_server_host, job_id, "processor", tries, wait) + + +def poll_wf_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState: + return _poll_endpoint_status(ps_server_host, job_id, "workflow", tries, wait) + + +def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> str: + request_url = f"{ps_server_host}/processor/job/{processing_job_id}" + response = request_get(url=request_url, headers={"accept": "application/json"}) + assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" + job_state = response.json()["state"] + assert job_state + return job_state + + +def get_ps_workflow_job_status(ps_server_host: str, workflow_job_id: str) -> str: + request_url = f"{ps_server_host}/workflow/job-simple/{workflow_job_id}" + response = request_get(url=request_url, headers={"accept": "application/json"}) + assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" + job_state = response.json()["state"] + assert job_state + return job_state + + +def post_ps_processing_request(ps_server_host: str, processor: str, job_input: dict) -> str: + request_url = f"{ps_server_host}/processor/run/{processor}" + response = request_post(url=request_url, headers={"accept": "application/json"}, json=job_input) + assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" + processing_job_id = response.json()["job_id"] + assert processing_job_id + return processing_job_id + + +# TODO: Can be extended to include other parameters such as page_wise +def post_ps_workflow_request(ps_server_host: str, path_to_wf: str, path_to_mets: str) -> str: + request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise=True" + response = request_post( + url=request_url, headers={"accept": "application/json"}, files={"workflow": open(path_to_wf, "rb")}) + # print(response.json()) + # print(response.__dict__) + assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" + wf_job_id = response.json()["job_id"] + assert wf_job_id + return wf_job_id diff --git a/tests/network/test_integration_5_processing_server.py b/tests/network/test_integration_5_processing_server.py index bce67bbe69..bf5fadee3c 100644 --- a/tests/network/test_integration_5_processing_server.py +++ b/tests/network/test_integration_5_processing_server.py @@ -1,10 +1,13 @@ from pathlib import Path from requests import get as request_get +from src.ocrd_network.client_utils import ( + poll_job_status_till_timeout_fail_or_success, poll_wf_status_till_timeout_fail_or_success, + post_ps_processing_request, post_ps_workflow_request) from src.ocrd_network.constants import AgentType, JobState from src.ocrd_network.logging_utils import get_processing_job_logging_file_path from tests.base import assets from tests.network.config import test_config -from tests.network.utils import poll_till_timeout_fail_or_success, post_ps_processing_request, post_ps_workflow_request + PROCESSING_SERVER_URL = test_config.PROCESSING_SERVER_URL @@ -40,10 +43,8 @@ def test_processing_server_processing_request(): "parameters": {} } test_processor = "ocrd-dummy" - processing_job_id = post_ps_processing_request(PROCESSING_SERVER_URL, test_processor, test_processing_job_input) - job_state = poll_till_timeout_fail_or_success( - test_url=f"{PROCESSING_SERVER_URL}/processor/job/{processing_job_id}", tries=10, wait=10 - ) + process_job_id = post_ps_processing_request(PROCESSING_SERVER_URL, test_processor, test_processing_job_input) + job_state = poll_job_status_till_timeout_fail_or_success(PROCESSING_SERVER_URL, process_job_id, tries=10, wait=10) assert job_state == JobState.success # Check the existence of the results locally @@ -58,9 +59,7 @@ def test_processing_server_workflow_request(): workspace_root = "kant_aufklaerung_1784/data" path_to_mets = assets.path_to(f"{workspace_root}/mets.xml") wf_job_id = post_ps_workflow_request(PROCESSING_SERVER_URL, path_to_dummy_wf, path_to_mets) - job_state = poll_till_timeout_fail_or_success( - test_url=f"{PROCESSING_SERVER_URL}/workflow/job-simple/{wf_job_id}", tries=30, wait=10 - ) + job_state = poll_wf_status_till_timeout_fail_or_success(PROCESSING_SERVER_URL, wf_job_id, tries=10, wait=10) assert job_state == JobState.success # Check the existence of the results locally diff --git a/tests/network/test_integration_ocrd_all.py b/tests/network/test_integration_ocrd_all.py index d54d9f2fd5..bebfcf7623 100644 --- a/tests/network/test_integration_ocrd_all.py +++ b/tests/network/test_integration_ocrd_all.py @@ -1,6 +1,7 @@ +from src.ocrd_network.client_utils import poll_wf_status_till_timeout_fail_or_success, post_ps_workflow_request from src.ocrd_network.constants import JobState from tests.network.config import test_config -from tests.network.utils import poll_till_timeout_fail_or_success, post_ps_workflow_request + PROCESSING_SERVER_URL = test_config.PROCESSING_SERVER_URL @@ -11,9 +12,5 @@ def test_ocrd_all_workflow(): path_to_wf = "/ocrd-data/assets/ocrd_all-test-workflow.txt" path_to_mets = "/data/mets.xml" wf_job_id = post_ps_workflow_request(PROCESSING_SERVER_URL, path_to_wf, path_to_mets) - job_state = poll_till_timeout_fail_or_success( - test_url=f"{PROCESSING_SERVER_URL}/workflow/job-simple/{wf_job_id}", - tries=30, - wait=10 - ) + job_state = poll_wf_status_till_timeout_fail_or_success(PROCESSING_SERVER_URL, wf_job_id, tries=30, wait=10) assert job_state == JobState.success diff --git a/tests/network/utils.py b/tests/network/utils.py deleted file mode 100644 index dbf594a894..0000000000 --- a/tests/network/utils.py +++ /dev/null @@ -1,47 +0,0 @@ -from requests import get as request_get, post as request_post -from time import sleep -from src.ocrd_network.constants import JobState - - -def poll_till_timeout_fail_or_success(test_url: str, tries: int, wait: int) -> JobState: - job_state = JobState.unset - while tries > 0: - sleep(wait) - response = request_get(url=test_url) - assert response.status_code == 200, f"Processing server: {test_url}, {response.status_code}" - job_state = response.json()["state"] - if job_state == JobState.success or job_state == JobState.failed: - break - tries -= 1 - return job_state - - -def post_ps_processing_request(ps_server_host: str, test_processor: str, test_job_input: dict) -> str: - test_url = f"{ps_server_host}/processor/run/{test_processor}" - response = request_post( - url=test_url, - headers={"accept": "application/json"}, - json=test_job_input - ) - # print(response.json()) - # print(response.__dict__) - assert response.status_code == 200, f"Processing server: {test_url}, {response.status_code}" - processing_job_id = response.json()["job_id"] - assert processing_job_id - return processing_job_id - - -# TODO: Can be extended to include other parameters such as page_wise -def post_ps_workflow_request(ps_server_host: str, path_to_test_wf: str, path_to_test_mets: str) -> str: - test_url = f"{ps_server_host}/workflow/run?mets_path={path_to_test_mets}&page_wise=True" - response = request_post( - url=test_url, - headers={"accept": "application/json"}, - files={"workflow": open(path_to_test_wf, "rb")} - ) - # print(response.json()) - # print(response.__dict__) - assert response.status_code == 200, f"Processing server: {test_url}, {response.status_code}" - wf_job_id = response.json()["job_id"] - assert wf_job_id - return wf_job_id From 920c1a9737dcf6f2e58ca3a5e24019ef3c392ac7 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 9 Aug 2024 13:23:25 +0200 Subject: [PATCH 09/33] fix test --- tests/network/test_integration_6_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/network/test_integration_6_client.py b/tests/network/test_integration_6_client.py index bac9e1bdf8..55168d8322 100644 --- a/tests/network/test_integration_6_client.py +++ b/tests/network/test_integration_6_client.py @@ -24,7 +24,7 @@ def test_client_processing_processor(): ] ) # TODO: Do a better result check - assert result.output.count("Processing job id:") == 1 + assert result.output.count(f"{JobState.success}") == 1 def test_client_processing_workflow(): From 2a843a82f348e3677a3835a84938c216901e81e8 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 9 Aug 2024 14:47:50 +0200 Subject: [PATCH 10/33] fix: client processing request --- src/ocrd_network/cli/client.py | 14 +++++----- src/ocrd_network/client.py | 28 ++++++++----------- src/ocrd_network/client_utils.py | 24 +++++++++++++---- tests/network/test_integration_6_client.py | 31 ++++++++++------------ 4 files changed, 51 insertions(+), 46 deletions(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index fe87d01fbf..c2b6c500d1 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -1,9 +1,9 @@ import click +from json import dumps, loads from typing import Optional - from ocrd.decorators import parameter_option -from ocrd_network import Client from ocrd_utils import DEFAULT_METS_BASENAME +from ..client import Client @click.group('client') @@ -43,7 +43,7 @@ def processing_cli(): @click.option('--callback-url') @click.option('--agent-type', default='worker') @click.option('-b', '--block-till-job-end', default=False) -def send_processing_request( +def send_processing_job_request( address: Optional[str], processor_name: str, mets: str, @@ -73,13 +73,13 @@ def send_processing_request( req_params["result_queue_name"] = result_queue_name if callback_url: req_params["callback_url"] = callback_url - client = Client(server_addr_processing=address) - response = client.send_processing_request(processor_name=processor_name, req_params=req_params) - processing_job_id = response.get('job_id', None) + processing_job_id = client.send_processing_job_request( + processor_name=processor_name, req_params=loads(dumps(req_params))) + assert processing_job_id print(f"Processing job id: {processing_job_id}") if block_till_job_end: - pass + client.poll_job_status_till_timeout_fail_or_success(processing_job_id) @client_cli.group('workflow') diff --git a/src/ocrd_network/client.py b/src/ocrd_network/client.py index 688651f7de..333ec7fa44 100644 --- a/src/ocrd_network/client.py +++ b/src/ocrd_network/client.py @@ -1,8 +1,8 @@ -from json import dumps, loads -from requests import post as requests_post + from ocrd_utils import config, getLogger, LOG_FORMAT -from .constants import NETWORK_PROTOCOLS +from .client_utils import ( + poll_job_status_till_timeout_fail_or_success, post_ps_processing_request, verify_server_protocol) class Client: @@ -10,19 +10,13 @@ def __init__(self, server_addr_processing: str = config.OCRD_NETWORK_SERVER_ADDR self.log = getLogger(f"ocrd_network.client") self.server_addr_processing = server_addr_processing verify_server_protocol(self.server_addr_processing) + self.polling_tries = 900 + self.polling_wait = 30 - def send_processing_request(self, processor_name: str, req_params: dict): - req_url = f"{self.server_addr_processing}/processor/{processor_name}" - req_headers = {"Content-Type": "application/json; charset=utf-8"} - req_json = loads(dumps(req_params)) - self.log.info(f"Sending processing request to: {req_url}") - self.log.debug(req_json) - response = requests_post(url=req_url, headers=req_headers, json=req_json) - return response.json() - + def poll_job_status_till_timeout_fail_or_success(self, job_id: str) -> str: + return poll_job_status_till_timeout_fail_or_success( + ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait) -def verify_server_protocol(address: str): - for protocol in NETWORK_PROTOCOLS: - if address.startswith(protocol): - return - raise ValueError(f"Wrong/Missing protocol in the server address: {address}, must be one of: {NETWORK_PROTOCOLS}") + def send_processing_job_request(self, processor_name: str, req_params: dict) -> str: + return post_ps_processing_request( + ps_server_host=self.server_addr_processing, processor=processor_name, job_input=req_params) diff --git a/src/ocrd_network/client_utils.py b/src/ocrd_network/client_utils.py index 96fab03372..651dc5cf6b 100644 --- a/src/ocrd_network/client_utils.py +++ b/src/ocrd_network/client_utils.py @@ -1,6 +1,6 @@ from requests import get as request_get, post as request_post from time import sleep -from .constants import JobState +from .constants import JobState, NETWORK_PROTOCOLS def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int): @@ -29,7 +29,7 @@ def poll_wf_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> str: request_url = f"{ps_server_host}/processor/job/{processing_job_id}" - response = request_get(url=request_url, headers={"accept": "application/json"}) + response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"}) assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" job_state = response.json()["state"] assert job_state @@ -38,7 +38,7 @@ def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> def get_ps_workflow_job_status(ps_server_host: str, workflow_job_id: str) -> str: request_url = f"{ps_server_host}/workflow/job-simple/{workflow_job_id}" - response = request_get(url=request_url, headers={"accept": "application/json"}) + response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"}) assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" job_state = response.json()["state"] assert job_state @@ -47,7 +47,11 @@ def get_ps_workflow_job_status(ps_server_host: str, workflow_job_id: str) -> str def post_ps_processing_request(ps_server_host: str, processor: str, job_input: dict) -> str: request_url = f"{ps_server_host}/processor/run/{processor}" - response = request_post(url=request_url, headers={"accept": "application/json"}, json=job_input) + response = request_post( + url=request_url, + headers={"accept": "application/json; charset=utf-8"}, + json=job_input + ) assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" processing_job_id = response.json()["job_id"] assert processing_job_id @@ -58,10 +62,20 @@ def post_ps_processing_request(ps_server_host: str, processor: str, job_input: d def post_ps_workflow_request(ps_server_host: str, path_to_wf: str, path_to_mets: str) -> str: request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise=True" response = request_post( - url=request_url, headers={"accept": "application/json"}, files={"workflow": open(path_to_wf, "rb")}) + url=request_url, + headers={"accept": "application/json; charset=utf-8"}, + files={"workflow": open(path_to_wf, "rb")} + ) # print(response.json()) # print(response.__dict__) assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" wf_job_id = response.json()["job_id"] assert wf_job_id return wf_job_id + + +def verify_server_protocol(address: str): + for protocol in NETWORK_PROTOCOLS: + if address.startswith(protocol): + return + raise ValueError(f"Wrong/Missing protocol in the server address: {address}, must be one of: {NETWORK_PROTOCOLS}") diff --git a/tests/network/test_integration_6_client.py b/tests/network/test_integration_6_client.py index 55168d8322..c1fe5ab260 100644 --- a/tests/network/test_integration_6_client.py +++ b/tests/network/test_integration_6_client.py @@ -1,9 +1,7 @@ -from click.testing import CliRunner - from src.ocrd_network.constants import AgentType, JobState from tests.base import assets from tests.network.config import test_config -from ocrd_network.cli.client import client_cli +from ocrd_network.client import Client PROCESSING_SERVER_URL = test_config.PROCESSING_SERVER_URL @@ -11,20 +9,19 @@ def test_client_processing_processor(): workspace_root = "kant_aufklaerung_1784/data" path_to_mets = assets.path_to(f"{workspace_root}/mets.xml") - runner = CliRunner() - result = runner.invoke( - client_cli, - args=[ - "processing", "processor", "ocrd-dummy", - "--address", PROCESSING_SERVER_URL, - "--mets", path_to_mets, - "--input-file-grp", "OCR-D-IMG", - "--output-file-grp", "OCR-D-DUMMY-TEST-CLIENT", - "--agent-type", AgentType.PROCESSING_WORKER - ] - ) - # TODO: Do a better result check - assert result.output.count(f"{JobState.success}") == 1 + client = Client(server_addr_processing=PROCESSING_SERVER_URL) + req_params = { + "path_to_mets": path_to_mets, + "description": "OCR-D Network client request", + "input_file_grps": ["OCR-D-IMG"], + "output_file_grps": ["OCR-D-DUMMY-TEST-CLIENT"], + "parameters": {}, + "agent_type": AgentType.PROCESSING_WORKER + } + processing_job_id = client.send_processing_job_request(processor_name="ocrd-dummy", req_params=req_params) + assert processing_job_id + print(f"Processing job id: {processing_job_id}") + assert JobState.success == client.poll_job_status_till_timeout_fail_or_success(processing_job_id) def test_client_processing_workflow(): From 3a238a7717db9ae41282ca169f0d6cf1a95ec94f Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 9 Aug 2024 15:06:01 +0200 Subject: [PATCH 11/33] add: client workflow run --- src/ocrd_network/cli/client.py | 21 ++++++++++++++++++++- src/ocrd_network/client.py | 18 +++++++++++++++--- tests/network/test_integration_6_client.py | 10 +++++++++- 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index c2b6c500d1..a5f8b0e085 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -79,7 +79,7 @@ def send_processing_job_request( assert processing_job_id print(f"Processing job id: {processing_job_id}") if block_till_job_end: - client.poll_job_status_till_timeout_fail_or_success(processing_job_id) + client.poll_job_status_till_timeout_fail_or_success(job_id=processing_job_id) @client_cli.group('workflow') @@ -90,6 +90,25 @@ def workflow_cli(): pass +@processing_cli.command('run') +@click.option('--address') +@click.option('-m', '--path-to-mets', required=True) +@click.option('-w', '--path-to-workflow', required=True) +@click.option('-b', '--block-till-job-end', default=False) +def send_workflow_job_request( + address: Optional[str], + path_to_mets: str, + path_to_workflow: str, + block_till_job_end: Optional[bool] +): + client = Client(server_addr_processing=address) + workflow_job_id = client.send_workflow_job_request(path_to_wf=path_to_workflow, path_to_mets=path_to_mets) + assert workflow_job_id + print(f"Workflow job id: {workflow_job_id}") + if block_till_job_end: + client.poll_wf_status_till_timeout_fail_or_success(job_id=workflow_job_id) + + @client_cli.group('workspace') def workspace_cli(): """ diff --git a/src/ocrd_network/client.py b/src/ocrd_network/client.py index 333ec7fa44..012d2ea2e5 100644 --- a/src/ocrd_network/client.py +++ b/src/ocrd_network/client.py @@ -1,8 +1,11 @@ - from ocrd_utils import config, getLogger, LOG_FORMAT - from .client_utils import ( - poll_job_status_till_timeout_fail_or_success, post_ps_processing_request, verify_server_protocol) + poll_job_status_till_timeout_fail_or_success, + poll_wf_status_till_timeout_fail_or_success, + post_ps_processing_request, + post_ps_workflow_request, + verify_server_protocol +) class Client: @@ -10,6 +13,7 @@ def __init__(self, server_addr_processing: str = config.OCRD_NETWORK_SERVER_ADDR self.log = getLogger(f"ocrd_network.client") self.server_addr_processing = server_addr_processing verify_server_protocol(self.server_addr_processing) + # TODO: Read these values from the environment config. self.polling_tries = 900 self.polling_wait = 30 @@ -17,6 +21,14 @@ def poll_job_status_till_timeout_fail_or_success(self, job_id: str) -> str: return poll_job_status_till_timeout_fail_or_success( ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait) + def poll_wf_status_till_timeout_fail_or_success(self, job_id: str) -> str: + return poll_wf_status_till_timeout_fail_or_success( + ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait) + def send_processing_job_request(self, processor_name: str, req_params: dict) -> str: return post_ps_processing_request( ps_server_host=self.server_addr_processing, processor=processor_name, job_input=req_params) + + def send_workflow_job_request(self, path_to_wf: str, path_to_mets: str): + return post_ps_workflow_request( + ps_server_host=self.server_addr_processing, path_to_wf=path_to_wf, path_to_mets=path_to_mets) diff --git a/tests/network/test_integration_6_client.py b/tests/network/test_integration_6_client.py index c1fe5ab260..fc44705b40 100644 --- a/tests/network/test_integration_6_client.py +++ b/tests/network/test_integration_6_client.py @@ -1,3 +1,4 @@ +from pathlib import Path from src.ocrd_network.constants import AgentType, JobState from tests.base import assets from tests.network.config import test_config @@ -25,4 +26,11 @@ def test_client_processing_processor(): def test_client_processing_workflow(): - pass + workspace_root = "kant_aufklaerung_1784/data" + path_to_mets = assets.path_to(f"{workspace_root}/mets.xml") + # TODO: Improve the path resolution + path_to_dummy_wf = f"{Path(__file__).parent.resolve()}/dummy-workflow.txt" + client = Client(server_addr_processing=PROCESSING_SERVER_URL) + wf_job_id = client.send_workflow_job_request(path_to_dummy_wf, path_to_mets) + print(f"Workflow job id: {wf_job_id}") + assert JobState.success == client.poll_wf_status_till_timeout_fail_or_success(wf_job_id) From 50794f98a9811f4f3bef19818c3394d789e193f8 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 9 Aug 2024 15:19:21 +0200 Subject: [PATCH 12/33] add timeout and wait to configs --- src/ocrd_network/client.py | 13 +++++++++---- src/ocrd_utils/config.py | 10 ++++++++++ tests/network/config.py | 14 ++++++++++++++ tests/network/test_integration_6_client.py | 6 ++++-- 4 files changed, 37 insertions(+), 6 deletions(-) diff --git a/src/ocrd_network/client.py b/src/ocrd_network/client.py index 012d2ea2e5..1dc92ed991 100644 --- a/src/ocrd_network/client.py +++ b/src/ocrd_network/client.py @@ -9,13 +9,18 @@ class Client: - def __init__(self, server_addr_processing: str = config.OCRD_NETWORK_SERVER_ADDR_PROCESSING): + def __init__( + self, + server_addr_processing: str = config.OCRD_NETWORK_SERVER_ADDR_PROCESSING, + timeout: int = config.OCRD_NETWORK_CLIENT_POLLING_TIMEOUT, + wait: int = config.OCRD_NETWORK_CLIENT_POLLING_SLEEP + ): self.log = getLogger(f"ocrd_network.client") self.server_addr_processing = server_addr_processing verify_server_protocol(self.server_addr_processing) - # TODO: Read these values from the environment config. - self.polling_tries = 900 - self.polling_wait = 30 + self.polling_timeout = timeout + self.polling_wait = wait + self.polling_tries = int(timeout/wait) def poll_job_status_till_timeout_fail_or_success(self, job_id: str) -> str: return poll_job_status_till_timeout_fail_or_success( diff --git a/src/ocrd_utils/config.py b/src/ocrd_utils/config.py index b3a3e9537d..dc7bee7a33 100644 --- a/src/ocrd_utils/config.py +++ b/src/ocrd_utils/config.py @@ -145,6 +145,16 @@ def _ocrd_download_timeout_parser(val): description="Default address of Processing Server to connect to (for `ocrd network client processing`).", default=(True, '')) +config.add("OCRD_NETWORK_CLIENT_POLLING_TIMEOUT", + description="Timeout for a blocking ocrd network client (seconds)", + parser=int, + default=(True, 3600)) + +config.add("OCRD_NETWORK_CLIENT_POLLING_SLEEP", + description="How many seconds to sleep before trying again (seconds)", + parser=int, + default=(True, 30)) + config.add("OCRD_NETWORK_SERVER_ADDR_WORKFLOW", description="Default address of Workflow Server to connect to (for `ocrd network client workflow`).", default=(True, '')) diff --git a/tests/network/config.py b/tests/network/config.py index 67c4ff24b7..9dc38bc0db 100644 --- a/tests/network/config.py +++ b/tests/network/config.py @@ -55,6 +55,20 @@ parser=_ocrd_download_timeout_parser ) +test_config.add( + "OCRD_NETWORK_CLIENT_POLLING_TIMEOUT", + description="Timeout for a blocking ocrd network client", + parser=int, + default=(True, 3600) +) + +test_config.add( + "OCRD_NETWORK_CLIENT_POLLING_SLEEP", + description="How many seconds to sleep before trying again (seconds)", + parser=int, + default=(True, 30) +) + test_config.add( name="OCRD_NETWORK_SERVER_ADDR_PROCESSING", description="Default address of Processing Server to connect to (for `ocrd network client processing`).", diff --git a/tests/network/test_integration_6_client.py b/tests/network/test_integration_6_client.py index fc44705b40..b0ed2bbc3a 100644 --- a/tests/network/test_integration_6_client.py +++ b/tests/network/test_integration_6_client.py @@ -5,12 +5,14 @@ from ocrd_network.client import Client PROCESSING_SERVER_URL = test_config.PROCESSING_SERVER_URL +timeout = test_config.OCRD_NETWORK_CLIENT_POLLING_TIMEOUT +wait = test_config.OCRD_NETWORK_CLIENT_POLLING_SLEEP def test_client_processing_processor(): workspace_root = "kant_aufklaerung_1784/data" path_to_mets = assets.path_to(f"{workspace_root}/mets.xml") - client = Client(server_addr_processing=PROCESSING_SERVER_URL) + client = Client(PROCESSING_SERVER_URL, timeout, wait) req_params = { "path_to_mets": path_to_mets, "description": "OCR-D Network client request", @@ -30,7 +32,7 @@ def test_client_processing_workflow(): path_to_mets = assets.path_to(f"{workspace_root}/mets.xml") # TODO: Improve the path resolution path_to_dummy_wf = f"{Path(__file__).parent.resolve()}/dummy-workflow.txt" - client = Client(server_addr_processing=PROCESSING_SERVER_URL) + client = Client(PROCESSING_SERVER_URL, timeout, wait) wf_job_id = client.send_workflow_job_request(path_to_dummy_wf, path_to_mets) print(f"Workflow job id: {wf_job_id}") assert JobState.success == client.poll_wf_status_till_timeout_fail_or_success(wf_job_id) From cc06fc37a18b871ee007fabe325a4f1136e93d40 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Mon, 12 Aug 2024 13:01:51 +0200 Subject: [PATCH 13/33] Update src/ocrd_network/client_utils.py Co-authored-by: Konstantin Baierer --- src/ocrd_network/client_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ocrd_network/client_utils.py b/src/ocrd_network/client_utils.py index 651dc5cf6b..30b4a2bd96 100644 --- a/src/ocrd_network/client_utils.py +++ b/src/ocrd_network/client_utils.py @@ -5,7 +5,7 @@ def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int): if job_type not in ["workflow", "processor"]: - raise ValueError("Unknown job type, expected 'workflow' or 'processor'") + raise ValueError(f"Unknown job type '{job_type}', expected 'workflow' or 'processor'") job_state = JobState.unset while tries > 0: sleep(wait) From 41159371337542761a3fe9212d77224cec665df4 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Mon, 12 Aug 2024 13:05:26 +0200 Subject: [PATCH 14/33] refine status check methods --- src/ocrd_network/cli/client.py | 4 ++-- src/ocrd_network/client.py | 4 ++-- tests/network/test_integration_6_client.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index a5f8b0e085..d44b26ff34 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -79,7 +79,7 @@ def send_processing_job_request( assert processing_job_id print(f"Processing job id: {processing_job_id}") if block_till_job_end: - client.poll_job_status_till_timeout_fail_or_success(job_id=processing_job_id) + client.poll_job_status(job_id=processing_job_id) @client_cli.group('workflow') @@ -106,7 +106,7 @@ def send_workflow_job_request( assert workflow_job_id print(f"Workflow job id: {workflow_job_id}") if block_till_job_end: - client.poll_wf_status_till_timeout_fail_or_success(job_id=workflow_job_id) + client.poll_workflow_status(job_id=workflow_job_id) @client_cli.group('workspace') diff --git a/src/ocrd_network/client.py b/src/ocrd_network/client.py index 1dc92ed991..6f93ab7e37 100644 --- a/src/ocrd_network/client.py +++ b/src/ocrd_network/client.py @@ -22,11 +22,11 @@ def __init__( self.polling_wait = wait self.polling_tries = int(timeout/wait) - def poll_job_status_till_timeout_fail_or_success(self, job_id: str) -> str: + def poll_job_status(self, job_id: str) -> str: return poll_job_status_till_timeout_fail_or_success( ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait) - def poll_wf_status_till_timeout_fail_or_success(self, job_id: str) -> str: + def poll_workflow_status(self, job_id: str) -> str: return poll_wf_status_till_timeout_fail_or_success( ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait) diff --git a/tests/network/test_integration_6_client.py b/tests/network/test_integration_6_client.py index b0ed2bbc3a..1a693ed0b1 100644 --- a/tests/network/test_integration_6_client.py +++ b/tests/network/test_integration_6_client.py @@ -24,7 +24,7 @@ def test_client_processing_processor(): processing_job_id = client.send_processing_job_request(processor_name="ocrd-dummy", req_params=req_params) assert processing_job_id print(f"Processing job id: {processing_job_id}") - assert JobState.success == client.poll_job_status_till_timeout_fail_or_success(processing_job_id) + assert JobState.success == client.poll_job_status(processing_job_id) def test_client_processing_workflow(): @@ -35,4 +35,4 @@ def test_client_processing_workflow(): client = Client(PROCESSING_SERVER_URL, timeout, wait) wf_job_id = client.send_workflow_job_request(path_to_dummy_wf, path_to_mets) print(f"Workflow job id: {wf_job_id}") - assert JobState.success == client.poll_wf_status_till_timeout_fail_or_success(wf_job_id) + assert JobState.success == client.poll_workflow_status(wf_job_id) From 0136db04e39269f77dfab3b71e46e43fa189e839 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Mon, 12 Aug 2024 13:09:30 +0200 Subject: [PATCH 15/33] add help for new env --- src/ocrd/cli/__init__.py | 4 ++++ src/ocrd_utils/config.py | 12 ++++++------ tests/network/config.py | 12 ++++++------ 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/src/ocrd/cli/__init__.py b/src/ocrd/cli/__init__.py index 9b80abeb4d..70d738f083 100644 --- a/src/ocrd/cli/__init__.py +++ b/src/ocrd/cli/__init__.py @@ -35,6 +35,10 @@ \b {config.describe('OCRD_MAX_PROCESSOR_CACHE')} \b +{config.describe('OCRD_NETWORK_CLIENT_POLLING_SLEEP')} +\b +{config.describe('OCRD_NETWORK_CLIENT_POLLING_TIMEOUT')} +\b {config.describe('OCRD_NETWORK_SERVER_ADDR_PROCESSING')} \b {config.describe('OCRD_NETWORK_SERVER_ADDR_WORKFLOW')} diff --git a/src/ocrd_utils/config.py b/src/ocrd_utils/config.py index dc7bee7a33..063af930c8 100644 --- a/src/ocrd_utils/config.py +++ b/src/ocrd_utils/config.py @@ -145,16 +145,16 @@ def _ocrd_download_timeout_parser(val): description="Default address of Processing Server to connect to (for `ocrd network client processing`).", default=(True, '')) -config.add("OCRD_NETWORK_CLIENT_POLLING_TIMEOUT", - description="Timeout for a blocking ocrd network client (seconds)", - parser=int, - default=(True, 3600)) - config.add("OCRD_NETWORK_CLIENT_POLLING_SLEEP", - description="How many seconds to sleep before trying again (seconds)", + description="How many seconds to sleep before trying again.", parser=int, default=(True, 30)) +config.add("OCRD_NETWORK_CLIENT_POLLING_TIMEOUT", + description="Timeout for a blocking ocrd network client (in seconds).", + parser=int, + default=(True, 3600)) + config.add("OCRD_NETWORK_SERVER_ADDR_WORKFLOW", description="Default address of Workflow Server to connect to (for `ocrd network client workflow`).", default=(True, '')) diff --git a/tests/network/config.py b/tests/network/config.py index 9dc38bc0db..e22cc6ce9d 100644 --- a/tests/network/config.py +++ b/tests/network/config.py @@ -56,17 +56,17 @@ ) test_config.add( - "OCRD_NETWORK_CLIENT_POLLING_TIMEOUT", - description="Timeout for a blocking ocrd network client", + "OCRD_NETWORK_CLIENT_POLLING_SLEEP", + description="How many seconds to sleep before trying again.", parser=int, - default=(True, 3600) + default=(True, 30) ) test_config.add( - "OCRD_NETWORK_CLIENT_POLLING_SLEEP", - description="How many seconds to sleep before trying again (seconds)", + "OCRD_NETWORK_CLIENT_POLLING_TIMEOUT", + description="Timeout for a blocking ocrd network client (in seconds).", parser=int, - default=(True, 30) + default=(True, 3600) ) test_config.add( From 734bbf09e8aaa0d8aab637c65c73f05f422fbc71 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 13 Aug 2024 15:15:10 +0200 Subject: [PATCH 16/33] add cli job status check --- src/ocrd_network/cli/client.py | 28 +++++++++++++++++++++++++++- src/ocrd_network/client.py | 8 ++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index d44b26ff34..3c6829c727 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -31,6 +31,19 @@ def processing_cli(): pass +@processing_cli.command('check-status') +@click.option('--address') +@click.option('-j', '--processing-job-id', required=True) +def check_processing_job_status( + address: Optional[str], + processing_job_id: str +): + client = Client(server_addr_processing=address) + job_status = client.check_job_status(processing_job_id) + assert job_status + print(f"Processing job status: {job_status}") + + @processing_cli.command('processor') @click.argument('processor_name', required=True, type=click.STRING) @click.option('--address') @@ -90,7 +103,20 @@ def workflow_cli(): pass -@processing_cli.command('run') +@workflow_cli.command('check-status') +@click.option('--address') +@click.option('-j', '--workflow-job-id', required=True) +def check_workflow_job_status( + address: Optional[str], + workflow_job_id: str +): + client = Client(server_addr_processing=address) + job_status = client.check_workflow_status(workflow_job_id) + assert job_status + print(f"Workflow job status: {job_status}") + + +@workflow_cli.command('run') @click.option('--address') @click.option('-m', '--path-to-mets', required=True) @click.option('-w', '--path-to-workflow', required=True) diff --git a/src/ocrd_network/client.py b/src/ocrd_network/client.py index 6f93ab7e37..279a5f7d65 100644 --- a/src/ocrd_network/client.py +++ b/src/ocrd_network/client.py @@ -1,5 +1,7 @@ from ocrd_utils import config, getLogger, LOG_FORMAT from .client_utils import ( + get_ps_processing_job_status, + get_ps_workflow_job_status, poll_job_status_till_timeout_fail_or_success, poll_wf_status_till_timeout_fail_or_success, post_ps_processing_request, @@ -22,6 +24,12 @@ def __init__( self.polling_wait = wait self.polling_tries = int(timeout/wait) + def check_job_status(self, job_id: str): + return get_ps_processing_job_status(self.server_addr_processing, processing_job_id=job_id) + + def check_workflow_status(self, workflow_job_id: str): + return get_ps_workflow_job_status(self.server_addr_processing, workflow_job_id=workflow_job_id) + def poll_job_status(self, job_id: str) -> str: return poll_job_status_till_timeout_fail_or_success( ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait) From f86bc23be28ae63563349897253451fd95b104be Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 13 Aug 2024 15:21:11 +0200 Subject: [PATCH 17/33] add: help section to the cli --- src/ocrd_network/cli/client.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index 3c6829c727..25b13bcfad 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -32,8 +32,9 @@ def processing_cli(): @processing_cli.command('check-status') -@click.option('--address') -@click.option('-j', '--processing-job-id', required=True) +@click.option('--address', help='The address of the Processing Server. If not provided, ' + 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') +@click.option('-j', '--processing-job-id',) def check_processing_job_status( address: Optional[str], processing_job_id: str @@ -46,7 +47,8 @@ def check_processing_job_status( @processing_cli.command('processor') @click.argument('processor_name', required=True, type=click.STRING) -@click.option('--address') +@click.option('--address', help='The address of the Processing Server. If not provided, ' + 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') @click.option('-m', '--mets', required=True, default=DEFAULT_METS_BASENAME) @click.option('-I', '--input-file-grp', default='OCR-D-INPUT') @click.option('-O', '--output-file-grp', default='OCR-D-OUTPUT') @@ -55,7 +57,8 @@ def check_processing_job_status( @click.option('--result-queue-name') @click.option('--callback-url') @click.option('--agent-type', default='worker') -@click.option('-b', '--block-till-job-end', default=False) +@click.option('-b', '--block', default=False, + help='If set, the client will block till job timeout, fail or success.') def send_processing_job_request( address: Optional[str], processor_name: str, @@ -104,7 +107,8 @@ def workflow_cli(): @workflow_cli.command('check-status') -@click.option('--address') +@click.option('--address', help='The address of the Processing Server. If not provided, ' + 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') @click.option('-j', '--workflow-job-id', required=True) def check_workflow_job_status( address: Optional[str], @@ -117,21 +121,23 @@ def check_workflow_job_status( @workflow_cli.command('run') -@click.option('--address') +@click.option('--address', help='The address of the Processing Server. If not provided, ' + 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') @click.option('-m', '--path-to-mets', required=True) @click.option('-w', '--path-to-workflow', required=True) -@click.option('-b', '--block-till-job-end', default=False) +@click.option('-b', '--block', default=False, + help='If set, the client will block till job timeout, fail or success.') def send_workflow_job_request( address: Optional[str], path_to_mets: str, path_to_workflow: str, - block_till_job_end: Optional[bool] + block: Optional[bool] ): client = Client(server_addr_processing=address) workflow_job_id = client.send_workflow_job_request(path_to_wf=path_to_workflow, path_to_mets=path_to_mets) assert workflow_job_id print(f"Workflow job id: {workflow_job_id}") - if block_till_job_end: + if block: client.poll_workflow_status(job_id=workflow_job_id) From 4194f9f6f9da2e2b47e1ae88e42ade0eb1ee5eb9 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 13 Aug 2024 15:28:51 +0200 Subject: [PATCH 18/33] fix: required job id --- src/ocrd_network/cli/client.py | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index 25b13bcfad..d5f138493e 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -31,21 +31,18 @@ def processing_cli(): pass -@processing_cli.command('check-status') +@processing_cli.command('check-status', help='Check the status of a previously submitted processing job.') @click.option('--address', help='The address of the Processing Server. If not provided, ' 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') -@click.option('-j', '--processing-job-id',) -def check_processing_job_status( - address: Optional[str], - processing_job_id: str -): +@click.option('-j', '--processing-job-id', required=True) +def check_processing_job_status(address: Optional[str], processing_job_id: str): client = Client(server_addr_processing=address) job_status = client.check_job_status(processing_job_id) assert job_status print(f"Processing job status: {job_status}") -@processing_cli.command('processor') +@processing_cli.command('processor', help='Submit a processing job to the processing server.') @click.argument('processor_name', required=True, type=click.STRING) @click.option('--address', help='The address of the Processing Server. If not provided, ' 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') @@ -106,21 +103,18 @@ def workflow_cli(): pass -@workflow_cli.command('check-status') +@workflow_cli.command('check-status', help='Check the status of a previously submitted workflow job.') @click.option('--address', help='The address of the Processing Server. If not provided, ' 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') @click.option('-j', '--workflow-job-id', required=True) -def check_workflow_job_status( - address: Optional[str], - workflow_job_id: str -): +def check_workflow_job_status(address: Optional[str], workflow_job_id: str): client = Client(server_addr_processing=address) job_status = client.check_workflow_status(workflow_job_id) assert job_status print(f"Workflow job status: {job_status}") -@workflow_cli.command('run') +@workflow_cli.command('run', help='Submit a workflow job to the processing server.') @click.option('--address', help='The address of the Processing Server. If not provided, ' 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') @click.option('-m', '--path-to-mets', required=True) From 97b3eea415685f9b5f4ce451a6326ae906738630 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 13 Aug 2024 15:52:29 +0200 Subject: [PATCH 19/33] add docstring to cli commands --- src/ocrd_network/cli/client.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index d5f138493e..d763f48185 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -31,18 +31,21 @@ def processing_cli(): pass -@processing_cli.command('check-status', help='Check the status of a previously submitted processing job.') +@processing_cli.command('check-status') @click.option('--address', help='The address of the Processing Server. If not provided, ' 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') @click.option('-j', '--processing-job-id', required=True) def check_processing_job_status(address: Optional[str], processing_job_id: str): + """ + Check the status of a previously submitted processing job. + """ client = Client(server_addr_processing=address) job_status = client.check_job_status(processing_job_id) assert job_status print(f"Processing job status: {job_status}") -@processing_cli.command('processor', help='Submit a processing job to the processing server.') +@processing_cli.command('processor') @click.argument('processor_name', required=True, type=click.STRING) @click.option('--address', help='The address of the Processing Server. If not provided, ' 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') @@ -71,6 +74,9 @@ def send_processing_job_request( agent_type: Optional[str], block_till_job_end: Optional[bool] ): + """ + Submit a processing job to the processing server. + """ req_params = { "path_to_mets": mets, "description": "OCR-D Network client request", @@ -103,18 +109,21 @@ def workflow_cli(): pass -@workflow_cli.command('check-status', help='Check the status of a previously submitted workflow job.') +@workflow_cli.command('check-status') @click.option('--address', help='The address of the Processing Server. If not provided, ' 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') @click.option('-j', '--workflow-job-id', required=True) def check_workflow_job_status(address: Optional[str], workflow_job_id: str): + """ + Check the status of a previously submitted workflow job. + """ client = Client(server_addr_processing=address) job_status = client.check_workflow_status(workflow_job_id) assert job_status print(f"Workflow job status: {job_status}") -@workflow_cli.command('run', help='Submit a workflow job to the processing server.') +@workflow_cli.command('run') @click.option('--address', help='The address of the Processing Server. If not provided, ' 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') @click.option('-m', '--path-to-mets', required=True) @@ -127,6 +136,9 @@ def send_workflow_job_request( path_to_workflow: str, block: Optional[bool] ): + """ + Submit a workflow job to the processing server. + """ client = Client(server_addr_processing=address) workflow_job_id = client.send_workflow_job_request(path_to_wf=path_to_workflow, path_to_mets=path_to_mets) assert workflow_job_id From 8e7ba26ec414106e0715f185e2508d2feb2604d5 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 13 Aug 2024 17:57:24 +0200 Subject: [PATCH 20/33] Fix: rename to block When refactoring to a shorter flag I obviously missed that there are two places to adapt. --- src/ocrd_network/cli/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index d763f48185..ab74024f06 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -72,7 +72,7 @@ def send_processing_job_request( # TODO: This is temporally available to toggle # between the ProcessingWorker/ProcessorServer agent_type: Optional[str], - block_till_job_end: Optional[bool] + block: Optional[bool] ): """ Submit a processing job to the processing server. @@ -97,7 +97,7 @@ def send_processing_job_request( processor_name=processor_name, req_params=loads(dumps(req_params))) assert processing_job_id print(f"Processing job id: {processing_job_id}") - if block_till_job_end: + if block: client.poll_job_status(job_id=processing_job_id) From 69808b6b0e368199fb6bab14c1ba6979d8cd33b4 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 13 Aug 2024 18:02:31 +0200 Subject: [PATCH 21/33] Fix: server_utils.py > 404 to 400 --- src/ocrd_network/server_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ocrd_network/server_utils.py b/src/ocrd_network/server_utils.py index cc0c59ec67..376afc9b60 100644 --- a/src/ocrd_network/server_utils.py +++ b/src/ocrd_network/server_utils.py @@ -212,10 +212,10 @@ def validate_job_input(logger: Logger, processor_name: str, ocrd_tool: dict, job report = ParameterValidator(ocrd_tool).validate(dict(job_input.parameters)) if not report.is_valid: message = f"Failed to validate processing job input against the tool json of processor: {processor_name}\n" - raise_http_exception(logger, status.HTTP_404_BAD_REQUEST, message + report.errors) + raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, message + report.errors) except Exception as error: message = f"Failed to validate processing job input against the ocrd tool json of processor: {processor_name}" - raise_http_exception(logger, status.HTTP_404_BAD_REQUEST, message, error) + raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, message, error) def validate_workflow(logger: Logger, workflow: str) -> None: From 4de1e83776bd280599ad7d8e5a508c3adcf90927 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 13 Aug 2024 20:00:52 +0200 Subject: [PATCH 22/33] fix: set ps address if None in constructor --- src/ocrd_network/client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/ocrd_network/client.py b/src/ocrd_network/client.py index 279a5f7d65..2744cd5f0f 100644 --- a/src/ocrd_network/client.py +++ b/src/ocrd_network/client.py @@ -13,11 +13,13 @@ class Client: def __init__( self, - server_addr_processing: str = config.OCRD_NETWORK_SERVER_ADDR_PROCESSING, + server_addr_processing: str, timeout: int = config.OCRD_NETWORK_CLIENT_POLLING_TIMEOUT, wait: int = config.OCRD_NETWORK_CLIENT_POLLING_SLEEP ): self.log = getLogger(f"ocrd_network.client") + if not server_addr_processing: + server_addr_processing = config.OCRD_NETWORK_SERVER_ADDR_PROCESSING self.server_addr_processing = server_addr_processing verify_server_protocol(self.server_addr_processing) self.polling_timeout = timeout From d1af85b931496be879d26ea6a8309f6cb8561236 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 13 Aug 2024 21:24:27 +0200 Subject: [PATCH 23/33] fix: check report validation outside try block --- src/ocrd_network/server_utils.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/ocrd_network/server_utils.py b/src/ocrd_network/server_utils.py index 376afc9b60..ed1519033a 100644 --- a/src/ocrd_network/server_utils.py +++ b/src/ocrd_network/server_utils.py @@ -193,7 +193,9 @@ def parse_workflow_tasks(logger: Logger, workflow_content: str) -> List[Processo def raise_http_exception(logger: Logger, status_code: int, message: str, error: Exception = None) -> None: - logger.exception(f"{message} {error}") + if error: + message = f"{message} {error}" + logger.exception(f"{message}") raise HTTPException(status_code=status_code, detail=message) @@ -210,12 +212,12 @@ def validate_job_input(logger: Logger, processor_name: str, ocrd_tool: dict, job raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message) try: report = ParameterValidator(ocrd_tool).validate(dict(job_input.parameters)) - if not report.is_valid: - message = f"Failed to validate processing job input against the tool json of processor: {processor_name}\n" - raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, message + report.errors) except Exception as error: message = f"Failed to validate processing job input against the ocrd tool json of processor: {processor_name}" raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, message, error) + if report and not report.is_valid: + message = f"Failed to validate processing job input against the tool json of processor: {processor_name}\n" + raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, f"{message}{report.errors}") def validate_workflow(logger: Logger, workflow: str) -> None: From 50f73c58064bf9bed5479c370f356cefeda92210 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 13 Aug 2024 22:01:34 +0200 Subject: [PATCH 24/33] fix: the annoying string dict --- src/ocrd_network/cli/client.py | 8 ++++++-- src/ocrd_network/server_utils.py | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index ab74024f06..ca74a034a7 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -81,20 +81,24 @@ def send_processing_job_request( "path_to_mets": mets, "description": "OCR-D Network client request", "input_file_grps": input_file_grp.split(','), - "parameters": parameter if parameter else {}, "agent_type": agent_type } if output_file_grp: req_params["output_file_grps"] = output_file_grp.split(',') if page_id: req_params["page_id"] = page_id + if parameter: + if parameter == ['{}']: + req_params["parameters"] = {} + else: + req_params["parameters"] = parameter if result_queue_name: req_params["result_queue_name"] = result_queue_name if callback_url: req_params["callback_url"] = callback_url client = Client(server_addr_processing=address) processing_job_id = client.send_processing_job_request( - processor_name=processor_name, req_params=loads(dumps(req_params))) + processor_name=processor_name, req_params=req_params) assert processing_job_id print(f"Processing job id: {processing_job_id}") if block: diff --git a/src/ocrd_network/server_utils.py b/src/ocrd_network/server_utils.py index ed1519033a..8fa13fd714 100644 --- a/src/ocrd_network/server_utils.py +++ b/src/ocrd_network/server_utils.py @@ -200,6 +200,7 @@ def raise_http_exception(logger: Logger, status_code: int, message: str, error: def validate_job_input(logger: Logger, processor_name: str, ocrd_tool: dict, job_input: PYJobInput) -> None: + # logger.warning(f"Job input: {job_input}") if bool(job_input.path_to_mets) == bool(job_input.workspace_id): message = ( "Wrong processing job input format. " From 8f2861c4a8083db9a1f820333be784e9244e20ba Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 13 Aug 2024 22:28:22 +0200 Subject: [PATCH 25/33] add: parameter_override --- src/ocrd_network/cli/client.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index ca74a034a7..00a185ee1a 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -1,7 +1,7 @@ import click from json import dumps, loads from typing import Optional -from ocrd.decorators import parameter_option +from ocrd.decorators.parameter_option import parameter_option, parameter_override_option from ocrd_utils import DEFAULT_METS_BASENAME from ..client import Client @@ -54,6 +54,7 @@ def check_processing_job_status(address: Optional[str], processing_job_id: str): @click.option('-O', '--output-file-grp', default='OCR-D-OUTPUT') @click.option('-g', '--page-id') @parameter_option +@parameter_override_option @click.option('--result-queue-name') @click.option('--callback-url') @click.option('--agent-type', default='worker') @@ -67,6 +68,7 @@ def send_processing_job_request( output_file_grp: Optional[str], page_id: Optional[str], parameter: Optional[dict], + parameter_override: Optional[dict], result_queue_name: Optional[str], callback_url: Optional[str], # TODO: This is temporally available to toggle @@ -92,6 +94,8 @@ def send_processing_job_request( req_params["parameters"] = {} else: req_params["parameters"] = parameter + if parameter_override: + req_params["parameters"] = parameter_override if result_queue_name: req_params["result_queue_name"] = result_queue_name if callback_url: From 06a371c0a8a9de2e951de1b109d37c08f45b6b6a Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 13 Aug 2024 23:23:03 +0200 Subject: [PATCH 26/33] add sort to network agents --- src/ocrd_network/runtime_data/deployer.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/ocrd_network/runtime_data/deployer.py b/src/ocrd_network/runtime_data/deployer.py index 124c9fbbe2..b956904d07 100644 --- a/src/ocrd_network/runtime_data/deployer.py +++ b/src/ocrd_network/runtime_data/deployer.py @@ -35,7 +35,7 @@ def __init__(self, config_path: str) -> None: # TODO: Reconsider this. def find_matching_network_agents( self, worker_only: bool = False, server_only: bool = False, docker_only: bool = False, - native_only: bool = False, str_names_only: bool = False, unique_only: bool = False + native_only: bool = False, str_names_only: bool = False, unique_only: bool = False, sort: bool = False ) -> Union[List[str], List[object]]: """Finds and returns a list of matching data objects of type: `DataProcessingWorker` and `DataProcessorServer`. @@ -46,6 +46,7 @@ def find_matching_network_agents( :py:attr:`native_only` match only native network agents (DataProcessingWorker and DataProcessorServer) :py:attr:`str_names_only` returns the processor_name filed instead of the Data* object :py:attr:`unique_only` remove duplicate names from the matches + :py:attr:`sort` sort the result `worker_only` and `server_only` are mutually exclusive to each other `docker_only` and `native_only` are mutually exclusive to each other @@ -64,6 +65,10 @@ def find_matching_network_agents( msg = f"Value 'unique_only' is allowed only together with 'str_names_only'" self.log.exception(msg) raise ValueError(msg) + if sort and not str_names_only: + msg = f"Value 'sort' is allowed only together with 'str_names_only'" + self.log.exception(msg) + raise ValueError(msg) # Find all matching objects of type DataProcessingWorker or DataProcessorServer matched_objects = [] @@ -88,8 +93,12 @@ def find_matching_network_agents( matched_names = [match.processor_name for match in matched_objects] if not unique_only: return matched_names - # Removes any duplicate entries from matched names - return list(dict.fromkeys(matched_names)) + list_matched = list(dict.fromkeys(matched_names)) + if not sort: + # Removes any duplicate entries from matched names + return list_matched + list_matched.sort() + return list_matched def resolve_processor_server_url(self, processor_name) -> str: processor_server_url = '' From 4d85970656b26da357812900f79c92edeaf615ee Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 13 Aug 2024 23:23:27 +0200 Subject: [PATCH 27/33] add: discovery cli, processors and processor --- src/ocrd_network/cli/client.py | 40 +++++++++++++++++++++++---- src/ocrd_network/client.py | 9 ++++++ src/ocrd_network/client_utils.py | 14 ++++++++++ src/ocrd_network/processing_server.py | 2 +- 4 files changed, 59 insertions(+), 6 deletions(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index 00a185ee1a..ee6c29fc3b 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -1,5 +1,5 @@ import click -from json import dumps, loads +from json import dumps from typing import Optional from ocrd.decorators.parameter_option import parameter_option, parameter_override_option from ocrd_utils import DEFAULT_METS_BASENAME @@ -23,6 +23,34 @@ def discovery_cli(): pass +@discovery_cli.command('processors') +@click.option('--address', + help='The address of the Processing Server. If not provided, ' + 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') +def check_deployed_processors(address: Optional[str]): + """ + Get a list of deployed processing workers/processor servers. + Each processor is shown only once regardless of the amount of deployed instances. + """ + client = Client(server_addr_processing=address) + processors_list = client.check_deployed_processors() + print(dumps(processors_list, indent=4)) + + +@discovery_cli.command('processor') +@click.option('--address', + help='The address of the Processing Server. If not provided, ' + 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') +@click.argument('processor_name', required=True, type=click.STRING) +def check_processor_ocrd_tool(address: Optional[str], processor_name: str): + """ + Get the json tool of a deployed processor specified with `processor_name` + """ + client = Client(server_addr_processing=address) + ocrd_tool = client.check_deployed_processor_ocrd_tool(processor_name=processor_name) + print(dumps(ocrd_tool, indent=4)) + + @client_cli.group('processing') def processing_cli(): """ @@ -32,8 +60,9 @@ def processing_cli(): @processing_cli.command('check-status') -@click.option('--address', help='The address of the Processing Server. If not provided, ' - 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') +@click.option('--address', + help='The address of the Processing Server. If not provided, ' + 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') @click.option('-j', '--processing-job-id', required=True) def check_processing_job_status(address: Optional[str], processing_job_id: str): """ @@ -47,8 +76,9 @@ def check_processing_job_status(address: Optional[str], processing_job_id: str): @processing_cli.command('processor') @click.argument('processor_name', required=True, type=click.STRING) -@click.option('--address', help='The address of the Processing Server. If not provided, ' - 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') +@click.option('--address', + help='The address of the Processing Server. If not provided, ' + 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') @click.option('-m', '--mets', required=True, default=DEFAULT_METS_BASENAME) @click.option('-I', '--input-file-grp', default='OCR-D-INPUT') @click.option('-O', '--output-file-grp', default='OCR-D-OUTPUT') diff --git a/src/ocrd_network/client.py b/src/ocrd_network/client.py index 2744cd5f0f..b3872f1aaf 100644 --- a/src/ocrd_network/client.py +++ b/src/ocrd_network/client.py @@ -1,5 +1,7 @@ from ocrd_utils import config, getLogger, LOG_FORMAT from .client_utils import ( + get_ps_deployed_processors, + get_ps_deployed_processor_ocrd_tool, get_ps_processing_job_status, get_ps_workflow_job_status, poll_job_status_till_timeout_fail_or_success, @@ -26,6 +28,13 @@ def __init__( self.polling_wait = wait self.polling_tries = int(timeout/wait) + def check_deployed_processors(self): + return get_ps_deployed_processors(ps_server_host=self.server_addr_processing) + + def check_deployed_processor_ocrd_tool(self, processor_name: str): + return get_ps_deployed_processor_ocrd_tool( + ps_server_host=self.server_addr_processing, processor_name=processor_name) + def check_job_status(self, job_id: str): return get_ps_processing_job_status(self.server_addr_processing, processing_job_id=job_id) diff --git a/src/ocrd_network/client_utils.py b/src/ocrd_network/client_utils.py index 30b4a2bd96..2dc2805aa0 100644 --- a/src/ocrd_network/client_utils.py +++ b/src/ocrd_network/client_utils.py @@ -27,6 +27,20 @@ def poll_wf_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str return _poll_endpoint_status(ps_server_host, job_id, "workflow", tries, wait) +def get_ps_deployed_processors(ps_server_host: str): + request_url = f"{ps_server_host}/processor" + response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"}) + assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" + return response.json() + + +def get_ps_deployed_processor_ocrd_tool(ps_server_host: str, processor_name: str): + request_url = f"{ps_server_host}/processor/info/{processor_name}" + response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"}) + assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" + return response.json() + + def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> str: request_url = f"{ps_server_host}/processor/job/{processing_job_id}" response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"}) diff --git a/src/ocrd_network/processing_server.py b/src/ocrd_network/processing_server.py index e142802268..34c22e5cf6 100644 --- a/src/ocrd_network/processing_server.py +++ b/src/ocrd_network/processing_server.py @@ -651,7 +651,7 @@ async def list_processors(self) -> List[str]: # There is no caching on the Processing Server side processor_names_list = self.deployer.find_matching_network_agents( docker_only=False, native_only=False, worker_only=False, server_only=False, - str_names_only=True, unique_only=True + str_names_only=True, unique_only=True, sort=True ) return processor_names_list From bb3007da3473fe6d3c624b9e028b3ee8e599af45 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 13 Aug 2024 23:33:39 +0200 Subject: [PATCH 28/33] add: check processing job log file --- src/ocrd_network/cli/client.py | 14 ++++++++++++++ src/ocrd_network/client.py | 4 ++++ src/ocrd_network/client_utils.py | 6 ++++++ 3 files changed, 24 insertions(+) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index ee6c29fc3b..38eb959a07 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -59,6 +59,20 @@ def processing_cli(): pass +@processing_cli.command('check-log') +@click.option('--address', + help='The address of the Processing Server. If not provided, ' + 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') +@click.option('-j', '--processing-job-id', required=True) +def check_processing_job_status(address: Optional[str], processing_job_id: str): + """ + Check the log of a previously submitted processing job. + """ + client = Client(server_addr_processing=address) + response = client.check_job_log(job_id=processing_job_id) + print(response._content.decode(encoding='utf-8')) + + @processing_cli.command('check-status') @click.option('--address', help='The address of the Processing Server. If not provided, ' diff --git a/src/ocrd_network/client.py b/src/ocrd_network/client.py index b3872f1aaf..793d6ead89 100644 --- a/src/ocrd_network/client.py +++ b/src/ocrd_network/client.py @@ -2,6 +2,7 @@ from .client_utils import ( get_ps_deployed_processors, get_ps_deployed_processor_ocrd_tool, + get_ps_processing_job_log, get_ps_processing_job_status, get_ps_workflow_job_status, poll_job_status_till_timeout_fail_or_success, @@ -35,6 +36,9 @@ def check_deployed_processor_ocrd_tool(self, processor_name: str): return get_ps_deployed_processor_ocrd_tool( ps_server_host=self.server_addr_processing, processor_name=processor_name) + def check_job_log(self, job_id: str): + return get_ps_processing_job_log(self.server_addr_processing, processing_job_id=job_id) + def check_job_status(self, job_id: str): return get_ps_processing_job_status(self.server_addr_processing, processing_job_id=job_id) diff --git a/src/ocrd_network/client_utils.py b/src/ocrd_network/client_utils.py index 2dc2805aa0..9b924c16a4 100644 --- a/src/ocrd_network/client_utils.py +++ b/src/ocrd_network/client_utils.py @@ -41,6 +41,12 @@ def get_ps_deployed_processor_ocrd_tool(ps_server_host: str, processor_name: str return response.json() +def get_ps_processing_job_log(ps_server_host: str, processing_job_id: str): + request_url = f"{ps_server_host}/processor/log/{processing_job_id}" + response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"}) + return response + + def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> str: request_url = f"{ps_server_host}/processor/job/{processing_job_id}" response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"}) From ff4243ff9797e671847b62fee3c559a3877f7a57 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Wed, 14 Aug 2024 13:04:40 +0200 Subject: [PATCH 29/33] fix: exception handling --- src/ocrd_network/client.py | 3 ++- src/ocrd_network/server_utils.py | 9 ++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/ocrd_network/client.py b/src/ocrd_network/client.py index 793d6ead89..ff9ad870e3 100644 --- a/src/ocrd_network/client.py +++ b/src/ocrd_network/client.py @@ -1,3 +1,4 @@ +from typing import Optional from ocrd_utils import config, getLogger, LOG_FORMAT from .client_utils import ( get_ps_deployed_processors, @@ -16,7 +17,7 @@ class Client: def __init__( self, - server_addr_processing: str, + server_addr_processing: Optional[str], timeout: int = config.OCRD_NETWORK_CLIENT_POLLING_TIMEOUT, wait: int = config.OCRD_NETWORK_CLIENT_POLLING_SLEEP ): diff --git a/src/ocrd_network/server_utils.py b/src/ocrd_network/server_utils.py index 8fa13fd714..9d8628170c 100644 --- a/src/ocrd_network/server_utils.py +++ b/src/ocrd_network/server_utils.py @@ -125,14 +125,13 @@ def request_processor_server_tool_json(logger: Logger, processor_server_base_url urljoin(base=processor_server_base_url, url="info"), headers={"Content-Type": "application/json"} ) - if response.status_code != 200: - message = f"Failed to retrieve tool json from: {processor_server_base_url}, code: {response.status_code}" - raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message) - return response.json() except Exception as error: message = f"Failed to retrieve ocrd tool json from: {processor_server_base_url}" raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) - + if response.status_code != 200: + message = f"Failed to retrieve tool json from: {processor_server_base_url}, code: {response.status_code}" + raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message) + return response.json() async def forward_job_to_processor_server( logger: Logger, job_input: PYJobInput, processor_server_base_url: str From 5f746c1ee61b2a70f4c8439b71a739e5e781cc3b Mon Sep 17 00:00:00 2001 From: kba Date: Wed, 14 Aug 2024 13:09:20 +0200 Subject: [PATCH 30/33] ocrd network client: parse parameters and overrides --- src/ocrd_network/cli/client.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index 38eb959a07..f4bf216992 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -1,8 +1,10 @@ import click from json import dumps -from typing import Optional +from typing import List, Optional, Tuple from ocrd.decorators.parameter_option import parameter_option, parameter_override_option from ocrd_utils import DEFAULT_METS_BASENAME +from ocrd_utils.introspect import set_json_key_value_overrides +from ocrd_utils.str import parse_json_string_or_file from ..client import Client @@ -111,8 +113,8 @@ def send_processing_job_request( input_file_grp: str, output_file_grp: Optional[str], page_id: Optional[str], - parameter: Optional[dict], - parameter_override: Optional[dict], + parameter: List[str], + parameter_override: List[Tuple[str, str]], result_queue_name: Optional[str], callback_url: Optional[str], # TODO: This is temporally available to toggle @@ -133,11 +135,7 @@ def send_processing_job_request( req_params["output_file_grps"] = output_file_grp.split(',') if page_id: req_params["page_id"] = page_id - if parameter: - if parameter == ['{}']: - req_params["parameters"] = {} - else: - req_params["parameters"] = parameter + req_params["parameters"] = set_json_key_value_overrides(parse_json_string_or_file(*parameter), *parameter_override) if parameter_override: req_params["parameters"] = parameter_override if result_queue_name: From 8fc8bff7b15f7550a0d744b3d7ca619dae96136f Mon Sep 17 00:00:00 2001 From: kba Date: Wed, 14 Aug 2024 16:00:08 +0200 Subject: [PATCH 31/33] fix parameter parsing again --- src/ocrd_network/cli/client.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index f4bf216992..f6593fb147 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -136,8 +136,6 @@ def send_processing_job_request( if page_id: req_params["page_id"] = page_id req_params["parameters"] = set_json_key_value_overrides(parse_json_string_or_file(*parameter), *parameter_override) - if parameter_override: - req_params["parameters"] = parameter_override if result_queue_name: req_params["result_queue_name"] = result_queue_name if callback_url: From 15cea57c9c88b2b5585092547dd4d7dba8e170fb Mon Sep 17 00:00:00 2001 From: kba Date: Thu, 22 Aug 2024 16:53:20 +0200 Subject: [PATCH 32/33] :memo: changelog --- CHANGELOG.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 14783f8376..a9bc4d67b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,17 @@ Versioned according to [Semantic Versioning](http://semver.org/). Changed: * ocrd_network: Use `ocrd-all-tool.json` bundled by core instead of download from website, #1257, #1260 + * `ocrd network client processing processor` supports blocking behavior with `--block` by polling job status, #1265, #1269 + +Added: + + * `ocrd network client workflow check-status` to get the status of a workflow job, #1269 + * `ocrd network client processing check-status` to get the status of a processing (processor) job, #1269 + * `ocrd network client workflow run` Run, optionally blocking, a workflow on the processing server, #1265, #1269 + * Environment variables `OCRD_NETWORK_CLIENT_POLLING_SLEEP` and `OCRD_NETWORK_CLIENT_POLLING_TIMEOUT` to control polling interval and timeout for `ocrd network client {processing processor,workflow run`, #1269 + * `ocrd network client discovery processors` to list the processors deployed in the processing server, #1269 + * `ocrd network client discovery processor` to get the `ocrd-tool.json` of a deployed processor, #1269 + * `ocrd network client processing check-log` to retrieve the log data for a processing job, #1269 ## [2.67.2] - 2024-07-19 From 66085392bc35334530b520b859f8760dc3518b80 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 23 Aug 2024 08:43:19 +0200 Subject: [PATCH 33/33] refactor client cli: process -> run --- src/ocrd_network/cli/client.py | 2 +- src/ocrd_network/client.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index f6593fb147..9c7f15c88f 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -90,7 +90,7 @@ def check_processing_job_status(address: Optional[str], processing_job_id: str): print(f"Processing job status: {job_status}") -@processing_cli.command('processor') +@processing_cli.command('run') @click.argument('processor_name', required=True, type=click.STRING) @click.option('--address', help='The address of the Processing Server. If not provided, ' diff --git a/src/ocrd_network/client.py b/src/ocrd_network/client.py index ff9ad870e3..8ec8e541ea 100644 --- a/src/ocrd_network/client.py +++ b/src/ocrd_network/client.py @@ -28,7 +28,7 @@ def __init__( verify_server_protocol(self.server_addr_processing) self.polling_timeout = timeout self.polling_wait = wait - self.polling_tries = int(timeout/wait) + self.polling_tries = int(timeout / wait) def check_deployed_processors(self): return get_ps_deployed_processors(ps_server_host=self.server_addr_processing)