Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow better debugging through useful error messages from verification steps. #28

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 57 additions & 20 deletions cert_verifier/checks.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
import hashlib
import json
import logging
from datetime import datetime
from threading import Lock

import bitcoin
import hashlib
import pytz
from bitcoin.signmessage import BitcoinMessage, VerifyMessage
from cert_schema import BlockcertValidationError
from cert_schema import normalize_jsonld
from cert_core import BlockcertVersion, Chain
from cert_core import chain_to_bitcoin_network
from cert_core.cert_model.model import SignatureType
from chainpoint.chainpoint import Chainpoint

from cert_schema import BlockcertValidationError
from cert_schema import normalize_jsonld
from cert_verifier import StepStatus
from cert_verifier.errors import InvalidCertificateError
from chainpoint.chainpoint import Chainpoint

lock = Lock()

Expand Down Expand Up @@ -96,7 +95,12 @@ def __init__(self, content_to_verify, transaction_info):
def do_execute(self):
blockchain_hash = self.transaction_info.op_return
local_hash = hashlib.sha256(self.content_to_verify).hexdigest()
return hashes_match(blockchain_hash, local_hash)
match = hashes_match(blockchain_hash, local_hash)
if not match:
logging.error(
f"BinaryFileIntegrityChecker failed - Blockchain hash: '{blockchain_hash}'' | Local hash: '{local_hash}'"
)
return match


class NormalizedJsonLdIntegrityChecker(VerificationCheck):
Expand All @@ -107,10 +111,13 @@ def __init__(self, content_to_verify, expected_hash, detect_unmapped_fields=Fals

def do_execute(self):
try:
normalized = normalize_jsonld(self.content_to_verify,
detect_unmapped_fields=self.detect_unmapped_fields)
local_hash = hash_normalized(normalized)
normalized_f = normalize_jsonld(self.content_to_verify, detect_unmapped_fields=False)
local_hash = hash_normalized(normalized_f)
cert_hashes_match = hashes_match(local_hash, self.expected_hash)
if not cert_hashes_match:
logging.error(
f"NormalizedJsonLdIntegrityChecker failed - Expected hash: '{self.expected_hash}'' | Local hash: '{local_hash}'"
)
return cert_hashes_match
except BlockcertValidationError:
logging.error('Certificate has been modified', exc_info=True)
Expand All @@ -123,8 +130,10 @@ def __init__(self, expected_merkle_root, actual_merkle_root):
self.actual_merkle_root = actual_merkle_root

def do_execute(self):
merkle_root_matches = hashes_match(self.expected_merkle_root,
self.actual_merkle_root)
merkle_root_matches = hashes_match(self.expected_merkle_root, self.actual_merkle_root)
if not merkle_root_matches:
logging.error(f"MerkleRootIntegrityChecker failed - Actual: '{self.actual_merkle_root}' | "
f"Expected: '{self.expected_merkle_root}'")
return merkle_root_matches


Expand All @@ -136,7 +145,10 @@ def do_execute(self):
cp = Chainpoint()
# overwrite with Chainpoint type before passing to validator
self.merkle_proof['type'] = 'ChainpointSHA256v2'
valid_receipt = cp.valid_receipt(json.dumps(self.merkle_proof))
dumped_proof = json.dumps(self.merkle_proof)
valid_receipt = cp.valid_receipt(dumped_proof)
if not valid_receipt:
logging.error(f"ReceiptIntegrityChecker failed - Dumped proof: '{dumped_proof}'")
return valid_receipt


Expand All @@ -156,7 +168,7 @@ def __init__(self, values_to_check, revoked_values):
def do_execute(self):
revoked = any(k in self.revoked_values for k in self.values_to_check)
if revoked:
logging.error('This certificate has been revoked by the issuer')
logging.error('RevocationChecker failed - This certificate has been revoked by the issuer')
return not revoked


Expand All @@ -169,7 +181,10 @@ def do_execute(self):
return True
# compare to current time. If expires_date is timezone naive, we assume UTC
now_tz = pytz.UTC.localize(datetime.utcnow())
return now_tz < self.expires
current = now_tz < self.expires
if not current:
logging.error(f"ExpiredChecker failed - Now: '{now_tz}' | Expires: '{self.expires}'")
return current


class EmbeddedSignatureChecker(VerificationCheck):
Expand All @@ -181,7 +196,14 @@ def __init__(self, signing_key, content_to_verify, signature_value, chain=Chain.

def do_execute(self):

if self.signing_key is None or self.content_to_verify is None or self.signature_value is None:
if self.signing_key is None:
logging.error('EmbeddedSignatureChecker failed - signing key is none')
return False
if self.content_to_verify is None:
logging.error('EmbeddedSignatureChecker failed - content to verify is none')
return False
if self.signature_value is None:
logging.error('EmbeddedSignatureChecker failed - signature value is none')
return False
message = BitcoinMessage(self.content_to_verify)
try:
Expand All @@ -205,17 +227,32 @@ def __init__(self, transaction_signing_key, transaction_signing_date, issuer_key
self.issuer_key_map = issuer_key_map

def do_execute(self):
if self.transaction_signing_key in self.issuer_key_map:
key = self.issuer_key_map[self.transaction_signing_key]
signing_key = self.transaction_signing_key.casefold()
issuer_keys = {k.casefold(): v for k, v in self.issuer_key_map.items()}
if signing_key in issuer_keys:
key = issuer_keys[signing_key]
res = True
if key.created:
res &= self.transaction_signing_date >= key.created
created_ok = self.transaction_signing_date >= key.created
if not created_ok:
logging.error('AuthenticityChecker failed - transaction_signing_date >= key.created.')
res &= created_ok
if key.revoked:
res &= self.transaction_signing_date <= key.revoked
revoked_ok = self.transaction_signing_date <= key.revoked
if not revoked_ok:
logging.error('AuthenticityChecker failed - transaction_signing_date >= key.revoked.')
res &= revoked_ok
if key.expires:
res &= self.transaction_signing_date <= key.expires
expired_ok = self.transaction_signing_date <= key.expires
if not expired_ok:
logging.error('AuthenticityChecker failed - transaction_signing_date <= key.expires')
res &= expired_ok
return res
else:
logging.error(
f'Transaction signing key "{signing_key}" can\'t be found among the keys in the '
f'issuer profile: "{issuer_keys}".'
)
return False


Expand Down