From c1c83df7fbe318d1fb9541bb2ca99a2835a32017 Mon Sep 17 00:00:00 2001 From: Lubos Mjachky Date: Tue, 16 Jan 2024 23:13:56 +0100 Subject: [PATCH] Add the last call changes [noissue] --- docs/workflows/host.rst | 7 ++- pulp_container/app/registry.py | 48 ++++++++++------- pulp_container/app/registry_api.py | 82 ++++++++++++++++++++---------- 3 files changed, 90 insertions(+), 47 deletions(-) diff --git a/docs/workflows/host.rst b/docs/workflows/host.rst index 185cb4014..395c1002d 100644 --- a/docs/workflows/host.rst +++ b/docs/workflows/host.rst @@ -122,6 +122,10 @@ Docker Output:: Pull-Through Caching -------------------- +.. warning:: + This feature is provided as a tech preview and could change in backwards incompatible + ways in the future. + The Pull-Through Caching feature offers an alternative way to host content by leveraging a **remote registry** as the source of truth. This eliminates the need for in-advance repository synchronization because Pulp acts as a **caching proxy** and stores images, after they have been @@ -154,4 +158,5 @@ ensures a more reliable container deployment system in production environments. a "debian" repository with the "10" tag is created. Subsequent pulls such as "debian:11" result in a new repository version that incorporates both tags while removing the previous version. Repositories and their content remain manageable through standard Pulp API endpoints. - With that, no content can be pushed to these repositories. + With that, no content can be pushed to these repositories. Furthermore, these repositories are + public by default. diff --git a/pulp_container/app/registry.py b/pulp_container/app/registry.py index 0e229a939..c93723ec2 100644 --- a/pulp_container/app/registry.py +++ b/pulp_container/app/registry.py @@ -123,12 +123,12 @@ async def get_tag(self, request): if not repository_version: raise PathNotResolved(tag_name) + distribution = await distribution.acast() try: tag = await Tag.objects.select_related("tagged_manifest").aget( pk__in=await sync_to_async(repository_version.get_content)(), name=tag_name ) except ObjectDoesNotExist: - distribution = await distribution.acast() if distribution.remote_id and distribution.pull_through_distribution_id: pull_downloader = await PullThroughDownloader.create( distribution, repository_version, path, tag_name @@ -144,22 +144,25 @@ async def get_tag(self, request): return web.Response(text=raw_manifest, headers=headers) else: raise PathNotResolved(tag_name) - else: - if distribution.remote_id and distribution.pull_through_distribution_id: - # check if the content was updated on the remote and stream it back - remote = await distribution.remote.acast() - relative_url = "/v2/{name}/manifests/{tag}".format( - name=remote.namespaced_upstream_name, tag=tag_name - ) - tag_url = urljoin(remote.url, relative_url) - downloader = remote.get_downloader(url=tag_url) - try: - response = await downloader.run( - extra_data={"headers": V2_ACCEPT_HEADERS, "http_method": "head"} - ) - except ClientResponseError: - raise PathNotResolved(path) + # check if the content is pulled via the pull-through caching distribution; + # if yes, update the respective manifest from the remote when its digest changed + if distribution.remote_id and distribution.pull_through_distribution_id: + remote = await distribution.remote.acast() + relative_url = "/v2/{name}/manifests/{tag}".format( + name=remote.namespaced_upstream_name, tag=tag_name + ) + tag_url = urljoin(remote.url, relative_url) + downloader = remote.get_downloader(url=tag_url) + try: + response = await downloader.run( + extra_data={"headers": V2_ACCEPT_HEADERS, "http_method": "head"} + ) + except ClientResponseError: + # the manifest is not available on the remote anymore + # but the old one is still stored in the database + pass + else: digest = response.headers.get("docker-content-digest") if tag.tagged_manifest.digest != digest: pull_downloader = await PullThroughDownloader.create( @@ -313,6 +316,8 @@ async def get_by_digest(self, request): pull_downloader = await PullThroughDownloader.create( distribution, repository_version, path, digest ) + + # "/pulp/container/{path:.+}/{content:(blobs|manifests)}/sha256:{digest:.+}" content_type = request.match_info["content"] if content_type == "manifests": raw_manifest, digest, media_type = await pull_downloader.download_manifest() @@ -322,13 +327,15 @@ async def get_by_digest(self, request): "Docker-Distribution-API-Version": "registry/2.0", } return web.Response(text=raw_manifest, headers=headers) - else: + elif content_type == "blobs": # there might be a case where the client has all the manifest data in place # and tries to download only missing blobs; because of that, only the reference - # to a remote blob is returned + # to a remote blob is returned (i.e., RemoteArtifact) blob = await pull_downloader.init_remote_blob() ca = await blob.contentartifact_set.afirst() return await self._stream_content_artifact(request, web.StreamResponse(), ca) + else: + raise RuntimeError("Only blobs or manifests are supported by the parser.") else: raise PathNotResolved(path) else: @@ -419,6 +426,7 @@ async def run_manifest_downloader(self): # it is necessary to pass this information back to the client raise HTTPTooManyRequests() else: + # TODO: do not mask out relevant errors, like HTTP 502 raise PathNotResolved(self.path) return response @@ -446,7 +454,9 @@ async def init_pending_content(self, digest, manifest_data, media_type, artifact manifest = Manifest( digest=digest, - schema_version=2, + schema_version=2 + if manifest_data["mediaType"] in (MEDIA_TYPE.MANIFEST_V2, MEDIA_TYPE.MANIFEST_OCI) + else 1, media_type=media_type, config_blob=config_blob, ) diff --git a/pulp_container/app/registry_api.py b/pulp_container/app/registry_api.py index 2cd9c2087..e971b1ac8 100644 --- a/pulp_container/app/registry_api.py +++ b/pulp_container/app/registry_api.py @@ -295,44 +295,46 @@ def get_drv_pull(self, path): return distribution, distribution.repository, repository_version def get_pull_through_drv(self, path): - root_cache_distribution = ( + pull_through_cache_distribution = ( models.ContainerPullThroughDistribution.objects.annotate(path=Value(path)) .filter(path__startswith=F("base_path")) .order_by("-base_path") .first() ) - if not root_cache_distribution: + if not pull_through_cache_distribution: raise RepositoryNotFound(name=path) try: with transaction.atomic(): - cache_repository, _ = models.ContainerRepository.objects.get_or_create( + repository, _ = models.ContainerRepository.objects.get_or_create( name=path, retain_repo_versions=1 ) - remote_data = _get_pull_through_remote_data(root_cache_distribution) - upstream_name = path.split(root_cache_distribution.base_path, maxsplit=1)[1] - cache_remote, _ = models.ContainerRemote.objects.get_or_create( + remote_data = _get_pull_through_remote_data(pull_through_cache_distribution) + upstream_name = path.split(pull_through_cache_distribution.base_path, maxsplit=1)[1] + remote, _ = models.ContainerRemote.objects.get_or_create( name=path, upstream_name=upstream_name.strip("/"), - url=root_cache_distribution.remote.url, + url=pull_through_cache_distribution.remote.url, **remote_data, ) - cache_distribution, _ = models.ContainerDistribution.objects.get_or_create( + # TODO: Propagate the user's permissions and private flag from the pull-through + # distribution to this distribution + distribution, _ = models.ContainerDistribution.objects.get_or_create( name=path, base_path=path, - remote=cache_remote, - repository=cache_repository, + remote=remote, + repository=repository, ) except IntegrityError: # some entities needed to be created, but their keys already exist in the database # (e.g., a repository with the same name as the constructed path) raise RepositoryNotFound(name=path) else: - root_cache_distribution.distributions.add(cache_distribution) + pull_through_cache_distribution.distributions.add(distribution) - return cache_distribution, cache_repository, cache_repository.latest_version() + return distribution, repository, repository.latest_version() def get_dr_push(self, request, path, create=False): """ @@ -1014,11 +1016,10 @@ def handle_safe_method(self, request, path, pk): distribution = distribution.cast() if distribution.remote and distribution.pull_through_distribution_id: remote = distribution.remote.cast() - repository = distribution.repository.cast() # issue a head request first to ensure that the content exists on the remote # source; we want to prevent immediate "not found" error responses from # content-app: 302 (api-app) -> 404 (content-app) - manifest = self.fetch_manifest(remote, repository_version, repository, pk) + manifest = self.fetch_manifest(remote, pk) if manifest is None: return redirects.redirect_to_content_app("manifests", pk) @@ -1029,6 +1030,20 @@ def handle_safe_method(self, request, path, pk): tag = models.Tag.objects.get(name=tag.name, tagged_manifest=manifest) tag.touch() + add_content_units = self.get_content_units_to_add(manifest, tag) + + dispatch( + add_and_remove, + exclusive_resources=[repository], + kwargs={ + "repository_pk": str(repository.pk), + "add_content_units": add_content_units, + "remove_content_units": [], + }, + immediate=True, + deferred=True, + ) + return redirects.redirect_to_content_app("manifests", tag.name) else: raise ManifestNotFound(reference=pk) @@ -1050,14 +1065,36 @@ def handle_safe_method(self, request, path, pk): distribution = distribution.cast() if distribution.remote and distribution.pull_through_distribution_id: remote = distribution.remote.cast() - self.fetch_manifest(remote, repository_version, repository, pk) + self.fetch_manifest(remote, pk) return redirects.redirect_to_content_app("manifests", pk) else: raise ManifestNotFound(reference=pk) return redirects.issue_manifest_redirect(manifest) - def fetch_manifest(self, remote, repository, repository_version, pk): + def get_content_units_to_add(self, manifest, tag): + add_content_units = [str(tag.pk), str(manifest.pk)] + if manifest.media_type in ( + models.MEDIA_TYPE.MANIFEST_LIST, + models.MEDIA_TYPE.INDEX_OCI, + ): + for listed_manifest in manifest.listed_manifests: + add_content_units.append(str(listed_manifest.pk)) + add_content_units.append(str(listed_manifest.config_blob_id)) + add_content_units.extend( + listed_manifest.blobs.values_list("pk", flat=True) + ) + elif manifest.media_type in ( + models.MEDIA_TYPE.MANIFEST_V2, + models.MEDIA_TYPE.MANIFEST_OCI, + ): + add_content_units.append(str(manifest.config_blob_id)) + add_content_units.extend(manifest.blobs.values_list("pk", flat=True)) + else: + add_content_units.extend(manifest.blobs.values_list("pk", flat=True)) + return add_content_units + + def fetch_manifest(self, remote, pk): relative_url = "/v2/{name}/manifests/{pk}".format( name=remote.namespaced_upstream_name, pk=pk ) @@ -1068,21 +1105,12 @@ def fetch_manifest(self, remote, repository, repository_version, pk): extra_data={"headers": V2_ACCEPT_HEADERS, "http_method": "head"} ) except ClientResponseError as response_error: - try: - return models.Manifest.objects.get(digest=pk, pk__in=repository_version.content) - except models.Manifest.DoesNotExist: - try: - manifest = repository.pending_manifests.get(digest=pk) - manifest.touch() - return manifest - except models.Manifest.DoesNotExist: - pass - if response_error.status == 429: # the client could request the manifest outside the docker hub pull limit; # it is necessary to pass this information back to the client raise Throttled() else: + # TODO: do not mask out relevant errors, like HTTP 502 raise ManifestNotFound(reference=pk) else: digest = response.headers.get("docker-content-digest") @@ -1325,7 +1353,7 @@ def head(self, request, path, pk=None): def get(self, request, path, pk): """Return a signature identified by its sha256 checksum.""" - _, repository, repository_version = self.get_drv_pull(path) + _, _, repository_version = self.get_drv_pull(path) try: manifest = models.Manifest.objects.get(digest=pk, pk__in=repository_version.content)