Skip to content

Commit

Permalink
Added tests and other cleanups resulting from merging PR #252.
Browse files Browse the repository at this point in the history
  • Loading branch information
jtesta committed Mar 18, 2024
1 parent 5bd925f commit 3c31934
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 46 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ For convenience, a web front-end on top of the command-line tool is available at
- The built-in man page (`-m`, `--manual`) is now available on Docker, PyPI, and Snap builds, in addition to the Windows build.
- Snap builds are now architecture-independent.
- Changed Docker base image from `python:3-slim` to `python:3-alpine`, resulting in a 59% reduction in image size; credit [Daniel Thamdrup](https://github.com/dallemon).
- Custom policies now support the `allow_algorithm_subset_and_reordering` directive to allow targets to pass with a subset and/or re-ordered list of host keys, kex, ciphers, and MACs. This allows for the creation of a baseline policy where targets can optionally implement stricter controls; partial credit [yannik1015](https://github.com/yannik1015).
- Added 1 new key exchange algorithm: `gss-nistp384-sha384-*`.

### v3.1.0 (2023-12-20)
Expand Down
6 changes: 6 additions & 0 deletions docker_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,12 @@ run_custom_policy_test "config2" "test13" "${PROGRAM_RETVAL_GOOD}"
# Failing test with DH modulus test.
run_custom_policy_test "config2" "test14" "${PROGRAM_RETVAL_FAILURE}"

# Passing test with algorithm subset matching.
run_custom_policy_test "config2" "test15" "${PROGRAM_RETVAL_GOOD}"

# Failing test with algorithm subset matching.
run_custom_policy_test "config2" "test16" "${PROGRAM_RETVAL_FAILURE}"

# Failing test for built-in OpenSSH 8.0p1 server policy (RSA host key size is 3072 instead of 4096).
run_builtin_policy_test "Hardened OpenSSH Server v8.0 (version 4)" "8.0p1" "test1" "-o HostKeyAlgorithms=rsa-sha2-512,rsa-sha2-256,ssh-ed25519 -o KexAlgorithms=curve25519-sha256,[email protected],diffie-hellman-group16-sha512,diffie-hellman-group18-sha512,diffie-hellman-group-exchange-sha256 -o [email protected],[email protected],[email protected],aes256-ctr,aes192-ctr,aes128-ctr -o [email protected],[email protected],[email protected]" "${PROGRAM_RETVAL_FAILURE}"

Expand Down
102 changes: 58 additions & 44 deletions src/ssh_audit/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(self, policy_file: Optional[str] = None, policy_data: Optional[str]
self._dh_modulus_sizes: Optional[Dict[str, int]] = None
self._server_policy = True
self._allow_algorithm_subset_and_reordering = False
self._errors: List[Any] = []

self._name_and_version: str = ''

Expand Down Expand Up @@ -151,7 +152,7 @@ def __init__(self, policy_file: Optional[str] = None, policy_data: Optional[str]
elif key == 'host keys':
self._host_keys = algs
elif key == 'optional host keys':
self._optional_host_keys = algs
self._optional_host_keys = algs
elif key == 'key exchanges':
self._kex = algs
elif key == 'ciphers':
Expand Down Expand Up @@ -217,13 +218,12 @@ def __init__(self, policy_file: Optional[str] = None, policy_data: Optional[str]
self._name_and_version = "%s (version %s)" % (self._name, self._version)


@staticmethod
def _append_error(errors: List[Any], mismatched_field: str, expected_required: Optional[List[str]], expected_optional: Optional[List[str]], actual: List[str]) -> None:
def _append_error(self, mismatched_field: str, expected_required: Optional[List[str]], expected_optional: Optional[List[str]], actual: List[str]) -> None:
if expected_required is None:
expected_required = ['']
if expected_optional is None:
expected_optional = ['']
errors.append({'mismatched_field': mismatched_field, 'expected_required': expected_required, 'expected_optional': expected_optional, 'actual': actual})
self._errors.append({'mismatched_field': mismatched_field, 'expected_required': expected_required, 'expected_optional': expected_optional, 'actual': actual})


def _normalize_hostkey_sizes(self) -> None:
Expand Down Expand Up @@ -296,6 +296,9 @@ def create(source: Optional[str], banner: Optional['Banner'], kex: Optional['SSH
# The version of this policy (displayed in the output during scans). Not parsed, and may be any value, including strings.
version = 1
# When false, host keys, kex, ciphers, and MAC lists must match exactly. When true, the target host may support a subset of the specified algorithms and/or algorithms may appear in a different order; this is useful for specifying a baseline and allowing some hosts the option to implement stricter controls.
allow_algorithm_subset_and_reordering = false
# The banner that must match exactly. Commented out to ignore banners, since minor variability in the banner is sometimes normal.
# banner = "%s"
Expand Down Expand Up @@ -325,39 +328,41 @@ def evaluate(self, banner: Optional['Banner'], kex: Optional['SSH2_Kex']) -> Tup
'''Evaluates a server configuration against this policy. Returns a tuple of a boolean (True if server adheres to policy) and an array of strings that holds error messages.'''

ret = True
errors: List[Any] = []

banner_str = str(banner)
if (self._banner is not None) and (banner_str != self._banner):
ret = False
self._append_error(errors, 'Banner', [self._banner], None, [banner_str])
self._append_error('Banner', [self._banner], None, [banner_str])

# All subsequent tests require a valid kex, so end here if we don't have one.
if kex is None:
return ret, errors, self._get_error_str(errors, self._allow_algorithm_subset_and_reordering)
error_list, error_str = self._get_errors()
return ret, error_list, error_str

if (self._compressions is not None) and (kex.server.compression != self._compressions):
ret = False
self._append_error(errors, 'Compression', self._compressions, None, kex.server.compression)
self._append_error('Compression', self._compressions, None, kex.server.compression)

# If a list of optional host keys was given in the policy, remove any of its entries from the list retrieved from the server. This allows us to do an exact comparison with the expected list below.
pruned_host_keys = kex.key_algorithms
if self._optional_host_keys is not None:
pruned_host_keys = [x for x in kex.key_algorithms if x not in self._optional_host_keys]
# Checking Hostkeys

# Check host keys.
if self._host_keys is not None:
# If the policy allows subsets and re-ordered algorithms...
if self._allow_algorithm_subset_and_reordering:
for hostkey_t in kex.key_algorithms:
if hostkey_t not in self._host_keys:
ret = False
self._append_error(errors, 'Host keys', self._host_keys, None, kex.key_algorithms)
self._append_error('Host keys', self._host_keys, self._optional_host_keys, kex.key_algorithms)
break
# The policy requires exact matching of algorithms.
elif pruned_host_keys != self._host_keys:
ret = False
self._append_error(errors, 'Host keys', self._host_keys, None, kex.key_algorithms)
self._append_error('Host keys', self._host_keys, self._optional_host_keys, kex.key_algorithms)

# Checking Host Key Sizes
# Check host key sizes.
if self._hostkey_sizes is not None:
hostkey_types = list(self._hostkey_sizes.keys())
hostkey_types.sort() # Sorted to make testing output repeatable.
Expand All @@ -368,7 +373,7 @@ def evaluate(self, banner: Optional['Banner'], kex: Optional['SSH2_Kex']) -> Tup
actual_hostkey_size = server_host_keys[hostkey_type]['hostkey_size']
if actual_hostkey_size != expected_hostkey_size:
ret = False
self._append_error(errors, 'Host key (%s) sizes' % hostkey_type, [str(expected_hostkey_size)], None, [str(actual_hostkey_size)])
self._append_error('Host key (%s) sizes' % hostkey_type, [str(expected_hostkey_size)], None, [str(actual_hostkey_size)])

# If we have expected CA signatures set, check them against what the server returned.
if self._hostkey_sizes is not None and len(cast(str, self._hostkey_sizes[hostkey_type]['ca_key_type'])) > 0 and cast(int, self._hostkey_sizes[hostkey_type]['ca_key_size']) > 0:
Expand All @@ -380,47 +385,59 @@ def evaluate(self, banner: Optional['Banner'], kex: Optional['SSH2_Kex']) -> Tup
# Ensure that the CA signature type is what's expected (i.e.: the server doesn't have an RSA sig when we're expecting an ED25519 sig).
if actual_ca_key_type != expected_ca_key_type:
ret = False
self._append_error(errors, 'CA signature type', [expected_ca_key_type], None, [actual_ca_key_type])
self._append_error('CA signature type', [expected_ca_key_type], None, [actual_ca_key_type])
# Ensure that the actual and expected signature sizes match.
elif actual_ca_key_size != expected_ca_key_size:
ret = False
self._append_error(errors, 'CA signature size (%s)' % actual_ca_key_type, [str(expected_ca_key_size)], None, [str(actual_ca_key_size)])
# Checking KEX
self._append_error('CA signature size (%s)' % actual_ca_key_type, [str(expected_ca_key_size)], None, [str(actual_ca_key_size)])

# Check key exchanges.
if self._kex is not None:
# If the policy allows subsets and re-ordered algorithms...
if self._allow_algorithm_subset_and_reordering:
for kex_t in kex.kex_algorithms:
if kex_t not in self._kex:
ret = False
self._append_error(errors, 'Key exchanges', self._kex, None, kex.kex_algorithms)
self._append_error('Key exchanges', self._kex, None, kex.kex_algorithms)
break
elif kex.kex_algorithms != self._kex: # Requires perfect match
# If [email protected] is in the policy (i.e. the Terrapin vulnerability countermeasure), then it must appear in the server's list, regardless of the "allow_algorithm_subset_and_reordering" flag.
if ('[email protected]' in self._kex and '[email protected]' not in kex.kex_algorithms) or \
('[email protected]' in self._kex and '[email protected]' not in kex.kex_algorithms):
ret = False
self._append_error('Key exchanges', self._kex, None, kex.kex_algorithms)

# The policy requires exact matching of algorithms.
elif kex.kex_algorithms != self._kex:
ret = False
self._append_error(errors, 'Key exchanges', self._kex, None, kex.kex_algorithms)
self._append_error('Key exchanges', self._kex, None, kex.kex_algorithms)

# Checking Ciphers
if self._ciphers is not None:
# If the policy allows subsets and re-ordered algorithms...
if self._allow_algorithm_subset_and_reordering:
for cipher_t in kex.server.encryption:
if cipher_t not in self._ciphers:
ret = False
self._append_error(errors, 'Ciphers', self._ciphers, None, kex.server.encryption)
self._append_error('Ciphers', self._ciphers, None, kex.server.encryption)
break
elif kex.server.encryption != self._ciphers: # Requires perfect match
# The policy requires exact matching of algorithms.
elif kex.server.encryption != self._ciphers:
ret = False
self._append_error(errors, 'Ciphers', self._ciphers, None, kex.server.encryption)
self._append_error('Ciphers', self._ciphers, None, kex.server.encryption)

# Checking MACs
if self._macs is not None:
# If the policy allows subsets and re-ordered algorithms...
if self._allow_algorithm_subset_and_reordering:
for mac_t in kex.server.mac:
if mac_t not in self._macs:
ret = False
self._append_error(errors, 'MACs', self._macs, None, kex.server.mac)
self._append_error('MACs', self._macs, None, kex.server.mac)
break
elif kex.server.mac != self._macs: # Requires perfect match
# The policy requires exact matching of algorithms.
elif kex.server.mac != self._macs:
ret = False
self._append_error(errors, 'MACs', self._macs, None, kex.server.mac)
self._append_error('MACs', self._macs, None, kex.server.mac)

if self._dh_modulus_sizes is not None:
dh_modulus_types = list(self._dh_modulus_sizes.keys())
Expand All @@ -431,30 +448,27 @@ def evaluate(self, banner: Optional['Banner'], kex: Optional['SSH2_Kex']) -> Tup
actual_dh_modulus_size = kex.dh_modulus_sizes()[dh_modulus_type]
if expected_dh_modulus_size != actual_dh_modulus_size:
ret = False
self._append_error(errors, 'Group exchange (%s) modulus sizes' % dh_modulus_type, [str(expected_dh_modulus_size)], None, [str(actual_dh_modulus_size)])

return ret, errors, self._get_error_str(errors, self._allow_algorithm_subset_and_reordering)
self._append_error('Group exchange (%s) modulus sizes' % dh_modulus_type, [str(expected_dh_modulus_size)], None, [str(actual_dh_modulus_size)])

error_list, error_str = self._get_errors()
return ret, error_list, error_str

@staticmethod
def _get_error_str(errors: List[Any], allow_algorithm_subset_and_reordering: bool = False) -> str:
'''Transforms an error struct to a flat string of error messages.'''

if allow_algorithm_subset_and_reordering:
expected_str = 'allowed'
else:
expected_str = 'required'
def _get_errors(self) -> Tuple[List[Any], str]:
'''Returns the list of errors, along with the string representation of those errors.'''

subset_and_reordering_semicolon = "; subset and/or reordering allowed" if self._allow_algorithm_subset_and_reordering else "; exact match"
subset_and_reordering_parens = " (subset and/or reordering allowed)" if self._allow_algorithm_subset_and_reordering else ""
error_list = []
spacer = ''
for e in errors:
for e in self._errors:
e_str = " * %s did not match.\n" % e['mismatched_field']

if ('expected_optional' in e) and (e['expected_optional'] != ['']):
e_str += " - Expected (" + expected_str + "): %s\n - Expected (optional): %s\n" % (Policy._normalize_error_field(e['expected_required']), Policy._normalize_error_field(e['expected_optional']))
e_str += " - Expected (required%s): %s\n - Expected (optional): %s\n" % (subset_and_reordering_semicolon, Policy._normalize_error_field(e['expected_required']), Policy._normalize_error_field(e['expected_optional']))
spacer = ' '
else:
e_str += " - Expected (" + expected_str + "): %s\n" % Policy._normalize_error_field(e['expected_required'])
e_str += " - Expected%s: %s\n" % (subset_and_reordering_parens, Policy._normalize_error_field(e['expected_required']))
spacer = ' '
e_str += " - Actual:%s%s\n" % (spacer, Policy._normalize_error_field(e['actual']))
error_list.append(e_str)
Expand All @@ -465,7 +479,7 @@ def _get_error_str(errors: List[Any], allow_algorithm_subset_and_reordering: boo
if len(error_list) > 0:
error_str = "\n".join(error_list)

return error_str
return self._errors, error_str


def get_name_and_version(self) -> str:
Expand Down Expand Up @@ -581,4 +595,4 @@ def __str__(self) -> str:
if self._dh_modulus_sizes is not None:
dh_modulus_sizes_str = str(self._dh_modulus_sizes)

return "Name: %s\nVersion: %s\nBanner: %s\nCompressions: %s\nHost Keys: %s\nOptional Host Keys: %s\nKey Exchanges: %s\nCiphers: %s\nMACs: %s\nHost Key Sizes: %s\nDH Modulus Sizes: %s\nServer Policy: %r" % (name, version, banner, compressions_str, host_keys_str, optional_host_keys_str, kex_str, ciphers_str, macs_str, hostkey_sizes_str, dh_modulus_sizes_str, self._server_policy)
return "Name: %s\nVersion: %s\nAllow Algorithm Subset and/or Reordering: %r\nBanner: %s\nCompressions: %s\nHost Keys: %s\nOptional Host Keys: %s\nKey Exchanges: %s\nCiphers: %s\nMACs: %s\nHost Key Sizes: %s\nDH Modulus Sizes: %s\nServer Policy: %r" % (name, version, self._allow_algorithm_subset_and_reordering, banner, compressions_str, host_keys_str, optional_host_keys_str, kex_str, ciphers_str, macs_str, hostkey_sizes_str, dh_modulus_sizes_str, self._server_policy)
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"errors": [],
"host": "localhost",
"passed": true,
"policy": "Docker policy: test15 (version 1)"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Host: localhost:2222
Policy: Docker policy: test15 (version 1)
Result: ✔ Passed
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
{
"errors": [
{
"actual": [
"rsa-sha2-512",
"rsa-sha2-256",
"ssh-rsa",
"ecdsa-sha2-nistp256",
"ssh-ed25519"
],
"expected_optional": [
""
],
"expected_required": [
"rsa-sha2-512",
"extra_hostkey_alg"
],
"mismatched_field": "Host keys"
},
{
"actual": [
"curve25519-sha256",
"[email protected]",
"ecdh-sha2-nistp256",
"ecdh-sha2-nistp384",
"ecdh-sha2-nistp521",
"diffie-hellman-group-exchange-sha256",
"diffie-hellman-group16-sha512",
"diffie-hellman-group18-sha512",
"diffie-hellman-group14-sha256",
"diffie-hellman-group14-sha1"
],
"expected_optional": [
""
],
"expected_required": [
"curve25519-sha256",
"extra_kex_alg"
],
"mismatched_field": "Key exchanges"
},
{
"actual": [
"[email protected]",
"aes128-ctr",
"aes192-ctr",
"aes256-ctr",
"[email protected]",
"[email protected]"
],
"expected_optional": [
""
],
"expected_required": [
"[email protected]",
"extra_cipher_alg"
],
"mismatched_field": "Ciphers"
},
{
"actual": [
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
"hmac-sha2-256",
"hmac-sha2-512",
"hmac-sha1"
],
"expected_optional": [
""
],
"expected_required": [
"[email protected]",
"extra_mac_alg"
],
"mismatched_field": "MACs"
}
],
"host": "localhost",
"passed": false,
"policy": "Docker policy: test16 (version 1)"
}
Loading

0 comments on commit 3c31934

Please sign in to comment.