Skip to content

Commit

Permalink
Cleanup code to "pass" Pylint.
Browse files Browse the repository at this point in the history
  • Loading branch information
tjko committed Oct 13, 2023
1 parent d2a623d commit 4553506
Showing 1 changed file with 169 additions and 125 deletions.
294 changes: 169 additions & 125 deletions flash-rd.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,146 +20,190 @@

import sys
import argparse
import serial
from time import sleep
import serial

supported_models = [60062, 60121, 60181, 60065, 60125]
verbose_mode = 0


def read_reply(serial, count):
if verbose_mode:
print('Waiting data...')
r = serial.read(count)
if verbose_mode:
print('Read: %d: %s' % (len(r), r))
return r


def write_string(serial, s):
if verbose_mode:
print('Write: %d: %s' % (len(s), s))
r = serial.write(s)
return r


def update_firmware(serial, firmware):
write_string(serial, b'upfirm\r\n')
r = read_reply(serial, 6)
if r != b'upredy':
print('Failed to initiate flashing: %s' % r)
return 1
print('Updating firmware...', end='', flush=True)
pos = 0
while pos < len(firmware):
buf = firmware[pos:pos+64]
write_string(serial, buf)
r = read_reply(serial, 2)
if r != b'OK':
print('Flash failed: %s' % r)
return 2
print('.', end='', flush=True)
pos += 64
print(r)
return 0


####################################################################################
class RidenFirmwareUpdater:
"""Riden Power Supply Firmware Updater"""

supported_models = [60062, 60121, 60181, 60065, 60125]

def __init__(self, port, verbose=False):
self.port = port
self.verbose_mode = verbose

def read_reply(self, count):
"""Read block of data from device."""

if self.verbose_mode:
print('Waiting data...')
res = self.port.read(count)
if self.verbose_mode:
print('Read: %d: %s' % (len(res), res))
return res


def write_string(self, string):
"""Write block of data to device."""
if self.verbose_mode:
print('Write: %d: %s' % (len(string), string))
res = self.port.write(string)
return res


def update_firmware(self, firmware):
"""Send firmware image to device."""
self.write_string(b'upfirm\r\n')
res = self.read_reply(6)
if res != b'upredy':
print('Failed to initiate flashing: %s' % res)
return 1
print('Updating firmware...', end='', flush=True)
pos = 0
while pos < len(firmware):
buf = firmware[pos:pos+64]
self.write_string(buf)
res = self.read_reply(2)
if res != b'OK':
print('Flash failed: %s' % res)
return 2
print('.', end='', flush=True)
pos += 64
print(res)
return 0

def bootloader_mode(self):
"""Set device into bootloader mode."""
print('Check if device is in bootloader mode...', end='', flush=True)
self.write_string(b'queryd\r\n')
res = self.read_reply(4)
self.port.timeout = 5
if res == b'boot':
print('Yes')
else:
print('No')

# Send modbus command (read registers 0-3)
self.write_string(b'\x01\x03\x00\x00\x00\x04\x44\x09')
res = self.read_reply(13)
if len(res) == 0:
print('No response from device.')
return 1
if len(res) != 13 or res[0] != 0x01 or res[1] != 0x03 or res[2] != 0x08:
print('Invalid response received: %s' % res)
return 2

model = (res[3] << 8 | res[4])
print('Found device (using Modbus): RD%d (%d) v%0.2f' %
(model / 10, model, res[10] / 100))

print('Rebooting to bootloader mode...')

# Send modbus command (write 0x1601 into register 0x100)
self.write_string(b'\x01\x06\x01\x00\x16\x01\x47\x96')
res = self.read_reply(1)
if res != b'\xfc':
print('Failed to reboot device.')
return 3
sleep(3)
return 0

def device_info(self):
"""Return device model information."""
self.write_string(b'getinf\r\n')
res = self.read_reply(13)
if len(res) == 0:
print('No response from bootloader')
return(-1, 0, 0)
if len(res) != 13 or res[0:3] != b'inf':
print('Invalid response from bootloader: %s' % res)
return(-2, 0, 0)
snum = (res[6] << 24 | res[5] << 16 | res[4] << 8 | res[3])
model = (res[8] << 8 | res[7])
fwver = res[11] / 100
return(model, fwver, snum)

def supported_model(self, model):
"""Check if device model is supported for firmare update."""
if model in self.supported_models:
return True
return False

# Parse command-line arguments

