Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pragya/person detection #1

Open
wants to merge 8 commits into
base: ros_humble_main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .vscode/c_cpp_properties.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"configurations": [
{
"browse": {
"databaseFilename": "${default}",
"limitSymbolsToIncludedHeaders": false
},
"includePath": [
"/opt/ros/humble/include/**",
"/usr/include/**"
],
"name": "ROS",
"intelliSenseMode": "gcc-x64",
"compilerPath": "/usr/bin/gcc",
"cStandard": "gnu11",
"cppStandard": "c++14"
}
],
"version": 4
}
15 changes: 15 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"python.autoComplete.extraPaths": [
"/home/pragya/ros2_ws/build/selection_task",
"/home/pragya/ros2_ws/install/selection_task/lib/python3.10/site-packages",
"/opt/ros/humble/lib/python3.10/site-packages",
"/opt/ros/humble/local/lib/python3.10/dist-packages"
],
"python.analysis.extraPaths": [
"/home/pragya/ros2_ws/build/selection_task",
"/home/pragya/ros2_ws/install/selection_task/lib/python3.10/site-packages",
"/opt/ros/humble/lib/python3.10/site-packages",
"/opt/ros/humble/local/lib/python3.10/dist-packages"
],
"ros.distro": "humble"
}
Binary file added deeplabv3.tflite
Binary file not shown.
21 changes: 21 additions & 0 deletions person_follower/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>person_follower</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="[email protected]">pragya</maintainer>
<license>TODO: License declaration</license>

<depend>rclpy</depend>
<depend>sensor_msgs</depend>

<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>

<export>
<build_type>ament_python</build_type>
</export>
</package>
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
29 changes: 29 additions & 0 deletions person_follower/person_follower/camera_subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
import cv2
from cv_bridge import CvBridge

class CameraSubscriber(Node):
def __init__(self):
super().__init__('camera_subscriber_node')
self.subscription = self.create_subscription(
Image,'/kinect_camera/image_raw', self.image_callback, 10)
self.bridge = CvBridge()
print("holaaaaaaaa")

def image_callback(self, msg):
cv_image = self.bridge.imgmsg_to_cv2(msg, encoding='bgr8')
cv2.imshow('Kinect Camera Feed', cv_image)
cv2.waitKey(1)

def main(args=None):
rclpy.init(args=args)
camera_subscriber = CameraSubscriber()
rclpy.spin(camera_subscriber)
camera_subscriber.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()
73 changes: 73 additions & 0 deletions person_follower/person_follower/final_code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#! /usr/bin/env python3

import cv2
import mediapipe as mp
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import numpy as np


class CameraSubscriber(Node):
def __init__(self):
super().__init__('camera_subscriber_node')
self.bridge = CvBridge()
self.subscription = self.create_subscription(Image, '/kinect_camera/image_raw', self.image_callback, 10)
self.cv_image = None
self.mp_holistic = mp.solutions.holistic.Holistic()
self.mp_drawing = mp.solutions.drawing_utils

def calculate_centroid(self, keypoints):
x_sum, y_sum = 0, 0
num_points = len(keypoints)
for point in keypoints:
x_sum += point[0]
y_sum += point[1]
centroid_x = x_sum / num_points
centroid_y = y_sum / num_points
return [int(centroid_x), int(centroid_y)]

def perform_segmentation(self, image):
gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
_, segmented_mask = cv2.threshold(gray_image, 128, 255, cv2.THRESH_BINARY)
return segmented_mask

def webcam_centroid(self):
try:
results = self.mp_holistic.process(self.cv_image)
if results.pose_landmarks:
keypoints = []
for landmark in results.pose_landmarks.landmark:
h, w, c = self.cv_image.shape
x, y = int(landmark.x * w), int(landmark.y * h)
keypoints.append((x, y))
centroid = self.calculate_centroid(keypoints)
cv2.circle(self.cv_image, centroid, 5, (0, 255, 0), -1)
self.mp_drawing.draw_landmarks(self.cv_image, results.pose_landmarks, mp.solutions.holistic.POSE_CONNECTIONS)

#image segmentation
segmented_mask = self.perform_segmentation(self.cv_image)
cv2.imshow('Segmentation Mask', segmented_mask)
cv2.imshow('Kinect Camera Feed', self.cv_image)

