Skip to content

Commit

Permalink
(feat) Added support for Call APDUs
Browse files Browse the repository at this point in the history
When device does call-to-dispatching it sends a request with the Call (GPRS Registration) APDU to the main system.
Added parsing for Request and Call APDU.
  • Loading branch information
Henrik Wahlgren committed Feb 25, 2024
1 parent f1ad6b4 commit 1ceeaef
Show file tree
Hide file tree
Showing 5 changed files with 323 additions and 8 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dist/
downloads/
eggs/
.eggs/
.idea/
lib/
lib64/
parts/
Expand Down
129 changes: 121 additions & 8 deletions elgas/application.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import datetime
from datetime import datetime, timedelta
from enum import IntEnum
from typing import *

Expand Down Expand Up @@ -160,13 +160,12 @@ class WriteTimeResponse:

@classmethod
def from_bytes(cls, in_bytes: bytes):

return cls()


@attr.s(auto_attribs=True)
class ReadDeviceParametersRequest:
""""""
""" """

service: ClassVar[
constants.ServiceNumber
Expand All @@ -187,7 +186,7 @@ def to_bytes(self) -> bytes:

@attr.s(auto_attribs=True)
class ReadDeviceParametersResponse:
""""""
""" """

service: ClassVar[
constants.ServiceNumber
Expand All @@ -214,7 +213,7 @@ def from_bytes(cls, in_bytes: bytes):

@attr.s(auto_attribs=True)
class ReadArchiveByTimeRequest:
""""""
""" """

service: ClassVar[
constants.ServiceNumber
Expand All @@ -237,7 +236,7 @@ def to_bytes(self) -> bytes:

@attr.s(auto_attribs=True)
class ReadArchiveByTimeResponse:
""""""
""" """

service: ClassVar[
constants.ServiceNumber
Expand All @@ -261,7 +260,7 @@ def from_bytes(cls, in_bytes: bytes):

@attr.s(auto_attribs=True)
class ReadArchiveRequest:
""""""
""" """

service: ClassVar[constants.ServiceNumber] = constants.ServiceNumber.READ_ARCHIVES

Expand All @@ -282,7 +281,7 @@ def to_bytes(self) -> bytes:

@attr.s(auto_attribs=True)
class ReadArchiveResponse:
""""""
""" """

service: ClassVar[constants.ServiceNumber] = constants.ServiceNumber.READ_ARCHIVES

Expand All @@ -300,3 +299,117 @@ def from_bytes(cls, in_bytes: bytes):
oldest_record_id=oldest_record_id,
data=data,
)


@attr.s(auto_attribs=True)
class CallRequest:
"""
Used in call to dispatching. Device will send the Call message to tell the server it is awake.
It is also named GprsRegistration.
"""

service: ClassVar[constants.ServiceNumber] = constants.ServiceNumber.CALL

guid: bytes
station_id: str
sim_card_id: str
modem_id: str
address_1: int
address_2: int
signal_strength: int
connections: int
last_connection_time: datetime
connection_errors: int
last_connection_error_time: datetime
resets: int
last_reset_time: datetime
tcp_data: int
all_data: int
serial_number: str
ip_address: str
last_modem_error: int
last_modem_error_time: datetime
modem_battery_capacity: int
modem_battery_voltage: int
firmware_version: str

version: int = attr.ib(default=2)
protocol: constants.Protocol = attr.ib(default=constants.Protocol.ELGAS2)

@classmethod
def from_bytes(cls, in_bytes: bytes):
"""
Datetimes are number of seconds since "2000-01-01T00:00:00"
"""
base_date = datetime.fromisoformat("2000-01-01T00:00:00")
data = bytearray(in_bytes)
length = int.from_bytes(utils.pop_many(data, 2), "little")
version = data.pop(0)
guid = bytes(utils.pop_many(data, 16))
station_id = utils.pretty_text(utils.pop_many(data, 17))
sim_id = str(utils.from_bcd(utils.pop_many(data, 10)))
modem_id = str(utils.from_bcd(utils.pop_many(data, 8)))
protocol = constants.Protocol(data.pop(0))
address_1 = int.from_bytes(utils.pop_many(data, 2), "little")
address_2 = data.pop(0)
signal_strength = data.pop(0)
connections = int.from_bytes(utils.pop_many(data, 4), "little")
last_connection = base_date + timedelta(
seconds=int.from_bytes(utils.pop_many(data, 4), "little")
)
connection_errors = int.from_bytes(utils.pop_many(data, 4), "little")
last_connection_error_time = base_date + timedelta(
seconds=int.from_bytes(utils.pop_many(data, 4), "little")
)
resets = int.from_bytes(utils.pop_many(data, 4), "little")
last_reset_time = base_date + timedelta(
seconds=int.from_bytes(utils.pop_many(data, 4), "little")
)
tcp_data = int.from_bytes(utils.pop_many(data, 4), "little")
all_data = int.from_bytes(utils.pop_many(data, 4), "little")
serial_number = str(int.from_bytes(utils.pop_many(data, 4), "little"))
ip_address = utils.parse_ip_address(utils.pop_many(data, 4))
last_modem_error_time = base_date + timedelta(
seconds=int.from_bytes(utils.pop_many(data, 4), "little")
)
last_modem_error = data.pop(0)
modem_battery_capacity = int.from_bytes(utils.pop_many(data, 2), "little")
modem_battery_voltage = int.from_bytes(utils.pop_many(data, 2), "little")
firmware_version = utils.pretty_text(utils.pop_many(data, 33))

return cls(
station_id=station_id,
sim_card_id=sim_id,
modem_id=modem_id,
address_1=address_1,
address_2=address_2,
signal_strength=signal_strength,
connections=connections,
last_connection_time=last_connection,
connection_errors=connection_errors,
last_connection_error_time=last_connection_error_time,
resets=resets,
last_reset_time=last_reset_time,
tcp_data=tcp_data,
all_data=all_data,
serial_number=serial_number,
ip_address=ip_address,
last_modem_error=last_modem_error,
last_modem_error_time=last_modem_error_time,
modem_battery_capacity=modem_battery_capacity,
modem_battery_voltage=modem_battery_voltage,
firmware_version=firmware_version,
version=version,
guid=guid,
protocol=protocol,
)


@attr.s(auto_attribs=True)
class CallResponse:
"""
It is just an empty response
"""

def to_bytes(self) -> bytes:
return b""
7 changes: 7 additions & 0 deletions elgas/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,10 @@ class Archive(IntEnum):
STATUS = 8
BILLING = 9
GAS_COMPOSITION = 10


class Protocol(IntEnum):
ELGAS2 = 0
QMD = 1
RDS = 2
MODBUS = 3
44 changes: 44 additions & 0 deletions elgas/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,50 @@ def to_bytes(self) -> bytes:

return bytes(out)

@classmethod
def from_bytes(cls, in_bytes: bytes):
if not in_bytes.startswith(b"\x02\xfe" + cls.TYPE.to_bytes(1, "little")):
raise ValueError("Not a Request frame")
if not in_bytes.endswith(b"\x0d"):
raise ValueError("Data does not end with end char")
service = constants.ServiceNumber(int.from_bytes(in_bytes[3:4], "little"))
length = int.from_bytes(in_bytes[4:6], "little")
if length != len(in_bytes) - 1: # STX is not counted
raise ValueError(
f"Length field does not correspond to length of data. Got {len(in_bytes)-1}, should be {length} "
)
destination_address_1 = int.from_bytes(in_bytes[6:8], "little")
destination_address_2 = int.from_bytes(in_bytes[8:9], "little")
source_address_1 = int.from_bytes(in_bytes[9:11], "little")
source_address_2 = int.from_bytes(in_bytes[11:12], "little")
data = in_bytes[12:-4]
redundancy_source = bytearray(in_bytes[1:-4])
lrc = in_bytes[-4]
calculated_lrc = utils.calculate_lrc(redundancy_source)
if lrc != calculated_lrc:
raise ValueError(
f"Incorrect LRC. Got {lrc!r}, should be {calculated_lrc!r}"
)
checksum = in_bytes[-3]
calculated_checksum = utils.calculate_checksum(redundancy_source)
if checksum != calculated_checksum:
raise ValueError(
f"Incorrect CHECKSUM. Got {checksum!r}, should be {calculated_checksum!r}"
)
drc = in_bytes[-2]
calculated_drc = utils.calculate_drc(redundancy_source)
if drc != calculated_drc:
raise ValueError(f"Incorrect DRC. Got {drc!r}, should be {drc!r}")

return cls(
service=service,
destination_address_1=destination_address_1,
destination_address_2=destination_address_2,
source_address_1=source_address_1,
source_address_2=source_address_2,
data=data,
)


@attr.s(auto_attribs=True)
class Response:
Expand Down
Loading

0 comments on commit 1ceeaef

Please sign in to comment.