Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(quality): add quality tools #316

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ cscope.out
.tox
.coverage
*.egg-info
.venv
.mypy_cache/
31 changes: 31 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.3.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/asottile/pyupgrade
rev: v3.3.1
hooks:
- id: pyupgrade
- repo: https://github.com/psf/black
rev: 22.12.0
hooks:
- id: black
# It is recommended to specify the latest version of Python
# supported by your project here, or alternatively use
# pre-commit's default_language_version, see
# https://pre-commit.com/#top_level-default_language_version
language_version: python3.10
- repo: https://github.com/pycqa/flake8
rev: '6.0.0'
hooks:
- id: flake8
- repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v0.991'
hooks:
- id: mypy
81 changes: 42 additions & 39 deletions jwcrypto/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,34 @@

def base64url_encode(payload):
if not isinstance(payload, bytes):
payload = payload.encode('utf-8')
payload = payload.encode("utf-8")
encode = urlsafe_b64encode(payload)
return encode.decode('utf-8').rstrip('=')
return encode.decode("utf-8").rstrip("=")


def base64url_decode(payload):
size = len(payload) % 4
if size == 2:
payload += '=='
payload += "=="
elif size == 3:
payload += '='
payload += "="
elif size != 0:
raise ValueError('Invalid base64 string')
return urlsafe_b64decode(payload.encode('utf-8'))
raise ValueError("Invalid base64 string")
return urlsafe_b64decode(payload.encode("utf-8"))


# JSON encoding/decoding helpers with good defaults


def json_encode(string):
if isinstance(string, bytes):
string = string.decode('utf-8')
return json.dumps(string, separators=(',', ':'), sort_keys=True)
string = string.decode("utf-8")
return json.dumps(string, separators=(",", ":"), sort_keys=True)


def json_decode(string):
if isinstance(string, bytes):
string = string.decode('utf-8')
string = string.decode("utf-8")
return json.loads(string)


Expand All @@ -48,10 +49,10 @@ class JWException(Exception):

class InvalidJWAAlgorithm(JWException):
def __init__(self, message=None):
msg = 'Invalid JWA Algorithm name'
msg = "Invalid JWA Algorithm name"
if message:
msg += ' (%s)' % message
super(InvalidJWAAlgorithm, self).__init__(msg)
msg += " (%s)" % message
super().__init__(msg)


class InvalidCEKeyLength(JWException):
Expand All @@ -62,8 +63,8 @@ class InvalidCEKeyLength(JWException):
"""

def __init__(self, expected, obtained):
msg = 'Expected key of length %d bits, got %d' % (expected, obtained)
super(InvalidCEKeyLength, self).__init__(msg)
msg = "Expected key of length %d bits, got %d" % (expected, obtained)
super().__init__(msg)


class InvalidJWEOperation(JWException):
Expand All @@ -78,10 +79,10 @@ def __init__(self, message=None, exception=None):
if message:
msg = message
else:
msg = 'Unknown Operation Failure'
msg = "Unknown Operation Failure"
if exception:
msg += ' {%s}' % repr(exception)
super(InvalidJWEOperation, self).__init__(msg)
msg += " {%s}" % repr(exception)
super().__init__(msg)


class InvalidJWEKeyType(JWException):
Expand All @@ -92,8 +93,8 @@ class InvalidJWEKeyType(JWException):
"""

def __init__(self, expected, obtained):
msg = 'Expected key type %s, got %s' % (expected, obtained)
super(InvalidJWEKeyType, self).__init__(msg)
msg = "Expected key type {}, got {}".format(expected, obtained)
super().__init__(msg)


class InvalidJWEKeyLength(JWException):
Expand All @@ -104,8 +105,8 @@ class InvalidJWEKeyLength(JWException):
"""

def __init__(self, expected, obtained):
msg = 'Expected key of length %d, got %d' % (expected, obtained)
super(InvalidJWEKeyLength, self).__init__(msg)
msg = "Expected key of length %d, got %d" % (expected, obtained)
super().__init__(msg)


class InvalidJWSERegOperation(JWException):
Expand All @@ -120,10 +121,10 @@ def __init__(self, message=None, exception=None):
if message:
msg = message
else:
msg = 'Unknown Operation Failure'
msg = "Unknown Operation Failure"
if exception:
msg += ' {%s}' % repr(exception)
super(InvalidJWSERegOperation, self).__init__(msg)
msg += " {%s}" % repr(exception)
super().__init__(msg)


class JWKeyNotFound(JWException):
Expand All @@ -138,16 +139,17 @@ def __init__(self, message=None):
if message:
msg = message
else:
msg = 'Key Not Found'
super(JWKeyNotFound, self).__init__(msg)
msg = "Key Not Found"
super().__init__(msg)


# JWSE Header Registry definitions

# RFC 7515 - 9.1: JSON Web Signature and Encryption Header Parameters Registry
# HeaderParameters are for both JWS and JWE
JWSEHeaderParameter = namedtuple('Parameter',
'description mustprotect supported check_fn')
JWSEHeaderParameter = namedtuple(
"JWSEHeaderParameter", "description mustprotect supported check_fn"
)


class JWSEHeaderRegistry(MutableMapping):
Expand All @@ -156,16 +158,15 @@ def __init__(self, init_registry=None):
if isinstance(init_registry, dict):
self._registry = copy.deepcopy(init_registry)
else:
raise InvalidJWSERegOperation('Unknown input type')
raise InvalidJWSERegOperation("Unknown input type")
else:
self._registry = {}

MutableMapping.__init__(self)

def check_header(self, h, value):
if h not in self._registry:
raise InvalidJWSERegOperation('No header "%s" found in registry'
% h)
raise InvalidJWSERegOperation('No header "%s" found in registry' % h)

param = self._registry[h]
if param.check_fn is None:
Expand All @@ -180,10 +181,10 @@ def __iter__(self):
return self._registry.__iter__()

def __delitem__(self, key):
if self._registry[key].mustprotect or \
self._registry[key].supported:
raise InvalidJWSERegOperation('Unable to delete protected or '
'supported field')
if self._registry[key].mustprotect or self._registry[key].supported:
raise InvalidJWSERegOperation(
"Unable to delete protected or " "supported field"
)
else:
self._registry.__delitem__(key)

Expand All @@ -192,11 +193,13 @@ def __setitem__(self, h, jwse_header_param):
if h in self._registry:
p = self._registry[h]
if p.supported:
raise InvalidJWSERegOperation('Supported header already exists'
' in registry')
raise InvalidJWSERegOperation(
"Supported header already exists" " in registry"
)
elif p.mustprotect and not jwse_header_param.mustprotect:
raise InvalidJWSERegOperation('Header specified should be'
'a protected header')
raise InvalidJWSERegOperation(
"Header specified should be" "a protected header"
)
else:
del self._registry[h]

Expand Down
Loading