Skip to content

Commit

Permalink
Pulled in Targets Analytics changes from PR #16 (#17)
Browse files Browse the repository at this point in the history
* Added analytics features
Created get_connections, get_submissions_summary & get_submissions under targets plugin

---------

Co-authored-by: Keanu <[email protected]>
  • Loading branch information
bamhm182 and RedByte1337 authored Feb 8, 2023
1 parent f660f6f commit 97e4202
Show file tree
Hide file tree
Showing 4 changed files with 308 additions and 4 deletions.
10 changes: 6 additions & 4 deletions checks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ flake8 src test live-tests
diff_arrays() {
local -n _one=$1
local -n _two=$2
echo "Alphabetical Order Current Order"
echo "---------------------------------------------------------------"
for ((i=0; i<${#_one[@]}; i++)); do
if [[ "${_one[$i]}" != "${_two[$i]}" ]]; then
echo -e "${_two[$i]}\t${_one[$i]}"
fi
_two[$i]="${_two[$i]} "
#echo -e "${t:0:50}${_one[$i]}"
echo -e "${_two[$i]:0:50}${_one[$i]}"
done
}

Expand Down Expand Up @@ -41,7 +43,7 @@ for test in ./test/test_*.py; do
readarray -t a_defs < <(printf '%s\n' "${defs[@]}" | sort)
# Check Alphabetical
if [[ "${defs[@]}" != "${a_defs[@]}" ]]; then
echo ${test} is not in alphabetical order
echo -e "${test} is not in alphabetical order"
diff_arrays defs a_defs
fi
done
Expand Down
70 changes: 70 additions & 0 deletions docs/src/usage/plugins/targets.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,21 @@
>> {"slug": "ulmpupflgm", "codename": "GOOFYGOPHER", "status": "Connected"}
>> ```
## targets.get_connections(target, **kwargs)
> Get the connection details of a target
>
> | Argments | Type | Description
> | --- | --- | ---
> | `target` | db.models.Target | A single Target returned from the database
> | `kwargs` | kwargs | Information used to look up a Target in the database (ex: `codename`, `slug`, etc.)
>
>> Examples
>> ```python3
>> >>> h.targets.get_connections(codename='BLINKYBABOON')
>> {"lifetime_connections":200,"current_connections":0}
>> ```
## targets.get_credentials(**kwargs)
> Pulls back the credentials for a Target
Expand Down Expand Up @@ -242,6 +257,60 @@
>> 'owners': [{'owner_uid': '97g8ehri', 'owner_type_id': 1, 'codename': 'slappyfrog'}, ...]
>> }, ...]
## targets.get_submissions(target, status="accepted", **kwargs)
> Get the details of previously submitted vulnerabilities from the analytics of a target
>
> | Argments | Type | Description
> | --- | --- | ---
> | `target` | db.models.Target | A single Target returned from the database
> | `status` | str | Query either `accepted`, `rejected` or `in_queue` vulnerabilities
> | `kwargs` | kwargs | Information used to look up a Target in the database (ex: `codename`, `slug`, etc.)
>
>> Examples
>> ```python3
>> >>> h.targets.get_submissions(codename='BLINKYBABOON')
>> [
>> {
>> "categories": ["Authorization/Permissions","SSRF"],
>> "exploitable_locations":[
>> {"type":"url","value":"https://example.com/index.html","created_at":1625646235,"status":"fixed"},
>> ...
>> ]
>> }, ...
>> ]
>> >>>
>> >>> h.targets.get_submissions(status="in_queue", codename='BLINKYBABOON')
>> [
>> {
>> "categories": ["Authorization/Permissions","SSRF"],
>> "exploitable_locations":[
>> {"type":"url","value":"https://example.com/login.html","created_at":1625646235,"status":"pending"},
>> ...
>> ]
>> }, ...
>> ]
>> ```
## targets.get_submissions_summary(target, hours_ago=None, **kwargs)
> Get a summary of the submission analytics of a target
>
> | Argments | Type | Description
> | --- | --- | ---
> | `target` | db.models.Target | A single Target returned from the database
> | `hours_ago` | int | The amount of hours since the current time to query the analytics for. (ex: `hours_ago=48` will query how many submissions were made in the last `48` hours. Defaults to lifetime when not set.)
> | `kwargs` | kwargs | Information used to look up a Target in the database (ex: `codename`, `slug`, etc.)
>
>> Examples
>> ```python3
>> >>> h.targets.get_submissions_summary(codename='BLINKYBABOON')
>> 35
>> >>>
>> >>> h.targets.get_submissions_summary(hours_ago=48, codename='BLINKYBABOON')
>> 5
>> ```
## targets.get_unregistered()
> Gets a list of unregistered Targets from the Synack API.
Expand Down Expand Up @@ -269,6 +338,7 @@
> Connect to a specified target
>
> | Argments | Type | Description
> | --- | --- | ---
> | `target` | db.models.Target | A single Target returned from the database
> | `kwargs` | kwargs | Information used to look up a Target in the database (ex: `codename`, `slug`, etc.)
>
Expand Down
42 changes: 42 additions & 0 deletions src/synack/plugins/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ def get_connected(self):
}
return ret

def get_connections(self, target=None, **kwargs):
"""Get the connection details of a target."""
if target is None:
if len(kwargs) == 0:
kwargs = {'codename': self.get_connected().get('codename')}
target = self.db.find_targets(**kwargs)
if target:
target = target[0]
res = self.api.request('GET', "listing_analytics/connections", query={"listing_id": target.slug})
if res.status_code == 200:
return res.json()["value"]

def get_credentials(self, **kwargs):
"""Get Credentials for a target"""
target = self.db.find_targets(**kwargs)[0]
Expand Down Expand Up @@ -231,6 +243,36 @@ def get_scope_web(self, target=None, add_to_db=False, **kwargs):
self.scratchspace.set_burp_file(self.build_scope_web_burp(scope), target=target)
return scope

def get_submissions(self, target=None, status="accepted", **kwargs):
"""Get the details of previously submitted vulnerabilities from the analytics of a target."""
if status not in ["accepted", "rejected", "in_queue"]:
return []
if target is None:
if len(kwargs) == 0:
kwargs = {'codename': self.get_connected().get('codename')}
target = self.db.find_targets(**kwargs)
if target:
target = target[0]
query = {"listing_id": target.slug, "status": status}
res = self.api.request('GET', "listing_analytics/categories", query=query)
if res.status_code == 200:
return res.json()["value"]

def get_submissions_summary(self, target=None, hours_ago=None, **kwargs):
"""Get a summary of the submission analytics of a target."""
if target is None:
if len(kwargs) == 0:
kwargs = {'codename': self.get_connected().get('codename')}
target = self.db.find_targets(**kwargs)
if target:
target = target[0]
query = {"listing_id": target.slug}
if hours_ago:
query["period"] = f"{hours_ago}h"
res = self.api.request('GET', "listing_analytics/submissions", query=query)
if res.status_code == 200:
return res.json()["value"]

def get_unregistered(self):
"""Get slugs of all unregistered targets"""
return self.get_query(status='unregistered')
Expand Down
190 changes: 190 additions & 0 deletions test/test_targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,53 @@ def test_get_connected_disconnected(self):
}
self.assertEqual(out, self.targets.get_connected())

def test_get_connections(self):
"""Should return a summary of the lifetime and current connections given a slug"""
connections = {
"lifetime_connections": 200,
"current_connections": 5
}
return_data = {
"listing_id": "u2ire",
"type": "connections",
"value": {
"lifetime_connections": 200,
"current_connections": 5
}
}
self.targets.db.find_targets = MagicMock()
self.targets.db.find_targets.return_value = [Target(slug='u2ire')]
self.targets.api.request.return_value.status_code = 200
self.targets.api.request.return_value.json.return_value = return_data
self.assertEquals(self.targets.get_connections(slug='u2ire'), connections)
self.targets.api.request.assert_called_with('GET', 'listing_analytics/connections',
query={"listing_id": "u2ire"})

def test_get_connections_no_args(self):
"""Should return a summary of the lifetime and current connections if no args provided"""
connections = {
"lifetime_connections": 200,
"current_connections": 5
}
return_data = {
"listing_id": "u2ire",
"type": "connections",
"value": {
"lifetime_connections": 200,
"current_connections": 5
}
}
self.targets.db.find_targets = MagicMock()
self.targets.get_connected = MagicMock()
self.targets.get_connected.return_value = {'codename': 'TIREDTIGER', 'slug': 'u2ire'}
self.targets.db.find_targets.return_value = [Target(slug='u2ire')]
self.targets.api.request.return_value.status_code = 200
self.targets.api.request.return_value.json.return_value = return_data
self.assertEquals(self.targets.get_connections(), connections)
self.targets.get_connected.assert_called_with()
self.targets.api.request.assert_called_with('GET', 'listing_analytics/connections',
query={"listing_id": "u2ire"})

def test_get_credentials(self):
"""Should get credentials for a given target"""
target = Target(organization="qwewqe", slug="asdasd")
Expand Down Expand Up @@ -525,6 +572,149 @@ def test_get_scope_web_add_to_db(self):
self.targets.api.request.return_value.json.assert_called()
self.targets.db.add_urls.assert_called_with(self.targets.build_scope_web_db.return_value)

def test_get_submissions(self):
"""Should return the accepted vulnerabilities for a target given a slug"""
return_data = {
"listing_id": "u2ire",
"type": "categories",
"value": [{
"categories": ["Authorization/Permissions", "Access/Privacy Control Violation"],
"exploitable_locations": [{
"type": "url",
"value": "https://example.com/index.html",
"created_at": 1625643431,
"status": "fixed"
}
]
}]
}
self.targets.db.find_targets = MagicMock()
self.targets.db.find_targets.return_value = [Target(slug='u2ire')]
self.targets.api.request.return_value.status_code = 200
self.targets.api.request.return_value.json.return_value = return_data
self.assertEquals(self.targets.get_submissions(slug='u2ire'), return_data["value"])
self.targets.api.request.assert_called_with('GET', 'listing_analytics/categories',
query={"listing_id": "u2ire", "status": "accepted"})

def test_get_submissions_invalid_status(self):
"""Should return an empty dictionary if status is invalid"""
return_data = {
"listing_id": "u2ire",
"type": "categories",
"value": [{
"categories": ["Authorization/Permissions", "Access/Privacy Control Violation"],
"exploitable_locations": [{
"type": "url",
"value": "https://example.com/index.html",
"created_at": 1625643431,
"status": "fixed"
}
]
}]
}
self.targets.db.find_targets = MagicMock()
self.targets.db.find_targets.return_value = [Target(slug='u2ire')]
self.targets.api.request.return_value.status_code = 200
self.targets.api.request.return_value.json.return_value = return_data
self.assertEquals(self.targets.get_submissions(slug='u2ire', status="bad_status"), [])

def test_get_submissions_no_slug(self):
"""Should return info on currently connected target if slug not provided"""
return_data = {
"listing_id": "u2ire",
"type": "categories",
"value": [{
"categories": ["Authorization/Permissions", "Access/Privacy Control Violation"],
"exploitable_locations": [{
"type": "url",
"value": "https://example.com/index.html",
"created_at": 1625643431,
"status": "fixed"
}
]
}]
}
self.targets.db.find_targets = MagicMock()
self.targets.db.find_targets.return_value = [Target(slug='u2ire')]
self.targets.api.request.return_value.status_code = 200
self.targets.api.request.return_value.json.return_value = return_data
self.targets.get_connected = MagicMock()
self.targets.get_connected.return_value = {"slug": "u2ire"}
self.assertEquals(self.targets.get_submissions(), return_data["value"])
self.targets.api.request.assert_called_with('GET', 'listing_analytics/categories',
query={"listing_id": "u2ire", "status": "accepted"})

def test_get_submissions_rejected(self):
"""Should return the accepted vulnerabilities for a target given a slug"""
return_data = {
"listing_id": "u2ire",
"type": "categories",
"value": [{
"categories": ["Authorization/Permissions", "Access/Privacy Control Violation"],
"exploitable_locations": [{
"type": "url",
"value": "https://example.com/index.html",
"created_at": 1625643431,
"status": "pending"
}
]
}]
}
self.targets.db.find_targets = MagicMock()
self.targets.db.find_targets.return_value = [Target(slug='u2ire')]
self.targets.api.request.return_value.status_code = 200
self.targets.api.request.return_value.json.return_value = return_data
self.assertEquals(self.targets.get_submissions(status="rejected", slug='u2ire'), return_data["value"])
self.targets.api.request.assert_called_with('GET', 'listing_analytics/categories',
query={"listing_id": "u2ire", "status": "rejected"})

def test_get_submissions_summary(self):
"""Should return the amount of lifetime submissions given a slug"""
return_data = {
"listing_id": "u2ire",
"type": "submissions",
"value": 35
}
self.targets.db.find_targets = MagicMock()
self.targets.db.find_targets.return_value = [Target(slug='u2ire')]
self.targets.api.request.return_value.status_code = 200
self.targets.api.request.return_value.json.return_value = return_data
self.assertEquals(self.targets.get_submissions_summary(slug='u2ire'), 35)
self.targets.api.request.assert_called_with('GET', 'listing_analytics/submissions',
query={"listing_id": "u2ire"})

def test_get_submissions_summary_hours(self):
"""Should return the amount of submissions in the last x hours given a slug"""
return_data = {
"listing_id": "u2ire",
"type": "submissions",
"value": 5
}
self.targets.db.find_targets = MagicMock()
self.targets.db.find_targets.return_value = [Target(slug='u2ire')]
self.targets.api.request.return_value.status_code = 200
self.targets.api.request.return_value.json.return_value = return_data
self.assertEquals(self.targets.get_submissions_summary(hours_ago=48, slug='u2ire'), 5)
self.targets.api.request.assert_called_with('GET', 'listing_analytics/submissions',
query={"listing_id": "u2ire", "period": "48h"})

def test_get_submissions_summary_no_slug(self):
"""Should return the amount of lifetime submissions for current connected when no slug"""
return_data = {
"listing_id": "u2ire",
"type": "submissions",
"value": 35
}
self.targets.db.find_targets = MagicMock()
self.targets.get_connected = MagicMock()
self.targets.get_connected.return_value = {'slug': 'u2ire'}
self.targets.db.find_targets.return_value = [Target(slug='u2ire')]
self.targets.api.request.return_value.status_code = 200
self.targets.api.request.return_value.json.return_value = return_data
self.assertEquals(self.targets.get_submissions_summary(), 35)
self.targets.api.request.assert_called_with('GET', 'listing_analytics/submissions',
query={"listing_id": "u2ire"})

def test_get_unregistered(self):
"""Should query for unregistered targets"""
results = [
Expand Down

0 comments on commit 97e4202

Please sign in to comment.