Skip to content

Commit

Permalink
added execution log/output view, fixed stopping of running jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
ansibleguy committed Jan 28, 2024
1 parent 48c507b commit 275bf27
Show file tree
Hide file tree
Showing 21 changed files with 392 additions and 154 deletions.
4 changes: 3 additions & 1 deletion src/ansible-webui/aw/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
from drf_spectacular.views import SpectacularAPIView, SpectacularSwaggerView

from aw.api_endpoints.key import APIKey, APIKeyItem
from aw.api_endpoints.job import APIJob, APIJobItem
from aw.api_endpoints.job import APIJob, APIJobItem, APIJobExecutionItem, APIJobExecutionLogs

urlpatterns_api = [
path('api/key/<str:token>', APIKeyItem.as_view()),
path('api/key', APIKey.as_view()),
path('api/job/<int:job_id>/<int:exec_id>/<int:line_start>', APIJobExecutionLogs.as_view()),
path('api/job/<int:job_id>/<int:exec_id>', APIJobExecutionItem.as_view()),
path('api/job/<int:job_id>', APIJobItem.as_view()),
path('api/job', APIJob.as_view()),
path('api/_schema/', SpectacularAPIView.as_view(), name='_schema'),
Expand Down
107 changes: 81 additions & 26 deletions src/ansible-webui/aw/api_endpoints/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
from rest_framework.response import Response
from drf_spectacular.utils import extend_schema, OpenApiResponse

from aw.models import Job
from aw.model.job import Job, JobExecution, BaseJobCredentials, \
CHOICE_JOB_PERMISSION_READ, CHOICE_JOB_PERMISSION_WRITE, CHOICE_JOB_PERMISSION_EXECUTE
from aw.api_endpoints.base import API_PERMISSION, get_api_user, BaseResponse
from aw.api_endpoints.job_util import get_viewable_jobs_serialized, job_action_allowed, \
JobReadResponse
from aw.model.job import CHOICE_JOB_PERMISSION_READ, CHOICE_JOB_PERMISSION_WRITE, CHOICE_JOB_PERMISSION_EXECUTE, \
BaseJobCredentials
from aw.execute.queue import queue_add
from aw.execute.util import update_execution_status, is_execution_status
from aw.utils.util import is_null

LIMIT_JOB_RESULTS = 10


class JobWriteRequest(serializers.ModelSerializer):
class Meta:
Expand All @@ -29,6 +31,17 @@ class JobWriteResponse(BaseResponse):
msg = serializers.CharField()


def _find_job(job_id: int) -> (Job, None):
# pylint: disable=E1101
return Job.objects.get(id=job_id)


def _find_job_and_execution(job_id: int, exec_id: int) -> tuple[Job, JobExecution]:
# pylint: disable=E1101
job = _find_job(job_id)
return job, JobExecution.objects.get(id=exec_id, job=job)


class APIJob(APIView):
http_method_names = ['post', 'get']
serializer_class = JobReadResponse
Expand Down Expand Up @@ -83,15 +96,10 @@ def post(self, request):


class APIJobItem(APIView):
http_method_names = ['get', 'delete', 'put', 'post', 'patch']
http_method_names = ['get', 'delete', 'put', 'post']
serializer_class = JobWriteResponse
permission_classes = API_PERMISSION

@staticmethod
def _find_job(job_id: int) -> (Job, None):
# pylint: disable=E1101
return Job.objects.get(id=job_id)

@extend_schema(
request=None,
responses={
Expand All @@ -104,7 +112,7 @@ def _find_job(job_id: int) -> (Job, None):
def get(self, request, job_id: int):
self.serializer_class = JobReadResponse
user = get_api_user(request)
job = self._find_job(job_id)
job = _find_job(job_id)
if job is None:
return Response(data={'msg': f"Job with ID {job_id} does not exist"}, status=404)

Expand All @@ -126,7 +134,7 @@ def get(self, request, job_id: int):
def delete(self, request, job_id: int):
user = get_api_user(request)
try:
job = self._find_job(job_id)
job = _find_job(job_id)

if job is not None:
if not job_action_allowed(user=user, job=job, permission_needed=CHOICE_JOB_PERMISSION_WRITE):
Expand Down Expand Up @@ -154,7 +162,7 @@ def delete(self, request, job_id: int):
def put(self, request, job_id: int):
user = get_api_user(request)
try:
job = self._find_job(job_id)
job = _find_job(job_id)

if job is not None:
if not job_action_allowed(user=user, job=job, permission_needed=CHOICE_JOB_PERMISSION_WRITE):
Expand Down Expand Up @@ -205,7 +213,7 @@ def put(self, request, job_id: int):
def post(self, request, job_id: int):
user = get_api_user(request)
try:
job = self._find_job(job_id)
job = _find_job(job_id)

if job is not None:
if not job_action_allowed(user=user, job=job, permission_needed=CHOICE_JOB_PERMISSION_EXECUTE):
Expand All @@ -219,33 +227,80 @@ def post(self, request, job_id: int):

return Response(data={'msg': f"Job with ID '{job_id}' does not exist"}, status=404)


class APIJobExecutionItem(APIView):
http_method_names = ['delete']
serializer_class = JobWriteResponse
permission_classes = API_PERMISSION

@extend_schema(
request=None,
responses={
200: OpenApiResponse(JobReadResponse, description='Job execution stopped'),
400: OpenApiResponse(JobReadResponse, description='Job is not running'),
200: OpenApiResponse(JobReadResponse, description='Job execution stopping'),
400: OpenApiResponse(JobReadResponse, description='Job execution is not running'),
403: OpenApiResponse(JobReadResponse, description='Not privileged to stop the job'),
404: OpenApiResponse(JobReadResponse, description='Job does not exist'),
404: OpenApiResponse(JobReadResponse, description='Job or execution does not exist'),
},
summary='Stop a running job.',
operation_id='job_stop'
summary='Stop a running job execution.',
operation_id='job_exec_stop'
)
def patch(self, request, job_id: int):
def delete(self, request, job_id: int, exec_id: int):
user = get_api_user(request)
try:
job = self._find_job(job_id)
job, execution = _find_job_and_execution(job_id, exec_id)

if job is not None:
if job is not None and execution is not None:
if not job_action_allowed(user=user, job=job, permission_needed=CHOICE_JOB_PERMISSION_EXECUTE):
return Response(data={'msg': f"Not privileged to stop the job '{job.name}'"}, status=403)

if not job.state_running:
return Response(data={'msg': f"Job '{job.name}' is not running"}, status=400)
if not is_execution_status(execution, 'Running'):
return Response(data={'msg': f"Job execution '{job.name}' is not running"}, status=400)

job.state_stop = True
return Response(data={'msg': f"Job '{job.name}' execution stopped"}, status=200)
update_execution_status(execution, 'Stopping')
return Response(data={'msg': f"Job execution '{job.name}' stopping"}, status=200)

except ObjectDoesNotExist:
pass

return Response(data={'msg': f"Job with ID '{job_id}' does not exist"}, status=404)
return Response(data={'msg': f"Job with ID '{job_id}' or execution does not exist"}, status=404)


class JobExecutionLogReadResponse(BaseResponse):
lines = serializers.ListSerializer(child=serializers.CharField())


class APIJobExecutionLogs(APIView):
http_method_names = ['get']
serializer_class = JobExecutionLogReadResponse
permission_classes = API_PERMISSION

@extend_schema(
request=None,
responses={
200: OpenApiResponse(JobReadResponse, description='Return job logs'),
403: OpenApiResponse(JobReadResponse, description='Not privileged to view the job logs'),
404: OpenApiResponse(JobReadResponse, description='Job, execution or logs do not exist'),
},
summary='Get logs of a job execution.',
operation_id='job_exec_logs'
)
def get(self, request, job_id: int, exec_id: int, line_start: int = 0):
user = get_api_user(request)
try:
job, execution = _find_job_and_execution(job_id, exec_id)

if job is not None and execution is not None:
if not job_action_allowed(user=user, job=job, permission_needed=CHOICE_JOB_PERMISSION_READ):
return Response(data={'msg': f"Not privileged to view logs of the job '{job.name}'"}, status=403)

if execution.log_stdout is None:
return Response(data={'msg': f"No logs found for job '{job.name}'"}, status=404)

with open(execution.log_stdout, 'r', encoding='utf-8') as logfile:
lines = logfile.readlines()
return Response(data={'lines': lines[line_start:]}, status=200)

except ObjectDoesNotExist:
pass

return Response(data={'msg': f"Job with ID '{job_id}' or execution does not exist"}, status=404)
2 changes: 1 addition & 1 deletion src/ansible-webui/aw/config/navigation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
# 'Dashboard': '/ui/',
'Jobs': {
'Manage': '/ui/jobs/manage',
'Logs': '/ui/jobs/log',
# 'Queue': '/ui/jobs/queue',
# 'Logs': '/ui/jobs/log',
},
'Settings': {
'API Keys': '/ui/settings/api_keys',
Expand Down
27 changes: 14 additions & 13 deletions src/ansible-webui/aw/execute/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from ansible_runner import RunnerConfig, Runner

from aw.model.job import Job, JobExecution
from aw.execute.play_util import runner_cleanup, runner_prep, parse_run_result, failure
from aw.execute.util import get_path_run, write_stdout_stderr
from aw.utils.util import datetime_w_tz, is_null # get_ansible_versions
from aw.execute.play_util import runner_cleanup, runner_prep, parse_run_result, failure, runner_logs, job_logs
from aw.execute.util import get_path_run, is_execution_status
from aw.utils.util import datetime_w_tz, is_null, timed_lru_cache # get_ansible_versions
from aw.utils.handlers import AnsibleConfigError
from aw.utils.debug import log

Expand All @@ -22,38 +22,39 @@ def ansible_playbook(job: Job, execution: (JobExecution, None)):
if is_null(execution):
execution = JobExecution(user=None, job=job, comment='Scheduled')

log_files = job_logs(job=job, execution=execution)
execution.log_stdout = log_files['stdout']
execution.log_stderr = log_files['stderr']

@timed_lru_cache(seconds=1) # check actual status every N seconds; lower DB queries
def _cancel_job() -> bool:
return job.state_stop
return is_execution_status(execution, 'Stopping')

runner_cfg = None
try:
opts = runner_prep(job=job, execution=execution, path_run=path_run)
execution.save()

# todo: fix runner overwriting pre-run logs
runner_cfg = AwRunnerConfig(**opts)
# write_stdout_stderr(config=config, msg=f"VERSIONS: '{get_ansible_versions()}'")
runner_logs(cfg=runner_cfg, log_files=log_files)
runner_cfg.prepare()
log(msg=f"Running job '{job.name}': '{' '.join(runner_cfg.command)}'", level=5)
# write_stdout_stderr(config=config, msg=f"RUNNING COMMAND: '{' '.join(config.command)}'")

runner = Runner(config=runner_cfg, cancel_callback=_cancel_job)
runner.run()

write_stdout_stderr(cfg=runner_cfg, msg='PROCESSING RESULT')
parse_run_result(
time_start=time_start,
execution=execution,
runner=runner,
)
del runner

write_stdout_stderr(cfg=runner_cfg, msg='CLEANUP')
runner_cleanup(job=job, execution=execution, path_run=path_run, cfg=runner_cfg)
runner_cleanup(path_run)

except (OSError, AnsibleConfigError) as err:
tb = traceback.format_exc(limit=1024)
failure(
job=job, execution=execution, path_run=path_run, time_start=time_start,
error_s=str(err), error_m=tb, cfg=runner_cfg,
execution=execution, path_run=path_run, time_start=time_start,
error_s=str(err), error_m=tb,
)
raise
Loading

0 comments on commit 275bf27

Please sign in to comment.