diff --git a/benchmarks/bm/iast_fixtures/str_methods.py b/benchmarks/bm/iast_fixtures/str_methods.py index f1960f0ae77..4a7c63f6fda 100644 --- a/benchmarks/bm/iast_fixtures/str_methods.py +++ b/benchmarks/bm/iast_fixtures/str_methods.py @@ -875,6 +875,21 @@ def do_args_kwargs_4(format_string, *args_safe, **kwargs_safe) -> Text: return format_string.format("1", "2", test_kwarg=3, *args_safe, **kwargs_safe) +def psycopg_queries_dump_bytes(args: tuple) -> bytes: + template = b'INSERT INTO "show_client" ("username") VALUES (%s) RETURNING "show_client"."id"' + return template % args + + +def psycopg_queries_dump_bytes_with_keys(args: dict) -> bytes: + template = b'INSERT INTO "show_client" ("username") VALUES (%(name)s) RETURNING "show_client"."id"' + return template % args + + +def psycopg_queries_dump_bytearray(args: tuple) -> bytes: + template = b'INSERT INTO "show_client" ("username") VALUES (%s) RETURNING "show_client"."id"' + return template % args + + def do_format_key_error(param1: str) -> Text: return "Test {param1}, {param2}".format(param1=param1) # noqa:F524 diff --git a/ddtrace/appsec/_iast/_taint_tracking/Aspects/AspectModulo.cpp b/ddtrace/appsec/_iast/_taint_tracking/Aspects/AspectModulo.cpp index a8efdd1b0fb..a08f76d9f3d 100644 --- a/ddtrace/appsec/_iast/_taint_tracking/Aspects/AspectModulo.cpp +++ b/ddtrace/appsec/_iast/_taint_tracking/Aspects/AspectModulo.cpp @@ -20,45 +20,17 @@ do_modulo(PyObject* text, PyObject* insert_tuple_or_obj) if (PyUnicode_Check(text)) { result = PyUnicode_Format(text, insert_tuple); - } else if (PyBytes_Check(text)) { - if (PyObject* text_unicode = PyUnicode_FromEncodedObject(text, "utf-8", "strict"); text_unicode != nullptr) { - result = PyUnicode_Format(text_unicode, insert_tuple); - Py_DECREF(text_unicode); - - if (result != nullptr) { - PyObject* encoded_result = PyUnicode_AsEncodedString(result, "utf-8", "strict"); - Py_DECREF(result); - result = encoded_result; - } - } - } else if (PyByteArray_Check(text)) { - if (PyObject* text_bytes = PyBytes_FromStringAndSize(PyByteArray_AsString(text), PyByteArray_Size(text)); - text_bytes != nullptr) { - PyObject* text_unicode = PyUnicode_FromEncodedObject(text_bytes, "utf-8", "strict"); - Py_DECREF(text_bytes); - if (text_unicode != nullptr) { - result = PyUnicode_Format(text_unicode, insert_tuple); - Py_DECREF(text_unicode); - - if (result != nullptr) { - PyObject* encoded_result = PyUnicode_AsEncodedString(result, "utf-8", "strict"); - Py_DECREF(result); - result = encoded_result; - } - } - } - - if (result != nullptr) { - PyObject* result_bytearray = PyByteArray_FromObject(result); - Py_DECREF(result); - result = result_bytearray; - } + } else if (PyBytes_Check(text) or PyByteArray_Check(text)) { + auto method_name = PyUnicode_FromString("__mod__"); + result = PyObject_CallMethodObjArgs(text, method_name, insert_tuple, nullptr); + Py_DECREF(method_name); } else { - Py_DECREF(insert_tuple); - return nullptr; } - Py_DECREF(insert_tuple); + if (has_pyerr()) { + Py_XDECREF(result); + return nullptr; + } return result; } diff --git a/releasenotes/notes/iast-fix-iast-propagation-error-6a36612724b8c84c.yaml b/releasenotes/notes/iast-fix-iast-propagation-error-6a36612724b8c84c.yaml new file mode 100644 index 00000000000..9f650cc6914 --- /dev/null +++ b/releasenotes/notes/iast-fix-iast-propagation-error-6a36612724b8c84c.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + Code Security: This fix resolves an issue where the modulo (%) operator would not be replaced correctly for bytes + and bytesarray if IAST is enabled. diff --git a/tests/appsec/iast/aspects/test_modulo_aspect_fixtures.py b/tests/appsec/iast/aspects/test_modulo_aspect_fixtures.py index 64eb7b6c1b6..80ca12a2db8 100644 --- a/tests/appsec/iast/aspects/test_modulo_aspect_fixtures.py +++ b/tests/appsec/iast/aspects/test_modulo_aspect_fixtures.py @@ -4,10 +4,15 @@ from typing import List # noqa:F401 from typing import Text # noqa:F401 +from hypothesis import given +from hypothesis.strategies import text import pytest +from ddtrace.appsec._iast._taint_tracking import OriginType from ddtrace.appsec._iast._taint_tracking import as_formatted_evidence from ddtrace.appsec._iast._taint_tracking import get_ranges +from ddtrace.appsec._iast._taint_tracking import get_tainted_ranges +from ddtrace.appsec._iast._taint_tracking import taint_pyobject from tests.appsec.iast.aspects.aspect_utils import BaseReplacement from tests.appsec.iast.aspects.conftest import _iast_patched_module @@ -175,3 +180,101 @@ def test_modulo_when_parameter_value_already_present_in_template_then_range_is_c expected_result="aaaaaaaaaaaa", escaped_expected_result="aaaaaaa:+-a-+:aaaa", ) + + +@pytest.mark.parametrize("is_tainted", [True, False]) +@given(text()) +def test_psycopg_queries_dump_bytes(is_tainted, string_data): + string_data_to_bytes = string_data.encode("utf-8") + bytes_to_test_orig = b"'%s'" % (string_data.encode("utf-8")) + if is_tainted: + bytes_to_test = taint_pyobject( + pyobject=bytes_to_test_orig, + source_name="string_data_to_bytes", + source_value=bytes_to_test_orig, + source_origin=OriginType.PARAMETER, + ) + else: + bytes_to_test = bytes_to_test_orig + + result = mod.psycopg_queries_dump_bytes((bytes_to_test,)) + assert ( + result + == b'INSERT INTO "show_client" ("username") VALUES (\'%s\') RETURNING "show_client"."id"' % string_data_to_bytes + ) + + if is_tainted and string_data_to_bytes: + ranges = get_tainted_ranges(result) + assert len(ranges) == 1 + assert ranges[0].start == 47 + assert ranges[0].length == len(bytes_to_test_orig) + assert result[ranges[0].start : (ranges[0].start + ranges[0].length)] == bytes_to_test_orig + + result = mod.psycopg_queries_dump_bytes_with_keys({b"name": bytes_to_test}) + assert ( + result + == b'INSERT INTO "show_client" ("username") VALUES (\'%s\') RETURNING "show_client"."id"' % string_data_to_bytes + ) + + with pytest.raises(TypeError): + mod.psycopg_queries_dump_bytes( + ( + bytes_to_test, + bytes_to_test, + ) + ) + + with pytest.raises(TypeError): + _ = b'INSERT INTO "show_client" ("username") VALUES (%s) RETURNING "show_client"."id"' % ((1,)) + + with pytest.raises(TypeError): + mod.psycopg_queries_dump_bytes((1,)) + + with pytest.raises(KeyError): + _ = b'INSERT INTO "show_client" ("username") VALUES (%(name)s) RETURNING "show_client"."id"' % { + "name": bytes_to_test + } + + with pytest.raises(KeyError): + mod.psycopg_queries_dump_bytes_with_keys({"name": bytes_to_test}) + + +@pytest.mark.parametrize("is_tainted", [True, False]) +@given(text()) +def test_psycopg_queries_dump_bytearray(is_tainted, string_data): + string_data_to_bytesarray = bytearray(string_data.encode("utf-8")) + bytesarray_to_test_orig = bytearray(b"'%s'" % (string_data.encode("utf-8"))) + if is_tainted: + bytesarray_to_test = taint_pyobject( + pyobject=bytesarray_to_test_orig, + source_name="string_data_to_bytes", + source_value=bytesarray_to_test_orig, + source_origin=OriginType.PARAMETER, + ) + else: + bytesarray_to_test = bytesarray_to_test_orig + + result = mod.psycopg_queries_dump_bytearray((bytesarray_to_test,)) + assert ( + result + == b'INSERT INTO "show_client" ("username") VALUES (\'%s\') RETURNING "show_client"."id"' + % string_data_to_bytesarray + ) + + if is_tainted and string_data_to_bytesarray: + ranges = get_tainted_ranges(result) + assert len(ranges) == 1 + assert ranges[0].start == 47 + assert ranges[0].length == len(bytesarray_to_test_orig) + assert result[ranges[0].start : (ranges[0].start + ranges[0].length)] == bytesarray_to_test_orig + + with pytest.raises(TypeError): + mod.psycopg_queries_dump_bytearray( + ( + bytesarray_to_test, + bytesarray_to_test, + ) + ) + + with pytest.raises(TypeError): + mod.psycopg_queries_dump_bytearray((1,))