Skip to content

Commit

Permalink
Fix pattern from env (#46)
Browse files Browse the repository at this point in the history
* Fix pattern from env

* Compatible with python 3.8
  • Loading branch information
Wh1isper authored Sep 9, 2023
1 parent f4da4fd commit 5dcfea0
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 18 deletions.
4 changes: 2 additions & 2 deletions duetector/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ def __init__(self, config: Optional[Union[Config, Dict[str, Any]]] = None, *args
config = config._config_dict

if self.config_scope:
for score in self.config_scope.split("."):
config = config.get(score.lower(), {})
for scope in self.config_scope.split("."):
config = config.get(scope.lower(), {})
c = copy.deepcopy(self.default_config)

def _recursive_update(c, config):
Expand Down
70 changes: 54 additions & 16 deletions duetector/filters/pattern.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import re
from typing import List, NamedTuple, Optional, Union
from ast import literal_eval
from typing import List, NamedTuple, Optional, Set, Union

from duetector.extension.filter import hookimpl
from duetector.filters import Filter
Expand All @@ -27,6 +28,17 @@ class PatternFilter(Filter):
Use ``(?!…)`` for include pattern:
- ``re_exclude_custom``: ``["(?!/proc/)"]`` will include ``/proc`` but exclude others.
Note:
- We using python literal to parse config, so you can use environment variable to pass list:
- Recommended: ``{PREFIX...}RE_EXCLUDE_FNAME="['/proc*', '/sys*']"``.
- Remember to quote the value, otherwise it will be parsed as a expression, e.g. ``{PREFIX...}RE_EXCLUDE_FNAME=[/proc*]`` will cause SyntaxError or ValueError.
and will fallback to split by comma.
So either use python literal or string split by comma:
- Recommended: ``{PREFIX...}RE_EXCLUDE_FNAME="['/proc*', '/sys*']"``
- It's OK: ``{PREFIX...}RE_EXCLUDE_FNAME="/proc*, /sys*"``
- Wrong: ``{PREFIX...}RE_EXCLUDE_FNAME=[/proc*, /sys*]``, this will be converted to a list of ``"[/proc*"`` and ``"/sys*]"``.
"""

default_config = {
Expand Down Expand Up @@ -65,18 +77,55 @@ def enable_customize_exclude(self) -> bool:
"""
return bool(self.config.enable_customize_exclude)

def customize_exclude(self, data: NamedTuple) -> bool:
@staticmethod
def _wrap_exclude_list(value: Union[str, List[str]]) -> Set[str]:
"""
Wrap exclude list to list if it's not a list
"""
if isinstance(value, list):
return set(str(v).strip() for v in value)
if not isinstance(value, str):
raise TypeError(f"Type of {value} should be str or list, got {type(value)}")

try:
# Use ast.literal_eval to parse python literal
value = literal_eval(value)
except (SyntaxError, ValueError):
# If value is not a valid python literal, fallback to split by comma
# e.g. "/proc/a*"
value = value.split(",")

try:
return set(str(v).strip() for v in value)
except TypeError:
return set(str(value).strip())

def is_exclude(self, data: NamedTuple, enable_customize_exclude=False) -> bool:
"""
Customize exclude function, return ``True`` to drop data, return ``False`` to keep data.
"""
for k in self.config._config_dict:
if not enable_customize_exclude and k not in self.default_config:
# If not enable_customize_exclude, only use default config
continue

if k.startswith("exclude_"):
field = k.replace("exclude_", "")
if getattr(data, field, None) in self.config._config_dict[k]:
value = getattr(data, field, None)
if value is None:
continue

if str(value).strip() in self._wrap_exclude_list(self.config._config_dict[k]):
return True
if k.startswith("re_exclude_"):
field = k.replace("re_exclude_", "")
if self.re_exclude(getattr(data, field, None), self.config._config_dict[k]):
value = getattr(data, field, None)
if value is None:
continue
if self.re_exclude(
str(value).strip(),
self._wrap_exclude_list(self.config._config_dict[k]),
):
return True
return False

Expand All @@ -103,19 +152,8 @@ def filter(self, data: NamedTuple) -> Optional[NamedTuple]:

if getattr(data, "pid", None) == os.getpid():
return
if self.re_exclude(getattr(data, "fname", None), self.config.re_exclude_fname):
return
if self.re_exclude(getattr(data, "comm", None), self.config.re_exclude_comm):
return

if getattr(data, "pid", None) in self.config.exclude_pid:
return
if getattr(data, "uid", None) in self.config.exclude_uid:
return
if getattr(data, "gid", None) in self.config.exclude_gid:
return

if self.enable_customize_exclude and self.customize_exclude(data):
if self.is_exclude(data, enable_customize_exclude=self.enable_customize_exclude):
return

return data
Expand Down
5 changes: 5 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import os

os.environ["DUETECTOR_LOG_LEVEL"] = "DEBUG"


from pathlib import Path

import pytest
Expand Down
75 changes: 75 additions & 0 deletions tests/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pytest

from duetector.config import ConfigLoader
from duetector.filters.pattern import PatternFilter
from duetector.managers import FilterManager

Expand Down Expand Up @@ -81,5 +82,79 @@ def test_filter(pattern_filter, data_args, passed):
assert pattern_filter(data) == None


@pytest.fixture
def config_loader(full_config_file):
yield ConfigLoader(full_config_file, load_env=True)


@pytest.fixture
def env_config(config_loader: ConfigLoader, monkeypatch):
prefix = config_loader.ENV_PREFIX
sep = config_loader.ENV_SEP
monkeypatch.setenv(f"{prefix}filter{sep}patternfilter{sep}exclude_ecustom", "ignore_ecustom")
monkeypatch.setenv(
f"{prefix}filter{sep}patternfilter{sep}re_exclude_egcustom",
'["ignore_ecustom*"]',
)
yield FilterManager(config_loader.load_config()).config._config_dict


@pytest.fixture
def env_pattern_filter(env_config):
yield PatternFilter(env_config)


env_passed = {
**passed,
"ecustom": "passed",
"egcustom": "passed",
}
env_params = [
(
env_passed,
True,
),
(
{
**env_passed,
"ecustom": "ignore_ecustom", # Filtered
},
False,
),
(
{
**env_passed,
"egcustom": "ignore_ecustom123", # Filtered
},
False,
),
]

e_data_t = namedtuple(
"Tracking",
[
"pid",
"uid",
"gid",
"comm",
"fname",
"timestamp",
"custom",
"gcustom",
"ecustom",
"egcustom",
],
)


@pytest.mark.parametrize("data_args, passed", env_params)
def test_filter_envs(env_pattern_filter, data_args, passed):
data = e_data_t(**data_args)
if passed:
assert env_pattern_filter(data) == data
else:
assert env_pattern_filter(data) == None


if __name__ == "__main__":
pytest.main(["-vv", "-s", __file__])

0 comments on commit 5dcfea0

Please sign in to comment.