Skip to content

Commit

Permalink
Upload visit_door_list_phhp
Browse files Browse the repository at this point in the history
  • Loading branch information
jinsooPark-17 authored Feb 7, 2024
1 parent ae037f3 commit a8b26d7
Showing 1 changed file with 214 additions and 0 deletions.
214 changes: 214 additions & 0 deletions bwi_tasks/scripts/visit_door_list_phhp
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
#!/usr/bin/env python
import math
import numpy as np
import random

import rospy
from actionlib import SimpleActionClient
from std_srvs.srv import Empty
from nav_msgs.srv import GetPlan, GetPlanRequest
from geometry_msgs.msg import PoseWithCovarianceStamped
from geometry_msgs.msg import Point32, PolygonStamped
from move_base_msgs.msg import MoveBaseGoal, MoveBaseAction

class Record:
def __init__(self):
import time
import rosbag
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import PoseWithCovarianceStamped

# create new rosbag
self.bag = rosbag.Bag("/root/projects/rosbag/{}.bag".format(time.strftime("%Y-%m-%d-%Hh%Mm")), 'w')

# define buffer
self.scan_filtered = LaserScan()
self.scan_hallucinate = LaserScan()

# define subscribers
self.sub_scan_filtered = rospy.Subscriber("/scan_filtered", LaserScan, self.scan_filtered_cb)
self.sub_scan_hallucinated = rospy.Subscriber("/scan_hallucinated", LaserScan, self.scan_hallucinate_cb)
self.sub_amcl = rospy.Subscriber("/amcl_pose", PoseWithCovarianceStamped, self.amcl_cb)
self.sub_comms = rospy.Subscriber("/comms/amcl_pose", PoseWithCovarianceStamped, self.comms_cb)

def scan_filtered_cb(self, msg):
self.scan_filtered = msg

def scan_hallucinate_cb(self, msg):
self.scan_hallucinate = msg

def amcl_cb(self, msg):
self.bag.write("amcl_pose", msg)
self.bag.write("scan_filtered", self.scan_filtered)
self.bag.write("scan_hallucinated", self.scan_hallucinate)

def comms_cb(self, msg):
self.bag.write("comms/amcl_pose", msg)

class BWIbot:
def __init__(self):
self.rec = Record()
# Define PHHP-related parameters
self.phhp = False
self.detection_range = 8.0
self.phhp_radius = 0.5122
self.phhp_dr = 0.5661
self.phhp_k_begin = 0.4842
self.phhp_k_end = 0.5001

self.opponent_location = np.zeros(2) # (x, y)

# Define move_base
self.move_base = SimpleActionClient("/move_base", MoveBaseAction)
connected = self.move_base.wait_for_server(timeout=rospy.Duration(60.0))
if not connected:
raise TimeoutError("MoveBase does not respond! Please retry after reboot segway base.")
self.goal = MoveBaseGoal()
self.goal.target_pose.header.frame_id = "level_mux_map"

# Define ROS services
self.make_plan_srv = rospy.ServiceProxy("/move_base/NavfnROS/make_plan", GetPlan)
self.clear_costmaps_srv = rospy.ServiceProxy("/move_base/clear_costmaps", Empty)
self.clear_hallucination_srv = rospy.ServiceProxy("/clear_virtual_circles", Empty)

# Define subscribers
self.__sub_opponent_amcl = rospy.Subscriber("/comms/amcl_pose", PoseWithCovarianceStamped, self.amcl_cb)

# Define publishers
self.__pub_hallucination = rospy.Publisher("/add_circles", PolygonStamped, queue_size=10)

def amcl_cb(self, amcl):
# subscribe to other robot's location
self.opponent_location[0] = amcl.pose.pose.position.x
self.opponent_location[1] = amcl.pose.pose.position.y

def clear_costmaps(self):
rospy.wait_for_service("/move_base/clear_costmaps")
try:
self.clear_costmaps_srv()
except rospy.ServiceException as e:
raise ConnectionError("move_base/clear_costmaps does not respond.")

def clear_hallucination(self):
rospy.wait_for_service("/clear_virtual_circles")
try:
self.clear_hallucination_srv()
except rospy.ServiceException as e:
raise ConnectionError("clear_hallucination does not respond.")

