From dca93f9c392a89776ca5e1988695937e879c173f Mon Sep 17 00:00:00 2001 From: David Farkas Date: Wed, 17 Jan 2018 14:01:27 +0100 Subject: [PATCH] Add generic storage --- api/config.py | 11 + api/download.py | 49 ++-- api/files.py | 220 ++++++++------ api/handlers/listhandler.py | 65 +++-- api/handlers/refererhandler.py | 38 +-- api/jobs/handlers.py | 8 +- api/placer.py | 88 +++--- api/upload.py | 41 ++- api/util.py | 21 +- bin/oneoffs/migrate_storage.py | 216 ++++++++++++++ bin/oneoffs/remove_cas.py | 238 --------------- requirements.txt | 1 + tests/bin/run-tests-ubuntu.sh | 2 + tests/integration_tests/python/conftest.py | 33 ++- .../integration_tests/python/test_download.py | 9 +- tests/integration_tests/python/test_jobs.py | 2 + .../python/test_migrate_storage.py | 274 ++++++++++++++++++ 17 files changed, 812 insertions(+), 504 deletions(-) create mode 100644 bin/oneoffs/migrate_storage.py delete mode 100755 bin/oneoffs/remove_cas.py create mode 100644 tests/integration_tests/python/test_migrate_storage.py diff --git a/api/config.py b/api/config.py index 74657503d..11d35c585 100644 --- a/api/config.py +++ b/api/config.py @@ -7,6 +7,8 @@ import datetime import elasticsearch +from fs import open_fs + from . import util from .dao.dbutil import try_replace_one @@ -63,6 +65,7 @@ 'db_server_selection_timeout': '3000', 'data_path': os.path.join(os.path.dirname(__file__), '../persistent/data'), 'elasticsearch_host': 'localhost:9200', + 'fs_url': 'osfs://' + os.path.join(os.path.dirname(__file__), '../persistent/data') }, } @@ -323,3 +326,11 @@ def mongo_pipeline(table, pipeline): def get_auth(auth_type): return get_config()['auth'][auth_type] + + +# Storage configuration +fs = open_fs(__config['persistent']['fs_url']) +signed_url_available = ['GCSFS'] + +legacy_fs = open_fs('osfs://' + __config['persistent']['data_path']) +support_legacy_fs = True diff --git a/api/download.py b/api/download.py index 31a723a26..366103180 100644 --- a/api/download.py +++ b/api/download.py @@ -1,11 +1,13 @@ import os import bson import pytz -import os.path import tarfile import datetime import cStringIO +import fs.path +import fs.errors + from .web import base from .web.request import AccessType from . import config, files, util, validators @@ -31,7 +33,7 @@ def _filter_check(property_filter, property_values): class Download(base.RequestHandler): - def _append_targets(self, targets, cont_name, container, prefix, total_size, total_cnt, data_path, filters): + def _append_targets(self, targets, cont_name, container, prefix, total_size, total_cnt, filters): for f in container.get('files', []): if filters: filtered = True @@ -45,10 +47,9 @@ def _append_targets(self, targets, cont_name, container, prefix, total_size, tot break if filtered: continue - file_path = files.get_file_abs_path(f.get('_id', '')) - if not util.file_exists(file_path): - file_path = os.path.join(data_path, util.path_from_hash(f['hash'])) - if util.file_exists(file_path): # silently skip missing files + + file_path, _ = files.get_valid_file(f) + if file_path: # silently skip missing files if cont_name == 'analyses': targets.append((file_path, prefix + '/' + ('input' if f.get('input') else 'output') + '/' + f['name'], cont_name, str(container.get('_id')), f['size'])) else: @@ -60,7 +61,6 @@ def _append_targets(self, targets, cont_name, container, prefix, total_size, tot return total_size, total_cnt def _bulk_preflight_archivestream(self, file_refs): - data_path = config.get_item('persistent', 'data_path') arc_prefix = self.get_param('prefix', 'scitran') file_cnt = 0 total_size = 0 @@ -96,11 +96,8 @@ def _bulk_preflight_archivestream(self, file_refs): log.warn("Expected file {} on Container {} {} to exist but it is missing. File will be skipped in download.".format(filename, cont_name, cont_id)) continue - file_id = file_obj.get('_id', '') - file_path = files.get_file_abs_path(file_id) if file_id else '' - if not file_id or not os.path.exists(file_path): - file_path = os.path.join(data_path, util.path_from_hash(file_obj['hash'])) - if os.path.exists(file_path): # silently skip missing files + file_path, _ = files.get_valid_file(file_obj) + if file_path: # silently skip missing files targets.append((file_path, cont_name+'/'+cont_id+'/'+file_obj['name'], cont_name, cont_id, file_obj['size'])) total_size += file_obj['size'] file_cnt += 1 @@ -115,7 +112,6 @@ def _bulk_preflight_archivestream(self, file_refs): def _preflight_archivestream(self, req_spec, collection=None): - data_path = config.get_item('persistent', 'data_path') arc_prefix = self.get_param('prefix', 'scitran') file_cnt = 0 total_size = 0 @@ -140,7 +136,7 @@ def _preflight_archivestream(self, req_spec, collection=None): continue prefix = '/'.join([arc_prefix, project['group'], project['label']]) - total_size, file_cnt = self._append_targets(targets, 'projects', project, prefix, total_size, file_cnt, data_path, req_spec.get('filters')) + total_size, file_cnt = self._append_targets(targets, 'projects', project, prefix, total_size, file_cnt, req_spec.get('filters')) sessions = config.db.sessions.find({'project': item_id, 'deleted': {'$exists': False}}, ['label', 'files', 'uid', 'timestamp', 'timezone', 'subject']) session_dict = {session['_id']: session for session in sessions} @@ -161,19 +157,19 @@ def _preflight_archivestream(self, req_spec, collection=None): for code, subject in subject_dict.iteritems(): subject_prefix = self._path_from_container(prefix, subject, ids_of_paths, code) subject_prefixes[code] = subject_prefix - total_size, file_cnt = self._append_targets(targets, 'subjects', subject, subject_prefix, total_size, file_cnt, data_path, req_spec.get('filters')) + total_size, file_cnt = self._append_targets(targets, 'subjects', subject, subject_prefix, total_size, file_cnt, req_spec.get('filters')) for session in session_dict.itervalues(): subject_code = session['subject'].get('code', 'unknown_subject') subject = subject_dict[subject_code] session_prefix = self._path_from_container(subject_prefixes[subject_code], session, ids_of_paths, session["_id"]) session_prefixes[session['_id']] = session_prefix - total_size, file_cnt = self._append_targets(targets, 'sessions', session, session_prefix, total_size, file_cnt, data_path, req_spec.get('filters')) + total_size, file_cnt = self._append_targets(targets, 'sessions', session, session_prefix, total_size, file_cnt, req_spec.get('filters')) for acq in acquisitions: session = session_dict[acq['session']] acq_prefix = self._path_from_container(session_prefixes[session['_id']], acq, ids_of_paths, acq['_id']) - total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, acq_prefix, total_size, file_cnt, data_path, req_spec.get('filters')) + total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, acq_prefix, total_size, file_cnt, req_spec.get('filters')) elif item['level'] == 'session': @@ -188,7 +184,7 @@ def _preflight_archivestream(self, req_spec, collection=None): if not subject.get('code'): subject['code'] = 'unknown_subject' prefix = self._path_from_container(self._path_from_container(project['group'] + '/' + project['label'], subject, ids_of_paths, subject["code"]), session, ids_of_paths, session['_id']) - total_size, file_cnt = self._append_targets(targets, 'sessions', session, prefix, total_size, file_cnt, data_path, req_spec.get('filters')) + total_size, file_cnt = self._append_targets(targets, 'sessions', session, prefix, total_size, file_cnt, req_spec.get('filters')) # If the param `collection` holding a collection id is not None, filter out acquisitions that are not in the collection a_query = {'session': item_id, 'deleted': {'$exists': False}} @@ -198,7 +194,7 @@ def _preflight_archivestream(self, req_spec, collection=None): for acq in acquisitions: acq_prefix = self._path_from_container(prefix, acq, ids_of_paths, acq['_id']) - total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, acq_prefix, total_size, file_cnt, data_path, req_spec.get('filters')) + total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, acq_prefix, total_size, file_cnt, req_spec.get('filters')) elif item['level'] == 'acquisition': acq = config.db.acquisitions.find_one(base_query, ['session', 'label', 'files', 'uid', 'timestamp', 'timezone']) @@ -214,7 +210,7 @@ def _preflight_archivestream(self, req_spec, collection=None): project = config.db.projects.find_one({'_id': session['project']}, ['group', 'label']) prefix = self._path_from_container(self._path_from_container(self._path_from_container(project['group'] + '/' + project['label'], subject, ids_of_paths, subject['code']), session, ids_of_paths, session["_id"]), acq, ids_of_paths, acq['_id']) - total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, prefix, total_size, file_cnt, data_path, req_spec.get('filters')) + total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, prefix, total_size, file_cnt, req_spec.get('filters')) elif item['level'] == 'analysis': analysis = config.db.analyses.find_one(base_query, ['parent', 'label', 'files', 'uid', 'timestamp']) @@ -224,7 +220,7 @@ def _preflight_archivestream(self, req_spec, collection=None): continue prefix = self._path_from_container("", analysis, ids_of_paths, util.sanitize_string_to_filename(analysis['label'])) filename = 'analysis_' + util.sanitize_string_to_filename(analysis['label']) + '.tar' - total_size, file_cnt = self._append_targets(targets, 'analyses', analysis, prefix, total_size, file_cnt, data_path, req_spec.get('filters')) + total_size, file_cnt = self._append_targets(targets, 'analyses', analysis, prefix, total_size, file_cnt, req_spec.get('filters')) if len(targets) > 0: if not filename: @@ -282,8 +278,9 @@ def archivestream(self, ticket): stream = cStringIO.StringIO() with tarfile.open(mode='w|', fileobj=stream) as archive: for filepath, arcpath, cont_name, cont_id, _ in ticket['target']: - yield archive.gettarinfo(filepath, arcpath).tobuf() - with open(filepath, 'rb') as fd: + file_system = files.get_fs_by_file_path(filepath) + with file_system.open(filepath, 'rb') as fd: + yield archive.gettarinfo(fileobj=fd, arcname=arcpath).tobuf() chunk = '' for chunk in iter(lambda: fd.read(CHUNKSIZE), ''): # pylint: disable=cell-var-from-loop yield chunk @@ -293,11 +290,11 @@ def archivestream(self, ticket): yield stream.getvalue() # get tar stream trailer stream.close() - def symlinkarchivestream(self, ticket, data_path): + def symlinkarchivestream(self, ticket): for filepath, arcpath, cont_name, cont_id, _ in ticket['target']: t = tarfile.TarInfo(name=arcpath) t.type = tarfile.SYMTYPE - t.linkname = os.path.relpath(filepath, data_path) + t.linkname = fs.path.relpath(filepath) yield t.tobuf() self.log_user_access(AccessType.download_file, cont_name=cont_name, cont_id=cont_id, filename=os.path.basename(arcpath), multifile=True, origin_override=ticket['origin']) # log download stream = cStringIO.StringIO() @@ -316,7 +313,7 @@ def download(self): if ticket['ip'] != self.request.client_addr: self.abort(400, 'ticket not for this source IP') if self.get_param('symlinks'): - self.response.app_iter = self.symlinkarchivestream(ticket, config.get_item('persistent', 'data_path')) + self.response.app_iter = self.symlinkarchivestream(ticket) else: self.response.app_iter = self.archivestream(ticket) self.response.headers['Content-Type'] = 'application/octet-stream' diff --git a/api/files.py b/api/files.py index e51ff3e37..963be4950 100644 --- a/api/files.py +++ b/api/files.py @@ -1,98 +1,106 @@ import os import cgi import json -import shutil +import six import hashlib -from . import config -from . import tempdir as tempfile -from . import util - -DEFAULT_HASH_ALG='sha384' - -def move_file(path, target_path): - target_dir = os.path.dirname(target_path) - if not os.path.exists(target_dir): - os.makedirs(target_dir) - shutil.move(path, target_path) - - -def move_form_file_field_into_storage(file_field): - """ - Given a file form field, move the (downloaded, tempdir-stored) file into the final storage. - - Requires an augmented file field; see upload.process_upload() for details. - """ - - if not file_field.uuid or not file_field.path: - raise Exception("Field is not a file field with uuid and path") - - move_file(file_field.path, get_file_abs_path(file_field.uuid)) +import fs.move +import fs.tempfs +import fs.path +import fs.errors + +from . import config, util + +DEFAULT_HASH_ALG = 'sha384' + +class FileProcessor(object): + def __init__(self, base, presistent_fs): + self.base = base + self._temp_fs = fs.tempfs.TempFS(identifier='.temp', temp_dir=self.base) + self._presistent_fs = presistent_fs + + def store_temp_file(self, src_path, dest_path): + if not isinstance(src_path, unicode): + src_path = six.u(src_path) + if not isinstance(dest_path, unicode): + dest_path = six.u(dest_path) + dst_dir = fs.path.dirname(dest_path) + self._presistent_fs.makedirs(dst_dir, recreate=True) + fs.move.move_file(src_fs=self.temp_fs, src_path=src_path, dst_fs=self._presistent_fs, dst_path=dest_path) + + def process_form(self, request): + """ + Some workarounds to make webapp2 process forms in an intelligent way. + Normally webapp2/WebOb Reqest.POST would copy the entire request stream + into a single file on disk. + https://github.com/Pylons/webob/blob/cb9c0b4f51542a7d0ed5cc5bf0a73f528afbe03e/webob/request.py#L787 + https://github.com/moraes/webapp-improved/pull/12 + We pass request.body_file (wrapped wsgi input stream) + to our custom subclass of cgi.FieldStorage to write each upload file + to a separate file on disk, as it comes in off the network stream from the client. + Then we can rename these files to their final destination, + without copying the data gain. + + Returns (tuple): + form: SingleFileFieldStorage instance + tempdir: tempdir the file was stored in. + + Keep tempdir in scope until you don't need it anymore; it will be deleted on GC. + """ + + # Copied from WebOb source: + # https://github.com/Pylons/webob/blob/cb9c0b4f51542a7d0ed5cc5bf0a73f528afbe03e/webob/request.py#L790 + env = request.environ.copy() + env.setdefault('CONTENT_LENGTH', '0') + env['QUERY_STRING'] = '' + + field_storage_class = get_single_file_field_storage(self._temp_fs) + + form = field_storage_class( + fp=request.body_file, environ=env, keep_blank_values=True + ) + return form -def hash_file_formatted(path, hash_alg=None, buffer_size=65536): - """ - Return the scitran-formatted hash of a file, specified by path. - """ + def hash_file_formatted(self, filepath, f_system, hash_alg=None, buffer_size=65536): + """ + Return the scitran-formatted hash of a file, specified by path. + """ - hash_alg = hash_alg or DEFAULT_HASH_ALG - hasher = hashlib.new(hash_alg) + if not isinstance(filepath, unicode): + filepath = six.u(filepath) - with open(path, 'rb') as f: - while True: - data = f.read(buffer_size) - if not data: - break - hasher.update(data) + hash_alg = hash_alg or DEFAULT_HASH_ALG + hasher = hashlib.new(hash_alg) - return util.format_hash(hash_alg, hasher.hexdigest()) + with f_system.open(filepath, 'rb') as f: + while True: + data = f.read(buffer_size) + if not data: + break + hasher.update(data) + return util.format_hash(hash_alg, hasher.hexdigest()) -class FileStoreException(Exception): - pass + @property + def temp_fs(self): + return self._temp_fs + @property + def persistent_fs(self): + return self._presistent_fs -def process_form(request): - """ - Some workarounds to make webapp2 process forms in an intelligent way. - Normally webapp2/WebOb Reqest.POST would copy the entire request stream - into a single file on disk. - https://github.com/Pylons/webob/blob/cb9c0b4f51542a7d0ed5cc5bf0a73f528afbe03e/webob/request.py#L787 - https://github.com/moraes/webapp-improved/pull/12 - We pass request.body_file (wrapped wsgi input stream) - to our custom subclass of cgi.FieldStorage to write each upload file - to a separate file on disk, as it comes in off the network stream from the client. - Then we can rename these files to their final destination, - without copying the data gain. - - Returns (tuple): - form: SingleFileFieldStorage instance - tempdir: tempdir the file was stored in. - - Keep tempdir in scope until you don't need it anymore; it will be deleted on GC. - """ - - # Store form file fields in a tempdir - tempdir = tempfile.TemporaryDirectory(prefix='.tmp', dir=config.get_item('persistent', 'data_path')) - - # Copied from WebOb source: - # https://github.com/Pylons/webob/blob/cb9c0b4f51542a7d0ed5cc5bf0a73f528afbe03e/webob/request.py#L790 - env = request.environ.copy() - env.setdefault('CONTENT_LENGTH', '0') - env['QUERY_STRING'] = '' - - field_storage_class = get_single_file_field_storage( - tempdir.name - ) + def __exit__(self, exc, value, tb): + self.close() - form = field_storage_class( - fp=request.body_file, environ=env, keep_blank_values=True - ) + def __del__(self): + self.close() - return form, tempdir + def close(self): + self.temp_fs.close() -def get_single_file_field_storage(upload_dir): +def get_single_file_field_storage(file_system): # pylint: disable=attribute-defined-outside-init # We dynamically create this class because we @@ -104,12 +112,15 @@ def get_single_file_field_storage(upload_dir): # https://github.com/python/cpython/blob/1e3e162ff5c0cc656559c43914439ab3e5734f00/Lib/cgi.py#L728 class SingleFileFieldStorage(cgi.FieldStorage): - bufsize = 2**20 + bufsize = 2 ** 20 def make_file(self, binary=None): # Sanitize form's filename (read: prevent malicious escapes, bad characters, etc) - self.filename = os.path.basename(self.filename) - self.open_file = open(os.path.join(upload_dir, self.filename), 'wb') + + self.filename = fs.path.basename(self.filename) + if not isinstance(self.filename, unicode): + self.filename = six.u(self.filename) + self.open_file = file_system.open(self.filename, 'wb') return self.open_file # override private method __write of superclass FieldStorage @@ -127,6 +138,9 @@ def _FieldStorage__write(self, line): return SingleFileFieldStorage +class FileStoreException(Exception): + pass + # File extension --> scitran file type detection hueristics. # Listed in precendence order. with open(os.path.join(os.path.dirname(__file__), 'filetypes.json')) as fd: @@ -146,6 +160,46 @@ def guess_type_from_filename(filename): return filetype -def get_file_abs_path(file_id): - version = 'v1' - return os.path.join(config.get_item('persistent', 'data_path'), version, util.path_from_uuid(file_id)) +def get_valid_file(file_info): + file_id = file_info.get('_id', '') + file_hash = file_info.get('hash', '') + file_uuid_path = None + file_hash_path = None + + if file_hash: + file_hash_path = util.path_from_hash(file_hash) + + if file_id: + file_uuid_path = util.path_from_uuid(file_id) + + if config.support_legacy_fs: + if file_hash_path and config.legacy_fs.isfile(file_hash_path): + return file_hash_path, config.legacy_fs + elif file_uuid_path and config.legacy_fs.isfile(file_uuid_path): + return file_uuid_path, config.legacy_fs + + if file_uuid_path and config.fs.isfile(file_uuid_path): + return file_uuid_path, config.fs + else: + raise fs.errors.ResourceNotFound('File not found: %s', file_info['name']) + + +def get_signed_url(file_path, file_system, filename=None): + try: + if type(file_system).__name__ in config.signed_url_available: + return file_system.geturl(file_path, filename=filename) + except fs.errors.NoURL: + return None + + +def get_fs_by_file_path(file_path): + if config.support_legacy_fs: + if config.legacy_fs.isfile(file_path): + return config.legacy_fs + elif config.legacy_fs.isfile(file_path): + return config.legacy_fs + + if config.fs.isfile(file_path): + return config.fs + else: + raise fs.errors.ResourceNotFound('File not found: %s', file_path) diff --git a/api/handlers/listhandler.py b/api/handlers/listhandler.py index 8cad26c69..8f601e532 100644 --- a/api/handlers/listhandler.py +++ b/api/handlers/listhandler.py @@ -1,4 +1,3 @@ -import os import bson import copy import datetime @@ -357,24 +356,25 @@ def _check_ticket(self, ticket_id, _id, filename): return ticket @staticmethod - def build_zip_info(filepath): + def build_zip_info(file_path, file_system): """ Builds a json response containing member and comment info for a zipfile """ - with zipfile.ZipFile(filepath) as zf: - info = {} - info['comment'] = zf.comment - info['members'] = [] - for zi in zf.infolist(): - m = {} - m['path'] = zi.filename - m['size'] = zi.file_size - m['timestamp'] = datetime.datetime(*zi.date_time) - m['comment'] = zi.comment - - info['members'].append(m) - - return info + with file_system.open(file_path, 'rb') as f: + with zipfile.ZipFile(f) as zf: + info = {} + info['comment'] = zf.comment + info['members'] = [] + for zi in zf.infolist(): + m = {} + m['path'] = zi.filename + m['size'] = zi.file_size + m['timestamp'] = datetime.datetime(*zi.date_time) + m['comment'] = zi.comment + + info['members'].append(m) + + return info def get(self, cont_name, list_name, **kwargs): """ @@ -441,10 +441,8 @@ def get(self, cont_name, list_name, **kwargs): hash_ = self.get_param('hash') if hash_ and hash_ != fileinfo['hash']: self.abort(409, 'file exists, hash mismatch') - data_path = config.get_item('persistent', 'data_path') - file_path = files.get_file_abs_path(fileinfo.get('_id', '')) - if not util.file_exists(file_path): - file_path = os.path.join(data_path, util.path_from_hash(fileinfo['hash'])) + + file_path, file_system = files.get_valid_file(fileinfo) # Request for download ticket if self.get_param('ticket') == '': @@ -454,18 +452,19 @@ def get(self, cont_name, list_name, **kwargs): # Request for info about zipfile elif self.is_true('info'): try: - info = self.build_zip_info(file_path) + info = self.build_zip_info(file_path, file_system) + return info except zipfile.BadZipfile: self.abort(400, 'not a zip file') - return info # Request to download zipfile member elif self.get_param('member') is not None: zip_member = self.get_param('member') try: - with zipfile.ZipFile(file_path) as zf: - self.response.headers['Content-Type'] = util.guess_mimetype(zip_member) - self.response.write(zf.open(zip_member).read()) + with file_system.open(file_path, 'rb') as f: + with zipfile.ZipFile(f) as zf: + self.response.headers['Content-Type'] = util.guess_mimetype(zip_member) + self.response.write(zf.open(zip_member).read()) except zipfile.BadZipfile: self.abort(400, 'not a zip file') except KeyError: @@ -480,13 +479,17 @@ def get(self, cont_name, list_name, **kwargs): # Authenticated or ticketed download request else: - self.response.app_iter = open(file_path, 'rb') - self.response.headers['Content-Length'] = str(fileinfo['size']) # must be set after setting app_iter - if self.is_true('view'): - self.response.headers['Content-Type'] = str(fileinfo.get('mimetype', 'application/octet-stream')) + signed_url = files.get_signed_url(file_path, file_system, filename=filename) + if signed_url: + self.redirect(signed_url) else: - self.response.headers['Content-Type'] = 'application/octet-stream' - self.response.headers['Content-Disposition'] = 'attachment; filename="' + filename + '"' + self.response.app_iter = file_system.open(file_path, 'rb') + self.response.headers['Content-Length'] = str(fileinfo['size']) # must be set after setting app_iter + if self.is_true('view'): + self.response.headers['Content-Type'] = str(fileinfo.get('mimetype', 'application/octet-stream')) + else: + self.response.headers['Content-Type'] = 'application/octet-stream' + self.response.headers['Content-Disposition'] = 'attachment; filename="' + filename + '"' # log download if we haven't already for this ticket if ticket: diff --git a/api/handlers/refererhandler.py b/api/handlers/refererhandler.py index 441eb9daa..2ca729d4c 100644 --- a/api/handlers/refererhandler.py +++ b/api/handlers/refererhandler.py @@ -7,7 +7,6 @@ """ import bson -import os import zipfile import datetime from abc import ABCMeta, abstractproperty @@ -359,30 +358,25 @@ def download(self, **kwargs): self.abort(404, "{} doesn't exist".format(filename)) else: fileinfo = fileinfo[0] - data_path = config.get_item('persistent', 'data_path') - file_path = files.get_file_abs_path(fileinfo.get('_id', '')) - if not util.file_exists(file_path): - file_path = os.path.join( - data_path, - util.path_from_hash(fileinfo['hash']) - ) + file_path, file_system = files.get_valid_file(fileinfo) filename = fileinfo['name'] # Request for info about zipfile if self.is_true('info'): try: - info = FileListHandler.build_zip_info(file_path) + info = FileListHandler.build_zip_info(file_path, file_system) + return info except zipfile.BadZipfile: self.abort(400, 'not a zip file') - return info # Request to download zipfile member elif self.get_param('member') is not None: zip_member = self.get_param('member') try: - with zipfile.ZipFile(file_path) as zf: - self.response.headers['Content-Type'] = util.guess_mimetype(zip_member) - self.response.write(zf.open(zip_member).read()) + with file_system.open(file_path, 'rb') as f: + with zipfile.ZipFile(f) as zf: + self.response.headers['Content-Type'] = util.guess_mimetype(zip_member) + self.response.write(zf.open(zip_member).read()) except zipfile.BadZipfile: self.abort(400, 'not a zip file') except KeyError: @@ -397,7 +391,7 @@ def download(self, **kwargs): # Request to download the file itself else: - self.response.app_iter = open(file_path, 'rb') + self.response.app_iter = file_system.open(file_path, 'rb') self.response.headers['Content-Length'] = str(fileinfo['size']) # must be set after setting app_iter if self.is_true('view'): self.response.headers['Content-Type'] = str(fileinfo.get('mimetype', 'application/octet-stream')) @@ -439,17 +433,13 @@ def _prepare_batch(self, fileinfo, analysis): ## we need a way to avoid this targets = [] total_size = total_cnt = 0 - data_path = config.get_item('persistent', 'data_path') for f in fileinfo: - file_path = files.get_file_abs_path(f.get('_id', '')) - if not util.file_exists(file_path): - file_path = os.path.join(data_path, util.path_from_hash(f['hash'])) - if util.file_exists(file_path): # silently skip missing files - targets.append((file_path, - util.sanitize_string_to_filename(analysis['label']) + '/' + ('input' if f.get('input') else 'output') + '/'+ f['name'], - 'analyses', analysis['_id'], f['size'])) - total_size += f['size'] - total_cnt += 1 + file_path, _ = files.get_valid_file(f) + targets.append((file_path, + util.sanitize_string_to_filename(analysis['label']) + '/' + ('input' if f.get('input') else 'output') + '/'+ f['name'], + 'analyses', analysis['_id'], f['size'])) + total_size += f['size'] + total_cnt += 1 return targets, total_size, total_cnt diff --git a/api/jobs/handlers.py b/api/jobs/handlers.py index 48c761afa..484b89fd3 100644 --- a/api/jobs/handlers.py +++ b/api/jobs/handlers.py @@ -107,9 +107,11 @@ def download(self, **kwargs): # pragma: no cover """Download gear tarball file""" dl_id = kwargs.pop('cid') gear = get_gear(dl_id) - file_id = gear['exchange']['rootfs-id'] - file_path = files.get_file_abs_path(file_id) - self.response.app_iter = open(file_path, 'rb') + file_path, file_system = files.get_valid_file({ + '_id': gear['exchange'].get('rootfs-id', ''), + 'hash': 'v0-' + gear['exchange']['rootfs-hash'].replace(':', '-') + }) + self.response.app_iter = file_system.open(file_path, 'rb') # self.response.headers['Content-Length'] = str(gear['size']) # must be set after setting app_iter self.response.headers['Content-Type'] = 'application/octet-stream' self.response.headers['Content-Disposition'] = 'attachment; filename="gear.tar"' diff --git a/api/placer.py b/api/placer.py index b2d5c1586..06026dda7 100644 --- a/api/placer.py +++ b/api/placer.py @@ -2,14 +2,14 @@ import copy import datetime import dateutil -import os import pymongo -import shutil import uuid import zipfile -from . import config, files, util, validators -from . import tempdir as tempfile +import fs.path +import fs.errors + +from . import config, util, validators from .dao.containerstorage import SessionStorage, AcquisitionStorage from .dao import containerutil, hierarchy from .jobs import rules @@ -23,7 +23,7 @@ class Placer(object): Interface for a placer, which knows how to process files and place them where they belong - on disk and database. """ - def __init__(self, container_type, container, id_, metadata, timestamp, origin, context): + def __init__(self, container_type, container, id_, metadata, timestamp, origin, context, file_processor): self.container_type = container_type self.container = container self.id_ = id_ @@ -42,6 +42,8 @@ def __init__(self, container_type, container, id_, metadata, timestamp, origin, # A list of files that have been saved via save_file() usually returned by finalize() self.saved = [] + self.file_processor = file_processor + def check(self): """ @@ -82,10 +84,12 @@ def save_file(self, field=None, file_attrs=None): Requires an augmented file field; see process_upload() for details. """ - # Save file - if field is not None: - files.move_form_file_field_into_storage(field) + if field is not None and self.file_processor is not None: + self.file_processor.store_temp_file(field.path, util.path_from_uuid(field.uuid)) + + # if field is not None: + # files.move_form_file_field_into_storage(field) # Update the DB if file_attrs is not None: @@ -135,8 +139,8 @@ class UIDPlacer(Placer): create_hierarchy = staticmethod(hierarchy.upsert_top_down_hierarchy) match_type = 'uid' - def __init__(self, container_type, container, id_, metadata, timestamp, origin, context): - super(UIDPlacer, self).__init__(container_type, container, id_, metadata, timestamp, origin, context) + def __init__(self, container_type, container, id_, metadata, timestamp, origin, context, file_processor): + super(UIDPlacer, self).__init__(container_type, container, id_, metadata, timestamp, origin, context, file_processor) self.metadata_for_file = {} self.session_id = None self.count = 0 @@ -186,7 +190,7 @@ def process_file_field(self, field, file_attrs): self.save_file(field, file_attrs) else: if field is not None: - files.move_form_file_field_into_storage(field) + self.file_processor.store_temp_file(field.path, util.path_from_uuid(field.uuid)) if file_attrs is not None: container.upsert_file(file_attrs) @@ -332,8 +336,8 @@ class TokenPlacer(Placer): Intended for use with a token that tracks where the files will be stored. """ - def __init__(self, container_type, container, id_, metadata, timestamp, origin, context): - super(TokenPlacer, self).__init__(container_type, container, id_, metadata, timestamp, origin, context) + def __init__(self, container_type, container, id_, metadata, timestamp, origin, context, file_processor): + super(TokenPlacer, self).__init__(container_type, container, id_, metadata, timestamp, origin, context, file_processor) self.paths = [] self.folder = None @@ -350,10 +354,9 @@ def check(self): # upload.clean_packfile_tokens # # It must be kept in sync between each instance. - base_path = config.get_item('persistent', 'data_path') - self.folder = os.path.join(base_path, 'tokens', 'packfile', token) + self.folder = fs.path.join('tokens', 'packfile', token) - util.mkdir_p(self.folder) + util.mkdir_p(self.folder, config.fs) def process_file_field(self, field, file_attrs): self.saved.append(file_attrs) @@ -361,8 +364,8 @@ def process_file_field(self, field, file_attrs): def finalize(self): for path in self.paths: - dest = os.path.join(self.folder, os.path.basename(path)) - shutil.move(path, dest) + dest = fs.path.join(self.folder, path) + self.file_processor.store_temp_file(path, dest) self.recalc_session_compliance() return self.saved @@ -371,8 +374,8 @@ class PackfilePlacer(Placer): A placer that can accept N files, save them into a zip archive, and place the result on an acquisition. """ - def __init__(self, container_type, container, id_, metadata, timestamp, origin, context): - super(PackfilePlacer, self).__init__(container_type, container, id_, metadata, timestamp, origin, context) + def __init__(self, container_type, container, id_, metadata, timestamp, origin, context, file_processor): + super(PackfilePlacer, self).__init__(container_type, container, id_, metadata, timestamp, origin, context, file_processor) # This endpoint is an SSE endpoint self.sse = True @@ -406,10 +409,11 @@ def check(self): # upload.clean_packfile_tokens # # It must be kept in sync between each instance. - base_path = config.get_item('persistent', 'data_path') - self.folder = os.path.join(base_path, 'tokens', 'packfile', token) + self.folder = fs.path.join('tokens', 'packfile', token) - if not os.path.isdir(self.folder): + try: + config.fs.isdir(self.folder) + except fs.errors.ResourceNotFound: raise Exception('Packfile directory does not exist or has been deleted') self.requireMetadata() @@ -441,27 +445,20 @@ def check(self): stamp = minimum # Remember the timestamp integer for later use with os.utime. - self.ziptime = int(dateutil.parser.parse(stamp).strftime('%s')) + self.ziptime = dateutil.parser.parse(stamp) # The zipfile is a santizied acquisition label self.dir_ = util.sanitize_string_to_filename(self.a_label) self.name = self.dir_ + '.zip' - # Make a tempdir to store zip until moved - # OPPORTUNITY: this is also called in files.py. Could be a util func. - self.tempdir = tempfile.TemporaryDirectory(prefix='.tmp', dir=config.get_item('persistent', 'data_path')) - # Create a zip in the tempdir that later gets moved into the CAS. - self.path = os.path.join(self.tempdir.name, 'temp.zip') - self.zip_ = zipfile.ZipFile(self.path, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) + self.path = u'temp.zip' + self.zip_ = zipfile.ZipFile(self.file_processor.temp_fs.open(self.path, 'wb'), + 'w', zipfile.ZIP_DEFLATED, allowZip64=True) # OPPORTUNITY: add zip comment # self.zip.comment = json.dumps(metadata, default=metadata_encoder) - # Bit of a silly hack: write our tempdir directory into the zip (not including its contents). - # Creates an empty directory entry in the zip which will hold all the files inside. - # This way, when you expand a zip, you'll get folder/things instead of a thousand dicoms splattered everywhere. - self.zip_.write(self.tempdir.name, self.dir_) def process_file_field(self, field, file_attrs): # Should not be called with any files @@ -469,19 +466,20 @@ def process_file_field(self, field, file_attrs): def finalize(self): - paths = os.listdir(self.folder) + paths = self.file_processor.persistent_fs.listdir(self.folder) total = len(paths) # Write all files to zip complete = 0 for path in paths: - p = os.path.join(self.folder, path) + p = fs.path.join(self.folder, path) # Set the file's mtime & atime. - os.utime(p, (self.ziptime, self.ziptime)) + self.file_processor.persistent_fs.settimes(p, self.ziptime, self.ziptime) # Place file into the zip folder we created before - self.zip_.write(p, os.path.join(self.dir_, os.path.basename(path))) + with self.file_processor.persistent_fs.open(p, 'rb') as f: + self.zip_.writestr(fs.path.join(self.dir_, path), f.read()) # Report progress complete += 1 @@ -493,7 +491,7 @@ def finalize(self): self.zip_.close() # Remove the folder created by TokenPlacer - shutil.rmtree(self.folder) + self.file_processor.persistent_fs.removetree(self.folder) # Lookup uid on token token = self.context['token'] @@ -508,10 +506,10 @@ def finalize(self): # Not a great practice. See process_upload() for details. cgi_field = util.obj_from_map({ 'filename': self.name, - 'path': self.path, - 'size': os.path.getsize(self.path), - 'hash': files.hash_file_formatted(self.path), - 'uuid': str(uuid.uuid4()), + 'path': self.path, + 'size': self.file_processor.temp_fs.getsize(self.path), + 'hash': self.file_processor.hash_file_formatted(self.path, self.file_processor.temp_fs), + 'uuid': str(uuid.uuid4()), 'mimetype': util.guess_mimetype('lol.zip'), 'modified': self.timestamp }) @@ -521,9 +519,9 @@ def finalize(self): # Used in the API return. cgi_attrs = { '_id': cgi_field.uuid, - 'name': cgi_field.filename, + 'name': cgi_field.filename, 'modified': cgi_field.modified, - 'size': cgi_field.size, + 'size': cgi_field.size, 'hash': cgi_field.hash, 'mimetype': cgi_field.mimetype, diff --git a/api/upload.py b/api/upload.py index 96f406bf5..86f358469 100644 --- a/api/upload.py +++ b/api/upload.py @@ -1,10 +1,10 @@ import bson import datetime import json -import os.path -import shutil import uuid +import fs.path + from .web import base from .web.errors import FileStoreException, FileFormException from . import config @@ -75,7 +75,8 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None # The vast majority of this function's wall-clock time is spent here. # Tempdir is deleted off disk once out of scope, so let's hold onto this reference. - form, tempdir = files.process_form(request) + file_processor = files.FileProcessor(config.get_item('persistent', 'data_path'), config.fs) + form = file_processor.process_form(request) if 'metadata' in form: try: @@ -84,7 +85,7 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None raise FileStoreException('wrong format for field "metadata"') placer_class = strategy.value - placer = placer_class(container_type, container, id_, metadata, timestamp, origin, context) + placer = placer_class(container_type, container, id_, metadata, timestamp, origin, context, file_processor) placer.check() # Browsers, when sending a multipart upload, will send files with field name "file" (if sinuglar) @@ -99,25 +100,22 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None raise FileFormException("Targeted uploads can only send one file") - for field in file_fields: field = form[field] # Augment the cgi.FieldStorage with a variety of custom fields. # Not the best practice. Open to improvements. # These are presumbed to be required by every function later called with field as a parameter. - field.path = os.path.join(tempdir.name, field.filename) - if not os.path.exists(field.path): - tempdir_exists = os.path.exists(tempdir.name) - raise Exception("file {} does not exist, tmpdir {} exists: {}, files in tmpdir: {}".format( + field.path = field.filename + if not file_processor.temp_fs.exists(field.path): + #tempdir_exists = os.path.exists(tempdir.name) + raise Exception("file {} does not exist, files in tmpdir: {}".format( field.path, - tempdir.name, - tempdir_exists, - tempdir_exists and os.listdir(tempdir.name), + file_processor.temp_fs.listdir('/'), )) - field.size = os.path.getsize(field.path) - field.hash = files.hash_file_formatted(field.path) + field.size = file_processor.temp_fs.getsize(field.path) + field.hash = file_processor.hash_file_formatted(field.path, file_processor.temp_fs) field.uuid = str(uuid.uuid4()) - field.mimetype = util.guess_mimetype(field.filename) # TODO: does not honor metadata's mime type if any + field.mimetype = util.guess_mimetype(field.filename) # TODO: does not honor metadata's mime type if any field.modified = timestamp # create a file-attribute map commonly used elsewhere in the codebase. @@ -125,7 +123,7 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None file_attrs = { '_id': field.uuid, 'name': field.filename, - 'modified': field.modified, # + 'modified': field.modified, 'size': field.size, 'mimetype': field.mimetype, 'hash': field.hash, @@ -243,15 +241,14 @@ def clean_packfile_tokens(self): # upload.clean_packfile_tokens # # It must be kept in sync between each instance. - basepath = config.get_item('persistent', 'data_path') - folder = os.path.join(basepath, 'tokens', 'packfile') + folder = fs.path.join('tokens', 'packfile') - util.mkdir_p(folder) - paths = os.listdir(folder) + util.mkdir_p(folder, config.fs) + paths = config.fs.listdir(folder) cleaned = 0 for token in paths: - path = os.path.join(folder, token) + path = fs.path.join(folder, token) result = None try: @@ -264,7 +261,7 @@ def clean_packfile_tokens(self): if result is None: log.info('Cleaning expired token directory ' + token) - shutil.rmtree(path) + config.fs.removetree(path) cleaned += 1 return { diff --git a/api/util.py b/api/util.py index 2520ebb54..a0239e821 100644 --- a/api/util.py +++ b/api/util.py @@ -1,6 +1,5 @@ import datetime import enum as baseEnum -import errno import hashlib import json import mimetypes @@ -11,6 +10,9 @@ import string import uuid +import fs.path +import fs.errors + import django from django.conf import settings from django.template import Template, Context @@ -226,14 +228,11 @@ def __eq__(self, other): else: return super.__eq__(other) -def mkdir_p(path): +def mkdir_p(path, file_system): try: - os.makedirs(path) - except OSError as exc: # Python >2.5 - if exc.errno == errno.EEXIST and os.path.isdir(path): - pass - else: - raise + file_system.makedirs(path) + except fs.errors.DirectoryExists: + pass NONCE_CHARS = string.ascii_letters + string.digits NONCE_LENGTH = 18 @@ -260,7 +259,7 @@ def path_from_uuid(uuid_): first_stanza = uuid_1[0:2] second_stanza = uuid_1[2:4] path = (first_stanza, second_stanza, uuid_) - return os.path.join(*path) + return fs.path.join(*path) def path_from_hash(hash_): @@ -276,7 +275,3 @@ def path_from_hash(hash_): second_stanza = actual_hash[2:4] path = (hash_version, hash_alg, first_stanza, second_stanza, hash_) return os.path.join(*path) - - -def file_exists(path): - return os.path.exists(path) and os.path.isfile(path) diff --git a/bin/oneoffs/migrate_storage.py b/bin/oneoffs/migrate_storage.py new file mode 100644 index 000000000..f10cf6b92 --- /dev/null +++ b/bin/oneoffs/migrate_storage.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python +import argparse +import datetime +import logging +import os +import uuid + +import fs.path +import fs.move + +from api import config, util + +log = logging.getLogger('migrate_storage') +log.setLevel(logging.INFO) + +COLLECTIONS_PREFIXES = [('projects', 'files'), + ('acquisitions', 'files'), + ('analyses', 'files'), + ('sessions', 'files'), + ('sessions', 'subject.files'), + ('collections', 'files')] + + +class MigrationError(Exception): + pass + + +def get_files_by_prefix(document, prefix): + for key in prefix.split('.'): + document = document.get(key, {}) + return document + + +def show_progress(current_index, total_files): + if current_index % (total_files / 10 + 1) == 0: + log.info('Processed %s files of total %s files ...' % (current_index, total_files)) + + +def cleanup_empty_folders(): + log.info('Cleanup empty folders') + + for _dirpath, _, _ in os.walk(config.get_item('persistent', 'data_path'), topdown=False): + if not (os.listdir(_dirpath) or config.get_item('persistent', 'data_path') == _dirpath): + os.rmdir(_dirpath) + + +def get_collections_files(): + _files = [] + + for collection, prefix in COLLECTIONS_PREFIXES: + cursor = config.db.get_collection(collection).find({}) + for document in cursor: + for f in get_files_by_prefix(document, prefix): + f_dict = { + 'collection_id': document.get('_id'), + 'collection': collection, + 'fileinfo': f, + 'prefix': prefix + } + _files.append(f_dict) + + return _files + + +def get_gears_files(): + cursor = config.db.get_collection('gears').find({}) + _files = [] + + for document in cursor: + if document.get('exchange', {}).get('git-commit', '') == 'local': + f_dict = { + 'gear_id': document['_id'], + 'gear_name': document['gear']['name'], + 'exchange': document['exchange'] + } + _files.append(f_dict) + + return _files + + +def migrate_collections(): + log.info('Migrate collection files...') + + _files = get_collections_files() + + for i, f in enumerate(_files): + try: + file_id = f['fileinfo'].get('_id', '') + if file_id: + file_path = util.path_from_uuid(file_id) + if not config.fs.isfile(file_path): + """Copy file from legacy to new storage""" + + log.debug('copy file between the legacy and new storage: %s' % file_path) + + dst_dir = fs.path.dirname(file_path) + config.fs.makedirs(dst_dir, recreate=True) + fs.move.copy_file(src_fs=config.legacy_fs, src_path=file_path, dst_fs=config.fs, dst_path=file_path) + else: + """generate uuuid, set the id field and copy the file""" + file_id = str(uuid.uuid4()) + f_old_path = util.path_from_hash(f['fileinfo']['hash']) + f_new_path = util.path_from_uuid(file_id) + + log.debug('copy file %s to %s' % (f_old_path, f_new_path)) + + dst_dir = fs.path.dirname(f_new_path) + config.fs.makedirs(dst_dir, recreate=True) + fs.move.copy_file(src_fs=config.legacy_fs, src_path=f_old_path, dst_fs=config.fs, dst_path=f_new_path) + + update_set = { + f['prefix'] + '.$.modified': datetime.datetime.utcnow(), + f['prefix'] + '.$._id': file_id + } + log.debug('update file in mongo: %s' % update_set) + # Update the file with the newly generated UUID + config.db[f['collection']].find_one_and_update( + {'_id': f['collection_id'], f['prefix'] + '.name': f['fileinfo']['name']}, + {'$set': update_set} + ) + + show_progress(i + 1, len(_files)) + + except Exception as e: + log.exception(e) + raise MigrationError('Wasn\'t able to migrate the \'%s\' ' + 'file in the \'%s\' collection (collection id: %s)' % + (f['fileinfo']['name'], f['collection'], str(f['collection_id'])), e) + + +def migrate_gears(): + log.info('Migrate gears...') + + _files = get_gears_files() + + for i, f in enumerate(_files): + try: + file_id = f['exchange'].get('rootfs-id', '') + if file_id: + file_path = util.path_from_uuid(file_id) + if not config.fs.isfile(file_path): + """Copy file from legacy to new storage""" + + log.debug('copy file between the legacy and new storage: %s' % file_path) + + dst_dir = fs.path.dirname(file_path) + config.fs.makedirs(dst_dir, recreate=True) + fs.move.copy_file(src_fs=config.legacy_fs, src_path=file_path, dst_fs=config.fs, dst_path=file_path) + else: + file_id = str(uuid.uuid4()) + file_hash = 'v0-' + f['exchange']['rootfs-hash'].replace(':', '-') + f_old_path = util.path_from_hash(file_hash) + f_new_path = util.path_from_uuid(file_id) + + log.debug('copy file %s to %s' % (f_old_path, f_new_path)) + + dst_dir = fs.path.dirname(f_new_path) + config.fs.makedirs(dst_dir, recreate=True) + fs.move.copy_file(src_fs=config.legacy_fs, src_path=f_old_path, dst_fs=config.fs, dst_path=f_new_path) + + update_set = { + 'modified': datetime.datetime.utcnow(), + 'exchange.rootfs-id': file_id + } + + log.debug('update file in mongo: %s' % update_set) + # Update the gear with the newly generated UUID + config.db['gears'].find_one_and_update( + {'_id': f['gear_id']}, + {'$set': update_set} + ) + + show_progress(i + 1, len(_files)) + except Exception as e: + log.exception(e) + raise MigrationError('Wasn\'t able to migrate the \'%s\' gear (gear id: %s)' % + (f['gear_name'], f['gear_id']), e) + + +def migrate_storage(): + """ + Remove CAS logic, generate UUID for the files and move the files from the lagacy storage to the new one. + """ + + parser = argparse.ArgumentParser(prog='Migrate storage') + parser.add_argument('--collections', action='store_true', help='Migrate collections') + parser.add_argument('--gears', action='store_true', help='Migrate gears') + parser.add_argument('--delete-files', action='store_true', help='Delete files from legacy storage') + + + args = parser.parse_args() + + try: + if not (args.collections or args.gears): + migrate_collections() + migrate_gears() + + if args.collections: + migrate_collections() + + if args.gears: + migrate_gears() + + if args.delete_files: + config.legacy_fs.removetree('/') + + except MigrationError as e: + log.exception(e) + exit(1) + finally: + cleanup_empty_folders() + pass + + +if __name__ == '__main__': + migrate_storage() diff --git a/bin/oneoffs/remove_cas.py b/bin/oneoffs/remove_cas.py deleted file mode 100755 index 2cbb852d4..000000000 --- a/bin/oneoffs/remove_cas.py +++ /dev/null @@ -1,238 +0,0 @@ -#!/usr/bin/env python -import datetime -import logging -import os -import shutil -import uuid - -from collections import Counter - -from api import config, files, util - -log = logging.getLogger('remove_cas') -log.setLevel(logging.INFO) - - -class MigrationError(Exception): - pass - - -def get_files_by_prefix(document, prefix): - for key in prefix.split('.'): - document = document.get(key, {}) - return document - - -def copy_file(path, target_path): - target_dir = os.path.dirname(target_path) - - if not os.path.exists(target_dir): - os.makedirs(target_dir) - shutil.copy(path, target_path) - - -def show_progress(current_index, total_files): - if current_index % (total_files / 10 + 1) == 0: - log.info('Processed %s files of total %s files ...' % (current_index, total_files)) - - -def cleanup_empty_folders(): - log.info('Cleanup empty folders') - - for _dirpath, _, _ in os.walk(config.get_item('persistent', 'data_path'), topdown=False): - if not (os.listdir(_dirpath) or config.get_item('persistent', 'data_path') == _dirpath): - os.rmdir(_dirpath) - - -def get_collections_files_hashes(): - COLLECTIONS_PREFIXES = [('projects', 'files'), - ('acquisitions', 'files'), - ('analyses', 'files'), - ('sessions', 'files'), - ('sessions', 'subject.files'), - ('collections', 'files')] - _hashes = [] - _files = [] - - for collection, prefix in COLLECTIONS_PREFIXES: - cursor = config.db.get_collection(collection).find({}) - for document in cursor: - for f in get_files_by_prefix(document, prefix): - _hashes.append(f.get('hash', '')) - f_dict = { - 'collection_id': document.get('_id'), - 'collection': collection, - 'fileinfo': f, - 'prefix': prefix - } - _files.append(f_dict) - - return _files, _hashes - - -def get_gears_files(): - cursor = config.db.get_collection('gears').find({}) - _files = [] - - for document in cursor: - if document['exchange']['git-commit'] == 'local': - f_dict = { - 'gear_id': document['_id'], - 'gear_name': document['gear']['name'], - 'exchange': document['exchange'] - } - _files.append(f_dict) - - return _files - - -def migrate_collections(): - log.info('Migrate collections...') - - _files, _hashes = get_collections_files_hashes() - counter = Counter(_hashes) - base = config.get_item('persistent', 'data_path') - - for i, f in enumerate(_files): - if f['fileinfo'].get('_id', ''): - counter[f['fileinfo']['hash']] -= 1 - continue - try: - f_uuid = str(uuid.uuid4()) - f_path = os.path.join(base, util.path_from_hash(f['fileinfo']['hash'])) - log.debug('copy file %s to %s' % (f_path, util.path_from_uuid(f_uuid))) - copy_file(f_path, files.get_file_abs_path(f_uuid)) - - update_set = { - f['prefix'] + '.$.modified': datetime.datetime.utcnow(), - f['prefix'] + '.$._id': f_uuid - } - log.debug('update file in mongo: %s' % update_set) - # Update the file with the newly generated UUID - config.db[f['collection']].find_one_and_update( - {'_id': f['collection_id'], f['prefix'] + '.name': f['fileinfo']['name']}, - {'$set': update_set} - ) - - # Decrease the count of the current hash, so we will know when we can remove the original file - counter[f['fileinfo']['hash']] -= 1 - - if counter[f['fileinfo']['hash']] == 0: - log.debug('remove old file: %s' % f_path) - os.remove(f_path) - - show_progress(i+1, len(_files)) - except Exception as e: - log.exception(e) - raise MigrationError('Wasn\'t able to migrate the \'%s\' ' - 'file in the \'%s\' collection (collection id: %s)' % - (f['fileinfo']['name'], f['collection'], str(f['collection_id'])), e) - - -def rollback_collections(): - log.info('Rollback collections...') - - _files, _ = get_collections_files_hashes() - base = config.get_item('persistent', 'data_path') - - for i, f in enumerate(_files): - if f['fileinfo'].get('_id', ''): - hash_path = os.path.join(base, util.path_from_hash(f['fileinfo']['hash'])) - uuid_path = files.get_file_abs_path(f['fileinfo']['_id']) - if os.path.exists(hash_path) and os.path.exists(uuid_path): - os.remove(uuid_path) - elif os.path.exists(uuid_path): - copy_file(uuid_path, hash_path) - os.remove(uuid_path) - - config.db[f['collection']].find_one_and_update( - {'_id': f['collection_id'], f['prefix'] + '.name': f['fileinfo']['name']}, - {'$unset': {f['prefix'] + '.$._id': ''}} - ) - - show_progress(i + 1, len(_files)) - - -def migrate_gears(): - log.info('Migrate gears...') - - _files = get_gears_files() - base = config.get_item('persistent', 'data_path') - - for i, f in enumerate(_files): - if f['exchange'].get('rootfs-id', ''): - continue - try: - f_uuid = str(uuid.uuid4()) - f_hash = 'v0-' + f['exchange']['rootfs-hash'].replace(':', '-') - f_path = os.path.join(base, util.path_from_hash(f_hash)) - log.debug('copy file %s to %s' % (f_path, util.path_from_uuid(f_uuid))) - copy_file(f_path, files.get_file_abs_path(f_uuid)) - - update_set = { - 'modified': datetime.datetime.utcnow(), - 'exchange.rootfs-id': f_uuid - } - - log.debug('update file in mongo: %s' % update_set) - # Update the gear with the newly generated UUID - config.db['gears'].find_one_and_update( - {'_id': f['gear_id']}, - {'$set': update_set} - ) - - log.debug('remove old file: %s' % f_path) - os.remove(f_path) - - show_progress(i + 1, len(_files)) - except Exception as e: - log.exception(e) - raise MigrationError('Wasn\'t able to migrate the \'%s\' gear (gear id: %s)' % - (f['gear_name'], f['gear_id']), e) - - -def rollback_gears(): - log.info('Rollback gears...') - - _files = get_gears_files() - base = config.get_item('persistent', 'data_path') - - for i, f in enumerate(_files): - if f['exchange'].get('rootfs-id', ''): - f_hash = 'v0-' + f['exchange']['rootfs-hash'].replace(':', '-') - hash_path = os.path.join(base, util.path_from_hash(f_hash)) - uuid_path = files.get_file_abs_path(f['exchange']['rootfs-id']) - if os.path.exists(hash_path) and os.path.exists(uuid_path): - os.remove(uuid_path) - elif os.path.exists(uuid_path): - copy_file(uuid_path, hash_path) - os.remove(uuid_path) - - config.db['gears'].find_one_and_update( - {'_id': f['gear_id']}, - {'$unset': {'exchange.rootfs-id': ''}} - ) - - show_progress(i + 1, len(_files)) - - -def remove_cas(): - """ - Remove CAS logic, generate UUID for the files and rename them on the filesystem, make a copy of the file if more - than one container using the same hash. - """ - - try: - migrate_collections() - migrate_gears() - except MigrationError as e: - log.exception(e) - rollback_collections() - rollback_gears() - exit(1) - finally: - cleanup_empty_folders() - - -if __name__ == '__main__': - remove_cas() diff --git a/requirements.txt b/requirements.txt index cdb601fee..996485403 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ django>=1.11.0,<1.12.0 elasticsearch==5.3.0 enum==0.4.6 +fs==2.0.16 jsonschema==2.6.0 Markdown==2.6.5 pymongo==3.2 diff --git a/tests/bin/run-tests-ubuntu.sh b/tests/bin/run-tests-ubuntu.sh index 8c7d94fa5..422ae34fd 100755 --- a/tests/bin/run-tests-ubuntu.sh +++ b/tests/bin/run-tests-ubuntu.sh @@ -82,6 +82,8 @@ function main() { export SCITRAN_PERSISTENT_DB_LOG_URI=${SCITRAN_PERSISTENT_DB_LOG_URI:-"mongodb://localhost:$SCITRAN_PERSISTENT_DB_PORT/logs"} export SCITRAN_PERSISTENT_PATH=`mktemp -d` export SCITRAN_PERSISTENT_DATA_PATH="$SCITRAN_PERSISTENT_PATH/data" + mkdir -p "$SCITRAN_PERSISTENT_DATA_PATH/v1" + export SCITRAN_PERSISTENT_FS_URL="osfs://$SCITRAN_PERSISTENT_PATH/data/v1" export SCITRAN_CORE_DRONE_SECRET=${SCITRAN_CORE_DRONE_SECRET:-$( openssl rand -base64 32 )} if ${RUN_LINT}; then diff --git a/tests/integration_tests/python/conftest.py b/tests/integration_tests/python/conftest.py index 80fb294fe..e98ec608d 100644 --- a/tests/integration_tests/python/conftest.py +++ b/tests/integration_tests/python/conftest.py @@ -4,15 +4,16 @@ import json import logging import os -import shutil import attrdict import bson +import fs.move +import fs.path import pymongo import pytest import requests -from api import files, util +from api import config, files, util # load required envvars w/ the same name SCITRAN_CORE_DRONE_SECRET = os.environ['SCITRAN_CORE_DRONE_SECRET'] @@ -214,29 +215,31 @@ def legacy_cas_file(as_admin, api_db, data_builder, randstr, file_form): """Yield legacy CAS file""" project = data_builder.create_project() file_name = '%s.csv' % randstr() - as_admin.post('/projects/' + project + '/files', files=file_form(file_name)) + file_content = randstr() + as_admin.post('/projects/' + project + '/files', files=file_form((file_name, file_content))) - file_id = api_db['projects'].find_one( + file_info = api_db['projects'].find_one( {'files.name': file_name} - )['files'][0]['_id'] + )['files'][0] + file_id = file_info['_id'] + file_hash = file_info['hash'] # verify cas backward compatibility api_db['projects'].find_one_and_update( {'files.name': file_name}, {'$unset': {'files.$._id': ''}} ) - data_path = os.getenv('SCITRAN_PERSISTENT_DATA_PATH') - file_hash = 'v0-sha384-03a9df0a5e6e21f5d25aacbce76d8a5d9f8de14f6654c31ab2daed961cfbfb236b1708063350856f752a5a094fb64321' - file_path = os.path.join(data_path, util.path_from_hash(file_hash)) - target_dir = os.path.dirname(file_path) - if not os.path.exists(target_dir): - os.makedirs(target_dir) - shutil.move(files.get_file_abs_path(file_id), file_path) - yield (project, file_name) + file_path = unicode(util.path_from_hash(file_hash)) + target_dir = fs.path.dirname(file_path) + if not config.legacy_fs.exists(target_dir): + config.legacy_fs.makedirs(target_dir) + fs.move.move_file(src_fs=config.fs, src_path=util.path_from_uuid(file_id), dst_fs=config.legacy_fs, dst_path=file_path) + + yield (project, file_name, file_content) # clean up - os.remove(file_path) - os.removedirs(target_dir) + config.legacy_fs.remove(file_path) + config.legacy_fs.removetree(target_dir) api_db['projects'].delete_one({'_id': project}) class BaseUrlSession(requests.Session): diff --git a/tests/integration_tests/python/test_download.py b/tests/integration_tests/python/test_download.py index 951d4febf..39053e067 100644 --- a/tests/integration_tests/python/test_download.py +++ b/tests/integration_tests/python/test_download.py @@ -4,7 +4,7 @@ import zipfile -def test_download(data_builder, file_form, as_admin, api_db, legacy_cas_file): +def test_download_k(data_builder, file_form, as_admin, api_db, legacy_cas_file): project = data_builder.create_project(label='project1') session = data_builder.create_session(label='session1', project=project) session2 = data_builder.create_session(label='session1', project=project) @@ -152,7 +152,7 @@ def test_download(data_builder, file_form, as_admin, api_db, legacy_cas_file): assert r.ok # test legacy cas file handling - (project_legacy, file_name_legacy) = legacy_cas_file + (project_legacy, file_name_legacy, file_content) = legacy_cas_file r = as_admin.post('/download', json={ 'optional': False, 'nodes': [ @@ -233,14 +233,15 @@ def test_filelist_download(data_builder, file_form, as_admin, legacy_cas_file): assert r.ok # test legacy cas file handling - (project, file_name) = legacy_cas_file + (project, file_name, file_content) = legacy_cas_file r = as_admin.get('/projects/' + project + '/files/' + file_name, params={'ticket': ''}) assert r.ok ticket = r.json()['ticket'] r = as_admin.get('/projects/' + project + '/files/' + file_name, params={'ticket': ticket}) - assert r.content == 'test\ndata\n' + assert r.ok + assert r.content == file_content def test_analysis_download(data_builder, file_form, as_admin, default_payload): diff --git a/tests/integration_tests/python/test_jobs.py b/tests/integration_tests/python/test_jobs.py index 57e09e919..7faa854fc 100644 --- a/tests/integration_tests/python/test_jobs.py +++ b/tests/integration_tests/python/test_jobs.py @@ -284,6 +284,8 @@ def test_jobs(data_builder, default_payload, as_public, as_user, as_admin, as_ro r = as_admin.post('/jobs/add', json=job6) assert r.status_code == 500 + assert as_root.delete('/gears/' + gear3).ok + # Attempt to set a malformed file reference as input job7 = copy.deepcopy(job_data) job7['inputs'] = { diff --git a/tests/integration_tests/python/test_migrate_storage.py b/tests/integration_tests/python/test_migrate_storage.py new file mode 100644 index 000000000..79900d7e3 --- /dev/null +++ b/tests/integration_tests/python/test_migrate_storage.py @@ -0,0 +1,274 @@ +import os +import sys + +import fs.move +import fs.path +import pytest + +from api import config, util +from bson.objectid import ObjectId + + +def move_file_to_legacy(src, dst): + target_dir = fs.path.dirname(dst) + if not config.legacy_fs.exists(target_dir): + config.legacy_fs.makedirs(target_dir) + fs.move.move_file(src_fs=config.fs, src_path=src, + dst_fs=config.legacy_fs, dst_path=dst) + +@pytest.fixture(scope='function') +def migrate_storage(mocker): + """Enable importing from `bin` and return `undelete.undelete`.""" + bin_path = os.path.join(os.getcwd(), 'bin', 'oneoffs') + mocker.patch('sys.path', [bin_path] + sys.path) + import migrate_storage + return migrate_storage + + +@pytest.yield_fixture(scope='function') +def gears_to_migrate(api_db, as_admin, randstr, file_form): + def gen_gear_meta(gear_name): + return {'gear': { + "version": '0.0.1', + "config": {}, + "name": gear_name, + "inputs": { + "file": { + "base": "file", + "description": "Any image." + } + }, + "maintainer": "Test", + "description": "Test", + "license": "Other", + "author": "Test", + "url": "http://example.example", + "label": "Test Gear", + "flywheel": "0", + "source": "http://example.example" + }} + + gears = [] + + gear_name_1 = randstr() + + file_name = '%s.tar.gz' % randstr() + file_content = randstr() + r = as_admin.post('/gears/temp', files=file_form((file_name, file_content), meta=gen_gear_meta(gear_name_1))) + gear_id_1 = r.json()['_id'] + + r = as_admin.get('/gears/' + gear_id_1) + gear_json_1 = r.json() + + file_hash__1 = 'v0-' + gear_json_1['exchange']['rootfs-hash'].replace(':', '-') + file_id_1 = gear_json_1['exchange']['rootfs-id'] + + file_path = unicode(util.path_from_hash(file_hash__1)) + target_dir = fs.path.dirname(file_path) + if not config.legacy_fs.exists(target_dir): + config.legacy_fs.makedirs(target_dir) + fs.move.move_file(src_fs=config.fs, src_path=util.path_from_uuid(file_id_1), + dst_fs=config.legacy_fs, dst_path=file_path) + + api_db['gears'].find_one_and_update( + {'_id': ObjectId(gear_id_1)}, + {'$unset': {'exchange.rootfs-id': ''}}) + + gears.append((gear_id_1, file_path)) + + gear_name_2 = randstr() + file_name = '%s.tar.gz' % randstr() + file_content = randstr() + r = as_admin.post('/gears/temp', files=file_form((file_name, file_content), meta=gen_gear_meta(gear_name_2))) + gear_id_2 = r.json()['_id'] + + + r = as_admin.get('/gears/' + gear_id_2) + gear_json_2 = r.json() + + file_id_2 = gear_json_2['exchange']['rootfs-id'] + + file_path = unicode(util.path_from_uuid(file_id_2)) + target_dir = fs.path.dirname(file_path) + if not config.legacy_fs.exists(target_dir): + config.legacy_fs.makedirs(target_dir) + fs.move.move_file(src_fs=config.fs, src_path=file_path, + dst_fs=config.legacy_fs, dst_path=file_path) + + gears.append((gear_id_2, file_path)) + + yield gears + + # clean up + gear_json_1 = api_db['gears'].find_one({'_id': ObjectId(gear_id_1)}) + gear_json_2 = api_db['gears'].find_one({'_id': ObjectId(gear_id_2)}) + files_to_delete = [] + files_to_delete.append(util.path_from_uuid(gear_json_1['exchange'].get('rootfs-id', ''))) + files_to_delete.append(util.path_from_uuid(gear_json_1['exchange'].get('rootfs-hash', ''))) + files_to_delete.append(util.path_from_uuid(gear_json_2['exchange'].get('rootfs-id', ''))) + + for f_path in files_to_delete: + try: + config.fs.remove(f_path) + except: + pass + + api_db['gears'].delete_one({'_id': ObjectId(gear_id_1)}) + api_db['gears'].delete_one({'_id': ObjectId(gear_id_2)}) + +@pytest.yield_fixture(scope='function') +def files_to_migrate(data_builder, api_db, as_admin, randstr, file_form): + # Create a project + project_id = data_builder.create_project() + + files = [] + + # Create a CAS file + file_name_1 = '%s.csv' % randstr() + file_content_1 = randstr() + as_admin.post('/projects/' + project_id + '/files', files=file_form((file_name_1, file_content_1))) + + file_info = api_db['projects'].find_one( + {'files.name': file_name_1} + )['files'][0] + file_id_1 = file_info['_id'] + file_hash_1 = file_info['hash'] + url_1 = '/projects/' + project_id + '/files/' + file_name_1 + + api_db['projects'].find_one_and_update( + {'files.name': file_name_1}, + {'$unset': {'files.$._id': ''}} + ) + + move_file_to_legacy(util.path_from_uuid(file_id_1), util.path_from_hash(file_hash_1)) + files.append((url_1, util.path_from_hash(file_hash_1))) + # Create an UUID file + file_name_2 = '%s.csv' % randstr() + file_content_2 = randstr() + as_admin.post('/projects/' + project_id + '/files', files=file_form((file_name_2, file_content_2))) + + file_info = api_db['projects'].find_one( + {'files.name': file_name_2} + )['files'][1] + file_id_2 = file_info['_id'] + url_2 = '/projects/' + project_id + '/files/' + file_name_2 + + move_file_to_legacy(util.path_from_uuid(file_id_2), util.path_from_uuid(file_id_2)) + files.append((url_2, util.path_from_uuid(file_id_2))) + + yield files + + # Clean up, get the files + files = api_db['projects'].find_one( + {'_id': ObjectId(project_id)} + )['files'] + # Delete the files + for f in files: + try: + config.fs.remove(util.path_from_uuid(f['_id'])) + except: + pass + +def test_migrate_collections(files_to_migrate, as_admin, migrate_storage): + """Testing collection migration""" + + # get file storing by hash in legacy storage + (url_1, file_path_1) = files_to_migrate[0] + # get ile storing by uuid in legacy storage + (url_2, file_path_2) = files_to_migrate[1] + + # get the ticket + r = as_admin.get(url_1, params={'ticket': ''}) + assert r.ok + ticket = r.json()['ticket'] + + # download the file + assert as_admin.get(url_1, params={'ticket': ticket}).ok + + # get the ticket + r = as_admin.get(url_2, params={'ticket': ''}) + assert r.ok + ticket = r.json()['ticket'] + + # download the file + assert as_admin.get(url_2, params={'ticket': ticket}).ok + # run the migration + migrate_storage.migrate_collections() + + # delete files from the legacy storage + config.legacy_fs.remove(file_path_1) + config.legacy_fs.remove(file_path_2) + + # get the files from the new filesystem + # get the ticket + r = as_admin.get(url_1, params={'ticket': ''}) + assert r.ok + ticket = r.json()['ticket'] + + # download the file + assert as_admin.get(url_1, params={'ticket': ticket}).ok + + # get the ticket + r = as_admin.get(url_2, params={'ticket': ''}) + assert r.ok + ticket = r.json()['ticket'] + + # download the file + assert as_admin.get(url_2, params={'ticket': ticket}).ok + +def test_migrate_collections_error(files_to_migrate, migrate_storage): + """Testing that the migration script throws an exception if it couldn't migrate a file""" + + # get file storing by hash in legacy storage + (url, file_path_1) = files_to_migrate[0] + # get the other file, so we can clean up + (_, file_path_2) = files_to_migrate[1] + + # delete the file + config.legacy_fs.remove(file_path_1) + + with pytest.raises(migrate_storage.MigrationError): + migrate_storage.migrate_collections() + + # clean up + config.legacy_fs.remove(file_path_2) + + +def test_migrate_gears(gears_to_migrate, as_admin, migrate_storage): + """Testing collection migration""" + + (gear_id_1, gear_file_path_1) = gears_to_migrate[0] + (gear_id_2, gear_file_path_2) = gears_to_migrate[1] + + # get gears before migration + assert as_admin.get('/gears/temp/' + gear_id_1).ok + assert as_admin.get('/gears/temp/' + gear_id_2).ok + + # run migration + migrate_storage.migrate_gears() + + # delete files from the legacy storage + config.legacy_fs.remove(gear_file_path_1) + config.legacy_fs.remove(gear_file_path_2) + + # get the files from the new filesystem + assert as_admin.get('/gears/temp/' + gear_id_1).ok + assert as_admin.get('/gears/temp/' + gear_id_2).ok + + +def test_migrate_gears_error(gears_to_migrate, migrate_storage): + """Testing that the migration script throws an exception if it couldn't migrate a file""" + + # get file storing by hash in legacy storage + (gear_id, gear_file_path_1) = gears_to_migrate[0] + # get the other file, so we can clean up + (_, gear_file_path_2) = gears_to_migrate[1] + + # delete the file + config.legacy_fs.remove(gear_file_path_1) + + with pytest.raises(migrate_storage.MigrationError): + migrate_storage.migrate_gears() + + # clean up + config.legacy_fs.remove(gear_file_path_2)