Skip to content

Commit

Permalink
feat(call-to-dispatching): Implementing support for APDU for device c…
Browse files Browse the repository at this point in the history
…alling server. (#19)

* (feat) Added support for Call APDUs

When device does call-to-dispatching it sends a request with the Call (GPRS Registration) APDU to the main system.
Added parsing for Request and Call APDU.

* license(busl): Updated the BUSL grant

Updated the BUSL grant to be in line with other packages.

---------

Co-authored-by: Henrik Wahlgren <[email protected]>
  • Loading branch information
Krolken and Henrik Wahlgren authored Feb 25, 2024
1 parent f1ad6b4 commit c7524fe
Show file tree
Hide file tree
Showing 6 changed files with 346 additions and 13 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dist/
downloads/
eggs/
.eggs/
.idea/
lib/
lib64/
parts/
Expand Down
28 changes: 23 additions & 5 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,33 @@ Parameters
Licensor: Palmlund Wahlgren Innovative Technology AB

Licensed Work: elgas - the python library
The Licensed Work is (c) 2022 Palmlund Wahlgren Innovative Technology AB.
The Licensed Work is (c) 2024 Palmlund Wahlgren Innovative Technology AB.

Additional Use Grant: You may make use of the Licensed Work, provided that you do
not use the Licensed Work in a commercial product or a
commercial service.
Additional Use Grant: You may make use of the Licensed Work for any Permitted Purpose other than a Competing Use.
A Competing Use means use of the Licensed Work in or for a commercial product or service that
competes with the Licensed Work or any other product or service we offer using the Licensed Work
as of the date we make the Software available.

Competing Uses specifically include using the Licensed Work:

1. as a substitute for any of our products or services;

2. in a way that exposes the APIs of the Licensed Work; and

3. in a product or service that offers the same or substantially similar
functionality to the Licensed Work.

Permitted Purposes specifically include using the Software:

1. for your internal use and access;

2. for non-commercial education; and

3. for non-commercial research.

Change Date: Change date is four years from release date.

Change License: MIT
Change License: Apache License, Version 2.0

For information about alternative licensing arrangements please contact [email protected].

Expand Down
129 changes: 121 additions & 8 deletions elgas/application.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import datetime
from datetime import datetime, timedelta
from enum import IntEnum
from typing import *

Expand Down Expand Up @@ -160,13 +160,12 @@ class WriteTimeResponse:

@classmethod
def from_bytes(cls, in_bytes: bytes):

return cls()


@attr.s(auto_attribs=True)
class ReadDeviceParametersRequest:
""""""
""" """

service: ClassVar[
constants.ServiceNumber
Expand All @@ -187,7 +186,7 @@ def to_bytes(self) -> bytes:

@attr.s(auto_attribs=True)
class ReadDeviceParametersResponse:
""""""
""" """

service: ClassVar[
constants.ServiceNumber
Expand All @@ -214,7 +213,7 @@ def from_bytes(cls, in_bytes: bytes):

@attr.s(auto_attribs=True)
class ReadArchiveByTimeRequest:
""""""
""" """

service: ClassVar[
constants.ServiceNumber
Expand All @@ -237,7 +236,7 @@ def to_bytes(self) -> bytes:

@attr.s(auto_attribs=True)
class ReadArchiveByTimeResponse:
""""""
""" """

service: ClassVar[
constants.ServiceNumber
Expand All @@ -261,7 +260,7 @@ def from_bytes(cls, in_bytes: bytes):

@attr.s(auto_attribs=True)
class ReadArchiveRequest:
""""""
""" """

service: ClassVar[constants.ServiceNumber] = constants.ServiceNumber.READ_ARCHIVES

Expand All @@ -282,7 +281,7 @@ def to_bytes(self) -> bytes:

@attr.s(auto_attribs=True)
class ReadArchiveResponse:
""""""
""" """

service: ClassVar[constants.ServiceNumber] = constants.ServiceNumber.READ_ARCHIVES

Expand All @@ -300,3 +299,117 @@ def from_bytes(cls, in_bytes: bytes):
oldest_record_id=oldest_record_id,
data=data,
)


@attr.s(auto_attribs=True)
class CallRequest:
"""
Used in call to dispatching. Device will send the Call message to tell the server it is awake.
It is also named GprsRegistration.
"""

service: ClassVar[constants.ServiceNumber] = constants.ServiceNumber.CALL

guid: bytes
station_id: str
sim_card_id: str
modem_id: str
address_1: int
address_2: int
signal_strength: int
connections: int
last_connection_time: datetime
connection_errors: int
last_connection_error_time: datetime
resets: int
last_reset_time: datetime
tcp_data: int
all_data: int
serial_number: str
ip_address: str
last_modem_error: int
last_modem_error_time: datetime
modem_battery_capacity: int
modem_battery_voltage: int
firmware_version: str

version: int = attr.ib(default=2)
protocol: constants.Protocol = attr.ib(default=constants.Protocol.ELGAS2)

@classmethod
def from_bytes(cls, in_bytes: bytes):
"""
Datetimes are number of seconds since "2000-01-01T00:00:00"
"""
base_date = datetime.fromisoformat("2000-01-01T00:00:00")
data = bytearray(in_bytes)
length = int.from_bytes(utils.pop_many(data, 2), "little")
version = data.pop(0)
guid = bytes(utils.pop_many(data, 16))
station_id = utils.pretty_text(utils.pop_many(data, 17))
sim_id = str(utils.from_bcd(utils.pop_many(data, 10)))
modem_id = str(utils.from_bcd(utils.pop_many(data, 8)))
protocol = constants.Protocol(data.pop(0))
address_1 = int.from_bytes(utils.pop_many(data, 2), "little")
address_2 = data.pop(0)
signal_strength = data.pop(0)
connections = int.from_bytes(utils.pop_many(data, 4), "little")
last_connection = base_date + timedelta(
seconds=int.from_bytes(utils.pop_many(data, 4), "little")
)
connection_errors = int.from_bytes(utils.pop_many(data, 4), "little")
last_connection_error_time = base_date + timedelta(
seconds=int.from_bytes(utils.pop_many(data, 4), "little")
)
resets = int.from_bytes(utils.pop_many(data, 4), "little")
last_reset_time = base_date + timedelta(
seconds=int.from_bytes(utils.pop_many(data, 4), "little")
)
tcp_data = int.from_bytes(utils.pop_many(data, 4), "little")
all_data = int.from_bytes(utils.pop_many(data, 4), "little")
serial_number = str(int.from_bytes(utils.pop_many(data, 4), "little"))
ip_address = utils.parse_ip_address(utils.pop_many(data, 4))
last_modem_error_time = base_date + timedelta(
seconds=int.from_bytes(utils.pop_many(data, 4), "little")
)
last_modem_error = data.pop(0)
modem_battery_capacity = int.from_bytes(utils.pop_many(data, 2), "little")
modem_battery_voltage = int.from_bytes(utils.pop_many(data, 2), "little")
firmware_version = utils.pretty_text(utils.pop_many(data, 33))

return cls(
station_id=station_id,
sim_card_id=sim_id,
modem_id=modem_id,
address_1=address_1,
address_2=address_2,
signal_strength=signal_strength,
connections=connections,
last_connection_time=last_connection,
connection_errors=connection_errors,
last_connection_error_time=last_connection_error_time,
resets=resets,
last_reset_time=last_reset_time,
tcp_data=tcp_data,
all_data=all_data,
serial_number=serial_number,
ip_address=ip_address,
last_modem_error=last_modem_error,
last_modem_error_time=last_modem_error_time,
modem_battery_capacity=modem_battery_capacity,
modem_battery_voltage=modem_battery_voltage,
firmware_version=firmware_version,
version=version,
guid=guid,
protocol=protocol,
)


@attr.s(auto_attribs=True)
class CallResponse:
"""
It is just an empty response
"""

def to_bytes(self) -> bytes:
return b""
7 changes: 7 additions & 0 deletions elgas/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,10 @@ class Archive(IntEnum):
STATUS = 8
BILLING = 9
GAS_COMPOSITION = 10


class Protocol(IntEnum):
ELGAS2 = 0
QMD = 1
RDS = 2
MODBUS = 3
44 changes: 44 additions & 0 deletions elgas/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,50 @@ def to_bytes(self) -> bytes:

return bytes(out)

@classmethod
def from_bytes(cls, in_bytes: bytes):
if not in_bytes.startswith(b"\x02\xfe" + cls.TYPE.to_bytes(1, "little")):
raise ValueError("Not a Request frame")
if not in_bytes.endswith(b"\x0d"):
raise ValueError("Data does not end with end char")
service = constants.ServiceNumber(int.from_bytes(in_bytes[3:4], "little"))
length = int.from_bytes(in_bytes[4:6], "little")
if length != len(in_bytes) - 1: # STX is not counted
raise ValueError(
f"Length field does not correspond to length of data. Got {len(in_bytes)-1}, should be {length} "
)
destination_address_1 = int.from_bytes(in_bytes[6:8], "little")
destination_address_2 = int.from_bytes(in_bytes[8:9], "little")
source_address_1 = int.from_bytes(in_bytes[9:11], "little")
source_address_2 = int.from_bytes(in_bytes[11:12], "little")
data = in_bytes[12:-4]
redundancy_source = bytearray(in_bytes[1:-4])
lrc = in_bytes[-4]
calculated_lrc = utils.calculate_lrc(redundancy_source)
if lrc != calculated_lrc:
raise ValueError(
f"Incorrect LRC. Got {lrc!r}, should be {calculated_lrc!r}"
)
checksum = in_bytes[-3]
calculated_checksum = utils.calculate_checksum(redundancy_source)
if checksum != calculated_checksum:
raise ValueError(
f"Incorrect CHECKSUM. Got {checksum!r}, should be {calculated_checksum!r}"
)
drc = in_bytes[-2]
calculated_drc = utils.calculate_drc(redundancy_source)
if drc != calculated_drc:
raise ValueError(f"Incorrect DRC. Got {drc!r}, should be {drc!r}")

return cls(
service=service,
destination_address_1=destination_address_1,
destination_address_2=destination_address_2,
source_address_1=source_address_1,
source_address_2=source_address_2,
data=data,
)


@attr.s(auto_attribs=True)
class Response:
Expand Down
Loading

0 comments on commit c7524fe

Please sign in to comment.