Skip to content

Commit

Permalink
Add a new data field to Blob model
Browse files Browse the repository at this point in the history
This is an alternative solution for issue pulp#1537.
Instead of creating a new model, this commit adds a new field to
the existing Blob model and, to avoid a lot of issues to identify
the file type (ASCII for container configs and binary for layers),
will keep the blobs (configs and layers) in the database if they
have a size < 22Kb.
  • Loading branch information
git-hyagi committed Apr 24, 2024
1 parent 39252e4 commit 8868dfe
Show file tree
Hide file tree
Showing 11 changed files with 213 additions and 29 deletions.
3 changes: 3 additions & 0 deletions CHANGES/1296.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added the new `data` field into the Blob model to store blobs smaller than 22Kb (an average size
for container configs). Blobs stored in this field will not be kept as artifacts anymore, which
will reduce the storage backend read requests.
66 changes: 66 additions & 0 deletions pulp_container/app/cache.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import json
import time

from django.core.exceptions import ObjectDoesNotExist
from django.db.models import F, Value
from django.http import HttpResponseRedirect, HttpResponse, FileResponse as ApiFileResponse

from pulpcore.app.settings import settings
from pulpcore.plugin.cache import CacheKeys, AsyncContentCache, SyncContentCache

from pulp_container.app.models import ContainerDistribution, ContainerPullThroughDistribution
Expand All @@ -9,6 +14,8 @@
ACCEPT_HEADER_KEY = "accept_header"
QUERY_KEY = "query"

DEFAULT_EXPIRES_TTL = settings.CACHE_SETTINGS["EXPIRES_TTL"]


class RegistryCache:
"""A class that overrides the default key specs."""
Expand Down Expand Up @@ -51,6 +58,65 @@ def make_key(self, request):
key = ":".join(all_keys[k] for k in self.keys)
return key

# overriding to serve layer blobs (bytes)
def make_response(self, key, base_key):
"""Tries to find the cached entry and turn it into a proper response"""
entry = self.get(key, base_key)
if not entry:
return None
entry = json.loads(entry)
response_type = entry.pop("type", None)
if binary := entry.pop("data", None):
# raw binary data were translated to their hexadecimal representation and saved in
# the cache as a regular string; now, it is necessary to translate the data back
# to its original representation that will be returned in the HTTP response BODY:
# https://docs.aiohttp.org/en/stable/web_reference.html#response
entry["data"] = bytes.fromhex(binary)
response_type = binary
# None means "doesn't expire", unset means "already expired".
expires = entry.pop("expires", -1)
if (not response_type or response_type not in self.RESPONSE_TYPES) or (
expires and expires < time.time()
):
# Bad entry, delete from cache
self.delete(key, base_key)
return None

response = self.RESPONSE_TYPES[response_type](**entry)
response.headers["X-PULP-CACHE"] = "HIT"
return response

# overriding to handle layer blobs (bytes) not raising exception when trying to decode it
def make_entry(self, key, base_key, handler, args, kwargs, expires=DEFAULT_EXPIRES_TTL):
"""Gets the response for the request and try to turn it into a cacheable entry"""
response = handler(*args, **kwargs)
entry = {"headers": dict(response.headers), "status": response.status_code}
if expires is not None:
# Redis TTL is not sufficient: https://github.com/pulp/pulpcore/issues/4845
entry["expires"] = expires + time.time()
else:
# Settings allow you to set None to mean "does not expire". Persist.
entry["expires"] = None
response.headers["X-PULP-CACHE"] = "MISS"
if isinstance(response, HttpResponseRedirect):
entry["redirect_to"] = str(response.headers["Location"])
entry["type"] = "Redirect"
elif isinstance(response, ApiFileResponse):
entry["path"] = str(response.filename)
entry["type"] = "FileResponse"
elif isinstance(response, HttpResponse):
entry["type"] = "Response"
if isinstance(response.data, bytes):
entry["data"] = response.data.hex()
else:
entry["content"] = response.content.decode("utf-8")
else:
# We don't cache StreamResponses or errors
return response

self.set(key, json.dumps(entry), expires, base_key=base_key)
return response


def find_base_path_cached(request, cached):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

from django.core.exceptions import ObjectDoesNotExist
from django.core.management import BaseCommand
from django.core.files.storage import default_storage as storage

from pulp_container.app.models import Manifest

from pulp_container.constants import MEDIA_TYPE
from pulp_container.constants import ARTIFACTLESS_BLOB_SIZE, MEDIA_TYPE


class Command(BaseCommand):
Expand Down Expand Up @@ -41,11 +42,30 @@ def handle(self, *args, **options):
media_type__in=[MEDIA_TYPE.MANIFEST_LIST, MEDIA_TYPE.INDEX_OCI], annotations={}
)
manifests_updated_count += self.update_manifests(manifest_lists)
manifests_updated_count += self.update_config_blobs()

self.stdout.write(
self.style.SUCCESS("Successfully updated %d manifests." % manifests_updated_count)
)