parser = argparse.ArgumentParser(description='Riden RD60xx Firmware Flash Tool')
parser.add_argument('port', help='Serial port')
parser.add_argument('firmware', nargs='?',
help='Firmware file. If not specified, only reboot to bootloader mode and print version of Riden '
'device')
parser.add_argument('--speed', type=int, default=115200, help='Serial port speed')
args = parser.parse_args()

####################################################################################

def main():
"""Main program when invoked as a script."""

# Open serial connection
# Parse command-line arguments

print('Serial port: %s (%dbps)' % (args.port, args.speed))
try:
serial = serial.Serial(port=args.port, baudrate=args.speed, timeout=2)
except serial.SerialException as err:
sys.exit(err)
parser = argparse.ArgumentParser(description='Riden RD60xx Firmware Flash Tool')
parser.add_argument('port', help='Serial port')
parser.add_argument('firmware', nargs='?',
help='Firmware file. If not specified, only reboot to bootloader'
'mode and print version of the device.')
parser.add_argument('--speed', type=int, default=115200, help='Serial port speed')
parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose mode')
args = parser.parse_args()


# Read firmware file into memory
# Open serial connection

if args.firmware:
print('Serial port: %s (%dbps)' % (args.port, args.speed))
try:
with open(args.firmware, 'rb') as file:
firmware = file.read()
except OSError as err:
sport = serial.Serial(port=args.port, baudrate=args.speed, timeout=2)
except serial.SerialException as err:
sys.exit(err)
print('Firmware size: %d bytes' % len(firmware))
else:
firmware = ''


# Put device into bootloader mode, if it's not already in it...

print('Check if device is in bootloader mode...', end='', flush=True)
write_string(serial, b'queryd\r\n')
res = read_reply(serial, 4)
serial.timeout = 5
if res == b'boot':
print('Yes')
else:
print('No')

# Send modbus command (read registers 0-3)
write_string(serial, b'\x01\x03\x00\x00\x00\x04\x44\x09')
res = read_reply(serial, 13)
if len(res) == 0:
sys.exit('No response from device.')
if len(res) != 13 or res[0] != 0x01 or res[1] != 0x03 or res[2] != 0x08:
sys.exit('Invalid response received: %s' % res)

model = (res[3] << 8 | res[4])
print('Found device (using Modbus): RD%d (%d) v%0.2f' % (model / 10, model, res[10] / 100))

print('Rebooting to bootloader mode...')

# Send modbus command (write 0x1601 into register 0x100)
write_string(serial, b'\x01\x06\x01\x00\x16\x01\x47\x96')
res = read_reply(serial, 1)
if res != b'\xfc':
sys.exit('Failed to reboot device.')
sleep(3)


# Query device information from bootloader

write_string(serial, b'getinf\r\n')
res = read_reply(serial, 13)
if len(res) == 0:
sys.exit('No response from bootloader')
if len(res) != 13 or res[0:3] != b'inf':
sys.exit('Invalid response from bootloader: %s' % res)

sn = (res[6] << 24 | res[5] << 16 | res[4] << 8 | res[3])
model = (res[8] << 8 | res[7])
fwver = res[11] / 100

print('Device information (from bootloader):')
print(' Model: RD%d (%d)' % (model / 10, model))
print(' Firmware: v%0.2f' % fwver)
print(' S/N: %08d' % sn)

if model not in supported_models:
sys.exit('Unsupported device: %d' % model)

# Update firmware

if len(firmware) > 0:
res = update_firmware(serial, firmware)
if res == 0:
print('Firmware update complete.')


# Read firmware file into memory

if args.firmware:
try:
with open(args.firmware, 'rb') as file:
firmware = file.read()
except OSError as err:
sys.exit(err)
print('Firmware size: %d bytes' % len(firmware))
else:
sys.exit('Firmware update FAILED!')
firmware = ''


# Put device into bootloader mode, if it's not already in it...

updater = RidenFirmwareUpdater(sport, verbose=args.verbose)
res = updater.bootloader_mode()
if res:
sys.exit('Failed to set device in bootloader mode.')


# Query device information from bootloader

model, fwver, snum = updater.device_info()

if model >= 0:
print('Device information (from bootloader):')
print(' Model: RD%d (%d)' % (model / 10, model))
print(' Firmware: v%0.2f' % fwver)
print(' S/N: %08d' % snum)

if not updater.supported_model(model):
sys.exit('Unsupported device: %d' % model)

# Update firmware

if len(firmware) > 0:
res = updater.update_firmware(firmware)
if res == 0:
print('Firmware update complete.')
else:
sys.exit('Firmware update FAILED!')


if __name__ == '__main__':
main()

# eof :-)

0 comments on commit 4553506

Please sign in to comment.