Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds dedicated API methods for fetching records #244

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 182 additions & 37 deletions apps/utils/keystone.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,31 @@
"""Utility functions used across various wrappers for interacting with keystone"""

from datetime import date
from typing import Any, Dict, Literal, Optional, Union
from typing import Any, Dict, List, Literal, Optional, Union

import requests
from requests import HTTPError

# Custom types
ResponseContentType = Literal['json', 'text', 'content']
ParsedResponseContent = Union[Dict[str, Any], str, bytes]
RecordQueryResult = Union[None, dict, List[dict]]

# Default API configuratipn
# CRC specific API settings
KEYSTONE_URL = "https://keystone.crc.pitt.edu"
KEYSTONE_AUTH_ENDPOINT = 'authentication/new/'
RAWUSAGE_RESET_DATE = date.fromisoformat('2024-05-07')


class KeystoneApi:
"""API client for submitting requests to the Keystone API"""

timeout = 10
authentication_endpoint = 'authentication/new/'
allocations_endpoint = 'allocations/allocations/'
requests_endpoint = 'allocations/requests/'
research_group_endpoint = 'users/researchgroups/'
users_endpoint = 'users/users/'

def __init__(self, base_url: str = KEYSTONE_URL) -> None:
"""Initializes the KeystoneApi class with the base URL of the API.

Expand All @@ -27,25 +35,30 @@ def __init__(self, base_url: str = KEYSTONE_URL) -> None:

self.base_url = base_url
self._token: Optional[str] = None
self._timeout: int = 10

def login(self, username: str, password: str, endpoint: str = KEYSTONE_AUTH_ENDPOINT) -> None:
def login(
self,
username: str,
password: str,
timeout: int = timeout
) -> None:
"""Logs in to the Keystone API and caches the JWT token.

Args:
username: The username for authentication
password: The password for authentication
endpoint: The API endpoint to send the authentication request to
timeout: Number of seconds before he requests times out

Raises:
requests.HTTPError: If the login request fails
"""

response = requests.post(
f"{self.base_url}/{endpoint}",
f"{self.base_url}/{self.authentication_endpoint}",
json={"username": username, "password": password},
timeout=self._timeout
timeout=timeout
)

response.raise_for_status()
self._token = response.json().get("access")

Expand Down Expand Up @@ -95,14 +108,19 @@ def _process_response(response: requests.Response, response_type: ResponseConten
raise ValueError(f"Invalid response type: {response_type}")

def get(
self, endpoint: str, params: Optional[Dict[str, Any]] = None, response_type: ResponseContentType = 'json'
self,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
response_type: ResponseContentType = 'json',
timeout: int = timeout
) -> ParsedResponseContent:
"""Makes a GET request to the specified endpoint.

Args:
endpoint: The API endpoint to send the GET request to
params: The query parameters to include in the request
response_type: The expected response type ('json', 'text', 'content')
timeout: Number of seconds before he requests times out

