-
Notifications
You must be signed in to change notification settings - Fork 5
/
logs.py
56 lines (47 loc) · 1.49 KB
/
logs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from itertools import repeat
import texts
from core import get_client
from spinner import *
def run_logs(credentials):
print("\n===== {} =====".format(texts.LOGS))
try:
with Spinner(""):
logs = vpcFlowLogEnabled(credentials)
if logs[0]:
for log in logs[0]:
print("{}: {}".format(log, texts.LOGGING))
if logs[1]:
for log in logs[1]:
print("{}: {}".format(log, texts.NOT_LOGGING))
except Exception as e:
print(e)
exit()
def getFlowLogs(vpcId, credentials):
flowLogs = get_client("ec2", credentials).describe_flow_logs(
Filters=[{"Name": "resource-id", "Values": [vpcId]}]
)["FlowLogs"]
if flowLogs:
flowLogs = list(
map(
lambda x: {
"FlowLogStatus": x["FlowLogStatus"],
"VpcId": x["ResourceId"],
},
flowLogs,
)
)
return flowLogs
else:
return {"FlowLogStatus": None, "VpcId": vpcId}
def vpcFlowLogEnabled(credentials):
cli = get_client("ec2", credentials)
vpcs = cli.describe_vpcs()["Vpcs"]
vpcsIds = [i["VpcId"] for i in vpcs]
flowLogs = list(map(getFlowLogs, vpcsIds, repeat(credentials)))
logging, notLogging = [], []
for id in flowLogs:
if id["FlowLogStatus"] == None:
notLogging.append(id["VpcId"])
else:
logging.append(id["VpcId"])
return logging, notLogging