Skip to content

Commit

Permalink
More cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
tjko committed Oct 13, 2023
1 parent 5a05f18 commit 9d94777
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pylint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ jobs:
pip install pylint pyserial
- name: Analysing the code with pylint
run: |
pylint --exit-zero $(git ls-files '*.py')
pylint --fail-under 9.0 $(git ls-files '*.py')
100 changes: 58 additions & 42 deletions flash-rd.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,17 @@


class RidenFirmwareUpdater:
"""Riden Power Supply Firmware Updater"""
"""Riden RD60xx Power Supply Firmware Updater"""

supported_models = [60062, 60121, 60181, 60065, 60125]


def __init__(self, port, verbose=False):
self.port = port
self.verbose_mode = verbose

def read_reply(self, count):

def read_data(self, count):
"""Read block of data from device."""

if self.verbose_mode:
Expand All @@ -44,26 +46,32 @@ def read_reply(self, count):
print('Read: %d: %s' % (len(res), res))
return res

def write_string(self, string):

def write_data(self, string):
"""Write block of data to device."""

if self.verbose_mode:
print('Write: %d: %s' % (len(string), string))
res = self.port.write(string)
return res


def update_firmware(self, firmware):
"""Send firmware image to device."""
self.write_string(b'upfirm\r\n')
res = self.read_reply(6)

self.port.timeout = 5
self.write_data(b'upfirm\r\n')
res = self.read_data(6)
if res != b'upredy':
print('Failed to initiate flashing: %s' % res)
return 1

print('Updating firmware...', end='', flush=True)
pos = 0
while pos < len(firmware):
buf = firmware[pos:pos+64]
self.write_string(buf)
res = self.read_reply(2)
self.write_data(buf)
res = self.read_data(2)
if res != b'OK':
print('Flash failed: %s' % res)
return 2
Expand All @@ -72,46 +80,53 @@ def update_firmware(self, firmware):
print(res)
return 0


def bootloader_mode(self):
"""Set device into bootloader mode."""

# Check if unit is in bootloader.
print('Check if device is in bootloader mode...', end='', flush=True)
self.write_string(b'queryd\r\n')
res = self.read_reply(4)
self.port.timeout = 5
self.write_data(b'queryd\r\n')
res = self.read_data(4)
if res == b'boot':
print('Yes')
else:
print('No')

# Send modbus command (read registers 0-3)
self.write_string(b'\x01\x03\x00\x00\x00\x04\x44\x09')
res = self.read_reply(13)
if len(res) == 0:
print('No response from device.')
return 1
if len(res) != 13 or res[0] != 0x01 or res[1] != 0x03 or res[2] != 0x08:
print('Invalid response received: %s' % res)
return 2
return 0
print('No')

model = res[3] << 8 | res[4]
print('Found device (using Modbus): RD%d (%d) v%0.2f' %
(model / 10, model, res[10] / 100))
# Try Modbus if unit not in bootloader mode...

print('Rebooting to bootloader mode...')

# Send modbus command (write 0x1601 into register 0x100)
self.write_string(b'\x01\x06\x01\x00\x16\x01\x47\x96')
res = self.read_reply(1)
if res != b'\xfc':
print('Failed to reboot device.')
return 3
sleep(3)
# Send modbus command (read registers 0-3)
self.port.timeout = 5
self.write_data(b'\x01\x03\x00\x00\x00\x04\x44\x09')
res = self.read_data(13)
if len(res) == 0:
print('No response from device.')
return 1
if len(res) != 13 or res[0] != 0x01 or res[1] != 0x03 or res[2] != 0x08:
print('Invalid response received: %s' % res)
return 2
model = res[3] << 8 | res[4]
print('Found device (using Modbus): RD%d (%d) v%0.2f' %
(model / 10, model, res[10] / 100))

# Send modbus command (write 0x1601 into register 0x100) to reboot
print('Rebooting into bootloader mode...')
self.write_data(b'\x01\x06\x01\x00\x16\x01\x47\x96')
res = self.read_data(1)
if res != b'\xfc':
print('Failed to reboot device.')
return 3

# Wait for unit to reboot...
sleep(3)
return 0


def device_info(self):
"""Return device model information."""
self.write_string(b'getinf\r\n')
res = self.read_reply(13)

self.write_data(b'getinf\r\n')
res = self.read_data(13)
if len(res) == 0:
print('No response from bootloader')
return(-1, 0, 0)
Expand All @@ -123,6 +138,7 @@ def device_info(self):
fwver = res[11] / 100
return(model, fwver, snum)


def supported_model(self, model):
"""Check if device model is supported for firmare update."""
if model in self.supported_models:
Expand Down Expand Up @@ -152,7 +168,7 @@ def main():

print('Serial port: %s (%dbps)' % (args.port, args.speed))
try:
sport = serial.Serial(port=args.port, baudrate=args.speed, timeout=2)
port = serial.Serial(port=args.port, baudrate=args.speed, timeout=2)
except serial.SerialException as err:
sys.exit(err)

Expand All @@ -172,29 +188,29 @@ def main():

# Put device into bootloader mode, if it's not already in it...

updater = RidenFirmwareUpdater(sport, verbose=args.verbose)
res = updater.bootloader_mode()
psu = RidenFirmwareUpdater(port, verbose=args.verbose)
res = psu.bootloader_mode()
if res:
sys.exit('Failed to set device in bootloader mode.')


# Query device information from bootloader

model, fwver, snum = updater.device_info()
model, fwver, snum = psu.device_info()

if model >= 0:
print('Device information (from bootloader):')
print(' Model: RD%d (%d)' % (model / 10, model))
print(' Firmware: v%0.2f' % fwver)
print(' S/N: %08d' % snum)

if not updater.supported_model(model):
if not psu.supported_model(model):
sys.exit('Unsupported device: %d' % model)

# Update firmware

if len(firmware) > 0:
res = updater.update_firmware(firmware)
res = psu.update_firmware(firmware)
if res == 0:
print('Firmware update complete.')
else:
Expand Down

0 comments on commit 9d94777

Please sign in to comment.