Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
ian612 committed Nov 15, 2023
2 parents 80c1b7b + 038e2ca commit cd6a354
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 137 deletions.
34 changes: 13 additions & 21 deletions scripts/dead_reckoning.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,20 @@
import struct
import math
import time
import _thread
from datetime import datetime
import serial

# Wrapper functions
def transmit(data):
'''Selects whether to use serial or tcp for transmitting.'''
if SIMULATE:
transmit_tcp(data)
else:
transmit_serial(data)
time.sleep(TRANSMIT_PAUSE)

def receive():
'''Selects whether to use serial or tcp for receiving.'''
if SIMULATE:
return receive_tcp()
else:
Expand All @@ -52,13 +53,10 @@ def transmit_tcp(data):
s.send(data.encode('utf-8'))
except (ConnectionRefusedError, ConnectionResetError):
print('Tx Connection was refused or reset.')
_thread.interrupt_main()
except TimeoutError:
print('Tx socket timed out.')
_thread.interrupt_main()
except EOFError:
print('\nKeyboardInterrupt triggered. Closing...')
_thread.interrupt_main()

def receive_tcp():
'''Receive a reply over the TCP connection.'''
Expand All @@ -73,10 +71,8 @@ def receive_tcp():
return [[False], None]
except (ConnectionRefusedError, ConnectionResetError):
print('Rx connection was refused or reset.')
_thread.interrupt_main()
except TimeoutError:
print('Response not received from robot.')
_thread.interrupt_main()

# Serial communication functions
def transmit_serial(data):
Expand Down Expand Up @@ -138,42 +134,38 @@ def bytes_to_list(msg):
except serial.SerialException:
pass

# Received responses
responses = [False]
time_rx = 'Never'

# The sequence of commands to run
cmd_sequence = ['w0-36', 'r0-90', 'w0-36', 'r0-90', 'w0-12', 'r0--90', 'w0-24', 'r0--90', 'w0-6', 'r0-720']
CMD_SEQUENCE = ['w0-36', 'r0-90', 'w0-36', 'r0-90', 'w0-12', 'r0--90', 'w0-24', 'r0--90', 'w0-6', 'r0-720']

# Main loop
RUNNING = True
ct = 0
while RUNNING:

if ct < len(cmd_sequence):
# If the command sequence hasn't been completed yet
if ct < len(CMD_SEQUENCE):

# Check an ultrasonic sensor 'u0'
transmit('u0')
[responses, time_rx] = receive()
print(f"Ultrasonic 0 reading: {round(responses[0], 3)}")

# Check an ultrasonic sensor 'u1'
transmit('u1')
[responses, time_rx] = receive()
print(f"Ultrasonic 1 reading: {round(responses[0], 3)}")

# transmit('u2')
# [responses, time_rx] = receive()
# print(f"Ultrasonic 2 reading: {round(responses[0], 3)}")

# transmit('u3')
# [responses, time_rx] = receive()
# print(f"Ultrasonic 3 reading: {round(responses[0], 3)}")

transmit(cmd_sequence[ct])
# Send a drive command
transmit(CMD_SEQUENCE[ct])
[responses, time_rx] = receive()
print(f"Drive command response: {round(responses[0], 3)}")

# If we receive a drive response indicating the command was accepted,
# move to the next command in the sequence
if responses[0] == math.inf:
ct += 1

# If the command sequence is complete, finish the program
else:
RUNNING = False
print("Sequence complete!")
218 changes: 102 additions & 116 deletions scripts/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,109 +23,89 @@

import socket
import struct
from threading import Thread
import _thread
import time
from datetime import datetime
import pygame

def display():

### Receive Window Setup ###
pygame.init()
clock = pygame.time.Clock()

# define RGB colors
white = (255, 255, 255)
green = (0, 255, 0)
blue = (0, 0, 128)

# display size
X = 400
Y = 225

# create the display surface object
display_surface = pygame.display.set_mode((X, Y))

# set the pygame window name
pygame.display.set_caption('Received text')

# create a font object
font = pygame.font.Font('freesansbold.ttf', 16)

# Draw the text field
display_surface.fill(white)
pygame.display.update()

# main loop
while True:

responses_rnd = [f"{item:.{2}f}" for item in responses]

# create a text surface object
text0 = font.render(f"Last response received at time: {time_rx}", True, green, blue)
text1 = font.render(f"Last response was: {responses_rnd}", True, green, blue)

# create a rectangular object for the text surface object
textRect0 = text0.get_rect()
textRect1 = text1.get_rect()

