Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ignore] just for testing: Pitch/connect renkulab and openbis datasets #549

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
6 changes: 4 additions & 2 deletions DEVELOPING.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ function if you prefer to keep your favorite shell.
## Running Tests

You can run style checks using `make style_checks`.
To run the test test suite, use `make tests` (you likely need to run in the devcontainer for this to work, as it needs
some surrounding services to run).
To run the test suite, use `make tests` (you likely need to run in the devcontainer for this to work, as it needs some
surrounding services to run).
* Run a specific test e.g.: `poetry run pytest test/bases/renku_data_services/data_api/test_data_connectors.py::test_create_openbis_data_connector`
* Also run tests marked with `@pytest.mark.myskip`: `PYTEST_FORCE_RUN_MYSKIPS=1 make tests`

## Migrations

Expand Down
51 changes: 49 additions & 2 deletions components/renku_data_services/data_connectors/blueprints.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
"""Data connectors blueprint."""

from dataclasses import dataclass
from datetime import datetime
from typing import Any

from sanic import Request
from sanic.response import HTTPResponse, JSONResponse
from sanic_ext import validate
from ulid import ULID

from renku_data_services import base_models
from renku_data_services import base_models, errors
from renku_data_services.base_api.auth import (
authenticate,
only_authenticated,
Expand All @@ -31,6 +32,7 @@
DataConnectorSecretRepository,
)
from renku_data_services.storage.rclone import RCloneValidator
from renku_data_services.utils.core import get_openbis_pat


@dataclass(kw_only=True)
Expand Down Expand Up @@ -310,10 +312,55 @@ async def _patch_secrets(
user: base_models.APIUser,
data_connector_id: ULID,
body: apispec.DataConnectorSecretPatchList,
validator: RCloneValidator,
) -> JSONResponse:
unsaved_secrets = validate_data_connector_secrets_patch(put=body)
data_connector = await self.data_connector_repo.get_data_connector(
user=user, data_connector_id=data_connector_id
)
storage = data_connector.storage
provider = validator.providers[storage.storage_type]
sensitive_lookup = [o.name for o in provider.options if o.sensitive]
for secret in unsaved_secrets:
if secret.name in sensitive_lookup:
continue
raise errors.ValidationError(
message=f"The '{secret.name}' property is not marked sensitive and can not be saved in the secret "
f"storage."
)
expiration_timestamp = None

if storage.storage_type == "openbis":

async def openbis_transform_session_token_to_pat() -> (
tuple[list[models.DataConnectorSecretUpdate], datetime]
):
if len(unsaved_secrets) == 1 and unsaved_secrets[0].name == "session_token":
if unsaved_secrets[0].value is not None:
try:
openbis_pat = await get_openbis_pat(
storage.configuration["host"], unsaved_secrets[0].value
)
return (
[models.DataConnectorSecretUpdate(name="session_token", value=openbis_pat[0])],
openbis_pat[1],
)
except Exception as e:
raise errors.ProgrammingError(message=str(e))
raise errors.ValidationError(message="The openBIS session token must be a string value.")

raise errors.ValidationError(message="The openBIS storage has only one secret: session_token")

(
unsaved_secrets,
expiration_timestamp,
) = await openbis_transform_session_token_to_pat()

