Skip to content

Commit

Permalink
Add the last call changes
Browse files Browse the repository at this point in the history
[noissue]
  • Loading branch information
lubosmj committed Jan 16, 2024
1 parent 981e412 commit c1c83df
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 47 deletions.
7 changes: 6 additions & 1 deletion docs/workflows/host.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ Docker Output::
Pull-Through Caching
--------------------

.. warning::
This feature is provided as a tech preview and could change in backwards incompatible
ways in the future.

The Pull-Through Caching feature offers an alternative way to host content by leveraging a **remote
registry** as the source of truth. This eliminates the need for in-advance repository
synchronization because Pulp acts as a **caching proxy** and stores images, after they have been
Expand Down Expand Up @@ -154,4 +158,5 @@ ensures a more reliable container deployment system in production environments.
a "debian" repository with the "10" tag is created. Subsequent pulls such as "debian:11"
result in a new repository version that incorporates both tags while removing the previous
version. Repositories and their content remain manageable through standard Pulp API endpoints.
With that, no content can be pushed to these repositories.
With that, no content can be pushed to these repositories. Furthermore, these repositories are
public by default.
48 changes: 29 additions & 19 deletions pulp_container/app/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ async def get_tag(self, request):
if not repository_version:
raise PathNotResolved(tag_name)