# set the center of the rectangular object
textRect0.center = (X // 2, Y // 2 + 15)
textRect1.center = (X // 2, Y // 2 - 15)

display_surface.fill(white)
display_surface.blit(text0, textRect0)
display_surface.blit(text1, textRect1)

for event in pygame.event.get():
if event.type == pygame.QUIT:
# deactivate the pygame library and quit the program
pygame.quit()
quit()

# update display
clock.tick(60)
pygame.display.flip()

def transmit():
ask = True
while True:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.connect((HOST, PORT_TX))
if ask:
send_string = input('Type in a string to send: ')
s.send(send_string.encode('utf-8'))
except (ConnectionRefusedError, ConnectionResetError):
print('Tx Connection was refused or reset.')
_thread.interrupt_main()
except TimeoutError:
print('Tx socket timed out.')
_thread.interrupt_main()
except EOFError:
print('\nKeyboardInterrupt triggered. Closing...')
_thread.interrupt_main()
ask = False
import serial

# Wrapper functions
def transmit(data):
'''Selects whether to use serial or tcp for transmitting.'''
if SIMULATE:
transmit_tcp(data)
else:
transmit_serial(data)
time.sleep(TRANSMIT_PAUSE)

def receive():
global responses
global time_rx
while True:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s2:
try:
s2.connect((HOST, PORT_RX))
response_raw = s2.recv(1024)
if response_raw:
responses = bytes_to_list(response_raw)
time_rx = datetime.now().strftime("%H:%M:%S")
except (ConnectionRefusedError, ConnectionResetError):
print('Rx connection was refused or reset.')
_thread.interrupt_main()
except TimeoutError:
print('Response not received from robot.')
_thread.interrupt_main()
'''Selects whether to use serial or tcp for receiving.'''
if SIMULATE:
return receive_tcp()
else:
return receive_serial()

# TCP communication functions
def transmit_tcp(data):
'''Send a command over the TCP connection.'''
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.connect((HOST, PORT_TX))
s.send(data.encode('utf-8'))
except (ConnectionRefusedError, ConnectionResetError):
print('Tx Connection was refused or reset.')
except TimeoutError:
print('Tx socket timed out.')
except EOFError:
print('\nKeyboardInterrupt triggered. Closing...')

def receive_tcp():
'''Receive a reply over the TCP connection.'''
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s2:
try:
s2.connect((HOST, PORT_RX))
response_raw = s2.recv(1024)
if response_raw:
# return the data received as well as the current time
return [bytes_to_list(response_raw), datetime.now().strftime("%H:%M:%S")]
else:
return [[False], None]
except (ConnectionRefusedError, ConnectionResetError):
print('Rx connection was refused or reset.')
except TimeoutError:
print('Response not received from robot.')

# Serial communication functions
def transmit_serial(data):
'''Transmit a command over a serial connection.'''
SER.write(data.encode('ascii'))

def receive_serial():
'''Receive a reply over a serial connection.'''
# If responses are ascii characters, use this
# response_raw = (SER.readline().strip().decode('ascii'),)

# If responses are a series of 4-byte floats, use this
available_bytes = SER.in_waiting
read_bytes = max(4, available_bytes - (available_bytes % 4))
if read_bytes >= 4:
response_raw = bytes_to_list(SER.read(read_bytes))

# If response received, return it
if response_raw[0]:
return [response_raw, datetime.now().strftime("%H:%M:%S")]
else:
return [[False], datetime.now().strftime("%H:%M:%S")]

def clear_serial(delay_time):
'''Wait some time (delay_time) and then clear the serial buffer.'''
time.sleep(delay_time)
SER.read(SER.in_waiting())

# Convert string of bytes to a list of values
def bytes_to_list(msg):
'''
Convert a sequence of single precision floats (Arduino/SimMerR float format)
to a list of numerical responses.
'''
num_responses = int(len(msg)/4)
if num_responses:
data = struct.unpack(f'{str(num_responses)}f', msg)
Expand All @@ -135,27 +115,33 @@ def bytes_to_list(msg):


# Set whether to use TCP (SimMeR) or serial (Arduino)
SIMULATE = True
SIMULATE = False

# Time to pause after transmitting (seconds)
# Pause time
TRANSMIT_PAUSE = 0.1

### Network Setup ###
HOST = '127.0.0.1' # The server's hostname or IP address
PORT_TX = 61200 # The port used by the *CLIENT* to receive
PORT_RX = 61201 # The port used by the *CLIENT* to send data

# Display text strings
responses = []
time_rx = 'Never'

# Create tx and rx threads
Thread(target = transmit, daemon = True).start()
Thread(target = receive, daemon = True).start()
HOST = '127.0.0.1' # The server's hostname or IP address
PORT_TX = 61200 # The port used by the *CLIENT* to receive
PORT_RX = 61201 # The port used by the *CLIENT* to send data

# Display the received text in a pygame window
### Serial Setup ###
BAUDRATE = 115200 # Baudrate in bps
PORT_SERIAL = 'COM4' # COM port identification
try:
display()
except KeyboardInterrupt:
pygame.quit()
quit()
SER = serial.Serial(PORT_SERIAL, BAUDRATE, timeout=0)
except serial.SerialException:
pass

# Source
SOURCE = 'serial device ' + PORT_SERIAL
if SIMULATE:
SOURCE = 'SimMeR'

# Main loop
RUNNING = True
while RUNNING:
cmd = input('Type in a string to send: ')
transmit(cmd)
[responses, time_rx] = receive()
print(f"At time '{time_rx}' received '{round(responses[0], 3)}' from {SOURCE}\n")

0 comments on commit cd6a354

Please sign in to comment.