From f4b1fd29e7307cb92849792cb24a3e4582bd8421 Mon Sep 17 00:00:00 2001 From: Caleb Date: Sat, 14 Dec 2024 10:01:41 -0500 Subject: [PATCH 1/4] move FileApplication to own module --- src/middlewared/middlewared/apps/__init__.py | 1 + src/middlewared/middlewared/apps/file_app.py | 189 +++++++++++++++++++ src/middlewared/middlewared/main.py | 184 +----------------- 3 files changed, 193 insertions(+), 181 deletions(-) create mode 100644 src/middlewared/middlewared/apps/__init__.py create mode 100644 src/middlewared/middlewared/apps/file_app.py diff --git a/src/middlewared/middlewared/apps/__init__.py b/src/middlewared/middlewared/apps/__init__.py new file mode 100644 index 000000000000..f4cd6f8a7ed1 --- /dev/null +++ b/src/middlewared/middlewared/apps/__init__.py @@ -0,0 +1 @@ +from .file_app import FileApplication diff --git a/src/middlewared/middlewared/apps/file_app.py b/src/middlewared/middlewared/apps/file_app.py new file mode 100644 index 000000000000..46ff49f016f0 --- /dev/null +++ b/src/middlewared/middlewared/apps/file_app.py @@ -0,0 +1,189 @@ +from asyncio import run_coroutine_threadsafe +from json import dumps, loads +from urllib.parse import parse_qs + +from aiohttp import web + +from middlewared.pipe import Pipes +from middlewared.restful import parse_credentials, authenticate, create_application, copy_multipart_to_pipe +from middlewared.service_exception import CallError + +__all__ = ('FileApplication',) + + +class FileApplication: + + def __init__(self, middleware, loop): + self.middleware = middleware + self.loop = loop + self.jobs = {} + + def register_job(self, job_id, buffered): + self.jobs[job_id] = self.middleware.loop.call_later( + 3600 if buffered else 60, # FIXME: Allow the job to run for infinite time + give 300 seconds to begin + # download instead of waiting 3600 seconds for the whole operation + lambda: self.middleware.create_task(self._cleanup_job(job_id)), + ) + + async def _cleanup_cancel(self, job_id): + job_cleanup = self.jobs.pop(job_id, None) + if job_cleanup: + job_cleanup.cancel() + + async def _cleanup_job(self, job_id): + if job_id not in self.jobs: + return + self.jobs[job_id].cancel() + del self.jobs[job_id] + + job = self.middleware.jobs[job_id] + await job.pipes.close() + + async def download(self, request): + path = request.path.split('/') + if not request.path[-1].isdigit(): + resp = web.Response() + resp.set_status(404) + return resp + + job_id = int(path[-1]) + + qs = parse_qs(request.query_string) + denied = False + filename = None + if 'auth_token' not in qs: + denied = True + else: + auth_token = qs.get('auth_token')[0] + token = await self.middleware.call('auth.get_token', auth_token) + if not token: + denied = True + else: + if token['attributes'].get('job') != job_id: + denied = True + else: + filename = token['attributes'].get('filename') + if denied: + resp = web.Response() + resp.set_status(401) + return resp + + job = self.middleware.jobs.get(job_id) + if not job: + resp = web.Response() + resp.set_status(404) + return resp + + if job_id not in self.jobs: + resp = web.Response() + resp.set_status(410) + return resp + + resp = web.StreamResponse(status=200, reason='OK', headers={ + 'Content-Type': 'application/octet-stream', + 'Content-Disposition': f'attachment; filename="{filename}"', + 'Transfer-Encoding': 'chunked', + }) + await resp.prepare(request) + + def do_copy(): + while True: + read = job.pipes.output.r.read(1048576) + if read == b'': + break + run_coroutine_threadsafe(resp.write(read), loop=self.loop).result() + + try: + await self._cleanup_cancel(job_id) + await self.middleware.run_in_thread(do_copy) + finally: + await job.pipes.close() + + await resp.drain() + return resp + + async def upload(self, request): + reader = await request.multipart() + + part = await reader.next() + if not part: + resp = web.Response(status=405, body='No part found on payload') + resp.set_status(405) + return resp + + if part.name != 'data': + resp = web.Response(status=405, body='"data" part must be the first on payload') + resp.set_status(405) + return resp + + try: + data = loads(await part.read()) + except Exception as e: + return web.Response(status=400, body=str(e)) + + if 'method' not in data: + return web.Response(status=422) + + try: + credentials = parse_credentials(request) + if credentials is None: + raise web.HTTPUnauthorized() + except web.HTTPException as e: + return web.Response(status=e.status_code, body=e.text) + app = await create_application(request) + try: + authenticated_credentials = await authenticate(self.middleware, request, credentials, 'CALL', + data['method']) + if authenticated_credentials is None: + raise web.HTTPUnauthorized() + except web.HTTPException as e: + credentials['credentials_data'].pop('password', None) + await self.middleware.log_audit_message(app, 'AUTHENTICATION', { + 'credentials': credentials, + 'error': e.text, + }, False) + return web.Response(status=e.status_code, body=e.text) + app = await create_application(request, authenticated_credentials) + credentials['credentials_data'].pop('password', None) + await self.middleware.log_audit_message(app, 'AUTHENTICATION', { + 'credentials': credentials, + 'error': None, + }, True) + + filepart = await reader.next() + + if not filepart or filepart.name != 'file': + resp = web.Response(status=405, body='"file" not found as second part on payload') + resp.set_status(405) + return resp + + try: + serviceobj, methodobj = self.middleware.get_method(data['method']) + if authenticated_credentials.authorize('CALL', data['method']): + job = await self.middleware.call_with_audit(data['method'], serviceobj, methodobj, + data.get('params') or [], app, + pipes=Pipes(input_=self.middleware.pipe())) + else: + await self.middleware.log_audit_message_for_method(data['method'], methodobj, data.get('params') or [], + app, True, False, False) + raise web.HTTPForbidden() + await self.middleware.run_in_thread(copy_multipart_to_pipe, self.loop, filepart, job.pipes.input) + except CallError as e: + if e.errno == CallError.ENOMETHOD: + status_code = 422 + else: + status_code = 412 + return web.Response(status=status_code, body=str(e)) + except web.HTTPException as e: + return web.Response(status=e.status_code, body=e.text) + except Exception as e: + return web.Response(status=500, body=str(e)) + + resp = web.Response( + status=200, + headers={ + 'Content-Type': 'application/json', + }, + body=dumps({'job_id': job.id}).encode(), + ) + return resp diff --git a/src/middlewared/middlewared/main.py b/src/middlewared/middlewared/main.py index 30881d7f9a2e..0c370bfdd824 100644 --- a/src/middlewared/middlewared/main.py +++ b/src/middlewared/middlewared/main.py @@ -8,11 +8,12 @@ from .api.base.server.ws_handler.base import BaseWebSocketHandler from .api.base.server.ws_handler.rpc import RpcWebSocketApp, RpcWebSocketAppEvent from .api.base.server.ws_handler.rpc import RpcWebSocketHandler +from .apps import FileApplication from .common.event_source.manager import EventSourceManager from .event import Events from .job import Job, JobsQueue, State -from .pipe import Pipes, Pipe -from .restful import parse_credentials, authenticate, create_application, copy_multipart_to_pipe, RESTfulAPI +from .pipe import Pipe +from .restful import RESTfulAPI from .role import ROLES, RoleManager from .schema import Error as SchemaError, OROperator from .webshellapp.webshell import ShellApplication @@ -71,7 +72,6 @@ import traceback import types import typing -import urllib.parse import uuid import tracemalloc @@ -365,184 +365,6 @@ def __setstate__(self, newstate): pass -class FileApplication(object): - - def __init__(self, middleware, loop): - self.middleware = middleware - self.loop = loop - self.jobs = {} - - def register_job(self, job_id, buffered): - self.jobs[job_id] = self.middleware.loop.call_later( - 3600 if buffered else 60, # FIXME: Allow the job to run for infinite time + give 300 seconds to begin - # download instead of waiting 3600 seconds for the whole operation - lambda: self.middleware.create_task(self._cleanup_job(job_id)), - ) - - async def _cleanup_cancel(self, job_id): - job_cleanup = self.jobs.pop(job_id, None) - if job_cleanup: - job_cleanup.cancel() - - async def _cleanup_job(self, job_id): - if job_id not in self.jobs: - return - self.jobs[job_id].cancel() - del self.jobs[job_id] - - job = self.middleware.jobs[job_id] - await job.pipes.close() - - async def download(self, request): - path = request.path.split('/') - if not request.path[-1].isdigit(): - resp = web.Response() - resp.set_status(404) - return resp - - job_id = int(path[-1]) - - qs = urllib.parse.parse_qs(request.query_string) - denied = False - filename = None - if 'auth_token' not in qs: - denied = True - else: - auth_token = qs.get('auth_token')[0] - token = await self.middleware.call('auth.get_token', auth_token) - if not token: - denied = True - else: - if token['attributes'].get('job') != job_id: - denied = True - else: - filename = token['attributes'].get('filename') - if denied: - resp = web.Response() - resp.set_status(401) - return resp - - job = self.middleware.jobs.get(job_id) - if not job: - resp = web.Response() - resp.set_status(404) - return resp - - if job_id not in self.jobs: - resp = web.Response() - resp.set_status(410) - return resp - - resp = web.StreamResponse(status=200, reason='OK', headers={ - 'Content-Type': 'application/octet-stream', - 'Content-Disposition': f'attachment; filename="{filename}"', - 'Transfer-Encoding': 'chunked', - }) - await resp.prepare(request) - - def do_copy(): - while True: - read = job.pipes.output.r.read(1048576) - if read == b'': - break - asyncio.run_coroutine_threadsafe(resp.write(read), loop=self.loop).result() - - try: - await self._cleanup_cancel(job_id) - await self.middleware.run_in_thread(do_copy) - finally: - await job.pipes.close() - - await resp.drain() - return resp - - async def upload(self, request): - reader = await request.multipart() - - part = await reader.next() - if not part: - resp = web.Response(status=405, body='No part found on payload') - resp.set_status(405) - return resp - - if part.name != 'data': - resp = web.Response(status=405, body='"data" part must be the first on payload') - resp.set_status(405) - return resp - - try: - data = json.loads(await part.read()) - except Exception as e: - return web.Response(status=400, body=str(e)) - - if 'method' not in data: - return web.Response(status=422) - - try: - credentials = parse_credentials(request) - if credentials is None: - raise web.HTTPUnauthorized() - except web.HTTPException as e: - return web.Response(status=e.status_code, body=e.text) - app = await create_application(request) - try: - authenticated_credentials = await authenticate(self.middleware, request, credentials, 'CALL', - data['method']) - if authenticated_credentials is None: - raise web.HTTPUnauthorized() - except web.HTTPException as e: - credentials['credentials_data'].pop('password', None) - await self.middleware.log_audit_message(app, 'AUTHENTICATION', { - 'credentials': credentials, - 'error': e.text, - }, False) - return web.Response(status=e.status_code, body=e.text) - app = await create_application(request, authenticated_credentials) - credentials['credentials_data'].pop('password', None) - await self.middleware.log_audit_message(app, 'AUTHENTICATION', { - 'credentials': credentials, - 'error': None, - }, True) - - filepart = await reader.next() - - if not filepart or filepart.name != 'file': - resp = web.Response(status=405, body='"file" not found as second part on payload') - resp.set_status(405) - return resp - - try: - serviceobj, methodobj = self.middleware.get_method(data['method']) - if authenticated_credentials.authorize('CALL', data['method']): - job = await self.middleware.call_with_audit(data['method'], serviceobj, methodobj, - data.get('params') or [], app, - pipes=Pipes(input_=self.middleware.pipe())) - else: - await self.middleware.log_audit_message_for_method(data['method'], methodobj, data.get('params') or [], - app, True, False, False) - raise web.HTTPForbidden() - await self.middleware.run_in_thread(copy_multipart_to_pipe, self.loop, filepart, job.pipes.input) - except CallError as e: - if e.errno == CallError.ENOMETHOD: - status_code = 422 - else: - status_code = 412 - return web.Response(status=status_code, body=str(e)) - except web.HTTPException as e: - return web.Response(status=e.status_code, body=e.text) - except Exception as e: - return web.Response(status=500, body=str(e)) - - resp = web.Response( - status=200, - headers={ - 'Content-Type': 'application/json', - }, - body=json.dumps({'job_id': job.id}).encode(), - ) - return resp - - class PreparedCall(typing.NamedTuple): args: list[typing.Any] | None = None executor: typing.Any | None = None From 0a13088a7d1a316d243a5d9609695543f79249f1 Mon Sep 17 00:00:00 2001 From: Caleb Date: Sat, 14 Dec 2024 10:03:27 -0500 Subject: [PATCH 2/4] format the file --- src/middlewared/middlewared/apps/file_app.py | 129 ++++++++++++------- 1 file changed, 84 insertions(+), 45 deletions(-) diff --git a/src/middlewared/middlewared/apps/file_app.py b/src/middlewared/middlewared/apps/file_app.py index 46ff49f016f0..a67d210dc45f 100644 --- a/src/middlewared/middlewared/apps/file_app.py +++ b/src/middlewared/middlewared/apps/file_app.py @@ -5,14 +5,18 @@ from aiohttp import web from middlewared.pipe import Pipes -from middlewared.restful import parse_credentials, authenticate, create_application, copy_multipart_to_pipe +from middlewared.restful import ( + parse_credentials, + authenticate, + create_application, + copy_multipart_to_pipe, +) from middlewared.service_exception import CallError -__all__ = ('FileApplication',) +__all__ = ("FileApplication",) class FileApplication: - def __init__(self, middleware, loop): self.middleware = middleware self.loop = loop @@ -20,8 +24,10 @@ def __init__(self, middleware, loop): def register_job(self, job_id, buffered): self.jobs[job_id] = self.middleware.loop.call_later( - 3600 if buffered else 60, # FIXME: Allow the job to run for infinite time + give 300 seconds to begin - # download instead of waiting 3600 seconds for the whole operation + 3600 + if buffered + else 60, # FIXME: Allow the job to run for infinite time + give 300 seconds to begin + # download instead of waiting 3600 seconds for the whole operation lambda: self.middleware.create_task(self._cleanup_job(job_id)), ) @@ -40,7 +46,7 @@ async def _cleanup_job(self, job_id): await job.pipes.close() async def download(self, request): - path = request.path.split('/') + path = request.path.split("/") if not request.path[-1].isdigit(): resp = web.Response() resp.set_status(404) @@ -51,18 +57,18 @@ async def download(self, request): qs = parse_qs(request.query_string) denied = False filename = None - if 'auth_token' not in qs: + if "auth_token" not in qs: denied = True else: - auth_token = qs.get('auth_token')[0] - token = await self.middleware.call('auth.get_token', auth_token) + auth_token = qs.get("auth_token")[0] + token = await self.middleware.call("auth.get_token", auth_token) if not token: denied = True else: - if token['attributes'].get('job') != job_id: + if token["attributes"].get("job") != job_id: denied = True else: - filename = token['attributes'].get('filename') + filename = token["attributes"].get("filename") if denied: resp = web.Response() resp.set_status(401) @@ -79,17 +85,21 @@ async def download(self, request): resp.set_status(410) return resp - resp = web.StreamResponse(status=200, reason='OK', headers={ - 'Content-Type': 'application/octet-stream', - 'Content-Disposition': f'attachment; filename="{filename}"', - 'Transfer-Encoding': 'chunked', - }) + resp = web.StreamResponse( + status=200, + reason="OK", + headers={ + "Content-Type": "application/octet-stream", + "Content-Disposition": f'attachment; filename="{filename}"', + "Transfer-Encoding": "chunked", + }, + ) await resp.prepare(request) def do_copy(): while True: read = job.pipes.output.r.read(1048576) - if read == b'': + if read == b"": break run_coroutine_threadsafe(resp.write(read), loop=self.loop).result() @@ -107,12 +117,14 @@ async def upload(self, request): part = await reader.next() if not part: - resp = web.Response(status=405, body='No part found on payload') + resp = web.Response(status=405, body="No part found on payload") resp.set_status(405) return resp - if part.name != 'data': - resp = web.Response(status=405, body='"data" part must be the first on payload') + if part.name != "data": + resp = web.Response( + status=405, body='"data" part must be the first on payload' + ) resp.set_status(405) return resp @@ -121,7 +133,7 @@ async def upload(self, request): except Exception as e: return web.Response(status=400, body=str(e)) - if 'method' not in data: + if "method" not in data: return web.Response(status=422) try: @@ -132,42 +144,69 @@ async def upload(self, request): return web.Response(status=e.status_code, body=e.text) app = await create_application(request) try: - authenticated_credentials = await authenticate(self.middleware, request, credentials, 'CALL', - data['method']) + authenticated_credentials = await authenticate( + self.middleware, request, credentials, "CALL", data["method"] + ) if authenticated_credentials is None: raise web.HTTPUnauthorized() except web.HTTPException as e: - credentials['credentials_data'].pop('password', None) - await self.middleware.log_audit_message(app, 'AUTHENTICATION', { - 'credentials': credentials, - 'error': e.text, - }, False) + credentials["credentials_data"].pop("password", None) + await self.middleware.log_audit_message( + app, + "AUTHENTICATION", + { + "credentials": credentials, + "error": e.text, + }, + False, + ) return web.Response(status=e.status_code, body=e.text) app = await create_application(request, authenticated_credentials) - credentials['credentials_data'].pop('password', None) - await self.middleware.log_audit_message(app, 'AUTHENTICATION', { - 'credentials': credentials, - 'error': None, - }, True) + credentials["credentials_data"].pop("password", None) + await self.middleware.log_audit_message( + app, + "AUTHENTICATION", + { + "credentials": credentials, + "error": None, + }, + True, + ) filepart = await reader.next() - if not filepart or filepart.name != 'file': - resp = web.Response(status=405, body='"file" not found as second part on payload') + if not filepart or filepart.name != "file": + resp = web.Response( + status=405, body='"file" not found as second part on payload' + ) resp.set_status(405) return resp try: - serviceobj, methodobj = self.middleware.get_method(data['method']) - if authenticated_credentials.authorize('CALL', data['method']): - job = await self.middleware.call_with_audit(data['method'], serviceobj, methodobj, - data.get('params') or [], app, - pipes=Pipes(input_=self.middleware.pipe())) + serviceobj, methodobj = self.middleware.get_method(data["method"]) + if authenticated_credentials.authorize("CALL", data["method"]): + job = await self.middleware.call_with_audit( + data["method"], + serviceobj, + methodobj, + data.get("params") or [], + app, + pipes=Pipes(input_=self.middleware.pipe()), + ) else: - await self.middleware.log_audit_message_for_method(data['method'], methodobj, data.get('params') or [], - app, True, False, False) + await self.middleware.log_audit_message_for_method( + data["method"], + methodobj, + data.get("params") or [], + app, + True, + False, + False, + ) raise web.HTTPForbidden() - await self.middleware.run_in_thread(copy_multipart_to_pipe, self.loop, filepart, job.pipes.input) + await self.middleware.run_in_thread( + copy_multipart_to_pipe, self.loop, filepart, job.pipes.input + ) except CallError as e: if e.errno == CallError.ENOMETHOD: status_code = 422 @@ -182,8 +221,8 @@ async def upload(self, request): resp = web.Response( status=200, headers={ - 'Content-Type': 'application/json', + "Content-Type": "application/json", }, - body=dumps({'job_id': job.id}).encode(), + body=dumps({"job_id": job.id}).encode(), ) return resp From 35a70bd9158c6706a6b02e10023ef2e2d864b8cb Mon Sep 17 00:00:00 2001 From: Caleb Date: Sat, 14 Dec 2024 10:06:08 -0500 Subject: [PATCH 3/4] move ShellApplication to apps dir --- src/middlewared/middlewared/apps/__init__.py | 3 ++- .../{webshellapp/webshell.py => apps/webshell_app.py} | 0 src/middlewared/middlewared/main.py | 3 +-- src/middlewared/middlewared/webshellapp/__init__.py | 0 4 files changed, 3 insertions(+), 3 deletions(-) rename src/middlewared/middlewared/{webshellapp/webshell.py => apps/webshell_app.py} (100%) delete mode 100644 src/middlewared/middlewared/webshellapp/__init__.py diff --git a/src/middlewared/middlewared/apps/__init__.py b/src/middlewared/middlewared/apps/__init__.py index f4cd6f8a7ed1..503f15db87e9 100644 --- a/src/middlewared/middlewared/apps/__init__.py +++ b/src/middlewared/middlewared/apps/__init__.py @@ -1 +1,2 @@ -from .file_app import FileApplication +from .file_app import FileApplication # noqa +from .webshell_app import ShellApplication # noqa diff --git a/src/middlewared/middlewared/webshellapp/webshell.py b/src/middlewared/middlewared/apps/webshell_app.py similarity index 100% rename from src/middlewared/middlewared/webshellapp/webshell.py rename to src/middlewared/middlewared/apps/webshell_app.py diff --git a/src/middlewared/middlewared/main.py b/src/middlewared/middlewared/main.py index 0c370bfdd824..bece0924bffc 100644 --- a/src/middlewared/middlewared/main.py +++ b/src/middlewared/middlewared/main.py @@ -8,7 +8,7 @@ from .api.base.server.ws_handler.base import BaseWebSocketHandler from .api.base.server.ws_handler.rpc import RpcWebSocketApp, RpcWebSocketAppEvent from .api.base.server.ws_handler.rpc import RpcWebSocketHandler -from .apps import FileApplication +from .apps import FileApplication, ShellApplication from .common.event_source.manager import EventSourceManager from .event import Events from .job import Job, JobsQueue, State @@ -16,7 +16,6 @@ from .restful import RESTfulAPI from .role import ROLES, RoleManager from .schema import Error as SchemaError, OROperator -from .webshellapp.webshell import ShellApplication import middlewared.service from .service_exception import ( adapt_exception, CallError, CallException, ErrnoMixin, ValidationError, ValidationErrors, diff --git a/src/middlewared/middlewared/webshellapp/__init__.py b/src/middlewared/middlewared/webshellapp/__init__.py deleted file mode 100644 index e69de29bb2d1..000000000000 From 8fc40ce91b1a5cbf0d3de460ed63b7e3702ea51a Mon Sep 17 00:00:00 2001 From: Caleb Date: Sat, 14 Dec 2024 10:07:55 -0500 Subject: [PATCH 4/4] fix comment formatting --- src/middlewared/middlewared/apps/file_app.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/middlewared/middlewared/apps/file_app.py b/src/middlewared/middlewared/apps/file_app.py index a67d210dc45f..43200ef27215 100644 --- a/src/middlewared/middlewared/apps/file_app.py +++ b/src/middlewared/middlewared/apps/file_app.py @@ -23,11 +23,10 @@ def __init__(self, middleware, loop): self.jobs = {} def register_job(self, job_id, buffered): + # FIXME: Allow the job to run for infinite time + give 300 seconds to begin + # download instead of waiting 3600 seconds for the whole operation self.jobs[job_id] = self.middleware.loop.call_later( - 3600 - if buffered - else 60, # FIXME: Allow the job to run for infinite time + give 300 seconds to begin - # download instead of waiting 3600 seconds for the whole operation + 3600 if buffered else 60, lambda: self.middleware.create_task(self._cleanup_job(job_id)), )