Returns:
The response from the API in the specified format
Expand All @@ -111,23 +129,30 @@ def get(
requests.HTTPError: If the GET request fails
"""

response = requests.get(f"{self.base_url}/{endpoint}",
headers=self._get_headers(),
params=params,
timeout=self._timeout
)
response = requests.get(
f"{self.base_url}/{endpoint}",
headers=self._get_headers(),
params=params,
timeout=timeout
)

response.raise_for_status()
return self._process_response(response, response_type)

def post(
self, endpoint: str, data: Optional[Dict[str, Any]] = None, response_type: ResponseContentType = 'json'
self,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
response_type: ResponseContentType = 'json',
timeout: int = timeout
) -> ParsedResponseContent:
"""Makes a POST request to the specified endpoint.

Args:
endpoint: The API endpoint to send the POST request to
data: The JSON data to include in the POST request
response_type: The expected response type ('json', 'text', 'content')
timeout: Number of seconds before he requests times out

Returns:
The response from the API in the specified format
Expand All @@ -136,23 +161,30 @@ def post(
requests.HTTPError: If the POST request fails
"""

response = requests.post(f"{self.base_url}/{endpoint}",
headers=self._get_headers(),
json=data,
timeout=self._timeout
)
response = requests.post(
f"{self.base_url}/{endpoint}",
headers=self._get_headers(),
json=data,
timeout=timeout
)

response.raise_for_status()
return self._process_response(response, response_type)

def patch(
self, endpoint: str, data: Optional[Dict[str, Any]] = None, response_type: ResponseContentType = 'json'
self,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
response_type: ResponseContentType = 'json',
timeout: int = timeout
) -> ParsedResponseContent:
"""Makes a PATCH request to the specified endpoint.

Args:
endpoint: The API endpoint to send the PATCH request to
data: The JSON data to include in the PATCH request
response_type: The expected response type ('json', 'text', 'content')
timeout: Number of seconds before he requests times out

Returns:
The response from the API in the specified format
Expand All @@ -161,23 +193,30 @@ def patch(
requests.HTTPError: If the PATCH request fails
"""

response = requests.patch(f"{self.base_url}/{endpoint}",
headers=self._get_headers(),
json=data,
timeout=self._timeout
)
response = requests.patch(
f"{self.base_url}/{endpoint}",
headers=self._get_headers(),
json=data,
timeout=timeout
)

response.raise_for_status()
return self._process_response(response, response_type)

def put(
self, endpoint: str, data: Optional[Dict[str, Any]] = None, response_type: ResponseContentType = 'json'
self,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
response_type: ResponseContentType = 'json',
timeout: int = timeout
) -> ParsedResponseContent:
"""Makes a PUT request to the specified endpoint.

Args:
endpoint: The API endpoint to send the PUT request to
data: The JSON data to include in the PUT request
response_type: The expected response type ('json', 'text', 'content')
timeout: Number of seconds before he requests times out

Returns:
The response from the API in the specified format
Expand All @@ -186,20 +225,28 @@ def put(
requests.HTTPError: If the PUT request fails
"""

response = requests.put(f"{self.base_url}/{endpoint}",
headers=self._get_headers(),
json=data,
timeout=self._timeout
)
response = requests.put(
f"{self.base_url}/{endpoint}",
headers=self._get_headers(),
json=data,
timeout=timeout
)

response.raise_for_status()
return self._process_response(response, response_type)

def delete(self, endpoint: str, response_type: ResponseContentType = 'json') -> ParsedResponseContent:
def delete(
self,
endpoint: str,
response_type: ResponseContentType = 'json',
timeout: int = timeout
) -> ParsedResponseContent:
"""Makes a DELETE request to the specified endpoint.

Args:
endpoint: The API endpoint to send the DELETE request to
response_type: The expected response type ('json', 'text', 'content')
timeout: Number of seconds before he requests times out

Returns:
The response from the API in the specified format
Expand All @@ -208,13 +255,111 @@ def delete(self, endpoint: str, response_type: ResponseContentType = 'json') ->
requests.HTTPError: If the DELETE request fails
"""

response = requests.delete(f"{self.base_url}/{endpoint}",
headers=self._get_headers(),
timeout=self._timeout
)
response = requests.delete(f"{self.base_url}/{endpoint}", headers=self._get_headers(), timeout=timeout)
response.raise_for_status()
return self._process_response(response, response_type)

def _get_records(
self,
endpoint: str,
pk: Optional[int] = None,
filters: Optional[dict] = None,
timeout=timeout
) -> RecordQueryResult:
"""Fetch data from the specified endpoint with optional primary key and filters

Args:
endpoint: The API endpoint to send the GET request to
pk: Optional primary key to fetch a specific record
filters: Optional query parameters to include in the request

Returns:
The response from the API in JSON format
"""

if pk is not None:
endpoint = f'{endpoint}{pk}/'

try:
return self.get(endpoint, params=filters, timeout=timeout)

except HTTPError as excep:
if excep.response.status_code == 404:
return None

raise

def get_allocation(
self,
pk: Optional[int] = None,
timeout: int = timeout,
**filters
) -> RecordQueryResult:
"""Return allocation data from the API

Args:
pk: Optional primary key to fetch a specific allocation
timeout: Number of seconds before he requests times out
**filters: Additional filters to apply to the request

Returns:
A list of allocation records
"""

return self._get_records(self.allocations_endpoint, pk, filters, timeout)

def get_allocation_request(
self,
pk: Optional[int] = None,
timeout: int = timeout,
**filters
) -> RecordQueryResult:
"""Return allocation request data from the API

Args:
pk: Optional primary key to fetch a specific allocation request
timeout: Number of seconds before he requests times out
**filters: Additional filters to apply to the request

Returns:
A list of allocation request records
"""

return self._get_records(self.requests_endpoint, pk, filters, timeout)

def get_research_group(
self,
pk: Optional[int] = None,
timeout: int = timeout,
**filters
) -> RecordQueryResult:
"""Return research group data from the API

Args:
pk: Optional primary key to fetch a specific research group
timeout: Number of seconds before he requests times out
**filters: Additional filters to apply to the request

Returns:
A list of research group records
"""

return self._get_records(self.research_group_endpoint, pk, filters, timeout)

def get_user(self, pk: Optional[int] = None, timeout: int = timeout, **filters) -> RecordQueryResult:
"""Return user data from the API

Args:
pk: Optional primary key to fetch a specific user
timeout: Number of seconds before he requests times out
**filters: Additional filters to apply to the request

Returns:
A list of user records
"""

return self._get_records(self.users_endpoint, pk, filters, timeout)


def get_request_allocations(keystone_client: KeystoneApi, request_pk: int) -> dict:
"""Get All Allocation information from keystone for a given request"""
Expand Down