distribution = await distribution.acast()
try:
tag = await Tag.objects.select_related("tagged_manifest").aget(
pk__in=await sync_to_async(repository_version.get_content)(), name=tag_name
)
except ObjectDoesNotExist:
distribution = await distribution.acast()
if distribution.remote_id and distribution.pull_through_distribution_id:
pull_downloader = await PullThroughDownloader.create(
distribution, repository_version, path, tag_name
Expand All @@ -144,22 +144,25 @@ async def get_tag(self, request):
return web.Response(text=raw_manifest, headers=headers)
else:
raise PathNotResolved(tag_name)
else:
if distribution.remote_id and distribution.pull_through_distribution_id:
# check if the content was updated on the remote and stream it back
remote = await distribution.remote.acast()
relative_url = "/v2/{name}/manifests/{tag}".format(
name=remote.namespaced_upstream_name, tag=tag_name
)
tag_url = urljoin(remote.url, relative_url)
downloader = remote.get_downloader(url=tag_url)
try:
response = await downloader.run(
extra_data={"headers": V2_ACCEPT_HEADERS, "http_method": "head"}
)
except ClientResponseError:
raise PathNotResolved(path)

# check if the content is pulled via the pull-through caching distribution;
# if yes, update the respective manifest from the remote when its digest changed
if distribution.remote_id and distribution.pull_through_distribution_id:
remote = await distribution.remote.acast()
relative_url = "/v2/{name}/manifests/{tag}".format(
name=remote.namespaced_upstream_name, tag=tag_name
)
tag_url = urljoin(remote.url, relative_url)
downloader = remote.get_downloader(url=tag_url)
try:
response = await downloader.run(
extra_data={"headers": V2_ACCEPT_HEADERS, "http_method": "head"}
)
except ClientResponseError:
# the manifest is not available on the remote anymore
# but the old one is still stored in the database
pass
else:
digest = response.headers.get("docker-content-digest")
if tag.tagged_manifest.digest != digest:
pull_downloader = await PullThroughDownloader.create(
Expand Down Expand Up @@ -313,6 +316,8 @@ async def get_by_digest(self, request):
pull_downloader = await PullThroughDownloader.create(
distribution, repository_version, path, digest
)

# "/pulp/container/{path:.+}/{content:(blobs|manifests)}/sha256:{digest:.+}"
content_type = request.match_info["content"]
if content_type == "manifests":
raw_manifest, digest, media_type = await pull_downloader.download_manifest()
Expand All @@ -322,13 +327,15 @@ async def get_by_digest(self, request):
"Docker-Distribution-API-Version": "registry/2.0",
}
return web.Response(text=raw_manifest, headers=headers)
else:
elif content_type == "blobs":
# there might be a case where the client has all the manifest data in place
# and tries to download only missing blobs; because of that, only the reference
# to a remote blob is returned
# to a remote blob is returned (i.e., RemoteArtifact)
blob = await pull_downloader.init_remote_blob()
ca = await blob.contentartifact_set.afirst()
return await self._stream_content_artifact(request, web.StreamResponse(), ca)
else:
raise RuntimeError("Only blobs or manifests are supported by the parser.")
else:
raise PathNotResolved(path)
else:
Expand Down Expand Up @@ -419,6 +426,7 @@ async def run_manifest_downloader(self):
# it is necessary to pass this information back to the client
raise HTTPTooManyRequests()
else:
# TODO: do not mask out relevant errors, like HTTP 502
raise PathNotResolved(self.path)

return response
Expand Down Expand Up @@ -446,7 +454,9 @@ async def init_pending_content(self, digest, manifest_data, media_type, artifact

manifest = Manifest(
digest=digest,
schema_version=2,
schema_version=2
if manifest_data["mediaType"] in (MEDIA_TYPE.MANIFEST_V2, MEDIA_TYPE.MANIFEST_OCI)
else 1,
media_type=media_type,
config_blob=config_blob,
)
Expand Down
82 changes: 55 additions & 27 deletions pulp_container/app/registry_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,44 +295,46 @@ def get_drv_pull(self, path):
return distribution, distribution.repository, repository_version

def get_pull_through_drv(self, path):
root_cache_distribution = (
pull_through_cache_distribution = (
models.ContainerPullThroughDistribution.objects.annotate(path=Value(path))
.filter(path__startswith=F("base_path"))
.order_by("-base_path")
.first()
)
if not root_cache_distribution:
if not pull_through_cache_distribution:
raise RepositoryNotFound(name=path)

try:
with transaction.atomic():
cache_repository, _ = models.ContainerRepository.objects.get_or_create(
repository, _ = models.ContainerRepository.objects.get_or_create(
name=path, retain_repo_versions=1
)

remote_data = _get_pull_through_remote_data(root_cache_distribution)
upstream_name = path.split(root_cache_distribution.base_path, maxsplit=1)[1]
cache_remote, _ = models.ContainerRemote.objects.get_or_create(
remote_data = _get_pull_through_remote_data(pull_through_cache_distribution)
upstream_name = path.split(pull_through_cache_distribution.base_path, maxsplit=1)[1]
remote, _ = models.ContainerRemote.objects.get_or_create(
name=path,
upstream_name=upstream_name.strip("/"),
url=root_cache_distribution.remote.url,
url=pull_through_cache_distribution.remote.url,
**remote_data,
)

cache_distribution, _ = models.ContainerDistribution.objects.get_or_create(
# TODO: Propagate the user's permissions and private flag from the pull-through
# distribution to this distribution
distribution, _ = models.ContainerDistribution.objects.get_or_create(
name=path,
base_path=path,
remote=cache_remote,
repository=cache_repository,
remote=remote,
repository=repository,
)
except IntegrityError:
# some entities needed to be created, but their keys already exist in the database
# (e.g., a repository with the same name as the constructed path)
raise RepositoryNotFound(name=path)
else:
root_cache_distribution.distributions.add(cache_distribution)
pull_through_cache_distribution.distributions.add(distribution)

return cache_distribution, cache_repository, cache_repository.latest_version()
return distribution, repository, repository.latest_version()

def get_dr_push(self, request, path, create=False):
"""
Expand Down Expand Up @@ -1014,11 +1016,10 @@ def handle_safe_method(self, request, path, pk):
distribution = distribution.cast()
if distribution.remote and distribution.pull_through_distribution_id:
remote = distribution.remote.cast()
repository = distribution.repository.cast()
# issue a head request first to ensure that the content exists on the remote
# source; we want to prevent immediate "not found" error responses from
# content-app: 302 (api-app) -> 404 (content-app)
manifest = self.fetch_manifest(remote, repository_version, repository, pk)
manifest = self.fetch_manifest(remote, pk)
if manifest is None:
return redirects.redirect_to_content_app("manifests", pk)

Expand All @@ -1029,6 +1030,20 @@ def handle_safe_method(self, request, path, pk):
tag = models.Tag.objects.get(name=tag.name, tagged_manifest=manifest)
tag.touch()

add_content_units = self.get_content_units_to_add(manifest, tag)

dispatch(
add_and_remove,
exclusive_resources=[repository],
kwargs={
"repository_pk": str(repository.pk),
"add_content_units": add_content_units,
"remove_content_units": [],
},
immediate=True,
deferred=True,
)

return redirects.redirect_to_content_app("manifests", tag.name)
else:
raise ManifestNotFound(reference=pk)
Expand All @@ -1050,14 +1065,36 @@ def handle_safe_method(self, request, path, pk):
distribution = distribution.cast()
if distribution.remote and distribution.pull_through_distribution_id:
remote = distribution.remote.cast()
self.fetch_manifest(remote, repository_version, repository, pk)
self.fetch_manifest(remote, pk)
return redirects.redirect_to_content_app("manifests", pk)
else:
raise ManifestNotFound(reference=pk)

return redirects.issue_manifest_redirect(manifest)

def fetch_manifest(self, remote, repository, repository_version, pk):
def get_content_units_to_add(self, manifest, tag):
add_content_units = [str(tag.pk), str(manifest.pk)]
if manifest.media_type in (
models.MEDIA_TYPE.MANIFEST_LIST,
models.MEDIA_TYPE.INDEX_OCI,
):
for listed_manifest in manifest.listed_manifests:
add_content_units.append(str(listed_manifest.pk))
add_content_units.append(str(listed_manifest.config_blob_id))
add_content_units.extend(
listed_manifest.blobs.values_list("pk", flat=True)
)
elif manifest.media_type in (
models.MEDIA_TYPE.MANIFEST_V2,
models.MEDIA_TYPE.MANIFEST_OCI,
):
add_content_units.append(str(manifest.config_blob_id))
add_content_units.extend(manifest.blobs.values_list("pk", flat=True))
else:
add_content_units.extend(manifest.blobs.values_list("pk", flat=True))
return add_content_units

def fetch_manifest(self, remote, pk):
relative_url = "/v2/{name}/manifests/{pk}".format(
name=remote.namespaced_upstream_name, pk=pk
)
Expand All @@ -1068,21 +1105,12 @@ def fetch_manifest(self, remote, repository, repository_version, pk):
extra_data={"headers": V2_ACCEPT_HEADERS, "http_method": "head"}
)
except ClientResponseError as response_error:
try:
return models.Manifest.objects.get(digest=pk, pk__in=repository_version.content)
except models.Manifest.DoesNotExist:
try:
manifest = repository.pending_manifests.get(digest=pk)
manifest.touch()
return manifest
except models.Manifest.DoesNotExist:
pass

if response_error.status == 429:
# the client could request the manifest outside the docker hub pull limit;
# it is necessary to pass this information back to the client
raise Throttled()
else:
# TODO: do not mask out relevant errors, like HTTP 502
raise ManifestNotFound(reference=pk)
else:
digest = response.headers.get("docker-content-digest")
Expand Down Expand Up @@ -1325,7 +1353,7 @@ def head(self, request, path, pk=None):

def get(self, request, path, pk):
"""Return a signature identified by its sha256 checksum."""
_, repository, repository_version = self.get_drv_pull(path)
_, _, repository_version = self.get_drv_pull(path)

try:
manifest = models.Manifest.objects.get(digest=pk, pk__in=repository_version.content)
Expand Down

0 comments on commit c1c83df

Please sign in to comment.