Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
tdstein committed Feb 14, 2024
1 parent b5d8a18 commit 9deeb9d
Show file tree
Hide file tree
Showing 6 changed files with 323 additions and 292 deletions.
6 changes: 2 additions & 4 deletions src/posit/connect/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from .auth import Auth
from .config import Config
from .users import Users
from .users import LazyUsers, Users


@contextmanager
Expand All @@ -32,8 +32,6 @@ def create_client(


class Client:
users: Users

def __init__(
self,
api_key: Optional[str] = None,
Expand All @@ -56,7 +54,7 @@ def __init__(
session.hooks["response"].append(hooks.handle_errors)

# Initialize the Users instance.
self.users = Users(config=config, session=session)
self.users: Users = LazyUsers(config=config, session=session)
# Store the Session object.
self._session = session

Expand Down
81 changes: 81 additions & 0 deletions src/posit/connect/resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from __future__ import annotations


from abc import ABC, abstractmethod
from typing import Generic, Iterator, Optional, TypeVar, List, TypedDict


class Resource(TypedDict):
pass


T = TypeVar("T", bound=Resource)


class Resources(ABC, Generic[T], Iterator[T]):
def __init__(self, data: List[T]) -> None:
super().__init__()
self.data = data

@abstractmethod
def find(self, *args, **kwargs) -> Resources[T]:
raise NotImplementedError()

@abstractmethod
def find_one(self, *args, **kwargs) -> Optional[T]:
raise NotImplementedError()

@abstractmethod
def get(self, id: str) -> T:
raise NotImplementedError()

def __iter__(self) -> Iterator[T]:
self.index = 0
return self

def __next__(self) -> T:
if self.index >= len(self.data):
raise StopIteration

v = self.data[self.index]
self.index += 1
return v

def to_pandas(self):
try:
from pandas import DataFrame

return DataFrame(self)
except ImportError:
return None


class LazyResources(Resources[T]):
def __init__(self, data: List[T] = []) -> None:
super().__init__(data)
self.data = data
self.exhausted = False
self.index = 0

@abstractmethod
def fetch(self, index) -> tuple[Optional[Iterator[T]], bool]:
raise NotImplementedError()

def __iter__(self) -> Iterator[T]:
self.index = 0
return self

def __next__(self) -> T:
if self.index >= len(self.data):
if self.exhausted:
raise StopIteration

results, self.exhausted = self.fetch(self.index)
if not results:
raise StopIteration

self.data += results

v = self.data[self.index]
self.index += 1
return v
44 changes: 44 additions & 0 deletions src/posit/connect/resources_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import Iterator, List
from .resources import Resource, Resources, LazyResources


class FakeResource(Resource):
pass


class FakeResources(Resources[FakeResource]):
def __init__(self, data: List[FakeResource] = []) -> None:
super().__init__(data)

def __next__(self) -> FakeResource:
return super().__next__()

def find(self) -> Resources[FakeResource]:
return self

def find_one(self) -> FakeResource | None:
return None

def get(self, _: str) -> FakeResource:
return


class TestResources:
def test(self):
resources = FakeResources()
assert resources == resources.find()
assert resources.find_one() is None
assert resources.get(None) is None


class FakeLazyResources(FakeResources, LazyResources):
def fetch(self, index) -> tuple[Iterator | None, bool]:
return [FakeResource], len(self.data) > 0


class TestFakeLazyResources:
def test(self):
resources = FakeLazyResources()
assert resources == resources.find()
assert resources.find_one() is None
assert resources.get(None) is None
171 changes: 41 additions & 130 deletions src/posit/connect/users.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
from __future__ import annotations

import os

from datetime import datetime
from typing import Iterator, Callable

from requests import Session
from typing import Iterator, List, Optional, TypedDict

from .config import Config
from .endpoints import get_users
from .errors import ClientError
from .resources import LazyResources, Resource, Resources

_MAX_PAGE_SIZE = 500


class User(TypedDict, total=False):
class User(Resource):
guid: str
email: str
username: str
Expand All @@ -27,132 +25,45 @@ class User(TypedDict, total=False):
locked: bool


class Users(Iterator[User]):
def __init__(
self, config: Config, session: Session, *, users: Optional[List[User]] = None
):
self._config = config
self._session = session

self._cached_users: List[User] = users or []
self._exhausted: bool = users is not None
self._index: int = 0
self._page_number: int = 0

def __iter__(self) -> Iterator[User]:
"""
Initialize the iterator by resetting the index to the beginning of the cached user list.
class Users(Resources[User]):
def find(self, filter: Callable[[User], bool] = lambda _: True) -> Resources[User]:
return Users([user for user in self if filter(user)])

Returns:
Iterator: The initialized iterator object.
"""
# Reset the index to the beginning of the cached user list.
self._index = 0
# Return the iterator object.
return self
def find_one(self, filter: Callable[[User], bool] = lambda _: True) -> User | None:
return next(user for user in self if filter(user))

def __next__(self):
"""Retrieve the next user in the list. If necessary, fetch a new page of users beforehand.
def get(self, id: str) -> User:
return next(user for user in self if user["guid"] == id)

Raises:
StopIteration: If the end of the user list is reached.
StopIteration: If no users are returned for the current page.

Returns:
dict: Information about the next user.
"""
# Check if the current index is greater than or equal to the length of the cached user list.
if self._index >= len(self._cached_users):
# Check if the endpoint was exhausted on the previous iteration
if self._exhausted:
# Stop iteration if the index is not aligned with page boundaries.
raise StopIteration
# Fetch the current page of users.
results, exhausted = get_users(
self._config.endpoint, self._session, self._page_number
class LazyUsers(Users, LazyResources[User]):
def __init__(
self, config: Config, session: Session, *, page_size=_MAX_PAGE_SIZE
) -> None:
super().__init__()
self.config = config
self.session = session
self.page_size = page_size

def fetch(self, index) -> tuple[Iterator[User] | None, bool]:
if (index % _MAX_PAGE_SIZE) != 0:
raise ValueError(
f"index ({index}) must be a multiple of page size ({_MAX_PAGE_SIZE})"
)
# Mark if the endpoint is exhausted for the next iteration
self._exhausted = exhausted
# Increment the page counter for the next iteration.
self._page_number += 1
# Append the fetched users to the cached user list.
self._cached_users += [User(**result) for result in results]
# Check if the fetched results list is empty.
if not results:
# Stop iteration if no users are returned for the current page.
raise StopIteration
# Get the current user by index.
user = self._cached_users[self._index]
# Increment the index for the next iteration.
self._index += 1
# Return the current user.
return user

def find(self, params: User) -> Users:
"""
Finds users that match the provided filter conditions.
Args:
params (User): Filter conditions.
Returns:
Users: A list of users matching the filter conditions.
"""
found: List[User] = []
for user in self:
# Check if the items in params are subset of user's items.
if params.items() <= user.items():
# Append the user to the found list.
found.append(user)
return Users(self._config, self._session, users=found)

def find_one(self, params: User) -> Optional[User]:
"""
Finds one User matching the provided parameters.
Keyword Arguments:
params -- Dictionary of filter conditions (default: {}).
Returns:
A matching User if found, otherwise None.
Note:
This method first checks if 'guid' is present in params. If so, it attempts a direct lookup using self.get().
If an error with code '4' is encountered (indicating no matching user), it logs a warning and returns None.
If 'guid' is not provided, it performs a normal search using self.find() and return the first value found.
"""
# Check if 'guid' is provided in params
if "guid" in params:
try:
# Attempt direct lookup
self.get(params["guid"])
except ClientError as e:
# Check for error code '4' (no matching user)
if e.error_code == 4:
import logging

logging.warning(e)
# Return None if user not found
return None
raise e

# If 'guid' not provided perform a normal search
return next(iter(self.find(params)), None)

def get(self, guid: str) -> User:
"""Gets a user by guid.
Arguments:
guid -- the users guid.
Returns:
A :class:`User`.
"""
endpoint = os.path.join(self._config.endpoint, "v1/users", guid)
response = self._session.get(endpoint)
return User(**response.json())

def to_pandas_data_frame(self): # noqa
import pandas as pd

return pd.DataFrame((user for user in self))
# Construct the endpoint URL.
endpoint = os.path.join(self.config.endpoint, "v1/users")
# Define the page number using 1-based indexing.
page_number = int(index / self.page_size) + 1
# Define query parameters for pagination.
params = {"page_number": page_number, "page_size": self.page_size}
# Send a GET request to the endpoint with the specified parameters.
response = self.session.get(endpoint, params=params)
# Convert response to dict
json = dict(response.json())
# Parse the JSON response and extract the results.
results = json["results"]
# Mark exhausted if the result size is less than the maximum page size.
exhausted = len(results) < self.page_size
# Create User objects from the results and return them as a list.
users: Iteartor[User] = [User(result) for result in results]
return (users, exhausted)
Loading

0 comments on commit 9deeb9d

Please sign in to comment.