Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hyagi committed Apr 25, 2024
1 parent 70fb62f commit dd21369
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 34 deletions.
2 changes: 1 addition & 1 deletion pulp_container/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def init_labels(self):
if self.config_blob.data:
# if config-blob is stored in the database
blob_data = json.loads(self.config_blob.data)
config = blob_data.get("config",{})
config = blob_data.get("config", {})
if "Labels" in config.keys():
self.labels = config["Labels"] or {}
else:
Expand Down
10 changes: 5 additions & 5 deletions pulp_container/app/tasks/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
Manifest,
Tag,
)
from pulp_container.constants import MEDIA_TYPE
from pulp_container.constants import DB_BLOB_SIZE, MEDIA_TYPE
from pulpcore.plugin.models import Artifact, ContentArtifact, Content


Expand All @@ -35,10 +35,10 @@ def get_or_create_blob(layer_json, manifest, path):
except Blob.DoesNotExist:
layer_file_name = os.path.join(path, layer_json["digest"][7:])
blob = Blob(digest=layer_json["digest"])
if layer_json["mediaType"] in [MEDIA_TYPE.CONFIG_BLOB_OCI, MEDIA_TYPE.CONFIG_BLOB]:
with open(layer_file_name, "r") as content_file:
raw_data = content_file.read()
blob.data = raw_data.encode("utf-8")

if os.path.getsize(layer_file_name) < DB_BLOB_SIZE:
with open(layer_file_name, "rb") as content_file:
blob.data = content_file.read()
blob.save()
else:
layer_artifact = Artifact.init_and_validate(layer_file_name)
Expand Down
63 changes: 35 additions & 28 deletions pulp_container/app/tasks/sync_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from pulpcore.plugin.stages import DeclarativeArtifact, DeclarativeContent, Stage, ContentSaver

from pulp_container.constants import (
DB_BLOB_SIZE,
MEDIA_TYPE,
SIGNATURE_API_EXTENSION_VERSION,
SIGNATURE_HEADER,
Expand Down Expand Up @@ -350,12 +351,12 @@ async def handle_blobs(self, manifest_dc, content_data):
for layer in content_data.get("layers") or content_data.get("fsLayers"):
if not self._include_layer(layer):
continue
blob_dc = self.create_blob(layer)
blob_dc = await self.create_blob(layer)
manifest_dc.extra_data["blob_dcs"].append(blob_dc)
await self.put(blob_dc)
config = content_data.get("config", None)
if config:
blob_dc = await self.create_config_blob(config)
blob_dc = await self.create_blob(config, False, False)
manifest_dc.extra_data["config_blob_dc"] = blob_dc
await self.put(blob_dc)

Expand Down Expand Up @@ -518,7 +519,7 @@ async def create_listed_manifest(self, manifest_data):
)
return {"manifest_dc": man_dc, "platform": platform, "content_data": content_data}

def create_blob(self, blob_data, deferred_download=True):
async def create_blob(self, blob_data, deferred_download=True, is_layer=True):
"""
Create blob.
Expand All @@ -529,38 +530,44 @@ def create_blob(self, blob_data, deferred_download=True):
"""
digest = blob_data.get("digest") or blob_data.get("blobSum")
blob_artifact = Artifact(sha256=digest[len("sha256:") :])
blob = Blob(digest=digest)
relative_url = "/v2/{name}/blobs/{digest}".format(
name=self.remote.namespaced_upstream_name, digest=digest
)
blob_url = urljoin(self.remote.url, relative_url)
da = DeclarativeArtifact(
artifact=blob_artifact,
url=blob_url,
relative_path=digest,
remote=self.remote,
deferred_download=deferred_download and self.deferred_download,
)
blob_dc = DeclarativeContent(content=blob, d_artifacts=[da])
blob_size, raw_data = await self._get_blob_size_data(blob_data, blob_url, is_layer)

if blob_size > DB_BLOB_SIZE:
blob_artifact = Artifact(sha256=digest[len("sha256:") :])
blob = Blob(digest=digest)
da = DeclarativeArtifact(
artifact=blob_artifact,
url=blob_url,
relative_path=digest,
remote=self.remote,
deferred_download=deferred_download and self.deferred_download,
)
blob_dc = DeclarativeContent(content=blob, d_artifacts=[da])
else:
# if this is a layer with size < DB_BLOB_SIZE, we will need to download it
if is_layer:
downloader = self.remote.get_downloader(url=blob_url)
response = await downloader.run(extra_data={"headers": V2_ACCEPT_HEADERS})
with open(response.path, "rb") as content_file:
raw_data = content_file.read()
artifactless_blob = Blob(digest=digest, data=raw_data)
blob_dc = DeclarativeContent(content=artifactless_blob)

return blob_dc

async def create_config_blob(self, blob_data):
digest = blob_data.get("digest")
relative_url = "/v2/{name}/blobs/{digest}".format(
name=self.remote.namespaced_upstream_name, digest=digest
)
blob_url = urljoin(self.remote.url, relative_url)

downloader = self.remote.get_downloader(url=blob_url)
response = await downloader.run(extra_data={"headers": V2_ACCEPT_HEADERS})
with open(response.path, "rb") as content_file:
raw_data = content_file.read()
config_blob = Blob(digest=digest, data=raw_data)
config_blob_dc = DeclarativeContent(content=config_blob)

return config_blob_dc
async def _get_blob_size_data(self, blob_data, blob_url, is_layer):
if is_layer:
return blob_data.get("size"), None
else:
downloader = self.remote.get_downloader(url=blob_url)
response = await downloader.run(extra_data={"headers": V2_ACCEPT_HEADERS})
with open(response.path, "rb") as content_file:
raw_data = content_file.read()
return len(raw_data), raw_data

async def create_signatures(self, man_dc, signature_source):
"""
Expand Down

0 comments on commit dd21369

Please sign in to comment.