def send_goal_and_wait(self, x, y, yaw):
self.goal.target_pose.header.stamp = rospy.Time.now()
self.goal.target_pose.pose.position.x = x
self.goal.target_pose.pose.position.y = y
self.goal.target_pose.pose.orientation.z = math.sin(yaw/2.)
self.goal.target_pose.pose.orientation.w = math.cos(yaw/2.)

self.phhp = False
start_time = rospy.Time.now()
self.move_base.send_goal(
goal = self.goal,
feedback_cb = self.feedback_cb,
)
self.move_base.wait_for_result()
ttd = (rospy.Time.now() - start_time).to_sec()
self.clear_hallucination()
self.clear_costmaps()

return ttd

def feedback_cb(self, feedback):
curr_pose = feedback.base_position.pose

if self.phhp is False:
plan = self.get_plan_to_goal(curr_pose)

try:
# check if plan overlap with other robot
idx = np.argwhere( np.linalg.norm(plan-self.opponent_location, axis=1) < 0.5 )[0][0]
dist = np.linalg.norm(plan[1:idx+1] - plan[0:idx], axis=1).sum()
if self.phhp is False and dist < self.detection_range:
self.generate_vo( # Right lane following baseline
center=plan[:idx+1], # plan[:idx+1]
radius=self.phhp_radius, # 1.0
dr=self.phhp_dr-self.phhp_radius, # 0.075
k_begin=self.phhp_k_begin, # 0.2
k_end=self.phhp_k_end # 0.8
)
except IndexError as e:
# No conflict!
pass

def get_plan_to_goal(self, curr_pose):
req = GetPlanRequest()
req.start.header.frame_id = "level_mux_map"
req.goal.header.frame_id = "level_mux_map"
req.start.pose = curr_pose
req.goal.pose = self.goal.target_pose.pose

rospy.wait_for_service("/move_base/NavfnROS/make_plan")
try:
plan_msg = self.make_plan_srv( req )
plan = np.array(
[[p.pose.position.x, p.pose.position.y] for p in plan_msg.plan.poses]
)
return plan
except rospy.ServiceException as e:
print("Make plan service call failed.")

def generate_vo(self, center, radius, dr, k_begin, k_end):
dist = np.cumsum(np.linalg.norm( center[1:] - center[:-1], axis=1))
d_begin = dist[-1] * k_begin
d_end = dist[-1] * k_end
idx_begin, idx_end = np.searchsorted(dist, [d_begin, d_end])

# calculate center of virtual circles
dx, dy = (center[2:] - center[:-2]).T
theta = np.arctan2(dy, dx) + np.pi/2.0
idx = slice(idx_begin, idx_end, 4)
center = center[idx]
theta = theta[idx]
vos = center + (radius + dr) * np.array([np.cos(theta), np.sin(theta)]).T

msg = PolygonStamped()
msg.header.stamp = rospy.Time.now() + rospy.Duration(9999.9)
msg.polygon.points = [Point32(x, y, radius) for (x, y) in vos]
self.__pub_hallucination.publish(msg)
self.phhp = True

# Later, changed with LSTM?
# Use corridor-side door location (?00)
DOOR_LIST = dict(
d2_124a=[-38.75, -8.35, -1.5385494443596446],
d2_124b=[-44.75, -8.55, -1.5707963267948966],
d2_202x=[-6.35, -10.45, 1.6020361602251632],
d2_202y=[-5.15, -0.35, -3.104572537715862],
d2_202z=[-7.70, 26.35, 3.141592653589793],
d2_304a=[-62.9, -8.40, -0.07130746478528853],
d2_318 =[-53.75, 25.65, -3.0676536159318575],
d2_324 =[-53.80, 13.95, -3.0750244898139685],
d2_326 =[-53.90, 3.60, -3.079173843593835]
)

if __name__ == '__main__':
rospy.init_node("visit_door_list_ahg_multiple")

robot = BWIbot()

try:
# Move to random door
prev_door = None
while not rospy.is_shutdown():
next_door = random.choice( DOOR_LIST.keys() )
if prev_door == next_door:
# robot must move to somewhere
continue

prev_door = next_door
x, y, yaw = DOOR_LIST[ next_door ]
rospy.loginfo("Heading to {}".format(next_door))

robot.send_goal_and_wait(x, y, yaw)
rospy.sleep(5.0)
finally:
# wrapup
robot.rec.bag.close()

0 comments on commit a8b26d7

Please sign in to comment.