-
Notifications
You must be signed in to change notification settings - Fork 0
/
taxii_to_crowdstrike.py
207 lines (172 loc) · 7.84 KB
/
taxii_to_crowdstrike.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
#!/usr/bin/env python3
# TAXII to CrowdStrike IOC Ingestion Script
# Script to poll a STIX/TAXII server for IOCs and ingest them into CrowdStrike Falcon.
# Developed by C.Brown ([email protected])
# This software is released under the MIT License.
# See the LICENSE file in the project root for the full license text.
# Last revised 11/10/2024
# version 2024.10.6
#-----------------------------------------------------------------------
# Version Date Notes:
# 2024.10.1 08-10.2024 Initial Public Release
# 2024.10.2 09.10.2024 Added error handling, logging, and retries for robustness
# 2024.10.3 09.10.2024 Handle pagination of large datasets
# 2024.10.4 09.10.2024 Added improved token handling, error checking, and page-by-page processing
# 2024.10.5 10.10.2024 Added configurable rate limiting
# 2024.10.6 11.10.2024 Added TAXII polling error handling
#-----------------------------------------------------------------------
import os
import requests
import logging
import time
from requests.auth import HTTPBasicAuth
from datetime import datetime, timedelta
# Configure logging
LOG_FILE = '/var/log/taxii_to_crowdstrike.log'
logging.basicConfig(filename=LOG_FILE, level=logging.INFO, format='%(asctime)s - %(message)s')
# Load API credentials from environment variables or default values
CLIENT_ID = os.getenv('CLIENT_ID', 'your_client_id')
CLIENT_SECRET = os.getenv('CLIENT_SECRET', 'your_client_secret')
TAXII_SERVER_URL = os.getenv('TAXII_SERVER_URL', 'https://taxii.server.url')
TAXII_USERNAME = os.getenv('TAXII_USERNAME', 'your_taxii_username')
TAXII_PASSWORD = os.getenv('TAXII_PASSWORD', 'your_taxii_password')
TAXII_COLLECTION = os.getenv('TAXII_COLLECTION', 'your_taxii_collection')
# Rate limiting delay (in seconds) configurable via environment variable, default is 2 seconds
RATE_LIMIT_DELAY = int(os.getenv('RATE_LIMIT_DELAY', 2))
MAX_RETRIES = 3
def log(message):
"""Log message to file and console."""
logging.info(message)
print(message)
def retry(func, *args, **kwargs):
"""Retry function with exponential backoff."""
n = 1
delay = 5
while n <= MAX_RETRIES:
try:
return func(*args, **kwargs)
except Exception as e:
log(f"Command failed. Attempt {n}/{MAX_RETRIES}. Retrying in {delay} seconds... Error: {e}")
time.sleep(delay)
n += 1
delay *= 2
log("The command has failed after maximum retry attempts.")
return None
def rate_limit():
"""Rate limiting to control the request frequency."""
log(f"Rate limiting: Waiting for {RATE_LIMIT_DELAY} seconds before the next request...")
time.sleep(RATE_LIMIT_DELAY)
def get_crowdstrike_token():
"""Retrieve CrowdStrike OAuth2 token."""
log("Fetching CrowdStrike OAuth2 token...")
url = 'https://api.crowdstrike.com/oauth2/token'
data = {
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET
}
response = requests.post(url, data=data)
if response.status_code == 200:
access_token = response.json().get('access_token')
log("CrowdStrike OAuth2 token acquired.")
return access_token
else:
log(f"Failed to get OAuth2 token: {response.status_code} {response.text}")
return None
def poll_taxii_server():
"""Poll TAXII server and handle pagination with error handling."""
log(f"Polling TAXII server at {TAXII_SERVER_URL} for collection {TAXII_COLLECTION}...")
headers = {'Content-Type': 'application/xml'}
auth = HTTPBasicAuth(TAXII_USERNAME, TAXII_PASSWORD)
next_token = None
all_iocs = []
while True:
data = f"<taxii_poll_request_xml{' next=' + next_token if next_token else ''}/>"
response = requests.post(f"{TAXII_SERVER_URL}/collections/{TAXII_COLLECTION}/poll", headers=headers, auth=auth, data=data)
# Error handling based on response status
if response.status_code == 200:
taxii_data = response.json()
iocs = [obj['pattern'].split("'")[1] for obj in taxii_data.get('objects', []) if obj['type'] == 'indicator']
all_iocs.extend(iocs)
log(f"Retrieved {len(iocs)} IOCs from TAXII.")
elif response.status_code == 404:
log("Error: Collection not found at TAXII server. URL may be incorrect or collection may not exist. Exiting...")
break
elif response.status_code == 401:
log("Error: Unauthorized access to TAXII server. Check your credentials. Exiting...")
break
elif response.status_code == 500:
log("Error: Internal server error at TAXII server. Retrying...")
else:
log(f"Error: Received unexpected HTTP status code {response.status_code} from TAXII server. Retrying...")
# Add rate limiting between each request
rate_limit()
next_token = taxii_data.get('next_token')
if not next_token:
break
return all_iocs
def check_ioc_exists_paginated(ioc_value, access_token):
"""Check if IOC exists in CrowdStrike Falcon, handle pagination."""
url = f"https://api.crowdstrike.com/indicators/queries/iocs/v1?value={ioc_value}"
headers = {'Authorization': f'Bearer {access_token}'}
next_token = None
while True:
if next_token:
response = requests.get(f"{url}&next_token={next_token}", headers=headers)
else:
response = requests.get(url, headers=headers)
if response.status_code != 200:
log(f"Failed to check IOC: {response.status_code} {response.text}")
return False
if response.json().get('resources'):
return True
next_token = response.json().get('meta', {}).get('pagination', {}).get('next_token')
if not next_token:
break
return False
def calculate_expiration_date():
"""Calculate expiration date for IOC (3 months from now)."""
expiration_date = datetime.utcnow() + timedelta(days=90)
return expiration_date.strftime('%Y-%m-%dT%H:%M:%SZ')
def push_iocs_to_crowdstrike(iocs, access_token):
"""Push or update IOCs in CrowdStrike Falcon."""
log("Pushing or updating IOCs in CrowdStrike Falcon...")
headers = {'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json'}
for ioc in iocs:
log(f"Processing IOC: {ioc}")
expiration_date = calculate_expiration_date()
if check_ioc_exists_paginated(ioc, access_token):
log(f"IOC {ioc} already exists. Updating...")
payload = {
"type": "domain",
"value": ioc,
"action": "detect",
"valid_until": expiration_date,
"source": "TAXII Import"
}
response = requests.patch("https://api.crowdstrike.com/indicators/entities/iocs/v1", headers=headers, json=payload)
else:
log(f"IOC {ioc} is new. Adding...")
payload = {
"type": "domain",
"value": ioc,
"action": "detect",
"valid_until": expiration_date,
"source": "TAXII Import"
}
response = requests.post("https://api.crowdstrike.com/indicators/entities/iocs/v1", headers=headers, json=payload)
if response.status_code not in (200, 201):
log(f"Failed to push/update IOC {ioc}: {response.status_code} {response.text}")
# Add rate limiting between each IOC push
rate_limit()
def main():
log("TAXII to CrowdStrike IOC ingestion started.")
access_token = retry(get_crowdstrike_token)
if not access_token:
log("Failed to obtain access token. Exiting.")
return
iocs = retry(poll_taxii_server)
if iocs:
retry(push_iocs_to_crowdstrike, iocs, access_token)
log("IOC ingestion and management process complete.")
if __name__ == '__main__':
main()