diff --git a/examples/httpgateway.py b/examples/httpgateway.py index c2cb3bdf..24ed6ba1 100644 --- a/examples/httpgateway.py +++ b/examples/httpgateway.py @@ -7,7 +7,7 @@ from aleph.sdk.chains.common import get_fallback_private_key from aleph.sdk.chains.ethereum import ETHAccount -from aleph.sdk.client import AuthenticatedAlephClient +from aleph.sdk.client import AuthenticatedAlephHttpClient app = web.Application() routes = web.RouteTableDef() @@ -32,7 +32,7 @@ async def source_post(request): return web.json_response( {"status": "error", "message": "unauthorized secret"} ) - async with AuthenticatedAlephClient( + async with AuthenticatedAlephHttpClient( account=app["account"], api_server="https://api2.aleph.im" ) as session: message, _status = await session.create_post( diff --git a/examples/metrics.py b/examples/metrics.py index 381db6be..c7beb5d2 100644 --- a/examples/metrics.py +++ b/examples/metrics.py @@ -12,7 +12,7 @@ from aleph_message.status import MessageStatus from aleph.sdk.chains.ethereum import get_fallback_account -from aleph.sdk.client import AuthenticatedAlephClient, AuthenticatedUserSessionSync +from aleph.sdk.client import AuthenticatedAlephClientSync, AuthenticatedAlephHttpClient from aleph.sdk.conf import settings @@ -54,7 +54,7 @@ def get_cpu_cores(): def send_metrics( - session: AuthenticatedUserSessionSync, metrics + session: AuthenticatedAlephClientSync, metrics ) -> Tuple[AlephMessage, MessageStatus]: return session.create_aggregate(key="metrics", content=metrics, channel="SYSINFO") @@ -70,7 +70,7 @@ def collect_metrics(): def main(): account = get_fallback_account() - with AuthenticatedAlephClient( + with AuthenticatedAlephHttpClient( account=account, api_server=settings.API_HOST ) as session: while True: diff --git a/examples/mqtt.py b/examples/mqtt.py index eff32121..e09b2c6f 100644 --- a/examples/mqtt.py +++ b/examples/mqtt.py @@ -10,7 +10,7 @@ from aleph.sdk.chains.common import get_fallback_private_key from aleph.sdk.chains.ethereum import ETHAccount -from aleph.sdk.client import AuthenticatedAlephClient +from aleph.sdk.client import AuthenticatedAlephHttpClient from aleph.sdk.conf import settings @@ -27,7 +27,7 @@ def get_input_data(value): def send_metrics(account, metrics): - with AuthenticatedAlephClient( + with AuthenticatedAlephHttpClient( account=account, api_server=settings.API_HOST ) as session: return session.create_aggregate( @@ -100,7 +100,7 @@ async def gateway( if not userdata["received"]: await client.reconnect() - async with AuthenticatedAlephClient( + async with AuthenticatedAlephHttpClient( account=account, api_server=settings.API_HOST ) as session: for key, value in state.items(): diff --git a/examples/store.py b/examples/store.py index 6ce5662c..b6c7a862 100644 --- a/examples/store.py +++ b/examples/store.py @@ -6,7 +6,7 @@ from aleph.sdk.chains.common import get_fallback_private_key from aleph.sdk.chains.ethereum import ETHAccount -from aleph.sdk.client import AuthenticatedAlephClient +from aleph.sdk.client import AuthenticatedAlephHttpClient from aleph.sdk.conf import settings DEFAULT_SERVER = "https://api2.aleph.im" @@ -23,7 +23,7 @@ async def print_output_hash(message: StoreMessage, status: MessageStatus): async def do_upload(account, engine, channel, filename=None, file_hash=None): - async with AuthenticatedAlephClient( + async with AuthenticatedAlephHttpClient( account=account, api_server=settings.API_HOST ) as session: print(filename, account.get_address()) diff --git a/src/aleph/sdk/__init__.py b/src/aleph/sdk/__init__.py index c66fe9d6..c14b64f6 100644 --- a/src/aleph/sdk/__init__.py +++ b/src/aleph/sdk/__init__.py @@ -1,6 +1,6 @@ from pkg_resources import DistributionNotFound, get_distribution -from aleph.sdk.client import AlephClient, AuthenticatedAlephClient +from aleph.sdk.client import AlephHttpClient, AuthenticatedAlephHttpClient try: # Change here if project is renamed and does not equal the package name @@ -11,4 +11,4 @@ finally: del get_distribution, DistributionNotFound -__all__ = ["AlephClient", "AuthenticatedAlephClient"] +__all__ = ["AlephHttpClient", "AuthenticatedAlephHttpClient"] diff --git a/src/aleph/sdk/client/__init__.py b/src/aleph/sdk/client/__init__.py index 8b0db873..9ee25dd9 100644 --- a/src/aleph/sdk/client/__init__.py +++ b/src/aleph/sdk/client/__init__.py @@ -1,12 +1,10 @@ -from .authenticated import AuthenticatedAlephClient, AuthenticatedUserSessionSync -from .base import BaseAlephClient, BaseAuthenticatedAlephClient -from .client import AlephClient, UserSessionSync +from .abstract import AlephClient, AuthenticatedAlephClient +from .authenticated_http import AuthenticatedAlephHttpClient +from .http import AlephHttpClient __all__ = [ - "BaseAlephClient", - "BaseAuthenticatedAlephClient", "AlephClient", "AuthenticatedAlephClient", - "UserSessionSync", - "AuthenticatedUserSessionSync", + "AlephHttpClient", + "AuthenticatedAlephHttpClient", ] diff --git a/src/aleph/sdk/client/base.py b/src/aleph/sdk/client/abstract.py similarity index 87% rename from src/aleph/sdk/client/base.py rename to src/aleph/sdk/client/abstract.py index e5cb9c0a..26a51221 100644 --- a/src/aleph/sdk/client/base.py +++ b/src/aleph/sdk/client/abstract.py @@ -25,14 +25,15 @@ from aleph_message.models.execution.program import Encoding from aleph_message.status import MessageStatus -from ..models.message import MessageFilter -from ..models.post import PostFilter, PostsResponse +from ..query.filters import MessageFilter, PostFilter +from ..query.responses import PostsResponse from ..types import GenericMessage, StorageEnum +from ..utils import Writable DEFAULT_PAGE_SIZE = 200 -class BaseAlephClient(ABC): +class AlephClient(ABC): @abstractmethod async def fetch_aggregate(self, address: str, key: str) -> Dict[str, Dict]: """ @@ -110,6 +111,44 @@ async def download_file( """ pass + async def download_file_ipfs( + self, + file_hash: str, + ) -> bytes: + """ + Get a file from the ipfs storage engine as raw bytes. + + Warning: Downloading large files can be slow. + + :param file_hash: The hash of the file to retrieve. + """ + raise NotImplementedError() + + async def download_file_ipfs_to_buffer( + self, + file_hash: str, + output_buffer: Writable[bytes], + ) -> None: + """ + Download a file from the storage engine and write it to the specified output buffer. + + :param file_hash: The hash of the file to retrieve. + :param output_buffer: The binary output buffer to write the file data to. + """ + raise NotImplementedError() + + async def download_file_to_buffer( + self, + file_hash: str, + output_buffer: Writable[bytes], + ) -> None: + """ + Download a file from the storage engine and write it to the specified output buffer. + :param file_hash: The hash of the file to retrieve. + :param output_buffer: Writable binary buffer. The file will be written to this buffer. + """ + raise NotImplementedError() + @abstractmethod async def get_messages( self, @@ -180,7 +219,7 @@ def watch_messages( pass -class BaseAuthenticatedAlephClient(BaseAlephClient): +class AuthenticatedAlephClient(AlephClient): @abstractmethod async def create_post( self, @@ -350,3 +389,19 @@ async def submit( :param sync: If true, waits for the message to be processed by the API server (Default: False) """ pass + + async def ipfs_push(self, content: Mapping) -> str: + """ + Push a file to IPFS. + + :param content: Content of the file to push + """ + raise NotImplementedError() + + async def storage_push(self, content: Mapping) -> str: + """ + Push arbitrary content as JSON to the storage service. + + :param content: The dict-like content to upload + """ + raise NotImplementedError() diff --git a/src/aleph/sdk/client/authenticated.py b/src/aleph/sdk/client/authenticated_http.py similarity index 71% rename from src/aleph/sdk/client/authenticated.py rename to src/aleph/sdk/client/authenticated_http.py index 093dbe76..6291467a 100644 --- a/src/aleph/sdk/client/authenticated.py +++ b/src/aleph/sdk/client/authenticated_http.py @@ -23,14 +23,20 @@ StoreMessage, ) from aleph_message.models.execution.base import Encoding +from aleph_message.models.execution.environment import ( + FunctionEnvironment, + MachineResources, +) +from aleph_message.models.execution.program import CodeContent, FunctionRuntime +from aleph_message.models.execution.volume import MachineVolume from aleph_message.status import MessageStatus from pydantic.json import pydantic_encoder from ..conf import settings from ..exceptions import BroadcastError, InvalidMessageError from ..types import Account, StorageEnum -from .base import BaseAuthenticatedAlephClient -from .client import AlephClient, UserSessionSync +from .abstract import AuthenticatedAlephClient +from .http import AlephHttpClient logger = logging.getLogger(__name__) @@ -41,174 +47,7 @@ magic = None # type:ignore -class AuthenticatedUserSessionSync(UserSessionSync): - async_session: "AuthenticatedAlephClient" - - def __init__(self, async_session: "AuthenticatedAlephClient"): - super().__init__(async_session=async_session) - - def ipfs_push(self, content: Mapping) -> str: - return self._wrap(self.async_session.ipfs_push, content=content) - - def storage_push(self, content: Mapping) -> str: - return self._wrap(self.async_session.storage_push, content=content) - - def ipfs_push_file(self, file_content: Union[str, bytes]) -> str: - return self._wrap(self.async_session.ipfs_push_file, file_content=file_content) - - def storage_push_file(self, file_content: Union[str, bytes]) -> str: - return self._wrap( - self.async_session.storage_push_file, file_content=file_content - ) - - def create_post( - self, - post_content, - post_type: str, - ref: Optional[str] = None, - address: Optional[str] = None, - channel: Optional[str] = None, - inline: bool = True, - storage_engine: StorageEnum = StorageEnum.storage, - sync: bool = False, - ) -> Tuple[PostMessage, MessageStatus]: - return self._wrap( - self.async_session.create_post, - post_content=post_content, - post_type=post_type, - ref=ref, - address=address, - channel=channel, - inline=inline, - storage_engine=storage_engine, - sync=sync, - ) - - def create_aggregate( - self, - key: str, - content: Mapping[str, Any], - address: Optional[str] = None, - channel: Optional[str] = None, - inline: bool = True, - sync: bool = False, - ) -> Tuple[AggregateMessage, MessageStatus]: - return self._wrap( - self.async_session.create_aggregate, - key=key, - content=content, - address=address, - channel=channel, - inline=inline, - sync=sync, - ) - - def create_store( - self, - address: Optional[str] = None, - file_content: Optional[bytes] = None, - file_path: Optional[Union[str, Path]] = None, - file_hash: Optional[str] = None, - guess_mime_type: bool = False, - ref: Optional[str] = None, - storage_engine: StorageEnum = StorageEnum.storage, - extra_fields: Optional[dict] = None, - channel: Optional[str] = None, - sync: bool = False, - ) -> Tuple[StoreMessage, MessageStatus]: - return self._wrap( - self.async_session.create_store, - address=address, - file_content=file_content, - file_path=file_path, - file_hash=file_hash, - guess_mime_type=guess_mime_type, - ref=ref, - storage_engine=storage_engine, - extra_fields=extra_fields, - channel=channel, - sync=sync, - ) - - def create_program( - self, - program_ref: str, - entrypoint: str, - runtime: str, - environment_variables: Optional[Mapping[str, str]] = None, - storage_engine: StorageEnum = StorageEnum.storage, - channel: Optional[str] = None, - address: Optional[str] = None, - sync: bool = False, - memory: Optional[int] = None, - vcpus: Optional[int] = None, - timeout_seconds: Optional[float] = None, - persistent: bool = False, - encoding: Encoding = Encoding.zip, - volumes: Optional[List[Mapping]] = None, - subscriptions: Optional[List[Mapping]] = None, - metadata: Optional[Mapping[str, Any]] = None, - ) -> Tuple[ProgramMessage, MessageStatus]: - return self._wrap( - self.async_session.create_program, - program_ref=program_ref, - entrypoint=entrypoint, - runtime=runtime, - environment_variables=environment_variables, - storage_engine=storage_engine, - channel=channel, - address=address, - sync=sync, - memory=memory, - vcpus=vcpus, - timeout_seconds=timeout_seconds, - persistent=persistent, - encoding=encoding, - volumes=volumes, - subscriptions=subscriptions, - metadata=metadata, - ) - - def forget( - self, - hashes: List[str], - reason: Optional[str], - storage_engine: StorageEnum = StorageEnum.storage, - channel: Optional[str] = None, - address: Optional[str] = None, - sync: bool = False, - ) -> Tuple[ForgetMessage, MessageStatus]: - return self._wrap( - self.async_session.forget, - hashes=hashes, - reason=reason, - storage_engine=storage_engine, - channel=channel, - address=address, - sync=sync, - ) - - def submit( - self, - content: Dict[str, Any], - message_type: MessageType, - channel: Optional[str] = None, - storage_engine: StorageEnum = StorageEnum.storage, - allow_inlining: bool = True, - sync: bool = False, - ) -> Tuple[AlephMessage, MessageStatus]: - return self._wrap( - self.async_session.submit, - content=content, - message_type=message_type, - channel=channel, - storage_engine=storage_engine, - allow_inlining=allow_inlining, - sync=sync, - ) - - -class AuthenticatedAlephClient(AlephClient, BaseAuthenticatedAlephClient): +class AuthenticatedAlephHttpClient(AlephHttpClient, AuthenticatedAlephClient): account: Account BROADCAST_MESSAGE_FIELDS = { @@ -239,10 +78,7 @@ def __init__( ) self.account = account - def __enter__(self) -> "AuthenticatedUserSessionSync": - return AuthenticatedUserSessionSync(async_session=self) - - async def __aenter__(self) -> "AuthenticatedAlephClient": + async def __aenter__(self) -> "AuthenticatedAlephHttpClient": return self async def ipfs_push(self, content: Mapping) -> str: @@ -580,40 +416,42 @@ async def create_program( # Trigger on HTTP calls. triggers = {"http": True, "persistent": persistent} + volumes: List[MachineVolume] = [ + MachineVolume.parse_obj(volume) for volume in volumes + ] + content = ProgramContent( - **{ - "type": "vm-function", - "address": address, - "allow_amend": False, - "code": { - "encoding": encoding, - "entrypoint": entrypoint, - "ref": program_ref, - "use_latest": True, - }, - "on": triggers, - "environment": { - "reproducible": False, - "internet": True, - "aleph_api": True, - }, - "variables": environment_variables, - "resources": { - "vcpus": vcpus, - "memory": memory, - "seconds": timeout_seconds, - }, - "runtime": { - "ref": runtime, - "use_latest": True, - "comment": "Official aleph.im runtime" - if runtime == settings.DEFAULT_RUNTIME_ID - else "", - }, - "volumes": volumes, - "time": time.time(), - "metadata": metadata, - } + type="vm-function", + address=address, + allow_amend=False, + code=CodeContent( + encoding=encoding, + entrypoint=entrypoint, + ref=program_ref, + use_latest=True, + ), + on=triggers, + environment=FunctionEnvironment( + reproducible=False, + internet=True, + aleph_api=True, + ), + variables=environment_variables, + resources=MachineResources( + vcpus=vcpus, + memory=memory, + seconds=timeout_seconds, + ), + runtime=FunctionRuntime( + ref=runtime, + use_latest=True, + comment="Official aleph.im runtime" + if runtime == settings.DEFAULT_RUNTIME_ID + else "", + ), + volumes=volumes, + time=time.time(), + metadata=metadata, ) # Ensure that the version of aleph-message used supports the field. diff --git a/src/aleph/sdk/client/client.py b/src/aleph/sdk/client/http.py similarity index 69% rename from src/aleph/sdk/client/client.py rename to src/aleph/sdk/client/http.py index ce5c43db..93cbe837 100644 --- a/src/aleph/sdk/client/client.py +++ b/src/aleph/sdk/client/http.py @@ -1,20 +1,7 @@ -import asyncio import json import logging -import queue -import threading from io import BytesIO -from typing import ( - Any, - AsyncIterable, - Awaitable, - Callable, - Dict, - Iterable, - List, - Optional, - Type, -) +from typing import Any, AsyncIterable, Dict, Iterable, List, Optional, Type import aiohttp from aleph_message import parse_message @@ -23,8 +10,8 @@ from ..conf import settings from ..exceptions import FileTooLarge, MessageNotFoundError, MultipleMessagesError -from ..models.message import MessageFilter, MessagesResponse -from ..models.post import Post, PostFilter, PostsResponse +from ..query.filters import MessageFilter, PostFilter +from ..query.responses import MessagesResponse, Post, PostsResponse from ..types import GenericMessage from ..utils import ( Writable, @@ -32,158 +19,12 @@ copy_async_readable_to_buffer, get_message_type_value, ) -from .base import BaseAlephClient -from .utils import T, wrap_async +from .abstract import AlephClient logger = logging.getLogger(__name__) -async def run_async_watcher( - *args, output_queue: queue.Queue, api_server: Optional[str], **kwargs -): - async with AlephClient(api_server=api_server) as session: - async for message in session.watch_messages(*args, **kwargs): - output_queue.put(message) - - -def watcher_thread(output_queue: queue.Queue, api_server: Optional[str], args, kwargs): - asyncio.run( - run_async_watcher( - output_queue=output_queue, api_server=api_server, *args, **kwargs - ) - ) - - -class UserSessionSync: - """ - A sync version of `UserSession`, used in sync code. - - This class is returned by the context manager of `UserSession` and is - intended as a wrapper around the methods of `UserSession` and not as a public class. - The methods are fully typed to enable static type checking, but most (all) methods - should look like this (using args and kwargs for brevity, but the functions should - be fully typed): - - >>> def func(self, *args, **kwargs): - >>> return self._wrap(self.async_session.func)(*args, **kwargs) - """ - - def __init__(self, async_session: "AlephClient"): - self.async_session = async_session - - def _wrap(self, method: Callable[..., Awaitable[T]], *args, **kwargs): - return wrap_async(method)(*args, **kwargs) - - def get_messages( - self, - page_size: int = 200, - page: int = 1, - message_filter: Optional[MessageFilter] = None, - ignore_invalid_messages: bool = True, - invalid_messages_log_level: int = logging.NOTSET, - ) -> MessagesResponse: - return self._wrap( - self.async_session.get_messages, - page_size=page_size, - page=page, - message_filter=message_filter, - ignore_invalid_messages=ignore_invalid_messages, - invalid_messages_log_level=invalid_messages_log_level, - ) - - # @async_wrapper - def get_message( - self, - item_hash: str, - message_type: Optional[Type[GenericMessage]] = None, - channel: Optional[str] = None, - ) -> GenericMessage: - return self._wrap( - self.async_session.get_message, - item_hash=item_hash, - message_type=message_type, - channel=channel, - ) - - def fetch_aggregate( - self, - address: str, - key: str, - ) -> Dict[str, Dict]: - return self._wrap(self.async_session.fetch_aggregate, address, key) - - def fetch_aggregates( - self, - address: str, - keys: Optional[Iterable[str]] = None, - ) -> Dict[str, Dict]: - return self._wrap(self.async_session.fetch_aggregates, address, keys) - - def get_posts( - self, - page_size: int = 200, - page: int = 1, - post_filter: Optional[PostFilter] = None, - ) -> PostsResponse: - return self._wrap( - self.async_session.get_posts, - page_size=page_size, - page=page, - post_filter=post_filter, - ) - - def download_file(self, file_hash: str) -> bytes: - return self._wrap(self.async_session.download_file, file_hash=file_hash) - - def download_file_ipfs(self, file_hash: str) -> bytes: - return self._wrap( - self.async_session.download_file_ipfs, - file_hash=file_hash, - ) - - def download_file_to_buffer( - self, file_hash: str, output_buffer: Writable[bytes] - ) -> None: - return self._wrap( - self.async_session.download_file_to_buffer, - file_hash=file_hash, - output_buffer=output_buffer, - ) - - def download_file_ipfs_to_buffer( - self, file_hash: str, output_buffer: Writable[bytes] - ) -> None: - return self._wrap( - self.async_session.download_file_ipfs_to_buffer, - file_hash=file_hash, - output_buffer=output_buffer, - ) - - def watch_messages( - self, - message_filter: Optional[MessageFilter] = None, - ) -> Iterable[AlephMessage]: - """ - Iterate over current and future matching messages synchronously. - - Runs the `watch_messages` asynchronous generator in a thread. - """ - output_queue: queue.Queue[AlephMessage] = queue.Queue() - thread = threading.Thread( - target=watcher_thread, - args=( - output_queue, - self.async_session.api_server, - message_filter, - {}, - ), - ) - thread.start() - while True: - yield output_queue.get() - - -class AlephClient(BaseAlephClient): +class AlephHttpClient(AlephClient): api_server: str http_session: aiohttp.ClientSession @@ -221,18 +62,7 @@ def __init__( ) ) - def __enter__(self) -> UserSessionSync: - return UserSessionSync(async_session=self) - - def __exit__(self, exc_type, exc_val, exc_tb): - close_fut = self.http_session.close() - try: - loop = asyncio.get_running_loop() - loop.run_until_complete(close_fut) - except RuntimeError: - asyncio.run(close_fut) - - async def __aenter__(self) -> "AlephClient": + async def __aenter__(self) -> "AlephHttpClient": return self async def __aexit__(self, exc_type, exc_val, exc_tb): diff --git a/src/aleph/sdk/client/utils.py b/src/aleph/sdk/client/utils.py deleted file mode 100644 index 8af8cec3..00000000 --- a/src/aleph/sdk/client/utils.py +++ /dev/null @@ -1,21 +0,0 @@ -import asyncio -from typing import Awaitable, Callable, TypeVar - -T = TypeVar("T") - - -def wrap_async(func: Callable[..., Awaitable[T]]) -> Callable[..., T]: - """Wrap an asynchronous function into a synchronous one, - for easy use in synchronous code. - """ - - def func_caller(*args, **kwargs): - loop = asyncio.get_event_loop() - return loop.run_until_complete(func(*args, **kwargs)) - - # Copy wrapped function interface: - func_caller.__doc__ = func.__doc__ - func_caller.__annotations__ = func.__annotations__ - func_caller.__defaults__ = func.__defaults__ - func_caller.__kwdefaults__ = func.__kwdefaults__ - return func_caller diff --git a/src/aleph/sdk/models/__init__.py b/src/aleph/sdk/models/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/aleph/sdk/models/common.py b/src/aleph/sdk/models/common.py deleted file mode 100644 index 6892d881..00000000 --- a/src/aleph/sdk/models/common.py +++ /dev/null @@ -1,8 +0,0 @@ -from pydantic import BaseModel - - -class PaginationResponse(BaseModel): - pagination_page: int - pagination_total: int - pagination_per_page: int - pagination_item: str diff --git a/src/aleph/sdk/models/post.py b/src/aleph/sdk/models/post.py deleted file mode 100644 index 16669faf..00000000 --- a/src/aleph/sdk/models/post.py +++ /dev/null @@ -1,123 +0,0 @@ -from datetime import datetime -from typing import Any, Dict, Iterable, List, Optional, Union - -from aleph_message.models import Chain, ItemHash, ItemType, MessageConfirmation -from pydantic import BaseModel, Field - -from .common import PaginationResponse -from .utils import _date_field_to_float, serialize_list - - -class Post(BaseModel): - """ - A post is a type of message that can be updated. Over the get_posts API - we get the latest version of a post. - """ - - chain: Chain = Field(description="Blockchain this post is associated with") - item_hash: ItemHash = Field(description="Unique hash for this post") - sender: str = Field(description="Address of the sender") - type: str = Field(description="Type of the POST message") - channel: Optional[str] = Field(description="Channel this post is associated with") - confirmed: bool = Field(description="Whether the post is confirmed or not") - content: Dict[str, Any] = Field(description="The content of the POST message") - item_content: Optional[str] = Field( - description="The POSTs content field as serialized JSON, if of type inline" - ) - item_type: ItemType = Field( - description="Type of the item content, usually 'inline' or 'storage' for POSTs" - ) - signature: Optional[str] = Field( - description="Cryptographic signature of the message by the sender" - ) - size: int = Field(description="Size of the post") - time: float = Field(description="Timestamp of the post") - confirmations: List[MessageConfirmation] = Field( - description="Number of confirmations" - ) - original_item_hash: ItemHash = Field(description="Hash of the original content") - original_signature: Optional[str] = Field( - description="Cryptographic signature of the original message" - ) - original_type: str = Field(description="The original type of the message") - hash: ItemHash = Field(description="Hash of the original item") - ref: Optional[Union[str, Any]] = Field( - description="Other message referenced by this one" - ) - - class Config: - allow_extra = False - - -class PostsResponse(PaginationResponse): - """Response from an aleph.im node API on the path /api/v0/posts.json""" - - posts: List[Post] - pagination_item = "posts" - - -class PostFilter: - """ - A collection of filters that can be applied on post queries. - - """ - - types: Optional[Iterable[str]] - refs: Optional[Iterable[str]] - addresses: Optional[Iterable[str]] - tags: Optional[Iterable[str]] - hashes: Optional[Iterable[str]] - channels: Optional[Iterable[str]] - chains: Optional[Iterable[str]] - start_date: Optional[Union[datetime, float]] - end_date: Optional[Union[datetime, float]] - - def __init__( - self, - types: Optional[Iterable[str]] = None, - refs: Optional[Iterable[str]] = None, - addresses: Optional[Iterable[str]] = None, - tags: Optional[Iterable[str]] = None, - hashes: Optional[Iterable[str]] = None, - channels: Optional[Iterable[str]] = None, - chains: Optional[Iterable[str]] = None, - start_date: Optional[Union[datetime, float]] = None, - end_date: Optional[Union[datetime, float]] = None, - ): - self.types = types - self.refs = refs - self.addresses = addresses - self.tags = tags - self.hashes = hashes - self.channels = channels - self.chains = chains - self.start_date = start_date - self.end_date = end_date - - def as_http_params(self) -> Dict[str, str]: - """Convert the filters into a dict that can be used by an `aiohttp` client - as `params` to build the HTTP query string. - """ - - partial_result = { - "types": serialize_list(self.types), - "refs": serialize_list(self.refs), - "addresses": serialize_list(self.addresses), - "tags": serialize_list(self.tags), - "hashes": serialize_list(self.hashes), - "channels": serialize_list(self.channels), - "chains": serialize_list(self.chains), - "startDate": _date_field_to_float(self.start_date), - "endDate": _date_field_to_float(self.end_date), - } - - # Ensure all values are strings. - result: Dict[str, str] = {} - - # Drop empty values - for key, value in partial_result.items(): - if value: - assert isinstance(value, str), f"Value must be a string: `{value}`" - result[key] = value - - return result diff --git a/src/aleph/sdk/models/utils.py b/src/aleph/sdk/models/utils.py deleted file mode 100644 index 818e4c70..00000000 --- a/src/aleph/sdk/models/utils.py +++ /dev/null @@ -1,20 +0,0 @@ -from datetime import datetime -from typing import Iterable, Optional, Union - - -def serialize_list(values: Optional[Iterable[str]]) -> Optional[str]: - if values: - return ",".join(values) - else: - return None - - -def _date_field_to_float(date: Optional[Union[datetime, float]]) -> Optional[float]: - if date is None: - return None - elif isinstance(date, float): - return date - elif hasattr(date, "timestamp"): - return date.timestamp() - else: - raise TypeError(f"Invalid type: `{type(date)}`") diff --git a/src/aleph/sdk/models/message.py b/src/aleph/sdk/query/filters.py similarity index 59% rename from src/aleph/sdk/models/message.py rename to src/aleph/sdk/query/filters.py index 4fdb295d..346f3a24 100644 --- a/src/aleph/sdk/models/message.py +++ b/src/aleph/sdk/query/filters.py @@ -1,17 +1,9 @@ from datetime import datetime -from typing import Dict, Iterable, List, Optional, Union +from typing import Dict, Iterable, Optional, Union -from aleph_message.models import AlephMessage, MessageType +from aleph_message.models import MessageType -from .common import PaginationResponse -from .utils import _date_field_to_float, serialize_list - - -class MessagesResponse(PaginationResponse): - """Response from an aleph.im node API on the path /api/v0/messages.json""" - - messages: List[AlephMessage] - pagination_item = "messages" +from ..utils import _date_field_to_float, serialize_list class MessageFilter: @@ -101,3 +93,70 @@ def as_http_params(self) -> Dict[str, str]: result[key] = value return result + + +class PostFilter: + """ + A collection of filters that can be applied on post queries. + + """ + + types: Optional[Iterable[str]] + refs: Optional[Iterable[str]] + addresses: Optional[Iterable[str]] + tags: Optional[Iterable[str]] + hashes: Optional[Iterable[str]] + channels: Optional[Iterable[str]] + chains: Optional[Iterable[str]] + start_date: Optional[Union[datetime, float]] + end_date: Optional[Union[datetime, float]] + + def __init__( + self, + types: Optional[Iterable[str]] = None, + refs: Optional[Iterable[str]] = None, + addresses: Optional[Iterable[str]] = None, + tags: Optional[Iterable[str]] = None, + hashes: Optional[Iterable[str]] = None, + channels: Optional[Iterable[str]] = None, + chains: Optional[Iterable[str]] = None, + start_date: Optional[Union[datetime, float]] = None, + end_date: Optional[Union[datetime, float]] = None, + ): + self.types = types + self.refs = refs + self.addresses = addresses + self.tags = tags + self.hashes = hashes + self.channels = channels + self.chains = chains + self.start_date = start_date + self.end_date = end_date + + def as_http_params(self) -> Dict[str, str]: + """Convert the filters into a dict that can be used by an `aiohttp` client + as `params` to build the HTTP query string. + """ + + partial_result = { + "types": serialize_list(self.types), + "refs": serialize_list(self.refs), + "addresses": serialize_list(self.addresses), + "tags": serialize_list(self.tags), + "hashes": serialize_list(self.hashes), + "channels": serialize_list(self.channels), + "chains": serialize_list(self.chains), + "startDate": _date_field_to_float(self.start_date), + "endDate": _date_field_to_float(self.end_date), + } + + # Ensure all values are strings. + result: Dict[str, str] = {} + + # Drop empty values + for key, value in partial_result.items(): + if value: + assert isinstance(value, str), f"Value must be a string: `{value}`" + result[key] = value + + return result diff --git a/src/aleph/sdk/query/responses.py b/src/aleph/sdk/query/responses.py new file mode 100644 index 00000000..5fb91804 --- /dev/null +++ b/src/aleph/sdk/query/responses.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +from typing import Any, Dict, List, Optional, Union + +from aleph_message.models import ( + AlephMessage, + Chain, + ItemHash, + ItemType, + MessageConfirmation, +) +from pydantic import BaseModel, Field + + +class Post(BaseModel): + """ + A post is a type of message that can be updated. Over the get_posts API + we get the latest version of a post. + """ + + chain: Chain = Field(description="Blockchain this post is associated with") + item_hash: ItemHash = Field(description="Unique hash for this post") + sender: str = Field(description="Address of the sender") + type: str = Field(description="Type of the POST message") + channel: Optional[str] = Field(description="Channel this post is associated with") + confirmed: bool = Field(description="Whether the post is confirmed or not") + content: Dict[str, Any] = Field(description="The content of the POST message") + item_content: Optional[str] = Field( + description="The POSTs content field as serialized JSON, if of type inline" + ) + item_type: ItemType = Field( + description="Type of the item content, usually 'inline' or 'storage' for POSTs" + ) + signature: Optional[str] = Field( + description="Cryptographic signature of the message by the sender" + ) + size: int = Field(description="Size of the post") + time: float = Field(description="Timestamp of the post") + confirmations: List[MessageConfirmation] = Field( + description="Number of confirmations" + ) + original_item_hash: ItemHash = Field(description="Hash of the original content") + original_signature: Optional[str] = Field( + description="Cryptographic signature of the original message" + ) + original_type: str = Field(description="The original type of the message") + hash: ItemHash = Field(description="Hash of the original item") + ref: Optional[Union[str, Any]] = Field( + description="Other message referenced by this one" + ) + + class Config: + allow_extra = False + + +class PaginationResponse(BaseModel): + pagination_page: int + pagination_total: int + pagination_per_page: int + pagination_item: str + + +class PostsResponse(PaginationResponse): + """Response from an aleph.im node API on the path /api/v0/posts.json""" + + posts: List[Post] + pagination_item = "posts" + + +class MessagesResponse(PaginationResponse): + """Response from an aleph.im node API on the path /api/v0/messages.json""" + + messages: List[AlephMessage] + pagination_item = "messages" diff --git a/src/aleph/sdk/utils.py b/src/aleph/sdk/utils.py index be56cc2c..810d7326 100644 --- a/src/aleph/sdk/utils.py +++ b/src/aleph/sdk/utils.py @@ -1,10 +1,11 @@ import errno import logging import os +from datetime import datetime from enum import Enum from pathlib import Path from shutil import make_archive -from typing import Protocol, Tuple, Type, TypeVar, Union +from typing import Iterable, Optional, Protocol, Tuple, Type, TypeVar, Union from zipfile import BadZipFile, ZipFile from aleph_message.models import MessageType @@ -116,3 +117,21 @@ def enum_as_str(obj: Union[str, Enum]) -> str: return obj.value return obj + + +def serialize_list(values: Optional[Iterable[str]]) -> Optional[str]: + if values: + return ",".join(values) + else: + return None + + +def _date_field_to_float(date: Optional[Union[datetime, float]]) -> Optional[float]: + if date is None: + return None + elif isinstance(date, float): + return date + elif hasattr(date, "timestamp"): + return date.timestamp() + else: + raise TypeError(f"Invalid type: `{type(date)}`") diff --git a/tests/integration/itest_aggregates.py b/tests/integration/itest_aggregates.py index 5c5d4648..31f5c6cc 100644 --- a/tests/integration/itest_aggregates.py +++ b/tests/integration/itest_aggregates.py @@ -3,7 +3,7 @@ import pytest -from aleph.sdk.client import AuthenticatedAlephClient +from aleph.sdk.client import AuthenticatedAlephHttpClient from aleph.sdk.types import Account from tests.integration.toolkit import try_until @@ -18,7 +18,7 @@ async def create_aggregate_on_target( receiver_node: str, channel="INTEGRATION_TESTS", ): - async with AuthenticatedAlephClient( + async with AuthenticatedAlephHttpClient( account=account, api_server=emitter_node ) as tx_session: aggregate_message, message_status = await tx_session.create_aggregate( @@ -38,7 +38,7 @@ async def create_aggregate_on_target( assert aggregate_message.content.address == account.get_address() assert aggregate_message.content.content == content - async with AuthenticatedAlephClient( + async with AuthenticatedAlephHttpClient( account=account, api_server=receiver_node ) as rx_session: aggregate_from_receiver = await try_until( diff --git a/tests/integration/itest_forget.py b/tests/integration/itest_forget.py index a457bdda..a6cc141c 100644 --- a/tests/integration/itest_forget.py +++ b/tests/integration/itest_forget.py @@ -4,8 +4,8 @@ import pytest from aleph_message.models import ItemHash -from aleph.sdk.client import AuthenticatedAlephClient -from aleph.sdk.models.message import MessageFilter +from aleph.sdk.client import AuthenticatedAlephHttpClient +from aleph.sdk.query.filters import MessageFilter from aleph.sdk.types import Account from .config import REFERENCE_NODE, TARGET_NODE, TEST_CHANNEL @@ -15,7 +15,7 @@ async def create_and_forget_post( account: Account, emitter_node: str, receiver_node: str, channel=TEST_CHANNEL ) -> Tuple[ItemHash, ItemHash]: - async with AuthenticatedAlephClient( + async with AuthenticatedAlephHttpClient( account=account, api_server=emitter_node ) as tx_session: post_message, message_status = await tx_session.create_post( @@ -24,7 +24,7 @@ async def create_and_forget_post( channel="INTEGRATION_TESTS", ) - async with AuthenticatedAlephClient( + async with AuthenticatedAlephHttpClient( account=account, api_server=receiver_node ) as rx_session: await try_until( @@ -38,7 +38,7 @@ async def create_and_forget_post( post_hash = post_message.item_hash reason = "This well thought-out content offends me!" - async with AuthenticatedAlephClient( + async with AuthenticatedAlephHttpClient( account=account, api_server=emitter_node ) as tx_session: forget_message, forget_status = await tx_session.forget( @@ -52,7 +52,7 @@ async def create_and_forget_post( forget_hash = forget_message.item_hash # Wait until the message is forgotten - async with AuthenticatedAlephClient( + async with AuthenticatedAlephHttpClient( account=account, api_server=receiver_node ) as rx_session: await try_until( @@ -104,7 +104,7 @@ async def test_forget_a_forget_message(fixture_account): post_hash, forget_hash = await create_and_forget_post( fixture_account, TARGET_NODE, REFERENCE_NODE ) - async with AuthenticatedAlephClient( + async with AuthenticatedAlephHttpClient( account=fixture_account, api_server=TARGET_NODE ) as tx_session: forget_message, forget_status = await tx_session.forget( @@ -118,7 +118,7 @@ async def test_forget_a_forget_message(fixture_account): # wait 5 seconds await asyncio.sleep(5) - async with AuthenticatedAlephClient( + async with AuthenticatedAlephHttpClient( account=fixture_account, api_server=REFERENCE_NODE ) as rx_session: get_forget_message_response = await try_until( diff --git a/tests/integration/itest_posts.py b/tests/integration/itest_posts.py index 59b96b1b..77b87b7f 100644 --- a/tests/integration/itest_posts.py +++ b/tests/integration/itest_posts.py @@ -1,7 +1,7 @@ import pytest -from aleph.sdk.client import AuthenticatedAlephClient -from aleph.sdk.models.message import MessageFilter +from aleph.sdk.client import AuthenticatedAlephHttpClient +from aleph.sdk.query.filters import MessageFilter from tests.integration.toolkit import has_messages, try_until from .config import REFERENCE_NODE, TARGET_NODE @@ -11,7 +11,7 @@ async def create_message_on_target(account, emitter_node: str, receiver_node: st """ Create a POST message on the target node, then fetch it from the reference node. """ - async with AuthenticatedAlephClient( + async with AuthenticatedAlephHttpClient( account=account, api_server=emitter_node ) as tx_session: post_message, message_status = await tx_session.create_post( @@ -20,7 +20,7 @@ async def create_message_on_target(account, emitter_node: str, receiver_node: st channel="INTEGRATION_TESTS", ) - async with AuthenticatedAlephClient( + async with AuthenticatedAlephHttpClient( account=account, api_server=receiver_node ) as rx_session: responses = await try_until( diff --git a/tests/integration/toolkit.py b/tests/integration/toolkit.py index 62a5f841..a72f9d6f 100644 --- a/tests/integration/toolkit.py +++ b/tests/integration/toolkit.py @@ -2,7 +2,7 @@ import time from typing import Awaitable, Callable, TypeVar -from aleph.sdk.models.message import MessagesResponse +from aleph.sdk.query.responses import MessagesResponse T = TypeVar("T") diff --git a/tests/unit/test_asynchronous.py b/tests/unit/test_asynchronous.py index 8973263b..dbccbaa6 100644 --- a/tests/unit/test_asynchronous.py +++ b/tests/unit/test_asynchronous.py @@ -11,14 +11,14 @@ ) from aleph_message.status import MessageStatus -from aleph.sdk.client import AuthenticatedAlephClient +from aleph.sdk.client import AuthenticatedAlephHttpClient from aleph.sdk.types import Account, StorageEnum @pytest.fixture def mock_session_with_post_success( ethereum_account: Account, -) -> AuthenticatedAlephClient: +) -> AuthenticatedAlephHttpClient: class MockResponse: def __init__(self, sync: bool): self.sync = sync @@ -49,7 +49,7 @@ async def text(self): sync=kwargs.get("sync", False) ) - client = AuthenticatedAlephClient( + client = AuthenticatedAlephHttpClient( account=ethereum_account, api_server="http://localhost" ) client.http_session = http_session diff --git a/tests/unit/test_asynchronous_get.py b/tests/unit/test_asynchronous_get.py index 7773f9b2..f5e0c800 100644 --- a/tests/unit/test_asynchronous_get.py +++ b/tests/unit/test_asynchronous_get.py @@ -5,13 +5,13 @@ import pytest from aleph_message.models import MessagesResponse, MessageType -from aleph.sdk import AlephClient +from aleph.sdk import AlephHttpClient from aleph.sdk.conf import settings -from aleph.sdk.models.message import MessageFilter -from aleph.sdk.models.post import PostFilter, PostsResponse +from aleph.sdk.query.filters import MessageFilter, PostFilter +from aleph.sdk.query.responses import PostsResponse -def make_mock_session(get_return_value: Dict[str, Any]) -> AlephClient: +def make_mock_session(get_return_value: Dict[str, Any]) -> AlephHttpClient: class MockResponse: async def __aenter__(self): return self @@ -35,7 +35,7 @@ def get(self, *_args, **_kwargs): http_session = MockHttpSession() - client = AlephClient(api_server="http://localhost") + client = AlephHttpClient(api_server="http://localhost") client.http_session = http_session return client @@ -70,7 +70,7 @@ async def test_fetch_aggregates(): @pytest.mark.asyncio async def test_get_posts(): - async with AlephClient(api_server=settings.API_HOST) as session: + async with AlephHttpClient(api_server=settings.API_HOST) as session: response: PostsResponse = await session.get_posts( page_size=2, post_filter=PostFilter( @@ -84,7 +84,7 @@ async def test_get_posts(): @pytest.mark.asyncio async def test_get_messages(): - async with AlephClient(api_server=settings.API_HOST) as session: + async with AlephHttpClient(api_server=settings.API_HOST) as session: response: MessagesResponse = await session.get_messages( page_size=2, message_filter=MessageFilter( diff --git a/tests/unit/test_download.py b/tests/unit/test_download.py index b16e0d75..377e6d41 100644 --- a/tests/unit/test_download.py +++ b/tests/unit/test_download.py @@ -1,6 +1,6 @@ import pytest -from aleph.sdk import AlephClient +from aleph.sdk import AlephHttpClient from aleph.sdk.conf import settings as sdk_settings @@ -13,7 +13,7 @@ ) @pytest.mark.asyncio async def test_download(file_hash: str, expected_size: int): - async with AlephClient(api_server=sdk_settings.API_HOST) as client: + async with AlephHttpClient(api_server=sdk_settings.API_HOST) as client: file_content = await client.download_file(file_hash) # File is 5B file_size = len(file_content) assert file_size == expected_size @@ -28,7 +28,7 @@ async def test_download(file_hash: str, expected_size: int): ) @pytest.mark.asyncio async def test_download_ipfs(file_hash: str, expected_size: int): - async with AlephClient(api_server=sdk_settings.API_HOST) as client: + async with AlephHttpClient(api_server=sdk_settings.API_HOST) as client: file_content = await client.download_file_ipfs(file_hash) # 5817703 B FILE file_size = len(file_content) assert file_size == expected_size diff --git a/tests/unit/test_synchronous_get.py b/tests/unit/test_synchronous_get.py deleted file mode 100644 index e3b8c0ed..00000000 --- a/tests/unit/test_synchronous_get.py +++ /dev/null @@ -1,20 +0,0 @@ -from aleph_message.models import MessagesResponse, MessageType - -from aleph.sdk import AlephClient -from aleph.sdk.conf import settings -from aleph.sdk.models.message import MessageFilter - - -def test_get_post_messages(): - with AlephClient(api_server=settings.API_HOST) as session: - response: MessagesResponse = session.get_messages( - page_size=2, - message_filter=MessageFilter( - message_types=[MessageType.post], - ), - ) - - messages = response.messages - assert len(messages) > 1 - for message in messages: - assert message.type == MessageType.post