diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml
index 892f484..5ef836f 100644
--- a/.github/workflows/python-package.yml
+++ b/.github/workflows/python-package.yml
@@ -26,7 +26,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
- python -m pip install flake8 nose coverage
+ python -m pip install flake8 nose coverage pycryptodome
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Lint with flake8
run: |
diff --git a/enocean/protocol/EEP.xml b/enocean/protocol/EEP.xml
index 5b7916a..46a0428 100644
--- a/enocean/protocol/EEP.xml
+++ b/enocean/protocol/EEP.xml
@@ -1381,6 +1381,30 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -1473,4 +1497,78 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 0
+ 65535
+
+
+ 0
+ 65535
+
+
+
+
+
+
+
+
+
+
+
diff --git a/enocean/protocol/constants.py b/enocean/protocol/constants.py
index 8dd3e74..5dae19c 100644
--- a/enocean/protocol/constants.py
+++ b/enocean/protocol/constants.py
@@ -59,6 +59,11 @@ class RORG(IntEnum):
SYS_EX = 0xC5
SEC = 0x30
SEC_ENCAPS = 0x31
+ SECD = 0x32
+ SEC_CDM = 0x33
+ SEC_MAN = 0x34
+ SEC_TI = 0x35
+ SIGNAL = 0xD0
UTE = 0xD4
diff --git a/enocean/protocol/secure.py b/enocean/protocol/secure.py
new file mode 100644
index 0000000..089170d
--- /dev/null
+++ b/enocean/protocol/secure.py
@@ -0,0 +1,310 @@
+# -*- encoding: utf-8 -*-
+from __future__ import print_function, unicode_literals, division, absolute_import
+import logging
+from enum import IntEnum
+import struct
+import time
+import pickle
+import os.path
+
+from enocean.protocol.packet import Packet
+from enocean.protocol.constants import PACKET, RORG
+from Crypto.Cipher import AES
+from Crypto.Hash import CMAC
+
+
+def split_bytes_be(num, byte_count):
+ '''Split number to byte_count big-endian bytes.'''
+ return struct.pack('>I', num)[-byte_count:]
+
+
+class MAC_ALGO(IntEnum):
+ NO_MAC = 0
+ MAC_24_AES128 = 1
+ MAC_32_AES128 = 2
+
+
+class SecureDevice(object):
+ '''
+ Secure device parameters.
+ '''
+ logger = logging.getLogger('enocean.protocol.secure')
+ sender = []
+ sender_int = 0
+ psk = 0
+ ptm = 0
+ rlc_algo = 0
+ rlc_tx = 0
+ mac_algo = 0
+ data_enc = 0
+ rlc = 0
+ key = ''
+
+ def __init__(self, packets=None):
+ if packets:
+ self.parse(packets)
+
+ def parse(self, packets):
+ '''
+ Parse Secure Teach-In telegrams to enroll the communication partner.
+ Requires two RORG.SEC_TI packets, in order.
+ '''
+ if len(packets) != 2:
+ raise Exception('Excatly two packets required to teach-in secure device')
+ if packets[0].rorg != RORG.SEC_TI or packets[1].rorg != RORG.SEC_TI:
+ raise Exception('Packets of RORG.SEC_TI required to teach-in secure device')
+ if packets[0].packet_type != packets[1].packet_type:
+ raise Exception('Packets of need to be the same type to teach-in secure device')
+ if packets[0].sender != packets[1].sender:
+ raise Exception('Packets of RORG.SEC_TI required to teach-in secure device')
+
+ packets[0].parse_eep(0x00, 0x00)
+ if packets[0].parsed['CNT']['raw_value'] != 2 or packets[0].parsed['IDX']['raw_value'] != 0:
+ raise Exception('Packets of need to be in-order to teach-in secure device')
+
+ packets[1].parse_eep(0x00, 0x00)
+ if packets[1].parsed['IDX']['raw_value'] != 1:
+ raise Exception('Packets of need to be in-order to teach-in secure device')
+
+ self.sender = packets[0].sender
+ self.sender_int = packets[0].sender_int
+ self.psk = packets[0].parsed['PSK']['raw_value']
+ self.ptm = packets[0].parsed['TYPE']['raw_value']
+ self.rlc_algo = packets[0].parsed['RLC_ALGO']['raw_value']
+ self.rlc_tx = packets[0].parsed['RLC_TX']['raw_value']
+ self.mac_algo = packets[0].parsed['MAC_ALGO']['raw_value']
+ self.data_enc = packets[0].parsed['DATA_ENC']['raw_value']
+ self.rlc = packets[0].parsed['RLC']['raw_value']
+
+ var_len0 = len(packets[0].data) - 5 # 5 bytes sender+status
+ key0 = packets[0].data[5:var_len0] # idx 0: key[0:5]
+
+ var_len1 = len(packets[1].data) - 5 # 5 bytes sender+status
+ key1 = packets[1].data[2:var_len1] # idx 1: key[6:16]
+
+ self.key = bytearray(key0 + key1)
+
+ @property
+ def rlc_len(self):
+ return [0, 0, 2, 2, 3, 3, 4, 4][self.rlc_algo]
+
+ @property
+ def explicit_rlc_len(self):
+ return [0, 0, 0, 0, 0, 3, 3, 4][self.rlc_algo]
+
+ @property
+ def mac_len(self):
+ return [0, 3, 4, 0][self.mac_algo]
+
+ def vaes_decrypt(self, data_enc, rlc):
+ VAES_INIT = bytearray([0x34, 0x10, 0xde, 0x8f, 0x1a, 0xba, 0x3e, 0xff,
+ 0x9f, 0x5a, 0x11, 0x71, 0x72, 0xea, 0xca, 0xbd])
+
+ rlc = split_bytes_be(rlc, self.rlc_len)
+ rlc += b'\x00' * (16 - len(rlc)) # pad to 16 bytes
+ vaes_init = bytes(a ^ b for (a, b) in zip(VAES_INIT, rlc))
+
+ cipher = AES.new(self.key, AES.MODE_ECB)
+ enc = cipher.encrypt(vaes_init)
+
+ data_dec = [a ^ b for (a, b) in zip(data_enc, enc)]
+ return data_dec
+
+ def verify_cmac(self, rorg, data, rlc, mac_tag=None):
+ return bytearray(mac_tag) == bytearray(self.gen_cmac(rorg, data, rlc))
+
+ def gen_cmac(self, rorg, data, rlc):
+ rlc = split_bytes_be(rlc, self.rlc_len)
+
+ msg = [rorg]
+ msg.extend(data)
+ msg.extend(rlc)
+
+ mac = CMAC.new(self.key, ciphermod=AES, mac_len=4)
+ mac.update(bytearray(msg))
+
+ return mac.digest()[:self.mac_len]
+
+ def decrypt_SEC(self, packet, rlc_window=100):
+ '''
+ Verify CMAC and decrypt RORG.SEC to RORG.SECD, packet unchanged otherwise.
+ Raises if packet is not RORG.SEC.
+ '''
+ if packet.rorg != RORG.SEC:
+ raise Exception("Packet is not RORG.SEC")
+
+ rlc = self.rlc
+
+ rorg = packet.rorg
+ data = packet.data[1:len(packet.data) - 5] # 1 byte rorg + 5 bytes sender+status
+
+ mac_len = self.mac_len
+ cmac = data[-mac_len:]
+ data = data[:-mac_len]
+
+ rlc_len = self.explicit_rlc_len
+ if rlc_len:
+ explicit_rlc = data[-rlc_len:]
+ data = data[:-rlc_len]
+
+ data_enc = data
+
+ while rlc_window >= 0:
+ # self.gen_cmac(rorg, data_enc, rlc)
+ if self.verify_cmac(rorg, data_enc, rlc, cmac):
+ self.rlc = rlc + 1
+ data_dec = self.vaes_decrypt(data_enc, rlc)
+ # transform RORG.SEC to RORG.SECD
+ packet.rorg = RORG.SECD
+ packet.data[0] = packet.rorg # 0x30 -> 0x32
+ packet.data[1:1 + len(data_enc)] = data_dec
+ # splice the explicit rlc and mac out
+ packet.data[1 + len(data_enc):1 + len(data_enc) + rlc_len + mac_len] = []
+ packet.status = 0
+ return True
+ rlc += 1
+ rlc_window -= 1
+
+ return False
+
+ def translate_application(self, packet):
+ '''
+ Transform RORG.SECD to matching RORG.VLD (D2-03-00) if application type is PTM.
+ Raises if packet is not RORG.SECD.
+ '''
+ if packet.rorg != RORG.SECD:
+ raise Exception("Packet is not RORG.SECD")
+ if self.ptm:
+ packet.rorg = RORG.VLD
+ packet.data[0] = packet.rorg
+ packet.data[1] &= 0xf
+ packet.parse_eep(0x03, 0x00)
+
+ def translate_profile(self, packet):
+ '''
+ Convert profile D2-03-00 (payload 4 bits) to RPS (F6-02-01).
+ Raises if packet is not RORG.VLD or not profile D2-03-00.
+ '''
+ if packet.rorg != RORG.VLD:
+ raise Exception("Packet is not RORG.VLD")
+ if packet.rorg_func != 0x03 or packet.rorg_type != 0x00:
+ raise Exception("Packet is not profile D2-03-00")
+
+ # conversion between the profiles D2-03-00 (payload 4 bits) and F6-02-01:
+ # D2-03-00 DATA to F6-02-01 DATA and F6-02-01 STATUS
+ map_D2_03_00_to_F6_02_01 = {
+ 0: (0, 0), # Reserved
+ 1: (0, 0), # Reserved
+ 2: (0, 0), # Reserved
+ 3: (0, 0), # Reserved
+ 4: (0, 0), # Reserved
+ 5: (0x17, 0x30), # Button A1 + B0 pressed,energy bow pressed
+ 6: (0x70, 0x20), # 3 or 4 buttons pressed,energy bow pressed
+ 7: (0x37, 0x30), # Button A0 + B0 pressed,energy bow pressed
+ 8: (0x10, 0x20), # No buttons pressed, energybow pressed
+ 9: (0x15, 0x30), # Button A1 + B1 pressed,energy bow pressed
+ 10: (0x35, 0x30), # Button A0 + B1 pressed,energy bow pressed
+ 11: (0x50, 0x30), # Button B1 pressed, energybow pressed
+ 12: (0x70, 0x30), # Button B0 pressed, energybow pressed
+ 13: (0x10, 0x30), # Button A1 pressed, energybow pressed
+ 14: (0x30, 0x30), # Button A0 pressed, energybow pressed
+ 15: (0, 0x20), # Energy bow released
+ }
+ (data, status) = map_D2_03_00_to_F6_02_01[packet.data[1]]
+ packet.rorg = RORG.RPS
+ packet.data[0] = packet.rorg
+ packet.data[1] = data
+ packet.status = status
+ packet.parse_eep(0x02, 0x01)
+
+
+class SecureStore(object):
+ '''
+ Secure paired devices store.
+ '''
+ logger = logging.getLogger('enocean.protocol.secure')
+ file = None
+ paired = {}
+ teachin_max_devices = 0
+ teachin_max_timeout = 0
+ teachin_min_dBm = 0
+ teachin_packets = []
+ teachin_packet_time = 0
+
+ def __init__(self, file=None):
+ self.file = file
+ self.load()
+
+ def load(self):
+ '''This should be overridden with a better suited implementation.'''
+ if self.file and os.path.exists(self.file):
+ self.logger.info('Reading secure store from file')
+ self.paired = pickle.load(open(self.file, 'rb'))
+
+ def save(self, changed_device=None):
+ '''This should be overridden with a better suited implementation.'''
+ if self.file:
+ self.logger.debug('Writing secure store to file')
+ pickle.dump(self.paired, open(self.file, 'wb'))
+
+ def add_device(self, secure_device):
+ self.paired[secure_device.sender_int] = secure_device
+ self.save(secure_device)
+
+ def remove_device(self, secure_device):
+ '''
+ Raises KeyError if not found.
+ '''
+ del self.paired[secure_device.sender_int]
+ self.save(secure_device)
+
+ def get_device(self, packet):
+ '''
+ Raises KeyError if not found.
+ '''
+ return self.paired[packet.sender_int]
+
+ def allow_teach_in(self, max_devices=1, max_time=300, min_dBm=-56):
+ '''
+ Allow secure teach-in.
+ Defaults are to allow 1 device within 300 seconds (5 minutes)
+ at least -56 dBm (about a meter distance).
+ '''
+ self.teachin_max_devices = max_devices
+ self.teachin_max_timeout = time.time() + max_time
+ self.teachin_min_dBm = min_dBm
+
+ def teach_in(self, packet):
+ '''
+ Process secure teach-in packets.
+ Call for each packet.
+ '''
+ if self.teachin_max_devices < 1:
+ self.logger.info('Max teach-in devices reached, ignoring secure teach-in')
+ elif self.teachin_max_timeout < time.time():
+ self.logger.info('Max timeout reached, ignoring secure teach-in')
+ elif self.teachin_min_dBm > packet.dBm:
+ self.logger.info('Min dBm not reached, ignoring secure teach-in')
+ elif len(self.teachin_packets) == 0 or self.teachin_packet_time + 2 < time.time():
+ self.logger.debug('First secure teach-in packet received')
+ self.teachin_packets = [packet]
+ self.teachin_packet_time = time.time()
+ elif len(self.teachin_packets) == 1:
+ self.logger.debug('Second secure teach-in packet received')
+ self.teachin_packets.append(packet)
+ dev = SecureDevice(self.teachin_packets)
+ self.teachin_packets = []
+ self.add_device(dev)
+ return dev
+
+ def decrypt(self, packet):
+ dev = self.get_device(packet)
+ if not dev:
+ raise Exception("Device not known")
+
+ if not dev.decrypt_SEC(packet):
+ raise Exception("Can't decrypt secure packet")
+
+ self.save(dev) # update RLC
+
+ return dev
diff --git a/enocean/protocol/tests/test_secure.py b/enocean/protocol/tests/test_secure.py
new file mode 100644
index 0000000..6abc387
--- /dev/null
+++ b/enocean/protocol/tests/test_secure.py
@@ -0,0 +1,105 @@
+# -*- encoding: utf-8 -*-
+from __future__ import print_function, unicode_literals, division, absolute_import
+
+# from enocean.consolelogger import init_logging
+from enocean.protocol.secure import SecureStore
+from enocean.protocol.packet import Packet
+from enocean.protocol.constants import PARSE_RESULT, PACKET, RORG
+from enocean.decorators import timing
+
+
+@timing(1000)
+def test_secure_teach_in():
+ ''' Tests Secure Teach-In, verify and decrypt '''
+ # Secure Teach-In telegrams (part 1)
+ msg1 = bytearray([
+ 0x55,
+ 0x00, 0x0F, 0x07, 0x01,
+ 0x2B,
+ 0x35, 0x24, 0x4b, 0xC0, 0xFF, 0x45, 0x6E, 0x4F, 0x63, 0x65, 0x01, 0x9E, 0xB6, 0x3B, 0x00,
+ 0x00, 0xff, 0xff, 0xff, 0xff, 0x37, 0x00,
+ 0x71
+ ])
+
+ # Secure Teach-In telegrams (part 2)
+ msg2 = bytearray([
+ 0x55,
+ 0x00, 0x12, 0x07, 0x01,
+ 0x18,
+ 0x35, 0x40, 0x61, 0x6E, 0x20, 0x47, 0x6D, 0x62, 0x48, 0x2E, 0x31, 0x33, 0x00, 0x01, 0x9E, 0xB6, 0x3B, 0x00,
+ 0x00, 0xff, 0xff, 0xff, 0xff, 0x37, 0x00,
+ 0xF0
+ ])
+
+ # Secure telegram (PTM215 A0 pressed)
+ msg3 = bytearray([
+ 0x55,
+ 0x00, 0x0A, 0x07, 0x01,
+ 0xEB,
+ 0x30, 0xD0, 0xB5, 0x18, 0xFB, 0x01, 0x9E, 0xB6, 0x3B, 0x00,
+ 0x00, 0xff, 0xff, 0xff, 0xff, 0x37, 0x00,
+ 0x4D
+ ])
+
+ # Secure telegram (PTM215 A0 released)
+ msg4 = bytearray([
+ 0x55,
+ 0x00, 0x0A, 0x07, 0x01,
+ 0xEB,
+ 0x30, 0x7B, 0xAE, 0xC8, 0x28, 0x01, 0x9E, 0xB6, 0x3B, 0x00,
+ 0x00, 0xff, 0xff, 0xff, 0xff, 0x37, 0x00,
+ 0x94
+ ])
+
+ secure_devices = SecureStore()
+ secure_devices.allow_teach_in(min_dBm=-80)
+
+ status, buf, packet = Packet.parse_msg(msg1)
+ assert status == PARSE_RESULT.OK
+ assert buf == []
+ assert packet.rorg == RORG.SEC_TI
+ assert secure_devices.teach_in(packet) == None
+
+ status, buf, packet = Packet.parse_msg(msg2)
+ assert status == PARSE_RESULT.OK
+ assert buf == []
+ assert packet.rorg == RORG.SEC_TI
+ dev = secure_devices.teach_in(packet)
+ assert dev.sender == [0x01, 0x9E, 0xB6, 0x3B]
+ assert dev.psk == 0
+ assert dev.ptm == 1
+ assert dev.rlc_algo == 2
+ assert dev.rlc_tx == 0
+ assert dev.mac_algo == 1
+ assert dev.data_enc == 3
+ assert dev.rlc == 0xC0FF
+ assert dev.key == bytearray([0x45, 0x6E, 0x4F, 0x63, 0x65, 0x61, 0x6E, 0x20,
+ 0x47, 0x6D, 0x62, 0x48, 0x2E, 0x31, 0x33, 0x00])
+
+ status, buf, packet = Packet.parse_msg(msg3)
+ assert status == PARSE_RESULT.OK
+ assert buf == []
+ assert packet.rorg == RORG.SEC
+ dev = secure_devices.decrypt(packet)
+ assert packet.rorg == RORG.SECD
+ # translate RORG.SECD of PTM215 to VLD (D2-03-00)
+ dev.translate_application(packet)
+ assert packet.rorg == RORG.VLD
+ assert packet.data[1] == 0x0E # AO pressed
+ # translate profile D2-03-00 (payload 4 bits) to RPS (F6-02-01)
+ dev.translate_profile(packet)
+ assert packet.rorg == RORG.RPS
+
+ status, buf, packet = Packet.parse_msg(msg4)
+ assert status == PARSE_RESULT.OK
+ assert buf == []
+ assert packet.rorg == RORG.SEC
+ dev = secure_devices.decrypt(packet)
+ assert packet.rorg == RORG.SECD
+ # translate RORG.SECD of PTM215 to VLD (D2-03-00)
+ dev.translate_application(packet)
+ assert packet.rorg == RORG.VLD
+ assert packet.data[1] == 0x0F # released
+ # translate profile D2-03-00 (payload 4 bits) to RPS (F6-02-01)
+ dev.translate_profile(packet)
+ assert packet.rorg == RORG.RPS
diff --git a/examples/secure.py b/examples/secure.py
new file mode 100755
index 0000000..20516b5
--- /dev/null
+++ b/examples/secure.py
@@ -0,0 +1,115 @@
+#!/usr/bin/env python
+# -*- encoding: utf-8 -*-
+
+from enocean.consolelogger import init_logging
+from enocean.protocol.secure import SecureStore
+from enocean.protocol.packet import Packet
+from enocean.protocol.constants import PARSE_RESULT, PACKET, RORG
+from enocean.protocol import crc8
+
+
+# Secure Teach-In telegrams (PTM215 part 1)
+msg1 = bytearray([
+ 0x55,
+ 0x00, 0x0F, 0x07, 0x01,
+ 0x2B,
+ 0x35, 0x24, 0x4b, 0xC0, 0xFF, 0x45, 0x6E, 0x4F, 0x63, 0x65, 0x01, 0x9E, 0xB6, 0x3B, 0x00,
+ 0x00, 0xff, 0xff, 0xff, 0xff, 0x37, 0x00,
+ 0x71
+])
+
+# Secure Teach-In telegrams (PTM215 part 2)
+msg2 = bytearray([
+ 0x55,
+ 0x00, 0x12, 0x07, 0x01,
+ 0x18,
+ 0x35, 0x40, 0x61, 0x6E, 0x20, 0x47, 0x6D, 0x62, 0x48, 0x2E, 0x31, 0x33, 0x00, 0x01, 0x9E, 0xB6, 0x3B, 0x00,
+ 0x00, 0xff, 0xff, 0xff, 0xff, 0x37, 0x00,
+ 0xF0
+])
+
+# Secure telegram (PTM215 A0 pressed)
+msg3 = bytearray([
+ 0x55,
+ 0x00, 0x0A, 0x07, 0x01,
+ 0xEB,
+ 0x30, 0xD0, 0xB5, 0x18, 0xFB, 0x01, 0x9E, 0xB6, 0x3B, 0x00,
+ 0x00, 0xff, 0xff, 0xff, 0xff, 0x37, 0x00,
+ 0x4D
+])
+
+# Secure telegram (PTM215 A0 released)
+msg4 = bytearray([
+ 0x55,
+ 0x00, 0x0A, 0x07, 0x01,
+ 0xEB,
+ 0x30, 0x7B, 0xAE, 0xC8, 0x28, 0x01, 0x9E, 0xB6, 0x3B, 0x00,
+ 0x00, 0xff, 0xff, 0xff, 0xff, 0x37, 0x00,
+ 0x94
+])
+
+init_logging()
+
+# setup the secure store for paired devices
+secure_devices = SecureStore()
+# add a file name for simple pickle store
+# or sub class and override load() and store() for a custom implementation
+# secure_devices = SecureStore('foo.pickle')
+
+# allow secure teach in for a maximum number of devices (max_devices=1),
+# a limited time (max_time=300 i.e. 5 minutes),
+# and a required minimum dBm (min_dBm=-56 i.e. about a meter distance).
+secure_devices.allow_teach_in()
+
+for msg in [msg1, msg2, msg3, msg4]:
+ status, buf, packet = Packet.parse_msg(msg)
+ assert status == PARSE_RESULT.OK
+ assert buf == []
+ print('INPUT', packet)
+
+ if packet.packet_type == PACKET.RADIO_ERP1:
+ if packet.rorg == RORG.SEC:
+ print('Secure telegram')
+ # lookup the paired device, verify and decrypt
+ dev = secure_devices.decrypt(packet)
+ # the packet is now decrypted to SECD
+ assert packet.rorg == RORG.SECD
+ print('Decrypted telegram:', packet)
+
+ # translate RORG.SECD of PTM215 to VLD (D2-03-00)
+ dev.translate_application(packet)
+ # the packet is now VLD (D2-03-00)
+ assert packet.rorg == RORG.VLD
+ print('Translated telegram:', packet)
+
+ # translate profile D2-03-00 (payload 4 bits) to RPS (F6-02-01)
+ dev.translate_profile(packet)
+ # the packet is now RPS (F6-02-01)
+ assert packet.rorg == RORG.RPS
+ print('Converted telegram:', packet)
+
+ elif packet.rorg == RORG.SEC_ENCAPS:
+ print('Secure telegram with RORG encapsulation')
+ print('RORG not handeld.')
+
+ elif packet.rorg == RORG.SECD:
+ print('Non-secure message type that results from the decryption of a secure message with R-ORG 0x30.')
+ print('RORG not handeld.')
+
+ elif packet.rorg == RORG.SEC_CDM:
+ print('Secure chained Messages')
+ print('RORG not handeld.')
+
+ elif packet.rorg == RORG.SEC_MAN:
+ print('Maintenance Security message')
+ print('RORG not handeld.')
+
+ elif packet.rorg == RORG.SEC_TI:
+ print('Secure Teach-In telegrams transmit private key and rolling to the communication partner')
+ dev = secure_devices.teach_in(packet)
+ # dev is None for the first teach-in packet
+ # dev is the paired SecureDevice for the second teach-in packet
+ print('Teach-in result:', dev)
+
+ else:
+ print('Packet type not handeld.')