Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a filesystem abstraction to buildcache pruner #829

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
129 changes: 5 additions & 124 deletions images/ci-prune-buildcache/buildcache.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,15 @@
import boto3
from botocore.config import Config
from urllib.parse import urlparse
from datetime import datetime
import copy
import helper
from io import StringIO
import math
import multiprocessing.pool as pool
import time


def s3_resource():
config = Config(
retries={
"mode": "adaptive",
"max_attempts": 10,
}
)
return boto3.resource("s3", config=config)


class Object:
def __init__(self, bucket_name: str, key: str, last_modified):
def __init__(self, bucket_name: str, key: str, last_modified, size = 0):
self.bucket_name = bucket_name
self.key = key
self.size = size
if isinstance(last_modified, datetime):
self.last_modified = last_modified
else:
Expand All @@ -38,23 +25,6 @@ def endswith(self, exts):
return self.key.endswith(exts)


class S3Object(Object):
def delete(self):
print(f"Deleting s3://{self.bucket_name}/{self.key}")
# s3 = s3_resource()
# obj = s3.Object(self.bucket_name, self.key)
# response = obj.delete()
# return response["DeleteMarker"]
return False

def get(self):
s3 = s3_resource()
bucket = s3.Bucket(self.bucket_name)
s3obj = bucket.Object(self.key)
response = s3obj.get()
return response["Body"]


class BuildCache:
def __init__(self, url: str):
self.url = urlparse(url)
Expand All @@ -63,11 +33,7 @@ def __init__(self, url: str):
def snapshot(self):
self._listed = []
for obj in self._list():
self._listed.append(self.object_type()(
obj.bucket_name,
obj.key,
obj.last_modified,
))
self._listed.append(copy.deepcopy(obj))

return self._listed

Expand Down Expand Up @@ -106,89 +72,4 @@ def object_type(self):
return Object

def get_index(self):
key = f"{self.url.path}index.json".lstrip("/")
obj = next(self.list(key=key))
print("Fetching: ", key, obj)
try:
response = obj.get()
index = helper.load_json(response)
except Exception as e:
print("Could not fetch index: ", key)
raise e

return index, obj


class S3BuildCache(BuildCache):
def object_type(self):
return S3Object

def delete(self, keys : list = [], processes: int = 1, per_page: int = 1000):
"""Delete the listed keys from the buildcache, by default this will
delete all of the keys that exist in the buildcache.

Arguments:
keys (list(str), optional): list of keys to delete (default: all keys)
processes (int, optional): number of processes to use when calling delete
(default: 1, max: <system dependent>)
per_page (int, optional): The max number of items to delete at a time (default: 1000, max: 1000)
"""

if not keys:
keys = [obj.key for obj in self.list()]

# Get the keys to delete that exists in this buildcache
prefix = self.url.path.lstrip("/")
delete_keys = [{"Key": k} for k in keys if prefix in k]

# Nothing to delete
if not delete_keys:
return [], []

max_del = 1000
per_page = min(max_del, per_page)
nkeys = len(delete_keys)
stride = math.ceil(nkeys / per_page)

# Auto detecte number of threads for per_page
if processes < 1:
processes = stride

# Only spawn as many processes as needed
processes = min(stride, processes)

s3 = s3_resource()
bucket = s3.Bucket(self.url.netloc)

def delete_keys_f(i: int):
# time.sleep(1)
return bucket.delete_objects(Delete={
"Objects": delete_keys[i:nkeys:stride],
"Quiet": True,
}
)

failures = []
errors = []
if processes > 1:
with pool.ThreadPool(processes) as tp:
for response in tp.imap_unordered(helper.star(delete_keys_f), [(i,) for i in range(stride)]):
failures.extend([obj for obj in response.get("Deleted", []) if not obj["DeleteMarker"]])
errors.extend(response.get("Errors", []))
else:
for i in range(stride):
response = delete_keys_f(i)
failures.extend([obj for obj in response.get("Deleted", []) if not obj["DeleteMarker"]])
errors.extend(response.get("Errors", []))

return errors, failures

def _list(self):
s3 = s3_resource()
bucket = s3.Bucket(self.url.netloc)
for obj in bucket.objects.filter(Prefix=self.url.path.lstrip("/")):
yield S3Object(
obj.bucket_name,
obj.key,
obj.last_modified,
)
raise Exception("Must implement per class")
35 changes: 35 additions & 0 deletions images/ci-prune-buildcache/buildcache_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env spack-python

# copy of https://github.com/sandialabs/spack-manager/blob/main/manager/manager_cmds/cache_query.py
# as a stand alone script
# query the buildcache like `spack find`

import argparse

import spack.binary_distribution as bindist
import spack.cmd as cmd
import spack.cmd.find


parser = argparse.ArgumentParser()
spack.cmd.find.setup_parser(parser)

def cache_search(self, **kwargs):
qspecs = spack.cmd.parse_specs(self.values)
search_engine = bindist.BinaryCacheQuery(True)
results = {}
for q in qspecs:
hits = search_engine(str(q), **kwargs)
for hit in hits:
results[hit.dag_hash()] = hit
return sorted(results.values())

spack.cmd.common.arguments.ConstraintAction._specs = cache_search

