Skip to content

Commit

Permalink
feat: improve position, add virtual can support by @AlexMatter1512 (#9)
Browse files Browse the repository at this point in the history
* vcan not working at the moment

* added docstrings to robot.py

* added stuff for #3, to be checked in lab before merging with main

---------

Co-authored-by: Alessio Matarazzo <[email protected]>
  • Loading branch information
Helias and AlexMatter1512 authored Feb 4, 2024
1 parent 4a3c2a3 commit 1090a7c
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 12 deletions.
18 changes: 15 additions & 3 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import can

from models.can_packet import CAN_position
from robot.constants import CAN_IDS, CHANNEL, DEBUG_CAN, DEBUG_VIRTUAL
from robot.constants import CAN_IDS, CHANNEL, VCHANNEL, DEBUG_CAN, DEBUG_VIRTUAL, DEBUG_VCAN
from robot.robot import robot
from robot.virtual import get_v_bus, get_v_message
from webserver.webserver import start_rest_api, start_socket
Expand All @@ -12,8 +12,13 @@
threading.Thread(target=start_socket).start()
threading.Thread(target=start_rest_api).start()

_bustype='socketcan' if not DEBUG_VIRTUAL else 'virtual'
bus = can.interface.Bus(channel=CHANNEL, bustype=_bustype)
_bustype = "virtual" if DEBUG_VIRTUAL or DEBUG_VCAN else "socketcan"
_channel = VCHANNEL if DEBUG_VCAN else CHANNEL
bus = can.interface.Bus(channel=_channel, bustype=_bustype)

if DEBUG_CAN:
print(f"Bus type: {_bustype}")
print(f"Channel: {_channel}")

if DEBUG_VIRTUAL:
v_bus = get_v_bus(CHANNEL)
Expand All @@ -24,6 +29,13 @@
v_bus.send(v_message)
v_bus.send(v_messageUnk)

if DEBUG_VCAN:
v_message = get_v_message(CAN_IDS["ROBOT_POSITION"], "<hhh", CAN_position)
v_messageUnk = get_v_message(0x333, "<hhh", CAN_position)

bus.send(v_message)
bus.send(v_messageUnk)

robot.init_lidar()

try:
Expand Down
12 changes: 12 additions & 0 deletions models/can_packet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import TypedDict, Union
import struct


class Position(TypedDict):
Expand All @@ -11,3 +12,14 @@ class Position(TypedDict):
"Y": 20,
"angle": 300,
}

class MotionCommand:
def __init__(self, CMD: int, PARAM_1: int, PARAM_2: int, PARAM_3: int, flags: int):
self.CMD = CMD
self.PARAM_1 = PARAM_1
self.PARAM_2 = PARAM_2
self.PARAM_3 = PARAM_3
self.flags = flags

def get_struct(self) -> bytes:
return struct.pack("<BhhhB", self.CMD, self.PARAM_1, self.PARAM_2, self.PARAM_3, self.flags)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ flask
python-can
git+https://github.com/simondlevy/BreezyLidar.git#egg=BreezyLidar&subdirectory=python
websockets
BreezyLidar
11 changes: 11 additions & 0 deletions robot/constants.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
DEBUG_CAN = True
DEBUG_LIDAR = False
DEBUG_VIRTUAL = True
DEBUG_VCAN = False

CHANNEL = 'can0'
VCHANNEL = 'vcan0'
LIDAR_DEVICE = '/dev/ttyACM0'

CAN_IDS = {
Expand All @@ -15,3 +17,12 @@
'ST_CMD': 0x710,
'OBST_MAP': 0x70F
}

MOTION_CMDS = {
'STOP': 0x82,
'BRAKE': 0x83,
'SET_POSITION': 0x84,
'FW_TO_DISTANCE': 0x85,
'ROTATE_RELATIVE': 0x88,
'SET_SPEED': 0x8C,
}
5 changes: 0 additions & 5 deletions robot/model.py

This file was deleted.

87 changes: 83 additions & 4 deletions robot/robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,29 @@
import can
from breezylidar import URG04LX