except Exception as e:
print(e)

def image_callback(self, msg):
self.cv_image = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
self.webcam_centroid()
cv2.waitKey(3)

def main(args=None):
rclpy.init(args=args)
camera_subscriber = CameraSubscriber()
rclpy.spin(camera_subscriber)
camera_subscriber.mp_holistic.close()
cv2.destroyAllWindows()
camera_subscriber.destroy_node()
rclpy.shutdown()

if __name__ == "__main__":
main()

180 changes: 180 additions & 0 deletions person_follower/person_follower/image_segmentation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
#! /usr/bin/env python3

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from cv_bridge import CvBridge, CvBridgeError
import cv2
from sensor_msgs.msg import Image
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
import numpy as np

class PersonFollower(Node):
def __init__(self):
super().__init__("person_follower")
self.bridge = CvBridge()
self.image_sub = self.create_subscription(Image, "/kinect_camera/image_raw",self.callback, 10)
self.depth_sub=self.create_subscription(Image,"/kinect_camera/depth/image_raw",self.depth_callback,10)
self.pub = self.create_publisher(Twist, '/cmd_vel', 10)
self.velocity_msg = Twist()
self.depth_image = None
self.velocity_msg.linear.y = 0.0
self.velocity_msg.linear.z = 0.0
self.velocity_msg.angular.x = 0.0
self.velocity_msg.angular.y = 0.0