def find(parser, args):
spack.cmd.find.find(parser, args)

if __name__ == "__main__":
args = parser.parse_args()
find(parser, args)

165 changes: 165 additions & 0 deletions images/ci-prune-buildcache/cache-prune.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
#!/usr/bin/env python3

import argparse
import helper
import math
import os
import subprocess

from datetime import datetime, timedelta, timezone
from fs_buildcache import FileSystemBuildCache
from pruner import pruner_factory, PRUNER_TYPES

def convert_size(size_bytes):
if size_bytes == 0:
return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
size = round(size_bytes / p, 2)
return f"{size} {size_name[i]}"

def configure_parser():
parser = argparse.ArgumentParser()
parser.add_argument(
"path",
help="location of the buildcache",
)
parser.add_argument(
"--start-date",
help="Starting date for pruning window",
default=datetime.now(timezone.utc).isoformat(),
)
parser.add_argument(
"--since-days",
help="Ending date for pruning window",
type=int,
default=0
)
parser.add_argument(
"-j", "--nprocs",
help="Numer of process to use",
type=int,
metavar="N",
default=1
)
parser.add_argument(
"--prune-hashes",
help="json file with hash list to prune",
type=argparse.FileType("r"),
metavar="prune.json",
)
parser.add_argument(
"--keep-hashes",
help="json file with hash list to keep",
type=argparse.FileType("r"),
metavar="keep.json",
)
parser.add_argument(
"--keep-specs",
help="specs to preserve in the cache (includes dependencies)",
nargs="+",
)
parser.add_argument(
"-o", "--output-dir",
default=os.getcwd(),
help="output directory",
)
parser.add_argument(
"-S", "--suffix",
help="logging file suffix",
)
parser.add_argument(
"-D", "--delete",
help="attempt to delete the files",
action="store_true",
)
parser.add_argument(
"-m", "--method",
help="pruning method to use on the cache",
choices = list(PRUNER_TYPES.keys()),
default = "direct",
)

return parser


def get_cache_hashes_from_specs(*args, **kwargs):
command = ['spack-python', 'buildcache_query.py', '--format', '{hash}']
command.extend([*args])
result = subprocess.check_output(command, universal_newlines=True).strip().split()
return result

def get_keep_hashes(args: argparse.Namespace):
keep_hashes=[]
if args.keep_hashes:
keep_hashes.extend(helper.load_json(args.keep_hashes))
if args.keep_specs:
keep_hashes.extend(get_cache_hashes_from_specs("--deps", *args.keep_specs))
return keep_hashes

if __name__=="__main__":
args = configure_parser().parse_args()

os.makedirs(args.output_dir, exist_ok=True)


if not args.suffix:
log_suffix = "_" + args.method
else:
log_suffix = args.suffix

keep_hashes=get_keep_hashes(args)

cache = FileSystemBuildCache(args.path)

now = datetime.fromisoformat(args.start_date)
time_window = now - timedelta(days=args.since_days)

# combine start date and delta for passing to pruners
args.start_date = time_window

pruner = pruner_factory(cache, args.method, args, keep_hashes, since=time_window)

print("-- Computing prunable hashes")
prunable_hashes = []
if args.prune_hashes:
prunable_hashes.extend( helper.load_json(args.prune_hashes))
else:
prunable_hashes.extend(pruner.determine_prunable_hashes())

prune_hash_file = f"{args.output_dir}/prunable-hashes-{log_suffix}.txt"
with open(f"{prune_hash_file}", "w") as fd:
fd.writelines("\n".join(prunable_hashes))

if prunable_hashes:
print("-- Finding prunable files")

pruned = pruner.prune(prunable_hashes)

pruned_keys = [ obj.key for obj in pruned ]

print(f"-- Found prunable {len(pruned)} files in buildcache")
total_size_human = convert_size(sum(obj.size for obj in pruned))
print(f"-- Total Size of prunable files is {total_size_human}")

prune_list_file = f"{args.output_dir}/prunable-files-{log_suffix}.txt"
with open(f"{prune_list_file}", "w") as fd:
fd.writelines("\n".join(pruned_keys))
else:
print("-- Nothing to prune")

if args.delete:
print("-- Pruning build cache")
err, fail = cache.delete(pruned_keys, processes=args.nprocs)
fname_template = f"{args.output_dir}/delete-{{0}}-{log_suffix}.json"
if err:
print(f"errors found")
with open(fname_template.format("errors")) as fd:
helper.write_json(fd, err)

if fail:
print(f"failures found")
with open(fname_template.format("failures")) as fd:
helper.write_json(fd, fail)

4 changes: 2 additions & 2 deletions images/ci-prune-buildcache/ci_buildcache_prune.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from io import StringIO
from urllib.parse import urlparse

import buildcache
from s3_buildcache import S3BuildCache
import helper
from pruner import DirectPruner, IndexPruner, OrphanPruner

Expand Down Expand Up @@ -371,7 +371,7 @@ def configure_parser():
else:
url = f"{BUILDCACHE_URL}/{stack}/build_cache/"

bc = buildcache.S3BuildCache(url)
bc = S3BuildCache(url)

snapshot_key = f"s3-snapshot-{stack}"
if cache.exists(snapshot_key):
Expand Down
Loading