# TODO: check if it worth finding a better way to do it because here we are iterating over
# all of the available manifests again (we are also iterating over a list of manifests in
# the update_manifests method)
def update_config_blobs(self):
manifests_updated_count = 0
manifests = Manifest.objects.all()
for manifest in manifests.iterator():
artifact = manifest.config_blob._artifacts.get().file
blob_size = artifact.size
if blob_size < ARTIFACTLESS_BLOB_SIZE:
with storage.open(artifact.name) as artifact_file:
raw_data = artifact_file.read()
blob = manifest.config_blob
blob.data = raw_data
blob.save()
manifests_updated_count += 1
return manifests_updated_count

def update_manifests(self, manifests_qs):
manifests_updated_count = 0
manifests_to_update = []
Expand Down
18 changes: 18 additions & 0 deletions pulp_container/app/migrations/0039_artifactless_config_blob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.10 on 2024-04-24 15:52

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('container', '0038_add_manifest_metadata_fields'),
]

operations = [
migrations.AddField(
model_name='blob',
name='data',
field=models.BinaryField(blank=True),
),
]
10 changes: 10 additions & 0 deletions pulp_container/app/modelresource.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@
)


class BinaryWidget(widgets.Widget):
def clean(self, value, row=None, **kwargs):
return bytes.fromhex(value)

def render(self, value, obj=None):
return value.hex()


class ContainerRepositoryResource(RepositoryResource):
"""
A resource for importing/exporting repositories of the sync type
Expand Down Expand Up @@ -65,6 +73,8 @@ class BlobResource(BaseContentResource):
Resource for import/export of blob entities
"""

data = fields.Field(column_name="data", attribute="data", widget=BinaryWidget())

def set_up_queryset(self):
"""
:return: Blobs specific to a specified repo-version.
Expand Down
17 changes: 13 additions & 4 deletions pulp_container/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class Blob(Content):
Fields:
digest (models.TextField): The blob digest.
data (models.TextField): The container config content.
Relations:
manifest (models.ForeignKey): Many-to-one relationship with Manifest.
Expand All @@ -56,6 +57,7 @@ class Blob(Content):
TYPE = "blob"

digest = models.TextField(db_index=True)
data = models.BinaryField(blank=True)

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"
Expand Down Expand Up @@ -138,10 +140,17 @@ def init_annotations(self, manifest_data=None):

def init_labels(self):
if self.config_blob:
config_artifact = self.config_blob._artifacts.get()

config_data, _ = get_content_data(config_artifact)
self.labels = config_data.get("config", {}).get("Labels") or {}
try:
# if config-blob is stored in the database
config = json.loads(self.config_blob.config_blob.instance.data)["config"]
if "Labels" in config.keys():
self.labels = config["Labels"] or {}
except json.decoder.JSONDecodeError:
# if not, we'll retrieve the config from file
config_artifact = self.config_blob._artifacts.get()

config_data, _ = get_content_data(config_artifact)
self.labels = config_data.get("config", {}).get("Labels") or {}

return bool(self.labels)

Expand Down
54 changes: 41 additions & 13 deletions pulp_container/app/registry_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@
validate_manifest,
)
from pulp_container.constants import (
ARTIFACTLESS_BLOB_SIZE,
EMPTY_BLOB,
MEDIA_TYPE,
SIGNATURE_API_EXTENSION_VERSION,
SIGNATURE_HEADER,
SIGNATURE_PAYLOAD_MAX_SIZE,
Expand Down Expand Up @@ -608,8 +610,11 @@ def get(self, request):
tag.name, tag.tagged_manifest, req_oss, req_architectures, manifests
)
for manifest, tagged in manifests.items():
with storage.open(manifest.config_blob._artifacts.get().file.name) as file:
raw_data = file.read()
if manifest.config_blob.data != "":
raw_data = manifest.config_blob.data
else:
with storage.open(manifest.config_blob._artifacts.get().file.name) as file:
raw_data = file.read()
config_data = json.loads(raw_data)
labels = config_data.get("config", {}).get("Labels")
if not labels:
Expand Down Expand Up @@ -785,19 +790,33 @@ def create_blob(self, artifact, digest):
except IntegrityError:
blob = models.Blob.objects.get(digest=digest)
blob.touch()
try:
blob_artifact = ContentArtifact(
artifact=artifact, content=blob, relative_path=digest
)
blob_artifact.save()
except IntegrityError:
# re-upload artifact in case it was previously removed.
ca = ContentArtifact.objects.get(content=blob, relative_path=digest)
if not ca.artifact:
ca.artifact = artifact
ca.save(update_fields=["artifact"])

blob_size = artifact.file.size
if blob_size > ARTIFACTLESS_BLOB_SIZE:
self._create_blob_content_artifact(artifact, blob, digest)
else:
with storage.open(artifact.file.name) as artifact_file:
raw_data = artifact_file.read()
blob.data = raw_data
blob.save()

return blob

def _create_blob_content_artifact(self, artifact, blob, digest):
"""
Blobs with size > ARTIFACTLESS_BLOB_SIZE are often layer blobs and
will be stored as artifacts.
"""
try:
blob_artifact = ContentArtifact(artifact=artifact, content=blob, relative_path=digest)
blob_artifact.save()
except IntegrityError:
# re-upload artifact in case it was previously removed.
ca = ContentArtifact.objects.get(content=blob, relative_path=digest)
if not ca.artifact:
ca.artifact = artifact
ca.save(update_fields=["artifact"])

def single_request_upload(self, request, path, repository, digest):
"""Monolithic upload."""
chunk = request.META["wsgi.input"]
Expand Down Expand Up @@ -987,6 +1006,15 @@ def handle_safe_method(self, request, path, pk):
except models.Blob.DoesNotExist:
raise BlobNotFound(digest=pk)

if blob.data:
media_type = MEDIA_TYPE.CONFIG_BLOB_OCI
headers = {
"Content-Type": media_type,
"Docker-Content-Digest": pk,
"Docker-Distribution-API-Version": "registry/2.0",
}
return Response(data=blob.data, headers=headers)

return redirects.issue_blob_redirect(blob)


Expand Down
4 changes: 4 additions & 0 deletions pulp_container/app/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from django.core.validators import URLValidator
from rest_framework import serializers

from pulpcore.app.serializers import fields
from pulpcore.plugin.models import (
Artifact,
ContentRedirectContentGuard,
Expand Down Expand Up @@ -126,6 +127,9 @@ class BlobSerializer(SingleArtifactContentSerializer):
"""

