Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Datalake Files endpoints #3

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 141 additions & 2 deletions missioncontrol_client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,38 @@
import argparse
import json
import datetime
import os
import socket
from urllib.parse import urljoin
from uuid import uuid4

import pytz
import requests
from datalake import GzippingFile


class UTC(object):
# TODO factor this out into a proper library

def __init__(self, d="now"):
if d == "now":
self._d = datetime.datetime.utcnow()
elif isinstance(d, datetime.datetime):
if not d.tzinfo:
# naive, assume UTC
d = d.replace(tzinfo=pytz.utc)
elif d.tzinfo == pytz.utc:
pass
else:
d = d.astimezone(pytz.utc)
self._d = d
else:
# TODO convert strings, etc
raise NotImplementedError()

@property
def iso(self):
d = self._d.isoformat('T', 'microseconds')
return d.replace("+00:00", "Z")


class MCAPI(object):
Expand All @@ -13,6 +42,17 @@ def __init__(self, mc_base, jwt=None):
self.s = requests.session()
self.jwt = None

@classmethod
def from_environ(cls, ignore_ssl=False):
dtkav marked this conversation as resolved.
Show resolved Hide resolved
mc_api = cls(os.environ['MC_BASE'])
if ignore_ssl:
mc_api.s.verify = False
if os.environ.get('MC_JWT'):
mc_api.login(jwt=os.environ['MC_JWT'])
else:
mc_api.login(username=os.environ['MC_USERNAME'], password=os.environ['MC_PASSWORD'])
return mc_api

def get(self, path, *args, **kwargs):
r = self.s.get(urljoin(self.mc_base, path), *args, **kwargs)
r.raise_for_status()
Expand All @@ -22,6 +62,16 @@ def getj(self, path, *args, **kwargs):
r = self.get(path, *args, **kwargs)
return r.json()

def post(self, path, *args, **kwargs):
r = self.s.post(urljoin(self.mc_base, path), *args, **kwargs)
r.raise_for_status()
return r

def postj(self, path, *args, **kwargs):
ret = self.s.post(urljoin(self.mc_base, path), *args, **kwargs)
ret.raise_for_status()
return ret.json()

def put(self, path, *args, **kwargs):
r = self.s.put(urljoin(self.mc_base, path), *args, **kwargs)
r.raise_for_status()
Expand Down Expand Up @@ -145,6 +195,94 @@ def get_pass_task_stack(self, uuid, **kwargs):
json=kwargs
)

def get_latest_file(self, what, where, **kwargs):
return self.getj(
f'/api/v0/files/latest/{what}/{where}/',
params=kwargs
)

def get_files(self, what, **kwargs):
kwargs.update({"what": what})
return self.getj(
f'/api/v0/files/search/',
params=kwargs
)

def get_files_by_cid(self, cid, **kwargs):
return self.getj(
f'/api/v0/files/cid/{cid}/',
params=kwargs
)

def get_files_by_work_id(self, work_id, **kwargs):
return self.getj(
f'/api/v0/files/work-id/{work_id}/',
params=kwargs
)

def get_file(self, uuid):
return self.getj(
f'/api/v0/files/{uuid}/'
)

def download_file(self, uuid):
return self.get(
f'/api/v0/files/{uuid}/data/'
)

def download_cid(self, cid):
return self.get(
f'/api/v0/raw-file/{cid}/data/'
)

def upload_file(self, path, what, uuid=None, where=None, start=None,
end=None, work_id=None, content_type=None):

if uuid is None:
uuid = str(uuid4())

if start is None:
start = UTC("now").iso
else:
start = UTC(start).iso

if where is None:
where = socket.getfqdn()

f = GzippingFile.from_filename(
path,
what=what,
where=where,
start=start,
work_id=work_id
)

# get signed upload
signed = self.postj(
f'/api/v0/files/presign/',
json=f.metadata
)

file_tuple = ("file", f)
if content_type is not None:
file_tuple += (content_type,)

# upload file
if "url" in signed:
signed["fields"]["Content-Encoding"] = "gzip"
resp = requests.post(
signed["url"],
data=signed["fields"],
files=[file_tuple]
)
resp.raise_for_status()

# upload metadata
return self.putj(
f'/api/v0/files/{uuid}/',
json=f.metadata
)

def login(self, username=None, password=None, jwt=None):
if username is not None and jwt is not None:
raise ValueError("Can't give both a username and a jwt")
Expand Down Expand Up @@ -213,6 +351,7 @@ def handle_default_args(args):
args.mc_api.login(username=args.username, password=args.password)

def from_environ(ignore_ssl=False):
# deprecated, use MCAPI.from_environ()
mc_api = MCAPI(os.environ['MC_BASE'])
if ignore_ssl:
mc_api.s.verify = False
Expand All @@ -221,4 +360,4 @@ def from_environ(ignore_ssl=False):
else:
mc_api.login(username=os.environ['MC_USERNAME'], password=os.environ['MC_PASSWORD'])

return mc_api
return mc_api