from models.can_packet import Position
from models.can_packet import MotionCommand, Position
from models.lidar_mock import SCANDATA_MOCKS
from robot.constants import CAN_IDS, DEBUG_CAN, DEBUG_LIDAR, DEBUG_VIRTUAL, LIDAR_DEVICE
from robot.constants import (
CAN_IDS,
CHANNEL,
DEBUG_CAN,
DEBUG_LIDAR,
DEBUG_VCAN,
DEBUG_VIRTUAL,
LIDAR_DEVICE,
MOTION_CMDS,
VCHANNEL,
)


class Robot:
def __init__(self):
# Initialize the CAN bus
_channel = VCHANNEL if DEBUG_VCAN else CHANNEL
_bustype = "virtual" if DEBUG_VIRTUAL or DEBUG_VCAN else "socketcan"
self.bus = can.interface.Bus(channel=_channel, bustype=_bustype)

# Initialize the lidar
self.laser_data = []
self.laser = None

Expand All @@ -20,6 +36,13 @@ def __init__(self):
self.Position: Position = {"X": 0, "Y": 0, "Angle": 0}

def on_data_received(self, frm: can.Message):
"""
Handle the data received from the CAN bus depending on the CAN ID.
this has to be called in a loop at the moment, but it should be changed to a callback somehow
:param frm: The CAN message received.
:type frm: can.Message
:return: None
"""
# Extract data from the CAN message
data = frm.data

Expand Down Expand Up @@ -48,13 +71,69 @@ def on_data_received(self, frm: can.Message):

def get_position(self) -> Position:
return self.Position

def set_position(self, position: Position) -> None:
"""
Set the position of the robot using a Position object.
:param position: A Position object containing X, Y, and Angle values.
:type position: dict
:key position["X"]: X-coordinate of the robot.
:key position["Y"]: Y-coordinate of the robot.
:key position["Angle"]: Angle of the robot.
:return: None
"""
mc = MotionCommand(MOTION_CMDS['SET_POSITION'], position["X"], position["Y"], position["Angle"], 0)
data = mc.get_struct()
msg = can.Message(arbitration_id=CAN_IDS["ROBOT_POSITION"], data=data, extended_id=False)
self.bus.send(msg)
def set_position(self, x: int, y: int, angle: int) -> None:
"""
Set the position of the robot using individual X, Y, and Angle values.
:param x: X-coordinate of the robot.
:type x: int
:param y: Y-coordinate of the robot.
:type y: int
:param angle: Angle of the robot.
:type angle: int
:return: None
"""
mc = MotionCommand(MOTION_CMDS['SET_POSITION'], x, y, angle, 0)
data = mc.get_struct()
msg = can.Message(arbitration_id=CAN_IDS["ROBOT_POSITION"], data=data, extended_id=False)
self.bus.send(msg)

def set_speed(self, speed: int) -> None:
"""
Set the speed of the robot.
:param speed: Speed of the robot.
:type speed: int
:return: None
"""
mc = MotionCommand(MOTION_CMDS['SET_SPEED'], speed, 0, 0, 0)
data = mc.get_struct()
msg = can.Message(arbitration_id=CAN_IDS["ROBOT_MOTION_COMMAND"], data=data, extended_id=False)
self.bus.send(msg)

def init_lidar(self) -> None:
if not DEBUG_VIRTUAL:
"""
Initialize the lidar object of the robot using the LIDAR_DEVICE constant.
:return: None
"""
if not DEBUG_VIRTUAL and not DEBUG_VCAN:
self.laser = URG04LX(LIDAR_DEVICE)

def get_lidar_data(self) -> List[int]:
if DEBUG_VIRTUAL:
"""
Get the data from the lidar. If DEBUG_VIRTUAL or DEBUG_VCAN are True, the data is mocked.
:return: List[int]
"""
if DEBUG_VIRTUAL or DEBUG_VCAN:
index = randrange(0, len(SCANDATA_MOCKS))
laser_data = SCANDATA_MOCKS[index]
else:
Expand Down

0 comments on commit 1090a7c

Please sign in to comment.