Skip to content

Commit

Permalink
Merge pull request #2 from CrowdStrike/rtr-filters
Browse files Browse the repository at this point in the history
RTR Filter and Code Coverage
  • Loading branch information
ChristopherHammond13 authored Aug 15, 2023
2 parents 7af48ee + 9699cc3 commit 954e427
Show file tree
Hide file tree
Showing 10 changed files with 313 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,7 @@ jobs:
cache: 'poetry'
- name: Install dependencies
run: poetry install
- name: Run pytest
run: poetry run pytest
- name: Run pytest via coverage
run: poetry run coverage run --source=caracara_filters -m pytest -s
- name: Get Coverage Report
run: poetry run coverage report
3 changes: 3 additions & 0 deletions caracara_filters/dialects/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@
'rebase_filters_on_default',
'DIALECTS',
'HOST_FILTERS',
'RTR_FILTERS',
]

from caracara_filters.dialects._base import default_filter
from caracara_filters.dialects._merge import rebase_filters_on_default
from caracara_filters.dialects._base import BASE_FILTERS
from caracara_filters.dialects.hosts import HOST_FILTERS
from caracara_filters.dialects.rtr import RTR_FILTERS

DIALECTS = {
"base": BASE_FILTERS,
"hosts": HOST_FILTERS,
"rtr": RTR_FILTERS,
}
66 changes: 66 additions & 0 deletions caracara_filters/dialects/rtr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Caracara Filters: RTR Dialect.
This module contains filters that are specific to the RTR API.
"""
from functools import partial
from typing import Any, Dict

from caracara_filters.dialects._base import default_filter
from caracara_filters.dialects._base import rebase_filters_on_default
from caracara_filters.validators import options_validator


RTR_COMMANDS = [
"cat",
"cd",
"clear",
"cp",
"csrutil",
"cswindiag",
"encrypt",
"env",
"eventlog",
"filehash",
"get",
"getsid",
"history",
"ifconfig",
"ipconfig",
"kill",
"ls",
"map",
"memdump",
"mkdir",
"mount",
"mv",
"netstat",
"ps",
"put",
"put-and-run",
"reg",
"restart",
"rm",
"run",
"runscript",
"shutdown",
"tar",
"umount",
"unmap",
"update",
"users",
"xmemdump",
"zip",
]

rtr_base_command_filter = {
"fql": "base_command",
"validator": partial(options_validator, RTR_COMMANDS, case_sensitive=False),
"help": "Filter RTR audit logs by base command.",
}

RTR_FILTERS: Dict[str, Dict[str, Any]] = {
"basecommand": rtr_base_command_filter,
"command": rtr_base_command_filter,
}

rebase_filters_on_default(default_filter, RTR_FILTERS)
9 changes: 5 additions & 4 deletions caracara_filters/fql.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,25 @@ def __init__(self, dialect: str = 'base'):
self.dialect: str = dialect
self.filters: Dict[str, FilterArgs] = {}

def add_filter(self, new_filter: FilterArgs):
def add_filter(self, new_filter: FilterArgs) -> str:
"""Add a new filter to the FQLGenerator object."""
filter_id = str(uuid4())
self.filters[filter_id] = new_filter
return filter_id

def remove_filter(self, filter_id: str):
"""Remove a filter from the current FQL Generator object by filter ID."""
if filter_id in self.filters:
del self.filters[filter_id]
else:
raise ValueError(f"The filter with ID {filter_id} does not exist in this object.")
raise KeyError(f"The filter with ID {filter_id} does not exist in this object.")

def create_new_filter(
self,
filter_name: str,
initial_value: Any,
initial_operator: Optional[str] = None,
) -> None:
) -> str:
"""Create a new FQL filter and store it, alongside its arguments, inside this object."""
# For compatability reasons, we must send all filter names to lower case.
filter_name = filter_name.lower()
Expand Down Expand Up @@ -135,7 +136,7 @@ def create_new_filter(
value=transformed_value,
operator=initial_operator
)
self.add_filter(filter_args)
return self.add_filter(filter_args)

def create_new_filter_from_kv_string(self, key_string: str, value) -> str:
"""
Expand Down
34 changes: 32 additions & 2 deletions caracara_filters/validators/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,49 @@
from typing import Any, List


def options_validator(options: List[Any], chosen_option: Any) -> bool:
def options_validator(options: List[Any], chosen_option: Any, case_sensitive: bool = True) -> bool:
"""Check if an option passed to the filter is within a pre-set list of options.
If a list of choices is passed in, we bail out if any of those items are not in the list.
If all items fail to result in a bail out, we return True.
Technically, we should probably test whether this item is multivariate here. However, we
perform that check in fql.py within the FQLGenerator function.
Scenarios to cover here:
- Case sensitive and a list of strings
- Case sensitive and a list of non-strings
- Case sensitive and a string
- Non-case sensitive and a list of strings
- Non-case sensitive and a list of non-strings
- Non-case sensitive and a string
"""
if isinstance(chosen_option, str):
return chosen_option in options if case_sensitive \
else chosen_option.lower() in options

