-
Notifications
You must be signed in to change notification settings - Fork 0
/
malvada.py
142 lines (120 loc) · 5.4 KB
/
malvada.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import argparse
import glob
import os
import shutil
import malvada_workflow
from rich.progress import (
BarColumn,
Progress,
TextColumn,
TimeRemainingColumn,
SpinnerColumn,
TaskProgressColumn,
TimeElapsedColumn,
)
def parse_arguments():
"""
Arguments parsing.
"""
parser = argparse.ArgumentParser(description='Generates a dataset from CAPE reports. '
'WARNING: This script will modify the reports in '
'the directory provided.')
parser.add_argument(
'json_dir', help='The directory containing one or more json reports.')
parser.add_argument('-w', '--workers', type=int, default=10,
help='Number of workers to use (default: 10).')
parser.add_argument('-s', '--silent', action='store_true',
help='Silent mode (default: False).')
parser.add_argument('-vt', '--vt-positives-threshold', type=int,
default=10, help='Threshold for VirusTotal positives (default: 10).')
parser.add_argument('-a', '--anonimize-terms',
help='Replace the terms in the file provided with [REDACTED], one by line '
'(default: \'terms_to_anonymize.txt\').',
default='terms_to_anonymize.txt')
return parser.parse_args()
def main() -> None:
"""
Main script for the generation of the dataset.
"""
# Parse arguments
args = parse_arguments()
duplicates_strat = 'biggest' # Default strategy for duplicates
interpret = 'python3'
progress = Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
"•",
TimeElapsedColumn(),
"•",
TimeRemainingColumn(),
)
# Check if the json_dir exists and is a directory
if not (os.path.exists(args.json_dir) and os.path.isdir(args.json_dir)):
progress.console.log(
f"Error: {args.json_dir} does not exist or is not a directory.")
exit()
# Check if the interpret is valid
if not shutil.which(interpret):
progress.console.log(f"Error: {interpret} is not a valid interpreter.")
exit()
# Set up the reports for the pipeline
reports = glob.glob(args.json_dir + "/*.json")
total_reports = len(reports)
if not args.silent:
progress.console.rule(
"[bold green]MALVADA", style="green")
# Check if there are no reports within the directory
if not total_reports:
progress.console.log("Error: No reports found.")
exit()
if not args.silent:
progress.console.log(f"[+] Total workers: [green]{args.workers}")
progress.console.log(f"[+] Total reports: [green]{total_reports}")
progress.console.log(
f"[+] File with terms to anonymize: [green]{args.anonimize_terms}")
progress.console.log(
f"[+] VirusTotal positives threshold: [green]{args.vt_positives_threshold}")
progress.console.rule(
"[bold green]Starting pipeline[/bold green]", style="green")
progress.start() if not args.silent else None
# Phase 1
reports_with_errors, reports_with_vt_errors = malvada_workflow.get_incorrect_cape_reports(
reports, args.silent, progress)
# set for faster lookup
reports_to_remove = set(reports_with_errors + reports_with_vt_errors)
# Remove reports with errors
reports = [report for report in reports if report not in reports_to_remove]
# Phase 2
dup_reports = malvada_workflow.get_duplicate_reports(
reports, duplicates_strat, args.silent, progress)
# Remove duplicate reports
reports = [report for report in reports if report not in dup_reports]
# Phase 3
malvada_workflow.sanitize_and_anonymize_reports(
reports, args.silent, args.anonimize_terms, progress, args.workers)
# Phase 4
malvada_workflow.get_avclass_labels(reports, args.silent, progress, args.workers)
# Phase 5
undetected_reports, no_consensus_reports, no_vt_consensus_reports = malvada_workflow.get_cape_reports_stats(
reports, args.silent, args.vt_positives_threshold, progress)
# Final info
if not args.silent:
progress.console.rule("[bold green]Pipeline finished[/bold green]", style="green")
progress.console.log("[+] Execution time: [green]"
f"{round(sum([task.elapsed for task in progress.tasks]), 2)}[/green] seconds.")
progress.console.log(f"[+] Reports passing all phases: [green]{len(reports)}[/green]")
progress.console.log(f"[+] Reports with errors: [green]{len(reports_with_errors)}[/green]")
progress.console.log(
f"[+] Reports with VirusTotal errors: [green]{len(reports_with_vt_errors)}[/green]")
progress.console.log(f"[+] Duplicate reports: [green]{len(dup_reports)}[/green]")
progress.console.log(f"[+] Benign or undetected reports: [green]{len(undetected_reports)}[/green]")
progress.console.log(
f"[+] Reports with no CAPE consensus: [green]{len(no_consensus_reports)}[/green]")
progress.console.log(
f"[+] Reports with no AVClass consensus: [green]{len(no_vt_consensus_reports)}[/green]")
progress.console.rule("[bold green]MALVADA", style="green")
progress.stop()
if __name__ == '__main__':
main()