forked from Yelp/detect-secrets
-
Notifications
You must be signed in to change notification settings - Fork 7
/
baseline.py
147 lines (116 loc) · 4.69 KB
/
baseline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import json
import time
from typing import Any
from typing import Callable
from typing import cast
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
from . import upgrades
from ..__version__ import VERSION
from ..exceptions import UnableToReadBaselineError
from ..settings import configure_settings_from_baseline
from ..settings import get_settings
from ..util.importlib import import_modules_from_package
from ..util.semver import Version
from .scan import get_files_to_scan
from .secrets_collection import SecretsCollection
def create(
*paths: str,
should_scan_all_files: bool = False,
root: str = '',
num_processors: Optional[int] = None,
) -> SecretsCollection:
"""Scans all the files recursively in path to initialize a baseline."""
kwargs = {}
if num_processors:
kwargs['num_processors'] = num_processors
secrets = SecretsCollection(root=root)
secrets.scan_files(
*get_files_to_scan(*paths, should_scan_all_files=should_scan_all_files, root=root),
**kwargs,
)
return secrets
def load(baseline: Dict[str, Any], filename: str = '') -> SecretsCollection:
"""
With a given baseline file, load all settings and discovered secrets from it.
:raises: KeyError
"""
# This is required for backwards compatibility, and supporting upgrades from older versions.
baseline = upgrade(baseline)
configure_settings_from_baseline(baseline, filename=filename)
return SecretsCollection.load_from_baseline(baseline)
def load_from_file(filename: str) -> Dict[str, Any]:
"""
:raises: UnableToReadBaselineError
:raises: InvalidBaselineError
"""
try:
with open(filename) as f:
return cast(Dict[str, Any], json.loads(f.read()))
except (FileNotFoundError, OSError, json.decoder.JSONDecodeError) as e:
raise UnableToReadBaselineError from e
def format_for_output(secrets: SecretsCollection, is_slim_mode: bool = False) -> Dict[str, Any]:
output = {
'version': VERSION,
# This will populate settings of filters and plugins,
**get_settings().json(),
'results': secrets.json(),
}
if not is_slim_mode:
output['generated_at'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
else:
# NOTE: This has a nice little side effect of keeping it ordered by line number,
# even though we don't output it.
for _, secret_list in cast(
Dict[str, List[Dict[str, Any]]],
output['results'],
).items():
for secret_dict in secret_list:
secret_dict.pop('line_number')
return output
def save_to_file(
secrets: Union[SecretsCollection, Dict[str, Any]],
filename: str,
) -> None: # pragma: no cover
"""
:param secrets: if this is a SecretsCollection, it will output the baseline in its latest
format. Otherwise, you should pass in a dictionary to this function, to manually
specify the baseline format to save as.
If you're trying to decide the difference, ask yourself whether there are any changes
that does not directly impact the results of the scan.
"""
# TODO: I wonder whether this should add the `detect_secrets.filters.common.is_baseline_file`
# filter, since we know the filename already. However, one could argue that it would cause
# this function to "do more than one thing".
output = secrets
if isinstance(secrets, SecretsCollection):
output = format_for_output(secrets)
with open(filename, 'w') as f:
f.write(json.dumps(output, indent=2) + '\n')
def upgrade(baseline: Dict[str, Any]) -> Dict[str, Any]:
"""
Baselines will eventually require format changes. This function is responsible for upgrading
an older baseline to the latest version.
"""
baseline_version = Version(baseline['version'])
if baseline_version >= Version(VERSION):
return baseline
modules = import_modules_from_package(
upgrades,
filter=lambda x: not _is_relevant_upgrade_module(baseline_version)(x),
)
new_baseline = {**baseline}
for module in modules:
module.upgrade(new_baseline)
new_baseline['version'] = VERSION
return new_baseline
def _is_relevant_upgrade_module(current_version: Version) -> Callable:
def wrapped(module_path: str) -> bool:
# This converts `v1_0` to `1.0`
affected_version_string = module_path.rsplit('.', 1)[-1].lstrip('v').replace('_', '.')
# Patch version doesn't matter, because patches should not require baseline bumps.
affected_version = Version(f'{affected_version_string}.0')
return current_version < affected_version
return wrapped