Skip to content

Commit

Permalink
Add generic storage
Browse files Browse the repository at this point in the history
  • Loading branch information
davidfarkas93 committed Jan 25, 2018
1 parent 270b8b1 commit dca93f9
Show file tree
Hide file tree
Showing 17 changed files with 812 additions and 504 deletions.
11 changes: 11 additions & 0 deletions api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import datetime
import elasticsearch

from fs import open_fs

from . import util
from .dao.dbutil import try_replace_one

Expand Down Expand Up @@ -63,6 +65,7 @@
'db_server_selection_timeout': '3000',
'data_path': os.path.join(os.path.dirname(__file__), '../persistent/data'),
'elasticsearch_host': 'localhost:9200',
'fs_url': 'osfs://' + os.path.join(os.path.dirname(__file__), '../persistent/data')
},
}

Expand Down Expand Up @@ -323,3 +326,11 @@ def mongo_pipeline(table, pipeline):

def get_auth(auth_type):
return get_config()['auth'][auth_type]


# Storage configuration
fs = open_fs(__config['persistent']['fs_url'])
signed_url_available = ['GCSFS']

legacy_fs = open_fs('osfs://' + __config['persistent']['data_path'])
support_legacy_fs = True
49 changes: 23 additions & 26 deletions api/download.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import os
import bson
import pytz
import os.path
import tarfile
import datetime
import cStringIO

import fs.path
import fs.errors

from .web import base
from .web.request import AccessType
from . import config, files, util, validators
Expand All @@ -31,7 +33,7 @@ def _filter_check(property_filter, property_values):

class Download(base.RequestHandler):

def _append_targets(self, targets, cont_name, container, prefix, total_size, total_cnt, data_path, filters):
def _append_targets(self, targets, cont_name, container, prefix, total_size, total_cnt, filters):
for f in container.get('files', []):
if filters:
filtered = True
Expand All @@ -45,10 +47,9 @@ def _append_targets(self, targets, cont_name, container, prefix, total_size, tot
break
if filtered:
continue
file_path = files.get_file_abs_path(f.get('_id', ''))
if not util.file_exists(file_path):
file_path = os.path.join(data_path, util.path_from_hash(f['hash']))
if util.file_exists(file_path): # silently skip missing files

file_path, _ = files.get_valid_file(f)
if file_path: # silently skip missing files
if cont_name == 'analyses':
targets.append((file_path, prefix + '/' + ('input' if f.get('input') else 'output') + '/' + f['name'], cont_name, str(container.get('_id')), f['size']))
else:
Expand All @@ -60,7 +61,6 @@ def _append_targets(self, targets, cont_name, container, prefix, total_size, tot
return total_size, total_cnt

def _bulk_preflight_archivestream(self, file_refs):
data_path = config.get_item('persistent', 'data_path')
arc_prefix = self.get_param('prefix', 'scitran')
file_cnt = 0
total_size = 0
Expand Down Expand Up @@ -96,11 +96,8 @@ def _bulk_preflight_archivestream(self, file_refs):
log.warn("Expected file {} on Container {} {} to exist but it is missing. File will be skipped in download.".format(filename, cont_name, cont_id))
continue

file_id = file_obj.get('_id', '')
file_path = files.get_file_abs_path(file_id) if file_id else ''
if not file_id or not os.path.exists(file_path):
file_path = os.path.join(data_path, util.path_from_hash(file_obj['hash']))
if os.path.exists(file_path): # silently skip missing files
file_path, _ = files.get_valid_file(file_obj)
if file_path: # silently skip missing files
targets.append((file_path, cont_name+'/'+cont_id+'/'+file_obj['name'], cont_name, cont_id, file_obj['size']))
total_size += file_obj['size']
file_cnt += 1
Expand All @@ -115,7 +112,6 @@ def _bulk_preflight_archivestream(self, file_refs):


def _preflight_archivestream(self, req_spec, collection=None):
data_path = config.get_item('persistent', 'data_path')
arc_prefix = self.get_param('prefix', 'scitran')
file_cnt = 0
total_size = 0
Expand All @@ -140,7 +136,7 @@ def _preflight_archivestream(self, req_spec, collection=None):
continue

prefix = '/'.join([arc_prefix, project['group'], project['label']])
total_size, file_cnt = self._append_targets(targets, 'projects', project, prefix, total_size, file_cnt, data_path, req_spec.get('filters'))
total_size, file_cnt = self._append_targets(targets, 'projects', project, prefix, total_size, file_cnt, req_spec.get('filters'))

sessions = config.db.sessions.find({'project': item_id, 'deleted': {'$exists': False}}, ['label', 'files', 'uid', 'timestamp', 'timezone', 'subject'])
session_dict = {session['_id']: session for session in sessions}
Expand All @@ -161,19 +157,19 @@ def _preflight_archivestream(self, req_spec, collection=None):
for code, subject in subject_dict.iteritems():
subject_prefix = self._path_from_container(prefix, subject, ids_of_paths, code)
subject_prefixes[code] = subject_prefix
total_size, file_cnt = self._append_targets(targets, 'subjects', subject, subject_prefix, total_size, file_cnt, data_path, req_spec.get('filters'))
total_size, file_cnt = self._append_targets(targets, 'subjects', subject, subject_prefix, total_size, file_cnt, req_spec.get('filters'))

for session in session_dict.itervalues():
subject_code = session['subject'].get('code', 'unknown_subject')
subject = subject_dict[subject_code]
session_prefix = self._path_from_container(subject_prefixes[subject_code], session, ids_of_paths, session["_id"])
session_prefixes[session['_id']] = session_prefix
total_size, file_cnt = self._append_targets(targets, 'sessions', session, session_prefix, total_size, file_cnt, data_path, req_spec.get('filters'))
total_size, file_cnt = self._append_targets(targets, 'sessions', session, session_prefix, total_size, file_cnt, req_spec.get('filters'))

for acq in acquisitions:
session = session_dict[acq['session']]
acq_prefix = self._path_from_container(session_prefixes[session['_id']], acq, ids_of_paths, acq['_id'])
total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, acq_prefix, total_size, file_cnt, data_path, req_spec.get('filters'))
total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, acq_prefix, total_size, file_cnt, req_spec.get('filters'))


