Skip to content

Commit

Permalink
Add ability to save diagnostics data to artifacts
Browse files Browse the repository at this point in the history
closes #5422
  • Loading branch information
lubosmj committed Jul 22, 2024
1 parent fd6c397 commit 719e1e0
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 36 deletions.
2 changes: 2 additions & 0 deletions CHANGES/5422.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added the ability to save task's diagnostics data as artifacts. These artifacts are available at
the task's detail endpoint. To download the data, issue GET requests to `${TASK_HREF}profile_data/`.
35 changes: 35 additions & 0 deletions pulpcore/app/migrations/0121_add_profile_data_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Generated by Django 4.2.13 on 2024-07-22 12:05

from django.db import migrations, models
import django.db.models.deletion
import django_lifecycle.mixins
import pulpcore.app.models.base


class Migration(migrations.Migration):

dependencies = [
('core', '0120_get_url_removal'),
]

operations = [
migrations.CreateModel(
name='ProfileData',
fields=[
('pulp_id', models.UUIDField(default=pulpcore.app.models.base.pulp_uuid, editable=False, primary_key=True, serialize=False)),
('pulp_created', models.DateTimeField(auto_now_add=True)),
('pulp_last_updated', models.DateTimeField(auto_now=True, null=True)),
('name', models.TextField()),
('artifact', models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, to='core.artifact')),
],
options={
'unique_together': {('artifact', 'name')},
},
bases=(django_lifecycle.mixins.LifecycleModelMixin, models.Model),
),
migrations.AddField(
model_name='task',
name='profile_data',
field=models.ManyToManyField(to='core.profiledata'),
),
]
1 change: 1 addition & 0 deletions pulpcore/app/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@

