Skip to content

Commit

Permalink
fix(iast): fix propagation error in modulo operator (#11573)
Browse files Browse the repository at this point in the history
## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met 
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
  • Loading branch information
avara1986 authored Nov 29, 2024
1 parent b6ff124 commit 501f5a0
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 36 deletions.
15 changes: 15 additions & 0 deletions benchmarks/bm/iast_fixtures/str_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,21 @@ def do_args_kwargs_4(format_string, *args_safe, **kwargs_safe) -> Text:
return format_string.format("1", "2", test_kwarg=3, *args_safe, **kwargs_safe)


def psycopg_queries_dump_bytes(args: tuple) -> bytes:
template = b'INSERT INTO "show_client" ("username") VALUES (%s) RETURNING "show_client"."id"'
return template % args


def psycopg_queries_dump_bytes_with_keys(args: dict) -> bytes:
template = b'INSERT INTO "show_client" ("username") VALUES (%(name)s) RETURNING "show_client"."id"'
return template % args


def psycopg_queries_dump_bytearray(args: tuple) -> bytes:
template = b'INSERT INTO "show_client" ("username") VALUES (%s) RETURNING "show_client"."id"'
return template % args


def do_format_key_error(param1: str) -> Text:
return "Test {param1}, {param2}".format(param1=param1) # noqa:F524

Expand Down
44 changes: 8 additions & 36 deletions ddtrace/appsec/_iast/_taint_tracking/Aspects/AspectModulo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,17 @@ do_modulo(PyObject* text, PyObject* insert_tuple_or_obj)

if (PyUnicode_Check(text)) {
result = PyUnicode_Format(text, insert_tuple);
} else if (PyBytes_Check(text)) {
if (PyObject* text_unicode = PyUnicode_FromEncodedObject(text, "utf-8", "strict"); text_unicode != nullptr) {
result = PyUnicode_Format(text_unicode, insert_tuple);
Py_DECREF(text_unicode);

if (result != nullptr) {
PyObject* encoded_result = PyUnicode_AsEncodedString(result, "utf-8", "strict");
Py_DECREF(result);
result = encoded_result;
}
}
} else if (PyByteArray_Check(text)) {
if (PyObject* text_bytes = PyBytes_FromStringAndSize(PyByteArray_AsString(text), PyByteArray_Size(text));
text_bytes != nullptr) {
PyObject* text_unicode = PyUnicode_FromEncodedObject(text_bytes, "utf-8", "strict");
Py_DECREF(text_bytes);
if (text_unicode != nullptr) {
result = PyUnicode_Format(text_unicode, insert_tuple);
Py_DECREF(text_unicode);

if (result != nullptr) {
PyObject* encoded_result = PyUnicode_AsEncodedString(result, "utf-8", "strict");
Py_DECREF(result);
result = encoded_result;
}
}
}

if (result != nullptr) {
PyObject* result_bytearray = PyByteArray_FromObject(result);
Py_DECREF(result);
result = result_bytearray;
}
} else if (PyBytes_Check(text) or PyByteArray_Check(text)) {
auto method_name = PyUnicode_FromString("__mod__");
result = PyObject_CallMethodObjArgs(text, method_name, insert_tuple, nullptr);
Py_DECREF(method_name);
} else {
Py_DECREF(insert_tuple);
return nullptr;
}

Py_DECREF(insert_tuple);
if (has_pyerr()) {
Py_XDECREF(result);
return nullptr;
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
Code Security: This fix resolves an issue where the modulo (%) operator would not be replaced correctly for bytes
and bytesarray if IAST is enabled.
103 changes: 103 additions & 0 deletions tests/appsec/iast/aspects/test_modulo_aspect_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
from typing import List # noqa:F401
from typing import Text # noqa:F401

from hypothesis import given
from hypothesis.strategies import text
import pytest

from ddtrace.appsec._iast._taint_tracking import OriginType
from ddtrace.appsec._iast._taint_tracking import as_formatted_evidence
from ddtrace.appsec._iast._taint_tracking import get_ranges
from ddtrace.appsec._iast._taint_tracking import get_tainted_ranges
from ddtrace.appsec._iast._taint_tracking import taint_pyobject
from tests.appsec.iast.aspects.aspect_utils import BaseReplacement
from tests.appsec.iast.aspects.conftest import _iast_patched_module

Expand Down Expand Up @@ -175,3 +180,101 @@ def test_modulo_when_parameter_value_already_present_in_template_then_range_is_c
expected_result="aaaaaaaaaaaa",
escaped_expected_result="aaaaaaa:+-<input1>a<input1>-+:aaaa",
)


@pytest.mark.parametrize("is_tainted", [True, False])
@given(text())
def test_psycopg_queries_dump_bytes(is_tainted, string_data):
string_data_to_bytes = string_data.encode("utf-8")
bytes_to_test_orig = b"'%s'" % (string_data.encode("utf-8"))
if is_tainted:
bytes_to_test = taint_pyobject(
pyobject=bytes_to_test_orig,
source_name="string_data_to_bytes",
source_value=bytes_to_test_orig,
source_origin=OriginType.PARAMETER,
)
else:
bytes_to_test = bytes_to_test_orig

result = mod.psycopg_queries_dump_bytes((bytes_to_test,))
assert (
result
== b'INSERT INTO "show_client" ("username") VALUES (\'%s\') RETURNING "show_client"."id"' % string_data_to_bytes
)

if is_tainted and string_data_to_bytes:
ranges = get_tainted_ranges(result)
assert len(ranges) == 1
assert ranges[0].start == 47
assert ranges[0].length == len(bytes_to_test_orig)
assert result[ranges[0].start : (ranges[0].start + ranges[0].length)] == bytes_to_test_orig

result = mod.psycopg_queries_dump_bytes_with_keys({b"name": bytes_to_test})
assert (
result
== b'INSERT INTO "show_client" ("username") VALUES (\'%s\') RETURNING "show_client"."id"' % string_data_to_bytes
)

with pytest.raises(TypeError):
mod.psycopg_queries_dump_bytes(
(
bytes_to_test,
bytes_to_test,
)
)

with pytest.raises(TypeError):
_ = b'INSERT INTO "show_client" ("username") VALUES (%s) RETURNING "show_client"."id"' % ((1,))

with pytest.raises(TypeError):
mod.psycopg_queries_dump_bytes((1,))

with pytest.raises(KeyError):
_ = b'INSERT INTO "show_client" ("username") VALUES (%(name)s) RETURNING "show_client"."id"' % {
"name": bytes_to_test
}

with pytest.raises(KeyError):
mod.psycopg_queries_dump_bytes_with_keys({"name": bytes_to_test})


@pytest.mark.parametrize("is_tainted", [True, False])
@given(text())
def test_psycopg_queries_dump_bytearray(is_tainted, string_data):
string_data_to_bytesarray = bytearray(string_data.encode("utf-8"))
bytesarray_to_test_orig = bytearray(b"'%s'" % (string_data.encode("utf-8")))
if is_tainted:
bytesarray_to_test = taint_pyobject(
pyobject=bytesarray_to_test_orig,
source_name="string_data_to_bytes",
source_value=bytesarray_to_test_orig,
source_origin=OriginType.PARAMETER,
)
else:
bytesarray_to_test = bytesarray_to_test_orig

result = mod.psycopg_queries_dump_bytearray((bytesarray_to_test,))
assert (
result
== b'INSERT INTO "show_client" ("username") VALUES (\'%s\') RETURNING "show_client"."id"'
% string_data_to_bytesarray
)

if is_tainted and string_data_to_bytesarray:
ranges = get_tainted_ranges(result)
assert len(ranges) == 1
assert ranges[0].start == 47
assert ranges[0].length == len(bytesarray_to_test_orig)
assert result[ranges[0].start : (ranges[0].start + ranges[0].length)] == bytesarray_to_test_orig

with pytest.raises(TypeError):
mod.psycopg_queries_dump_bytearray(
(
bytesarray_to_test,
bytesarray_to_test,
)
)

with pytest.raises(TypeError):
mod.psycopg_queries_dump_bytearray((1,))

0 comments on commit 501f5a0

Please sign in to comment.