digest = serializers.CharField(help_text="sha256 of the Blob file")
artifact = fields.SingleContentArtifactField(
help_text=_("Artifact file representing the physical content"), required=False
)

class Meta:
fields = SingleArtifactContentSerializer.Meta.fields + ("digest",)
Expand Down
22 changes: 14 additions & 8 deletions pulp_container/app/tasks/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,21 @@ def get_or_create_blob(layer_json, manifest, path):
blob.touch()
except Blob.DoesNotExist:
layer_file_name = os.path.join(path, layer_json["digest"][7:])
layer_artifact = Artifact.init_and_validate(layer_file_name)
layer_artifact.save()
blob = Blob(digest=layer_json["digest"])
blob.save()
ContentArtifact(
artifact=layer_artifact, content=blob, relative_path=layer_json["digest"]
).save()
if layer_json["mediaType"] != MEDIA_TYPE.CONFIG_BLOB_OCI:
BlobManifest(manifest=manifest, manifest_blob=blob).save()
if layer_json["mediaType"] in [MEDIA_TYPE.CONFIG_BLOB_OCI, MEDIA_TYPE.CONFIG_BLOB]:
with open(layer_file_name, "r") as content_file:
raw_data = content_file.read()
blob.data = raw_data.encode("utf-8")
blob.save()
else:
layer_artifact = Artifact.init_and_validate(layer_file_name)
layer_artifact.save()
blob.save()
ContentArtifact(
artifact=layer_artifact, content=blob, relative_path=layer_json["digest"]
).save()
BlobManifest(manifest=manifest, manifest_blob=blob).save()

return blob


Expand Down
22 changes: 19 additions & 3 deletions pulp_container/app/tasks/sync_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,9 @@ async def handle_blobs(self, manifest_dc, content_data):
blob_dc = self.create_blob(layer)
manifest_dc.extra_data["blob_dcs"].append(blob_dc)
await self.put(blob_dc)
layer = content_data.get("config", None)
if layer:
blob_dc = self.create_blob(layer, deferred_download=False)
config = content_data.get("config", None)
if config:
blob_dc = await self.create_config_blob(config)
manifest_dc.extra_data["config_blob_dc"] = blob_dc
await self.put(blob_dc)

Expand Down Expand Up @@ -546,6 +546,22 @@ def create_blob(self, blob_data, deferred_download=True):

return blob_dc

async def create_config_blob(self, blob_data):
digest = blob_data.get("digest")
relative_url = "/v2/{name}/blobs/{digest}".format(
name=self.remote.namespaced_upstream_name, digest=digest
)
blob_url = urljoin(self.remote.url, relative_url)

downloader = self.remote.get_downloader(url=blob_url)
response = await downloader.run(extra_data={"headers": V2_ACCEPT_HEADERS})
with open(response.path, "rb") as content_file:
raw_data = content_file.read()
config_blob = Blob(digest=digest, data=raw_data)
config_blob_dc = DeclarativeContent(content=config_blob)

return config_blob_dc

async def create_signatures(self, man_dc, signature_source):
"""
Create signature declarative contents.
Expand Down
4 changes: 4 additions & 0 deletions pulp_container/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@
MEGABYTE = 1_000_000
SIGNATURE_PAYLOAD_MAX_SIZE = 4 * MEGABYTE

# we defined the following size based on the average container config size of some
# ubi and bootable images
ARTIFACTLESS_BLOB_SIZE = 22000

SIGNATURE_API_EXTENSION_VERSION = 2


Expand Down

0 comments on commit 8868dfe

Please sign in to comment.