if isinstance(chosen_option, list):
for chosen_option_item in chosen_option:
if chosen_option_item not in options:
if (
isinstance(chosen_option_item, str) and
case_sensitive and
chosen_option_item not in options
):
return False

if (
isinstance(chosen_option_item, str) and
not case_sensitive and
chosen_option_item.lower() not in options
):
return False

if (
not isinstance(chosen_option_item, str) and
chosen_option_item not in options
):
return False

return True

return chosen_option in options
74 changes: 73 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "caracara-filters"
version = "0.1.0"
version = "0.1.1"
description = "FQL generation engine for Caracara"
authors = ["Chris Hammond <[email protected]>"]
license = "MIT"
Expand Down Expand Up @@ -32,9 +32,10 @@ classifiers = [
python = "^3.7.2"

[tool.poetry.group.dev.dependencies]
coverage = "^7.0"
flake8 = "^5.0"
pydocstyle = "^6.3.0"
freezegun = "^1.2.2"
pydocstyle = "^6.3.0"
pylint = "^2.17.5"
pytest = "^7.4.0"

Expand Down
55 changes: 55 additions & 0 deletions tests/test_host_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,58 @@ def test_multi_kv_string():
fql_generator.create_new_filter_from_kv_string("LastSeen__GTE", "-29m")
fql = fql_generator.get_fql()
assert fql == "platform_name: ['Windows','Linux']+last_seen: >='2019-12-31T23:31:13Z'"


def test_non_multivariate_list_exception():
fql_generator = FQLGenerator(dialect='hosts')
with pytest.raises(TypeError):
fql_generator.create_new_filter("LastSeen", ['-30m', '-60h'])


def test_incorrect_list_type_exception():
fql_generator = FQLGenerator(dialect='hosts')
with pytest.raises(TypeError):
fql_generator.create_new_filter("hostname", [0, 'some host'])


def test_validation_failure():
fql_generator = FQLGenerator(dialect='hosts')
with pytest.raises(ValueError):
fql_generator.create_new_filter("LastSeen", '^123')


def test_incorrect_containment_option():
fql_generator = FQLGenerator(dialect='hosts')
with pytest.raises(ValueError):
fql_generator.create_new_filter("contained", "some containment option")


def test_incorrect_relative_timestamp_format():
fql_generator = FQLGenerator(dialect='hosts')
with pytest.raises(ValueError):
fql_generator.create_new_filter("firstseen", "-80x")


@freeze_time("2023-08-15 01:02:03")
def test_last_seen_day():
fql_generator = FQLGenerator(dialect='hosts')
fql_generator.create_new_filter("firstseen", "-2d")
fql = fql_generator.get_fql()
assert fql == "first_seen: >='2023-08-13T01:02:03Z'"


@freeze_time("2023-08-15 01:02:03")
def test_first_seen_add_time():
# This is a ridiculous scenario, but we support it anyway.
fql_generator = FQLGenerator(dialect='hosts')
fql_generator.create_new_filter("firstseen", "+2d")
fql = fql_generator.get_fql()
assert fql == "first_seen: >='2023-08-17T01:02:03Z'"


@freeze_time("2023-08-15 01:02:03")
def test_last_seen_relative_seconds():
fql_generator = FQLGenerator(dialect='hosts')
fql_generator.create_new_filter("lastseen", "-63s")
fql = fql_generator.get_fql()
assert fql == "last_seen: >='2023-08-15T01:01:00Z'"
49 changes: 49 additions & 0 deletions tests/test_meta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import pytest

from caracara_filters import FQLGenerator


def test_non_existent_dialect():
with pytest.raises(ValueError):
FQLGenerator(dialect='not a module')


def test_filter_delete_real_id():
fql_generator = FQLGenerator(dialect='base')
filter_id = fql_generator.create_new_filter("name", "testtest")
fql = fql_generator.get_fql()
assert fql == "name: 'testtest'"
assert filter_id is not None
fql_generator.remove_filter(filter_id)
assert fql_generator.filters == {}


def test_filter_delete_bad_id():
fql_generator = FQLGenerator()
with pytest.raises(KeyError):
fql_generator.remove_filter("non-existent-filter-id")


def test_bad_data_type():
fql_generator = FQLGenerator(dialect='base')
with pytest.raises(TypeError):
fql_generator.create_new_filter("name", 123)


def test_nullable_filter():
# TODO: write a test here once we have a nullable filter defined
# (and validation logic to handle this)
pass


def test_bool_filter():
# TODO: write a test once we have a boolean filter defined
pass


def test_str_dunder():
fql_generator = FQLGenerator(dialect='base')
fql_generator.create_new_filter("name", "testname")
fql = fql_generator.get_fql()
assert fql == str(fql_generator)
assert fql == "name: 'testname'"
Loading

0 comments on commit 954e427

Please sign in to comment.