self.mp_drawing = mp.solutions.drawing_utils
self.mp_holistic = mp.solutions.holistic
self.mp_pose = mp.solutions.pose.Pose(
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
self.kp_angular=0.005
self.kp_linear=0.7
self.x_center=None
self.image_center=None
self.buffer=10

# Create the options that will be used for ImageSegmenter
base_options = python.BaseOptions(model_asset_path='/home/pragya/personfollower_ws/src/Person-Follower/person_follower/person_follower/deeplabv3.tflite')
options = vision.ImageSegmenterOptions(base_options=base_options,output_category_mask=True)
self.segmenter = vision.ImageSegmenter.create_from_options(options)

self.BG_COLOR = (192, 192, 192) # gray
self.MASK_COLOR = (0, 255, 0) # white

def depth_callback(self,data):
try:
self.depth_image = self.bridge.imgmsg_to_cv2(data, "passthrough")
except Exception as e:
self.get_logger().error(f"Error converting depth image: {e}")


def callback(self,data):
self.cv_image = self.bridge.imgmsg_to_cv2(data, "bgr8")
# self.cv_image= cv2.resize(self.cv_image,(720,600))
rgb_cv_image = cv2.cvtColor(self.cv_image, cv2.COLOR_BGR2RGB)
self.segmentation_frame=self.cv_image
self.results = self.mp_pose.process(rgb_cv_image)

if self.results.pose_landmarks is not None:
landmarks = self.results.pose_landmarks.landmark
# Get the average of all landmark x and y coordinates to find the centroid
x_centroid = sum([landmark.x for landmark in landmarks]) / len(landmarks)
y_centroid = sum([landmark.y for landmark in landmarks]) / len(landmarks)
centroid = (x_centroid, y_centroid)
self.x_center=x_centroid * self.cv_image.shape[1]
self.y_center=y_centroid * self.cv_image.shape[0]
x_length=self.cv_image.shape[1]
self.image_center=x_length/2

x =int(x_length/2)
# if self.depth_image is not None:
# depth_mm = self.depth_image[int(self.y_center),int(self.x_center)]
# print("depth is :",depth_mm)

cv2.circle(self.cv_image, (int(x_centroid * self.cv_image.shape[1]), int(y_centroid * self.cv_image.shape[0])), 5, (0, 0, 255), -1)

x_min = min([landmark.x for landmark in landmarks])
x_max = max([landmark.x for landmark in landmarks])
y_min = min([landmark.y for landmark in landmarks])
y_max = max([landmark.y for landmark in landmarks])

cv2.rectangle(self.cv_image, (int(x_min * self.cv_image.shape[1]), int(y_min * self.cv_image.shape[0])),
(int(x_max * self.cv_image.shape[1]), int(y_max * self.cv_image.shape[0])), (0, 255, 0), 2)
self.segmentation_frame = mp.Image(image_format=mp.ImageFormat.SRGB, data=self.segmentation_frame)
# mask for the segmented image
segmentation_result = self.segmenter.segment(self.segmentation_frame)
category_mask = segmentation_result.category_mask

image_data = self.segmentation_frame.numpy_view()
fg_image = np.zeros(image_data.shape, dtype=np.uint8)
fg_image[:] = self.MASK_COLOR
bg_image = np.zeros(image_data.shape, dtype=np.uint8)
bg_image[:] = self.BG_COLOR

condition = np.stack((category_mask.numpy_view(),) * 3, axis=-1) > 0.2

self.segmentation_frame = np.where(condition, fg_image, bg_image)
self.mp_drawing.draw_landmarks(self.segmentation_frame, self.results.pose_landmarks,self.mp_holistic.POSE_CONNECTIONS)

if self.results.pose_landmarks is not None:
cv2.line(self.cv_image, (int(self.x_center-15), int(self.y_center)), (int(self.x_center+15), int(self.y_center)), (255, 0, 0), 3)
cv2.line(self.cv_image, (int(self.x_center), int(self.y_center-15)), (int(self.x_center), int(self.y_center+15)), (255, 0, 0), 3)
cv2.line(self.segmentation_frame, (int(self.x_center-15), int(self.y_center)), (int(self.x_center+15), int(self.y_center)), (255, 0, 0), 3)
cv2.line(self.segmentation_frame, (int(self.x_center), int(self.y_center-15)), (int(self.x_center), int(self.y_center+15)), (255, 0, 0), 3)
cv2.line(self.segmentation_frame, (int(350), int(0)), (int(350), int(500)), (0, 0, 255), 2)
cv2.line(self.segmentation_frame, (int(0), int(self.y_center)), (int(700), int(self.y_center)), (0, 0, 255), 2)


self.control_loop()

def control_loop(self):
if self.results.pose_landmarks is None:
self.velocity_control(0.0,-0.5)
self.top="Searching for Person"
self.bottom="<=Rotating=>"

else:
if self.x_center > 700:
self.x_center=699.0
if self.y_center> 500:
self.y_center=499.0
depth_mm = self.depth_image[int(self.y_center),int(self.x_center)]

if (3.0 + self.x_center) < self.image_center : #person is left means positive ang vel
self.top="<==Left"
self.bottom="Going Forward"

self.error_linear= depth_mm - 3.0

self.error= (self.image_center - self.x_center)
self.angular_vel= self.kp_angular * self.error
if self.error_linear < 0.3:
self.error_linear=0.0
self.linear_vel= self.kp_linear * self.error_linear
self.velocity_control(self.linear_vel,self.angular_vel)

elif self.x_center > self.image_center + 3.0 : #person is right means negative ang vel
self.top="Right==>"
self.bottom="Going Right"

self.error_linear= depth_mm - 3.0
self.error= (self.image_center - self.x_center)
self.angular_vel= self.kp_angular * self.error

if self.error_linear < 0.3:
self.error_linear=0.0
self.linear_vel= self.kp_linear * self.error_linear
self.velocity_control(self.linear_vel,self.angular_vel)

elif abs(self.x_center - self.image_center)<= 3.0 :
self.top="Stop"
self.bottom="Reached Goal position"
self.velocity_control(0.0,0.0)


cv2.putText(self.cv_image,self.top,(200,50),cv2.FONT_HERSHEY_DUPLEX,0.8,(0, 0,255),2)
cv2.putText(self.cv_image,self.bottom,(200,450),cv2.FONT_HERSHEY_DUPLEX,0.8,(0, 0, 255),2)
cv2.imshow('Person Detection', self.cv_image)
cv2.imshow('Person Segmentation', self.segmentation_frame)
cv2.waitKey(3)
print("No person detected in the image")

def velocity_control(self,linear,angular):
# linear = min(linear,2)
self.velocity_msg.linear.x = float(linear)
self.velocity_msg.angular.z = angular
self.pub.publish(self.velocity_msg)

def main():
rclpy.init()
Mynode = PersonFollower()
rclpy.spin(Mynode)
cv2.destroyAllWindows()
rclpy.shutdown()

if __name__=="__main__" :
main()
Loading