secrets = await self.data_connector_secret_repo.patch_data_connector_secrets(
user=user, data_connector_id=data_connector_id, secrets=unsaved_secrets
user=user,
data_connector_id=data_connector_id,
secrets=unsaved_secrets,
expiration_timestamp=expiration_timestamp,
)
return validated_json(
apispec.DataConnectorSecretsList, [self._dump_data_connector_secret(secret) for secret in secrets]
Expand Down
12 changes: 10 additions & 2 deletions components/renku_data_services/data_connectors/db.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Adapters for data connectors database classes."""

from collections.abc import AsyncIterator, Callable
from datetime import datetime
from typing import TypeVar

from cryptography.hazmat.primitives.asymmetric import rsa
Expand Down Expand Up @@ -554,7 +555,11 @@ async def get_data_connector_secrets(
return [secret.dump() for secret in secrets]

async def patch_data_connector_secrets(
self, user: base_models.APIUser, data_connector_id: ULID, secrets: list[models.DataConnectorSecretUpdate]
self,
user: base_models.APIUser,
data_connector_id: ULID,
secrets: list[models.DataConnectorSecretUpdate],
expiration_timestamp: datetime | None,
) -> list[models.DataConnectorSecret]:
"""Create, update or remove data connector secrets."""
if user.id is None:
Expand Down Expand Up @@ -598,7 +603,9 @@ async def patch_data_connector_secrets(

if data_connector_secret_orm := existing_secrets_as_dict.get(name):
data_connector_secret_orm.secret.update(
encrypted_value=encrypted_value, encrypted_key=encrypted_key
encrypted_value=encrypted_value,
encrypted_key=encrypted_key,
expiration_timestamp=expiration_timestamp,
)
else:
secret_orm = secrets_schemas.SecretORM(
Expand All @@ -607,6 +614,7 @@ async def patch_data_connector_secrets(
encrypted_value=encrypted_value,
encrypted_key=encrypted_key,
kind=SecretKind.storage,
expiration_timestamp=expiration_timestamp,
)
data_connector_secret_orm = schemas.DataConnectorSecretORM(
name=name,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""add secret expiration timestamp

Revision ID: 57facc53ae84
Revises: 08ac2714e8e2
Create Date: 2024-11-28 10:31:05.683682

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "57facc53ae84"
down_revision = "08ac2714e8e2"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"secrets", sa.Column("expiration_timestamp", sa.DateTime(timezone=True), nullable=True), schema="secrets"
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("secrets", "expiration_timestamp", schema="secrets")
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
Expand Up @@ -210,20 +210,29 @@ def get_manifest_patch(
return patches

def config_string(self, name: str) -> str:
"""Convert configuration oblect to string representation.
"""Convert configuration object to string representation.

Needed to create RClone compatible INI files.
"""
if not self.configuration:
raise ValidationError("Missing configuration for cloud storage")

# Transform configuration for polybox or switchDrive
# TODO Use RCloneValidator.get_real_configuration(...) instead.
# Transform configuration for polybox, switchDrive or openBIS
storage_type = self.configuration.get("type", "")
access = self.configuration.get("provider", "")

if storage_type == "polybox" or storage_type == "switchDrive":
self.configuration["type"] = "webdav"
self.configuration["provider"] = ""
elif storage_type == "s3" and access == "Switch":
# Switch is a fake provider we add for users, we need to replace it since rclone itself
# doesn't know it
self.configuration["provider"] = "Other"
elif storage_type == "openbis":
self.configuration["type"] = "sftp"
self.configuration["port"] = "2222"
self.configuration["user"] = "?"
self.configuration["pass"] = self.configuration.pop("session_token", self.configuration["pass"])

if access == "shared" and storage_type == "polybox":
self.configuration["url"] = "https://polybox.ethz.ch/public.php/webdav/"
Expand All @@ -240,10 +249,6 @@ def config_string(self, name: str) -> str:
user_identifier = public_link.split("/")[-1]
self.configuration["user"] = user_identifier

if self.configuration["type"] == "s3" and self.configuration.get("provider", None) == "Switch":
# Switch is a fake provider we add for users, we need to replace it since rclone itself
# doesn't know it
self.configuration["provider"] = "Other"
parser = ConfigParser()
parser.add_section(name)

Expand Down
42 changes: 30 additions & 12 deletions components/renku_data_services/secrets/db.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
"""Database repo for secrets."""

from collections.abc import AsyncGenerator, Callable, Sequence
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta
from typing import cast

from sqlalchemy import delete, select
from sqlalchemy import Select, delete, or_, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from ulid import ULID
Expand All @@ -25,11 +25,23 @@ def __init__(
) -> None:
self.session_maker = session_maker

def _get_stmt(self, requested_by: APIUser) -> Select[tuple[SecretORM]]:
return (
select(SecretORM)
.where(SecretORM.user_id == requested_by.id)
.where(
or_(
SecretORM.expiration_timestamp.is_(None),
SecretORM.expiration_timestamp > datetime.now(UTC) + timedelta(seconds=120),
)
)
)

@only_authenticated
async def get_user_secrets(self, requested_by: APIUser, kind: SecretKind) -> list[Secret]:
"""Get all user's secrets from the database."""
async with self.session_maker() as session:
stmt = select(SecretORM).where(SecretORM.user_id == requested_by.id).where(SecretORM.kind == kind)
stmt = self._get_stmt(requested_by).where(SecretORM.kind == kind)
res = await session.execute(stmt)
orm = res.scalars().all()
return [o.dump() for o in orm]
Expand All @@ -38,7 +50,7 @@ async def get_user_secrets(self, requested_by: APIUser, kind: SecretKind) -> lis
async def get_secret_by_id(self, requested_by: APIUser, secret_id: ULID) -> Secret | None:
"""Get a specific user secret from the database."""
async with self.session_maker() as session:
stmt = select(SecretORM).where(SecretORM.user_id == requested_by.id).where(SecretORM.id == secret_id)
stmt = self._get_stmt(requested_by).where(SecretORM.id == secret_id)
res = await session.execute(stmt)
orm = res.scalar_one_or_none()
if orm is None:
Expand Down Expand Up @@ -66,6 +78,7 @@ async def insert_secret(self, requested_by: APIUser, secret: UnsavedSecret) -> S
encrypted_value=secret.encrypted_value,
encrypted_key=secret.encrypted_key,
kind=secret.kind,
expiration_timestamp=secret.expiration_timestamp,
)
session.add(orm)