elif item['level'] == 'session':
Expand All @@ -188,7 +184,7 @@ def _preflight_archivestream(self, req_spec, collection=None):
if not subject.get('code'):
subject['code'] = 'unknown_subject'
prefix = self._path_from_container(self._path_from_container(project['group'] + '/' + project['label'], subject, ids_of_paths, subject["code"]), session, ids_of_paths, session['_id'])
total_size, file_cnt = self._append_targets(targets, 'sessions', session, prefix, total_size, file_cnt, data_path, req_spec.get('filters'))
total_size, file_cnt = self._append_targets(targets, 'sessions', session, prefix, total_size, file_cnt, req_spec.get('filters'))

# If the param `collection` holding a collection id is not None, filter out acquisitions that are not in the collection
a_query = {'session': item_id, 'deleted': {'$exists': False}}
Expand All @@ -198,7 +194,7 @@ def _preflight_archivestream(self, req_spec, collection=None):

for acq in acquisitions:
acq_prefix = self._path_from_container(prefix, acq, ids_of_paths, acq['_id'])
total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, acq_prefix, total_size, file_cnt, data_path, req_spec.get('filters'))
total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, acq_prefix, total_size, file_cnt, req_spec.get('filters'))

elif item['level'] == 'acquisition':
acq = config.db.acquisitions.find_one(base_query, ['session', 'label', 'files', 'uid', 'timestamp', 'timezone'])
Expand All @@ -214,7 +210,7 @@ def _preflight_archivestream(self, req_spec, collection=None):

project = config.db.projects.find_one({'_id': session['project']}, ['group', 'label'])
prefix = self._path_from_container(self._path_from_container(self._path_from_container(project['group'] + '/' + project['label'], subject, ids_of_paths, subject['code']), session, ids_of_paths, session["_id"]), acq, ids_of_paths, acq['_id'])
total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, prefix, total_size, file_cnt, data_path, req_spec.get('filters'))
total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, prefix, total_size, file_cnt, req_spec.get('filters'))

elif item['level'] == 'analysis':
analysis = config.db.analyses.find_one(base_query, ['parent', 'label', 'files', 'uid', 'timestamp'])
Expand All @@ -224,7 +220,7 @@ def _preflight_archivestream(self, req_spec, collection=None):
continue
prefix = self._path_from_container("", analysis, ids_of_paths, util.sanitize_string_to_filename(analysis['label']))
filename = 'analysis_' + util.sanitize_string_to_filename(analysis['label']) + '.tar'
total_size, file_cnt = self._append_targets(targets, 'analyses', analysis, prefix, total_size, file_cnt, data_path, req_spec.get('filters'))
total_size, file_cnt = self._append_targets(targets, 'analyses', analysis, prefix, total_size, file_cnt, req_spec.get('filters'))

if len(targets) > 0:
if not filename:
Expand Down Expand Up @@ -282,8 +278,9 @@ def archivestream(self, ticket):
stream = cStringIO.StringIO()
with tarfile.open(mode='w|', fileobj=stream) as archive:
for filepath, arcpath, cont_name, cont_id, _ in ticket['target']:
yield archive.gettarinfo(filepath, arcpath).tobuf()
with open(filepath, 'rb') as fd:
file_system = files.get_fs_by_file_path(filepath)
with file_system.open(filepath, 'rb') as fd:
yield archive.gettarinfo(fileobj=fd, arcname=arcpath).tobuf()
chunk = ''
for chunk in iter(lambda: fd.read(CHUNKSIZE), ''): # pylint: disable=cell-var-from-loop
yield chunk
Expand All @@ -293,11 +290,11 @@ def archivestream(self, ticket):
yield stream.getvalue() # get tar stream trailer
stream.close()

def symlinkarchivestream(self, ticket, data_path):
def symlinkarchivestream(self, ticket):
for filepath, arcpath, cont_name, cont_id, _ in ticket['target']:
t = tarfile.TarInfo(name=arcpath)
t.type = tarfile.SYMTYPE
t.linkname = os.path.relpath(filepath, data_path)
t.linkname = fs.path.relpath(filepath)
yield t.tobuf()
self.log_user_access(AccessType.download_file, cont_name=cont_name, cont_id=cont_id, filename=os.path.basename(arcpath), multifile=True, origin_override=ticket['origin']) # log download
stream = cStringIO.StringIO()
Expand All @@ -316,7 +313,7 @@ def download(self):
if ticket['ip'] != self.request.client_addr:
self.abort(400, 'ticket not for this source IP')
if self.get_param('symlinks'):
self.response.app_iter = self.symlinkarchivestream(ticket, config.get_item('persistent', 'data_path'))
self.response.app_iter = self.symlinkarchivestream(ticket)
else:
self.response.app_iter = self.archivestream(ticket)
self.response.headers['Content-Type'] = 'application/octet-stream'
Expand Down
Loading

0 comments on commit dca93f9

Please sign in to comment.