Skip to content

Commit

Permalink
Refactored target queries
Browse files Browse the repository at this point in the history
  • Loading branch information
bamhm182 committed Nov 13, 2022
1 parent b8211a4 commit 4337fb7
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 99 deletions.
18 changes: 16 additions & 2 deletions docs/src/usage/plugins/targets.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
>> 'DAPPERDINGO'
>> ```
## targets.build_scope_host_db(slug, scope)
> Prints a list of IPs ready to ingest into the Database
Expand Down Expand Up @@ -147,6 +146,21 @@
>> [{"credentials": [{...},...],...}]
>> ```
## targets.get_query(status='registered', query_changes={})
> Pulls back a list of targets matching the specified query
>
> | Arguments | Type | Description
> | --- | --- | ---
> | `status` | string | The type of targets to pull back. (Ex: `registered`, `unregistered`, `upcoming`, `all`)
> | `query_changes` | dict() | Changes to make to the standard query. (Ex: `{"sorting['field']": "dateUploaded"}`
>
>> Examples
>> ```python3
>> >>> h.targets.get_query(status='unregistered')
>> [{"codename": "SLEEPYSLUG", ...}, ...]
>> ```
## targets.get_registered_summary()
> The Registered Summary is a short list of information about every target you have registered.
Expand Down Expand Up @@ -230,7 +244,7 @@
>> Examples
>> ```python3
>> >>> h.targets.get_upcoming()
>> [{'codename': 'SLEEPYSLUG', 'slug': '1o2h8o', 'category_name': 'Web Application', 'organization_name': 'SLEEPY Orgnization', 'upcoming_start_date': 1668430800}, ...]
>> [{'codename': 'SLEEPYSLUG', 'upcoming_start_date': 1668430800, ...}, ...]
>> ```
## targets.set_connected(target, **kwargs)
Expand Down
60 changes: 25 additions & 35 deletions src/synack/plugins/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,26 @@ def get_credentials(self, **kwargs):
if res.status_code == 200:
return res.json()

def get_query(self, status='registered', query_changes={}):
"""Get information about targets returned from a query"""
if not self.db.categories:
self.get_assessments()
categories = []
for category in self.db.categories:
if category.passed_practical and category.passed_written:
categories.append(category.id)
query = {
'filter[primary]': status,
'filter[secondary]': 'all',
'filter[industry]': 'all',
'filter[category][]': categories
}
query.update(query_changes)
res = self.api.request('GET', 'targets', query=query)
if res.status_code == 200:
self.db.add_targets(res.json(), is_registered=True)
return res.json()

def get_registered_summary(self):
"""Get information on your registered targets"""
res = self.api.request('GET', 'targets/registered_summary')
Expand Down Expand Up @@ -199,45 +219,15 @@ def get_scope_web(self, target=None, **kwargs):

def get_unregistered(self):
"""Get slugs of all unregistered targets"""
if not self.db.categories:
self.get_assessments()
categories = []
for c in self.db.categories:
if c.passed_practical and c.passed_practical:
categories.append(c.id)
query = {
'filter[primary]': 'unregistered',
'filter[secondary]': 'all',
'filter[industry]': 'all',
'filter[category][]': categories
}
res = self.api.request('GET', 'targets', query=query)
ret = []
if res.status_code == 200:
self.db.add_targets(res.json(), is_registered=True)
for t in res.json():
ret.append({'codename': t['codename'], 'slug': t['slug']})
return ret
return self.get_query(status='unregistered')

def get_upcoming(self):
"""Get slugs and upcoming start dates of all upcoming targets"""
query = {
'filter[primary]': 'upcoming',
'filter[secondary]': 'all',
'filter[industry]': 'all',
'sorting[field]': 'upcomingStartDate',
'sorting[direction]': 'asc'
query_changes = {
'sorting[field]': 'upcomingStartDate',
'sorting[direction]': 'asc'
}
res = self.api.request('GET', 'targets', query=query)
ret = []
if res.status_code == 200:
for t in res.json():
ret.append({'codename': t['codename'],
'slug': t['slug'],
'category_name': t['category']['name'],
'organization_name': t['organization']['name'],
'upcoming_start_date': t['upcoming_start_date']})
return ret
return self.get_query(status='upcoming', query_changes=query_changes)

def set_connected(self, target=None, **kwargs):
"""Connect to a target"""
Expand Down
123 changes: 61 additions & 62 deletions test/test_targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,51 @@ def test_get_credentials(self):
self.targets.get_credentials(codename='SLEEPYSLUG'))
self.targets.api.request.assert_called_with('POST', url)

def test_get_query(self):
"""Should get a list of targets"""
self.targets.db.categories = [
Category(id=1, passed_practical=True, passed_written=True),
Category(id=2, passed_practical=True, passed_written=True),
Category(id=3, passed_practical=False, passed_written=False),
]
query = {
'filter[primary]': 'unregistered',
'filter[secondary]': 'all',
'filter[industry]': 'all',
'filter[category][]': [1, 2]
}
self.targets.api.request.return_value.status_code = 200
results = [
{
"codename": "SLEEPYSLUG",
"slug": "1o2h8o"
}
]
self.targets.api.request.return_value.json.return_value = results
self.assertEqual(results, self.targets.get_unregistered())
self.targets.api.request.assert_called_with("GET",
"targets",
query=query)

def test_get_query_assessments_empty(self):
"""Should get a list of unregistered targets"""
self.targets.get_assessments = MagicMock()
self.targets.db.categories = []
query = {
'filter[primary]': 'unregistered',
'filter[secondary]': 'all',
'filter[industry]': 'all',
'filter[category][]': []
}
self.targets.api.request.return_value.status_code = 200
results = []
self.targets.api.request.return_value.json.return_value = results
self.assertEqual(results, self.targets.get_unregistered())
self.targets.get_assessments.assert_called_with()
self.targets.api.request.assert_called_with("GET",
"targets",
query=query)

def test_get_registered_summary(self):
"""Should make a request to get basic info about registered targets"""
t1 = {
Expand Down Expand Up @@ -364,74 +409,28 @@ def test_get_scope_web(self):
self.targets.api.request.return_value.json.assert_called()

def test_get_unregistered(self):
"""Should get a list unregistered targets"""
self.targets.db.categories = [
Category(id=1, passed_practical=True, passed_written=True),
Category(id=2, passed_practical=True, passed_written=True),
Category(id=3, passed_practical=False, passed_written=False),
]
query = {
'filter[primary]': 'unregistered',
'filter[secondary]': 'all',
'filter[industry]': 'all',
'filter[category][]': [1, 2]
}
self.targets.api.request.return_value.status_code = 200
unreg = [
{
"codename": "SLEEPYSLUG",
"slug": "1o2h8o"
}
"""Should query for unregistered targets"""
results = [
{'codename': 'SLEEPYSLUG', 'slug': '1283hi'}
]
self.targets.api.request.return_value.json.return_value = unreg
self.assertEqual(unreg, self.targets.get_unregistered())
self.targets.api.request.assert_called_with("GET",
"targets",
query=query)
self.targets.get_query = MagicMock()
self.targets.get_query.return_value = results
self.assertEquals(results, self.targets.get_unregistered())
self.targets.get_query.assert_called_with(status='unregistered')

def test_get_upcoming(self):
"""Should get a list upcoming targets"""
query = {
'filter[primary]': 'upcoming',
'filter[secondary]': 'all',
'filter[industry]': 'all',
"""Should query for upcoming targets"""
results = [
{'codename': 'SLEEPYSLUG', 'slug': '1283hi'}
]
query_changes = {
'sorting[field]': 'upcomingStartDate',
'sorting[direction]': 'asc'
}
self.targets.api.request.return_value.status_code = 200
upcoming = [
{
"codename": "SLEEPYSLUG",
"slug": "1o2h8o",
'category_name': 'Web Application',
'organization_name': 'SLEEPY Orgnization',
'upcoming_start_date': 1668430800
}
]
self.targets.api.request.return_value.json.return_value = upcoming
self.assertEqual(upcoming, self.targets.get_upcoming())
self.targets.api.request.assert_called_with("GET",
"targets",
query=query)

def test_get_unregistered_assessments_empty(self):
"""Should get a list of unregistered targets"""
self.targets.get_assessments = MagicMock()
self.targets.db.categories = []
query = {
'filter[primary]': 'unregistered',
'filter[secondary]': 'all',
'filter[industry]': 'all',
'filter[category][]': []
}
self.targets.api.request.return_value.status_code = 200
unreg = []
self.targets.api.request.return_value.json.return_value = unreg
self.assertEqual(unreg, self.targets.get_unregistered())
self.targets.get_assessments.assert_called_with()
self.targets.api.request.assert_called_with("GET",
"targets",
query=query)
self.targets.get_query = MagicMock()
self.targets.get_query.return_value = results
self.assertEquals(results, self.targets.get_upcoming())
self.targets.get_query.assert_called_with(status='upcoming', query_changes=query_changes)

def test_set_connected(self):
"""Should connect to a given target provided kwargs"""
Expand Down

0 comments on commit 4337fb7

Please sign in to comment.