Expand All @@ -83,29 +96,34 @@ async def insert_secret(self, requested_by: APIUser, secret: UnsavedSecret) -> S

@only_authenticated
async def update_secret(
self, requested_by: APIUser, secret_id: ULID, encrypted_value: bytes, encrypted_key: bytes
self,
requested_by: APIUser,
secret_id: ULID,
encrypted_value: bytes,
encrypted_key: bytes,
expiration_timestamp: datetime | None,
) -> Secret:
"""Update a secret."""

async with self.session_maker() as session, session.begin():
result = await session.execute(
select(SecretORM).where(SecretORM.id == secret_id).where(SecretORM.user_id == requested_by.id)
)
result = await session.execute(self._get_stmt(requested_by).where(SecretORM.id == secret_id))
secret = result.scalar_one_or_none()
if secret is None:
raise errors.MissingResourceError(message=f"The secret with id '{secret_id}' cannot be found")

secret.update(encrypted_value=encrypted_value, encrypted_key=encrypted_key)
secret.update(
encrypted_value=encrypted_value,
encrypted_key=encrypted_key,
expiration_timestamp=expiration_timestamp,
)
return secret.dump()

@only_authenticated
async def delete_secret(self, requested_by: APIUser, secret_id: ULID) -> None:
"""Delete a secret."""

async with self.session_maker() as session, session.begin():
result = await session.execute(
select(SecretORM).where(SecretORM.id == secret_id).where(SecretORM.user_id == requested_by.id)
)
result = await session.execute(self._get_stmt(requested_by).where(SecretORM.id == secret_id))
secret = result.scalar_one_or_none()
if secret is None:
return None
Expand Down
1 change: 1 addition & 0 deletions components/renku_data_services/secrets/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class UnsavedSecret(BaseModel):
encrypted_key: bytes = Field(repr=False)
modification_date: datetime = Field(default_factory=lambda: datetime.now(UTC).replace(microsecond=0), init=False)
kind: SecretKind
expiration_timestamp: datetime | None = Field(default=None)


class Secret(UnsavedSecret):
Expand Down
10 changes: 8 additions & 2 deletions components/renku_data_services/secrets/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class SecretORM(BaseORM):
encrypted_value: Mapped[bytes] = mapped_column(LargeBinary())
encrypted_key: Mapped[bytes] = mapped_column(LargeBinary())
kind: Mapped[models.SecretKind]
expiration_timestamp: Mapped[Optional[datetime]] = mapped_column(
"expiration_timestamp", DateTime(timezone=True), default=None, nullable=True
)
modification_date: Mapped[datetime] = mapped_column(
"modification_date", DateTime(timezone=True), default_factory=lambda: datetime.now(UTC).replace(microsecond=0)
)
Expand All @@ -51,6 +54,7 @@ def dump(self) -> models.Secret:
encrypted_value=self.encrypted_value,
encrypted_key=self.encrypted_key,
kind=self.kind,
expiration_timestamp=self.expiration_timestamp,
)
secret.modification_date = self.modification_date
return secret
Expand All @@ -62,12 +66,14 @@ def load(cls, secret: models.UnsavedSecret) -> "SecretORM":
name=secret.name,
encrypted_value=secret.encrypted_value,
encrypted_key=secret.encrypted_key,
modification_date=secret.modification_date,
kind=secret.kind,
expiration_timestamp=secret.expiration_timestamp,
modification_date=secret.modification_date,
)

def update(self, encrypted_value: bytes, encrypted_key: bytes) -> None:
def update(self, encrypted_value: bytes, encrypted_key: bytes, expiration_timestamp: datetime | None) -> None:
"""Update an existing secret."""
self.encrypted_value = encrypted_value
self.encrypted_key = encrypted_key
self.expiration_timestamp = expiration_timestamp
self.modification_date = datetime.now(UTC).replace(microsecond=0)
Loading
Loading