Skip to content

Commit

Permalink
Remove Directory validator from new api schema.
Browse files Browse the repository at this point in the history
Add optional 'must_be_dir` bool to check_path_resides_within_volume[_sync].
Add directory validation for `anonpath` to FTP update method.
  • Loading branch information
mgrimesix committed Dec 12, 2024
1 parent bca13ac commit 6b31795
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 18 deletions.
13 changes: 1 addition & 12 deletions src/middlewared/middlewared/api/base/types/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import os
from pydantic.functional_validators import AfterValidator
from typing_extensions import Annotated

__all__ = ["UnixPerm", "Directory"]
__all__ = ["UnixPerm"]


def validate_unix_perm(value: str) -> str:
Expand All @@ -18,13 +17,3 @@ def validate_unix_perm(value: str) -> str:


UnixPerm = Annotated[str, AfterValidator(validate_unix_perm)]


def validate_dir_path(value: str) -> str:
if value and not os.path.isdir(value):
raise ValueError('This path is not a directory.')

return value


Directory = Annotated[str, AfterValidator(validate_dir_path)]
3 changes: 1 addition & 2 deletions src/middlewared/middlewared/api/v25_04_0/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from middlewared.api.base import (
BaseModel,
Directory,
Excluded,
excluded_field,
ForUpdateMetaclass,
Expand All @@ -30,7 +29,7 @@ class FtpEntry(BaseModel):
timeout: Annotated[int, Field(ge=0, le=10000)]
timeout_notransfer: Annotated[int, Field(ge=0, le=10000)]
onlyanonymous: bool
anonpath: Directory | None
anonpath: str | None
onlylocal: bool
banner: str
filemask: UnixPerm
Expand Down
4 changes: 2 additions & 2 deletions src/middlewared/middlewared/async_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
from middlewared.validators import IpAddress, check_path_resides_within_volume_sync


async def check_path_resides_within_volume(verrors, middleware, schema_name, path):
async def check_path_resides_within_volume(verrors, middleware, schema_name, path, must_be_dir=False):
"""
async wrapper around synchronous general-purpose path validation function
"""
vol_names = [vol["vol_name"] for vol in await middleware.call("datastore.query", "storage.volume")]
return await middleware.run_in_thread(
check_path_resides_within_volume_sync,
verrors, schema_name, path, vol_names
verrors, schema_name, path, vol_names, must_be_dir
)


Expand Down
4 changes: 3 additions & 1 deletion src/middlewared/middlewared/plugins/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ async def do_update(self, data):
new["anonpath"] = None

if new["anonpath"] is not None:
await check_path_resides_within_volume(verrors, self.middleware, "ftp_update.anonpath", new["anonpath"])
await check_path_resides_within_volume(
verrors, self.middleware, "ftp_update.anonpath", new["anonpath"], must_be_dir=True
)

if new["tls"]:
if not new["ssltls_certificate"]:
Expand Down
7 changes: 6 additions & 1 deletion src/middlewared/middlewared/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ def __call__(self, value):
raise ValueError('Invalid URL: no netloc specified')


def check_path_resides_within_volume_sync(verrors, schema_name, path, vol_names):
def check_path_resides_within_volume_sync(verrors, schema_name, path, vol_names, must_be_dir=False):
"""
This provides basic validation of whether a given `path` is allowed to
be exposed to end-users.
Expand All @@ -405,6 +405,8 @@ def check_path_resides_within_volume_sync(verrors, schema_name, path, vol_names)
`vol_names` - list of expected pool names
`must_be_dir` - optional check for directory
It checks the following:
* path is within /mnt
* path is located within one of the specified `vol_names`
Expand All @@ -422,6 +424,9 @@ def check_path_resides_within_volume_sync(verrors, schema_name, path, vol_names)

rp = Path(os.path.realpath(path))

if must_be_dir and not rp.is_dir():
verrors.add(schema_name, "The path must be a directory")

vol_paths = [os.path.join("/mnt", vol_name) for vol_name in vol_names]
if not path.startswith("/mnt/") or not any(
os.path.commonpath([parent]) == os.path.commonpath([parent, rp]) for parent in vol_paths
Expand Down

0 comments on commit 6b31795

Please sign in to comment.