from .task import (
CreatedResource,
ProfileData,
Task,
TaskGroup,
TaskSchedule,
Expand Down
13 changes: 13 additions & 0 deletions pulpcore/app/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ def _uuid_to_advisory_lock(value):
return ((value >> 64) ^ value) & 0x7FFFFFFFFFFFFFFF


class ProfileData(BaseModel):
"""
A model encapsulating profiled artifact data
"""
artifact = models.ForeignKey("Artifact", on_delete=models.CASCADE, null=True)
name = models.TextField()

class Meta:
unique_together = ("artifact", "name")


class Task(BaseModel, AutoAddObjPermsMixin):
"""
Represents a task
Expand Down Expand Up @@ -100,6 +111,8 @@ class Task(BaseModel, AutoAddObjPermsMixin):
pulp_domain = models.ForeignKey("Domain", default=get_domain_pk, on_delete=models.CASCADE)
versions = HStoreField(default=dict)

profile_data = models.ManyToManyField(ProfileData)

def __str__(self):
return "Task: {name} [{state}]".format(name=self.name, state=self.state)

Expand Down
17 changes: 15 additions & 2 deletions pulpcore/app/viewsets/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
WorkerSerializer,
)
from pulpcore.app.tasks import purge
from pulpcore.app.util import get_domain
from pulpcore.app.util import get_domain, get_artifact_url
from pulpcore.app.viewsets import NamedModelViewSet, RolesMixin
from pulpcore.app.viewsets.base import DATETIME_FILTER_OPTIONS, NAME_FILTER_OPTIONS
from pulpcore.app.viewsets.custom_filters import (
Expand Down Expand Up @@ -87,7 +87,7 @@ class TaskViewSet(
"statements": [
{"action": ["list"], "principal": "authenticated", "effect": "allow"},
{
"action": ["retrieve", "my_permissions"],
"action": ["retrieve", "profile_data", "my_permissions"],
"principal": "authenticated",
"effect": "allow",
"condition": "has_model_or_domain_or_obj_perms:core.view_task",
Expand Down Expand Up @@ -242,6 +242,19 @@ def purge(self, request):
)
return OperationPostponedResponse(task, request)

@action(detail=True)
def profile_data(self, request, pk):
"""
Return pre-signed URLs used for downloading raw profile data.
"""
task = self.get_object()
data = {}

for profile_data in task.profile_data.select_related("artifact").all():
data[profile_data.name] = get_artifact_url(profile_data.artifact)

return Response(data)


class TaskGroupViewSet(NamedModelViewSet, mixins.RetrieveModelMixin, mixins.ListModelMixin):
queryset = TaskGroup.objects.all()
Expand Down
3 changes: 0 additions & 3 deletions pulpcore/constants.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
from gettext import gettext as _
from pathlib import Path
from types import SimpleNamespace


VAR_TMP_PULP = Path("/var/tmp/pulp")

# Special purpose advisory locks for use with the two number variant.
# The group will be 0.
# The numbers are randomly chosen.
Expand Down
105 changes: 77 additions & 28 deletions pulpcore/tasking/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,26 @@
import sys
import threading
import time
import tempfile
from gettext import gettext as _

from contextlib import suppress

from django.conf import settings
from django.db import connection, transaction
from django.db import connection, transaction, IntegrityError
from django.db.models import Q
from django.utils import timezone
from django_guid import set_guid
from django_guid.utils import generate_guid
from pulpcore.app.models import Artifact, Content, Task, TaskSchedule
from pulpcore.app.models import Artifact, Content, Task, TaskSchedule, ProfileData
from pulpcore.app.role_util import get_users_with_perms
from pulpcore.app.util import (
set_current_user,
set_domain,
configure_analytics,
configure_cleanup,
)
from pulpcore.constants import TASK_FINAL_STATES, TASK_STATES, VAR_TMP_PULP
from pulpcore.constants import TASK_FINAL_STATES, TASK_STATES
from pulpcore.tasking.tasks import dispatch, execute_task

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -58,13 +61,11 @@ def delete_incomplete_resources(task):
_logger.error(_("Delete created resource, failed: {}").format(str(error)))


def write_memory_usage(path):
_logger.info("Writing task memory data to {}".format(path))

def write_memory_usage(stop_event, path):
with open(path, "w") as file:
file.write("# Seconds\tMemory in MB\n")
seconds = 0
while True:
while not stop_event.is_set():
current_mb_in_use = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024
file.write(f"{seconds}\t{current_mb_in_use:.2f}\n")
file.flush()
Expand Down Expand Up @@ -92,16 +93,6 @@ def perform_task(task_pk, task_working_dir_rel_path):
signal.signal(signal.SIGTERM, child_signal_handler)
signal.signal(signal.SIGHUP, child_signal_handler)
signal.signal(signal.SIGUSR1, child_signal_handler)
if settings.TASK_DIAGNOSTICS:
diagnostics_dir = VAR_TMP_PULP / str(task_pk)
diagnostics_dir.mkdir(parents=True, exist_ok=True)
mem_diagnostics_path = diagnostics_dir / "memory.datum"
# It would be better to have this recording happen in the parent process instead of here
# https://github.com/pulp/pulpcore/issues/2337
mem_diagnostics_thread = threading.Thread(
target=write_memory_usage, args=(mem_diagnostics_path,), daemon=True
)
mem_diagnostics_thread.start()
# All processes need to create their own postgres connection
connection.connection = None
task = Task.objects.select_related("pulp_domain").get(pk=task_pk)
Expand All @@ -114,21 +105,79 @@ def perform_task(task_pk, task_working_dir_rel_path):
set_domain(task.pulp_domain)
os.chdir(task_working_dir_rel_path)

# set up profiling
if settings.TASK_DIAGNOSTICS and importlib.util.find_spec("pyinstrument") is not None:
from pyinstrument import Profiler

with Profiler() as profiler:
execute_task(task)

profile_file = diagnostics_dir / "pyinstrument.html"
_logger.info("Writing task profile data to {}".format(profile_file))
with open(profile_file, "w+") as f:
f.write(profiler.output_html())
if settings.TASK_DIAGNOSTICS:
_execute_task_and_profile(task)
else:
execute_task(task)


def _execute_task_and_profile(task):
with tempfile.TemporaryDirectory(dir=settings.WORKING_DIRECTORY) as temp_dir:
pyinstrument_func = _pyinstrument_diagnostic_decorator(temp_dir, execute_task)
memory_func = _memory_diagnostic_decorator(temp_dir, pyinstrument_func)

memory_func(task)


def _memory_diagnostic_decorator(temp_dir, func):
def __memory_diagnostic_decorator(task):
mem_diagnostics_file_path = os.path.join(temp_dir, "memory.datum")
# It would be better to have this recording happen in the parent process instead of here
# https://github.com/pulp/pulpcore/issues/2337
stop_event = threading.Event()
mem_diagnostics_thread = threading.Thread(
target=write_memory_usage, args=(stop_event, mem_diagnostics_file_path), daemon=True
)
mem_diagnostics_thread.start()

func(task)

stop_event.set()
artifact = Artifact.init_and_validate(mem_diagnostics_file_path)
with suppress(IntegrityError):
artifact.save()

profile_data = ProfileData(artifact=artifact, name="memory_profile")
with suppress(IntegrityError):
profile_data.save()

task.profile_data.add(profile_data)

_logger.info("Created memory diagnostic data.")

return __memory_diagnostic_decorator


def _pyinstrument_diagnostic_decorator(temp_dir, func):
def __pyinstrument_diagnostic_decorator(task):
if importlib.util.find_spec("pyinstrument") is not None:
from pyinstrument import Profiler

with Profiler() as profiler:
func(task)

profile_file_path = os.path.join(temp_dir, "pyinstrument.html")
with open(profile_file_path, "w+") as f:
f.write(profiler.output_html())
f.flush()

artifact = Artifact.init_and_validate(str(profile_file_path))
with suppress(IntegrityError):
artifact.save()

profile_data = ProfileData(artifact=artifact, name="pyinstrument_data")
with suppress(IntegrityError):
profile_data.save()

task.profile_data.add(profile_data)

_logger.info("Created pyinstrument profile data.")
else:
func(task)

return __pyinstrument_diagnostic_decorator


def dispatch_scheduled_tasks():
# Warning, dispatch_scheduled_tasks is not race condition free!
now = timezone.now()
Expand Down
7 changes: 4 additions & 3 deletions staging_docs/admin/reference/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,11 @@ If set to 0, automatic cleanup is disabled, which is the default.

### TASK_DIAGNOSTICS

If `True`, each task will record various diagnostics (listed below) to files in the dir
`/var/tmp/pulp/<task_UUID>/`. This is `False` by default.
The default setting is `False`. When set to `True`, each task records various diagnostics (listed below)
and stores them as separate artifacts. To download the data, issue GET requests to `${TASK_HREF}profile_data/`.

> - memory - the task's max resident set size in MB.
> - memory.datum - the task's max resident set size in MB
> - pyinstrument.html - the output of the pyinstrument profiler, if installed


Expand Down

0 comments on commit 719e1e0

Please sign in to comment.