Skip to content

Commit

Permalink
Merge pull request #110 from OWASP/dev
Browse files Browse the repository at this point in the history
Dev RELEASE: v0.18.0
  • Loading branch information
dmdhrumilmistry authored May 15, 2024
2 parents ead2035 + 335e327 commit 131cc5f
Show file tree
Hide file tree
Showing 14 changed files with 635 additions and 568 deletions.
2 changes: 1 addition & 1 deletion src/offat/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from .parsers.openapi import OpenAPIv3Parser
from .parsers.swagger import SwaggerParser
from .config_data_handler import validate_config_file_data
from .tester.tester_utils import generate_and_run_tests
from .tester.handler import generate_and_run_tests
from .parsers import create_parser
from .utils import get_package_version, headers_list_to_dict, read_yaml

Expand Down
8 changes: 4 additions & 4 deletions src/offat/api/jobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from sys import exc_info
from offat.api.models import CreateScanModel
from offat.tester.tester_utils import generate_and_run_tests
from offat.tester.handler import generate_and_run_tests
from offat.parsers import create_parser
from offat.logger import logger

Expand All @@ -18,6 +18,6 @@ def scan_api(body_data: CreateScanModel):
)
return results
except Exception as e:
logger.error("Error occurred while creating a job: %s", repr(e))
logger.debug("Debug Data:", exc_info=exc_info())
return [{"error": str(e)}]
logger.error('Error occurred while creating a job: %s', repr(e))
logger.debug('Debug Data:', exc_info=exc_info())
return [{'error': str(e)}]
34 changes: 17 additions & 17 deletions src/offat/report/summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,26 @@ def get_counts(results: list[dict], filter_errors: bool = False) -> dict[str, in
dict: name (str) as key and its associated count (int)
"""
if filter_errors:
results = list(filter(lambda result: result.get("error", False), results))
results = list(filter(lambda result: result.get('error', False), results))

error_count = 0
data_leak_count = 0
failed_count = 0
success_count = 0
immune_count = 0
vulnerable_count = 0
for result in results:
error_count += 1 if result.get("error", False) else 0
data_leak_count += 1 if result.get("data_leak", False) else 0
error_count += 1 if result.get('error', False) else 0
data_leak_count += 1 if result.get('data_leak', False) else 0

if result.get("result"):
success_count += 1
if result.get('vulnerable'):
vulnerable_count += 1
else:
failed_count += 1
immune_count += 1

count_dict = {
"errors": error_count,
"data_leaks": data_leak_count,
"failed": failed_count,
"success": success_count,
'errors': error_count,
'data_leaks': data_leak_count,
'immune': immune_count,
'vulnerable': vulnerable_count,
}

return count_dict
Expand All @@ -50,7 +50,7 @@ def get_counts(results: list[dict], filter_errors: bool = False) -> dict[str, in
def generate_count_summary(
results: list[dict],
filter_errors: bool = False,
output_format: str = "table",
output_format: str = 'table',
table_title: str | None = None,
) -> Table | str:
"""
Expand All @@ -70,8 +70,8 @@ def generate_count_summary(
results=results, filter_errors=filter_errors
)
match output_format:
case "markdown":
output = ""
case 'markdown':
output = ''
if table_title:
output += f"**{table_title}**\n"

Expand All @@ -80,8 +80,8 @@ def generate_count_summary(

case _: # table format
output = Table(
Column(header="⚔️", overflow="fold", justify="center"),
Column(header="Endpoints Count", overflow="fold"),
Column(header='⚔️', overflow='fold', justify='center'),
Column(header='Endpoints Count', overflow='fold'),
title=table_title,
)

Expand Down
46 changes: 25 additions & 21 deletions src/offat/report/templates/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,33 @@ def _sanitize_results(
):
if filter_passed_results:
results = list(
filter(lambda x: not x.get('result') or x.get('data_leak'), results)
filter(
lambda result: result.get('vulnerable') or result.get('data_leak'),
results,
)
)

keys_to_remove = [
'url',
'test_name',
'response_filter',
'body_params',
'request_headers',
'redirection',
'query_params',
'path_params',
'curl_command',
'response_match_regex',
'regex_match_result',
'success_codes',
]

# remove keys based on conditions or update their values
for result in results:
if result['result']:
result['result'] = '[bold green]Passed \u2713[/bold green]'
if result['vulnerable']:
result['vulnerable'] = '[bold red]True \u00d7[/bold red]'
else:
result['result'] = '[bold red]Failed \u00d7[/bold red]'
result['vulnerable'] = '[bold green]False \u2713[/bold green]'

if not is_leaking_data:
del result['response_headers']
Expand All @@ -66,15 +84,6 @@ def _sanitize_results(
result['status_code'] = result.get('response_status_code')
del result['response_status_code']

if result.get('success_codes'):
del result['success_codes']

if result.get('regex_match_result'):
del result['regex_match_result']

if result.get('response_match_regex'):
del result['response_match_regex']

if result.get('security') or result.get('security') == []:
del result['security']

Expand All @@ -86,13 +95,8 @@ def _sanitize_results(
if not isinstance(result.get('malicious_payload'), str):
del result['malicious_payload']

del result['url']
del result['test_name']
del result['response_filter']
del result['body_params']
del result['request_headers']
del result['redirection']
del result['query_params']
del result['path_params']
for key in keys_to_remove:
if key in result:
del result[key]

return results
3 changes: 2 additions & 1 deletion src/offat/tester/fuzzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def fuzz_type_value(param_type: str, param_name: str):
def fill_params(params: list[dict], is_v3: bool) -> list[dict]:
"""fills params for OAS/swagger specs"""
schema_params = []
for index in range(len(params)):
for index, _ in enumerate(params):
param_type = (
params[index].get("schema", {}).get("type")
if is_v3
Expand All @@ -125,6 +125,7 @@ def fill_params(params: list[dict], is_v3: bool) -> list[dict]:
"name": param_name,
"required": param_is_required,
"value": param_value,
"type": param_type
}
]

Expand Down
87 changes: 45 additions & 42 deletions src/offat/tester/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ def check_unsupported_http_methods(
'malicious_payload': [],
'args': args,
'kwargs': kwargs,
'result_details': {
True: "Endpoint doesn't perform any HTTP verb which is not documented",
False: 'Endpoint performs HTTP verb which is not documented',
'vuln_details': {
True: 'Endpoint performs HTTP verb which is not documented',
False: "Endpoint doesn't perform any HTTP verb which is not documented",
},
'body_params': body_params,
'query_params': query_params,
Expand Down Expand Up @@ -297,9 +297,9 @@ def sqli_fuzz_params_test(

request_obj['malicious_payload'] = sqli_payload

request_obj['result_details'] = {
True: 'Parameters are not vulnerable to SQLi Payload', # passed
False: 'One or more parameter is vulnerable to SQL Injection Attack', # failed
request_obj['vuln_details'] = {
True: 'One or more parameter is vulnerable to SQL Injection Attack',
False: 'Parameters are not vulnerable to SQLi Payload',
}
request_obj['success_codes'] = success_codes
request_obj[
Expand Down Expand Up @@ -404,9 +404,9 @@ def sqli_in_uri_path_fuzz_test(
'malicious_payload': sqli_payload,
'args': args,
'kwargs': kwargs,
'result_details': {
True: 'Endpoint is not vulnerable to SQLi', # passed
False: 'Endpoint might be vulnerable to SQli', # failed
'vuln_details': {
True: 'Endpoint might be vulnerable to SQli',
False: 'Endpoint is not vulnerable to SQLi',
},
'success_codes': success_codes,
'response_filter': PostTestFiltersEnum.STATUS_CODE_FILTER.name,
Expand Down Expand Up @@ -498,9 +498,9 @@ def bola_fuzz_path_test(
'malicious_payload': path_params,
'args': args,
'kwargs': kwargs,
'result_details': {
True: 'Endpoint is not vulnerable to BOLA', # passed
False: 'Endpoint might be vulnerable to BOLA', # failed
'vuln_details': {
True: 'Endpoint might be vulnerable to BOLA',
False: 'Endpoint is not vulnerable to BOLA',
},
'success_codes': success_codes,
'response_filter': PostTestFiltersEnum.STATUS_CODE_FILTER.name,
Expand Down Expand Up @@ -594,9 +594,9 @@ def bola_fuzz_trailing_slash_path_test(
'malicious_payload': malicious_payload,
'args': args,
'kwargs': kwargs,
'result_details': {
True: 'Endpoint might not vulnerable to BOLA', # passed
False: 'Endpoint might be vulnerable to BOLA', # failed
'vuln_details': {
True: 'Endpoint might be vulnerable to BOLA',
False: 'Endpoint might not vulnerable to BOLA',
},
'success_codes': success_codes,
'response_filter': PostTestFiltersEnum.STATUS_CODE_FILTER.name,
Expand Down Expand Up @@ -680,6 +680,9 @@ def bopla_fuzz_test(
filter(lambda x: x.get('in') == 'path', request_params)
)

if len(request_body_params) == 0 and len(request_query_params) == 0:
continue

# handle path params from path_params
# and replace path params by value in
# endpoint path
Expand Down Expand Up @@ -718,9 +721,9 @@ def bopla_fuzz_test(
'malicious_payload': response_body_params,
'args': args,
'kwargs': kwargs,
'result_details': {
True: 'Endpoint might not vulnerable to BOPLA', # passed
False: 'Endpoint might be vulnerable to BOPLA', # failed
'vuln_details': {
True: 'Endpoint might be vulnerable to BOPLA',
False: 'Endpoint might not vulnerable to BOPLA',
},
'success_codes': success_codes,
'response_filter': PostTestFiltersEnum.STATUS_CODE_FILTER.name,
Expand Down Expand Up @@ -774,7 +777,7 @@ def __generate_injection_fuzz_params_test(
self,
openapi_parser: SwaggerParser | OpenAPIv3Parser,
test_name: str,
result_details: dict,
vuln_details: dict,
payloads_data: list[dict],
*args,
**kwargs,
Expand Down Expand Up @@ -804,19 +807,19 @@ def __generate_injection_fuzz_params_test(
for payload_dict in payloads_data:
for request_obj in fuzzed_request_list:
payload = payload_dict['request_payload']

# handle body request params
body_request_params = request_obj.get('body_params', [])
query_request_params = request_obj.get('query_params', [])
# endpoint can be fuzzed if it has query/body params
if len(body_request_params) == 0 and len(query_request_params) == 0:
continue

# handle body and query request params
malicious_body_request_params = self.__inject_payload_in_params(
body_request_params, payload
)

# handle query request params
query_request_params = request_obj.get('query_params', [])
malicious_query_request_params = self.__inject_payload_in_params(
query_request_params, payload
)

request_obj['test_name'] = test_name

request_obj['body_params'] = malicious_body_request_params
Expand All @@ -826,7 +829,7 @@ def __generate_injection_fuzz_params_test(

request_obj['malicious_payload'] = payload

request_obj['result_details'] = result_details
request_obj['vuln_details'] = vuln_details
request_obj[
'response_filter'
] = PostTestFiltersEnum.BODY_REGEX_FILTER.name
Expand Down Expand Up @@ -865,15 +868,15 @@ def os_command_injection_fuzz_params_test(
{'request_payload': 'ls -la', 'response_match_regex': r'total\s\d+'},
]

result_details = {
True: 'Parameters are not vulnerable to OS Command Injection', # passed
False: 'One or more parameter is vulnerable to OS Command Injection Attack', # failed
vuln_details = {
True: 'One or more parameter is vulnerable to OS Command Injection Attack',
False: 'Parameters are not vulnerable to OS Command Injection',
}

return self.__generate_injection_fuzz_params_test(
openapi_parser=openapi_parser,
test_name=test_name,
result_details=result_details,
vuln_details=vuln_details,
payloads_data=payloads_data,
)

Expand Down Expand Up @@ -912,15 +915,15 @@ def xss_html_injection_fuzz_params_test(
},
]

result_details = {
True: 'Parameters are not vulnerable to XSS/HTML Injection Attack', # passed
False: 'One or more parameter is vulnerable to XSS/HTML Injection Attack', # failed
vuln_details = {
False: 'Parameters are not vulnerable to XSS/HTML Injection Attack',
True: 'One or more parameter is vulnerable to XSS/HTML Injection Attack',
}

return self.__generate_injection_fuzz_params_test(
openapi_parser=openapi_parser,
test_name=test_name,
result_details=result_details,
vuln_details=vuln_details,
payloads_data=payloads_data,
)

Expand Down Expand Up @@ -968,15 +971,15 @@ def ssti_fuzz_params_test(self, openapi_parser: SwaggerParser | OpenAPIv3Parser)
{'request_payload': r'*{7*7}', 'response_match_regex': r'49'},
]

result_details = {
True: 'Parameters are not vulnerable to SSTI Attack', # passed
False: 'One or more parameter is vulnerable to SSTI Attack', # failed
vuln_details = {
True: 'One or more parameter is vulnerable to SSTI Attack',
False: 'Parameters are not vulnerable to SSTI Attack',
}

return self.__generate_injection_fuzz_params_test(
openapi_parser=openapi_parser,
test_name=test_name,
result_details=result_details,
vuln_details=vuln_details,
payloads_data=payloads_data,
)

Expand All @@ -993,7 +996,7 @@ def missing_auth_fuzz_test(
openapi_parser (OpenAPIParser): An instance of the OpenAPIParser class
containing the parsed OpenAPI specification.
success_codes (list[int], optional): A list of HTTP success codes to consider
as successful BOLA responses. Defaults to [200, 201, 301].
as test failed responses. Defaults to [200, 201, 301].
*args: Variable-length positional arguments.
**kwargs: Arbitrary keyword arguments.
Expand Down Expand Up @@ -1069,9 +1072,9 @@ def missing_auth_fuzz_test(
'malicious_payload': 'Security Payload Missing',
'args': args,
'kwargs': kwargs,
'result_details': {
True: 'Endpoint implements security authentication as defined', # passed
False: 'Endpoint fails to implement security authentication as defined', # failed
'vuln_details': {
True: 'Endpoint fails to implement security authentication as defined',
False: 'Endpoint implements security authentication as defined',
},
'success_codes': success_codes,
'response_filter': PostTestFiltersEnum.STATUS_CODE_FILTER.name,
Expand Down
Loading

0 comments on commit 131cc5f

Please sign in to comment.