Skip to content

Commit

Permalink
Fixing type hints and linting
Browse files Browse the repository at this point in the history
  • Loading branch information
ktbyers committed Oct 7, 2024
1 parent 2f0642e commit fd7a636
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 24 deletions.
2 changes: 2 additions & 0 deletions netmiko/cli_tools/fix.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
management api http-commands
no protocol https ssl profile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def main():
parser = argparse.ArgumentParser(
description="Encrypt data using Netmiko's encryption."
)
parser.add_argument("data", help="The data to encrypt", nargs='?')
parser.add_argument("data", help="The data to encrypt", nargs="?")
parser.add_argument(
"--key",
help="The encryption key (if not provided, will use NETMIKO_TOOLS_KEY env variable)",
Expand Down
1 change: 1 addition & 0 deletions netmiko/cli_tools/netmiko_show.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

# FIX: --list-devices currently fails due to missing 'device/group'


def main_ep():
sys.exit(main(sys.argv[1:]))

Expand Down
54 changes: 31 additions & 23 deletions netmiko/encryption_handling.py
Original file line number Diff line number Diff line change
@@ -1,87 +1,95 @@
import os
import base64
from typing import Dict, Any, Union

from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes


ENCRYPTION_PREFIX = "__encrypt__"
ENCRYPTION_PREFIX: str = "__encrypt__"


def get_encryption_key():
key = os.environ.get("NETMIKO_TOOLS_KEY")
def get_encryption_key() -> bytes:
key: Union[str, None] = os.environ.get("NETMIKO_TOOLS_KEY")
if not key:
raise ValueError(
"Encryption key not found. Set the 'NETMIKO_TOOLS_KEY' environment variable."
)
return key.encode()


def decrypt_value(encrypted_value, key, encryption_type):
def decrypt_value(encrypted_value: str, key: bytes, encryption_type: str) -> str:
# Remove the encryption prefix
encrypted_value = encrypted_value.replace(ENCRYPTION_PREFIX, "", 1)

# Extract salt and ciphertext
salt, ciphertext = encrypted_value.split(":", 1)
salt = base64.b64decode(salt)
salt_str, ciphertext_str = encrypted_value.split(":", 1)
salt = base64.b64decode(salt_str)
ciphertext = base64.b64decode(ciphertext_str)

kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
derived_key = kdf.derive(key)
derived_key: bytes = kdf.derive(key)

if encryption_type == "fernet":
f = Fernet(base64.urlsafe_b64encode(derived_key))
return f.decrypt(base64.b64decode(ciphertext)).decode()
return f.decrypt(ciphertext).decode()
elif encryption_type == "aes128":
iv = base64.b64decode(ciphertext[:24])
ciphertext = base64.b64decode(ciphertext[24:])
iv = ciphertext[:16]
ciphertext = ciphertext[16:]
cipher = Cipher(algorithms.AES(derived_key[:16]), modes.CBC(iv))
decryptor = cipher.decryptor()
padded = decryptor.update(ciphertext) + decryptor.finalize()
unpadded = padded[: -padded[-1]]
padded: bytes = decryptor.update(ciphertext) + decryptor.finalize()
unpadded: bytes = padded[: -padded[-1]]
return unpadded.decode()
else:
raise ValueError(f"Unsupported encryption type: {encryption_type}")

def decrypt_config(config, key, encryption_type):

def decrypt_config(
config: Dict[str, Any], key: bytes, encryption_type: str
) -> Dict[str, Any]:
for device, params in config.items():
if isinstance(params, dict):
for field, value in params.items():
if isinstance(value, str) and value.startswith(ENCRYPTION_PREFIX):
len_prefix = len(ENCRYPTION_PREFIX)
data = value[len_prefix:]
data: str = value[len_prefix:]
params[field] = decrypt_value(data, key, encryption_type)
return config


def encrypt_value(value, key, encryption_type):
salt = os.urandom(16)
def encrypt_value(value: str, key: bytes, encryption_type: str) -> str:
salt: bytes = os.urandom(16)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
derived_key = kdf.derive(key)
derived_key: bytes = kdf.derive(key)

if encryption_type == "fernet":
f = Fernet(base64.urlsafe_b64encode(derived_key))
encrypted = f.encrypt(value.encode())
fernet_encrypted: bytes = f.encrypt(value.encode())
encrypted_data = fernet_encrypted
elif encryption_type == "aes128":
iv = os.urandom(16)
iv: bytes = os.urandom(16)
cipher = Cipher(algorithms.AES(derived_key[:16]), modes.CBC(iv))
encryptor = cipher.encryptor()
padded = value.encode() + b"\0" * (16 - len(value) % 16)
encrypted = iv + encryptor.update(padded) + encryptor.finalize()
padded: bytes = value.encode() + b"\0" * (16 - len(value) % 16)
aes_encrypted: bytes = iv + encryptor.update(padded) + encryptor.finalize()
encrypted_data = aes_encrypted
else:
raise ValueError(f"Unsupported encryption type: {encryption_type}")

# Combine salt and encrypted data, and add prefix
b64_salt = base64.b64encode(salt).decode()
return f"{ENCRYPTION_PREFIX}{b64_salt}:{base64.b64encode(encrypted).decode()}"
b64_salt: str = base64.b64encode(salt).decode()
b64_encrypted: str = base64.b64encode(encrypted_data).decode()
return f"{ENCRYPTION_PREFIX}{b64_salt}:{b64_encrypted}"
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ ignore_errors = True

[mypy-netmiko.cli_tools.argument_handling]
ignore_errors = True

[mypy-netmiko.cli_tools.netmiko_encrypt]
ignore_errors = True

0 comments on commit fd7a636

Please sign in to comment.