利用blob检测算法识别交通杆,控制TB3机器人完成对交通杆的起停动作!
上一篇博文中《TB3_Autorace之路标检测》 订阅了原始图像信息,经过SIFT检测识别出道路交通标志,这里我们同样订阅树莓派摄像头的原始图像信息对交通杆进行识别,同时我们还订阅了交通杆的状态信息以及任务完成信息,实现杆落即停,杆起即过的功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 self.sub_image_type = "raw" self.pub_image_type = "raw" if self.sub_image_type == "compressed" : self.sub_image_original = rospy.Subscriber('/detect/image_input/compressed' , CompressedImage, self.cbGetImage, queue_size=1 ) elif self.sub_image_type == "raw" : self.sub_image_original = rospy.Subscriber('/detect/image_input' , Image, self.cbGetImage, queue_size=1 ) self.sub_level_crossing_order = rospy.Subscriber('/detect/level_crossing_order' , UInt8, self.cbLevelCrossingOrder, queue_size=1 ) self.sub_level_crossing_finished = rospy.Subscriber('/control/level_crossing_finished' , UInt8, self.cbLevelCrossingFinished, queue_size=1 )
发布话题 发布图像信息,交通杆的状态信息/detect/level_crossing_stamped
、任务开始信息/control/level_crossing_start
以及速度控制信息/control/max_vel
。
1 2 3 4 5 6 7 8 9 10 if self.pub_image_type == "compressed" : self.pub_image_level = rospy.Publisher('/detect/image_output/compressed' , CompressedImage, queue_size=1 ) elif self.pub_image_type == "raw" : self.pub_image_level = rosppy.Publisher('/detect/image_output' , Image, queue_size=1 ) self.pub_level_crossing_return = rospy.Publisher('/detect/level_crossing_stamped' , UInt8, queue_size=1 ) self.pub_parking_start = rospy.Publisher('/control/level_crossing_start' , UInt8, queue_size=1 ) self.pub_max_vel = rospy.Publisher('/control/max_vel' , Float64, queue_size=1 )
设定检测状态 这里我们利用python的枚举操作将路标检测封装为几个不同状态,包括识别标志、检测交通杆、识别交通杆、停车、通过等状态。根据不同的识别状态执行相应的控制命令。
1 2 3 4 5 6 self.StepOfLevelCrossing = Enum('StepOfLevelCrossing' , 'searching_stop_sign ' 'searching_level ' 'watching_level ' 'stop ' 'pass_level' )
距离计算 计算点到直线的距离,代码封装如下:
1 2 distance = abs(x0 * a + y0 * b + c) / math.sqrt(a * a + b * b) return distance
计算点到点的距离(欧式),代码封装如下:
1 2 distance = math.sqrt((x2 - x1) * (x2 - x1) + (y2 - y1) * (y2 - y1)) return distance
线性判断 利用blob算法检测出的红色块,得出色块的坐标对,根据三点呈一线的原理,计算直线方程并判断检测出的点是否呈线性,代码封装如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def (point1, point2, point3) : threshold_linearity = 50 x1, y1 = point1 x2, y2 = point3 if x2 - x1 != 0 : a = (y2 - y1) / (x2 - x1) else : a = 1000 b = -1 c = y1 - a * x1 err = fnCalcDistanceDot2Line(a, b, c, point2[0 ], point2[1 ]) if err < threshold_linearity: return True else : return False
距离判断 利用blob算法检测出的三个红色块,得出三个色块的坐标对并计算相互之间的距离,判断距离是否相等,代码封装如下:
1 2 3 4 5 6 7 8 9 10 11 def (point1, point2, point3) : threshold_distance_equality = 3 distance1 = fnCalcDistanceDot2Dot(point1[0 ], point1[1 ], point2[0 ], point2[1 ]) distance2 = fnCalcDistanceDot2Dot(point2[0 ], point2[1 ], point3[0 ], point3[1 ]) std = np.std([distance1, distance2]) if std < threshold_distance_equality: return True else : return False
格式转换 设定合适的帧率,将订阅到的原始图像信息格式转换成OpenCV库能够处理的信息格式,代码封装如下:
1 2 3 4 5 6 7 8 9 10 11 12 def cbGetImage (self, image_msg) : if self.counter % 3 != 0 : self.counter += 1 return else : self.counter = 1 if self.sub_image_type == "compressed" : np_arr = np.fromstring(image_msg.data, np.uint8) self.cv_image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) else : self.cv_image = self.cvBridge.imgmsg_to_cv2(image_msg, "bgr8" )
注:根据计算机处理能力设定帧率,这里设置成10fps,不合适的帧率设置容易产生掉帧的现象,会影响最终的检测。
提取色块 利用掩膜操作,设定颜色阈值,将RGB图像转换成HSV格式图像,根据颜色阈值提取出交通杆上的红色色块,返回掩膜后的0-1图像。代码封装如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def fnMaskRedOfLevel (self) : image = np.copy(self.cv_image) hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) Hue_l = self.hue_red_l Hue_h = self.hue_red_h Saturation_l = self.saturation_red_l Saturation_h = self.saturation_red_h Lightness_l = self.lightness_red_l Lightness_h = self.lightness_red_h lower_red = np.array([Hue_l, Saturation_l, Lightness_l]) upper_red = np.array([Hue_h, Saturation_h, Lightness_h]) mask = cv2.inRange(hsv, lower_red, upper_red) if self.is_calibration_mode == True : if self.pub_image_type == "compressed" : self.pub_image_color_filtered.publish(self.cvBridge.cv2_to_compressed_imgmsg(mask, "jpg" )) elif self.pub_image_type == "raw" : self.pub_image_color_filtered.publish(self.cvBridge.cv2_to_imgmsg(mask, "mono8" )) mask = cv2.bitwise_not(mask) return mask
速度控制 根据订阅到的交通杆状态信息/detect/level_crossing_order
,实时发布目前的检测状态以及速度命令控制小车由减速到停车再到启动的全过程。代码封装如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 大专栏 TB3_Autorace之交通杆检测 e">39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 def cbLevelCrossingOrder (self, order) : pub_level_crossing_return = UInt8() if order.data == self.StepOfLevelCrossing.searching_stop_sign.value: rospy.loginfo("Now lane_following" ) pub_level_crossing_return.data = self.StepOfLevelCrossing.searching_stop_sign.value elif order.data == self.StepOfLevelCrossing.searching_level.value: rospy.loginfo("Now searching_level" ) msg_pub_max_vel = Float64() msg_pub_max_vel.data = 0.10 self.pub_max_vel.publish(msg_pub_max_vel) while True : is_level_detected, _, _ = self.fnFindLevel() if is_level_detected == True : break else : pass rospy.loginfo("SLOWDOWN!!" ) msg_pub_max_vel.data = 0.04 self.pub_max_vel.publish(msg_pub_max_vel) pub_level_crossing_return.data = self.StepOfLevelCrossing.searching_level.value elif order.data == self.StepOfLevelCrossing.watching_level.value: rospy.loginfo("Now watching_level" ) while True : _, is_level_close, _ = self.fnFindLevel() if is_level_close == True : break else : pass rospy.loginfo("STOP~~" ) msg_pub_max_vel = Float64() msg_pub_max_vel.data = 0.0 self.pub_max_vel.publish(msg_pub_max_vel) pub_level_crossing_return.data = self.StepOfLevelCrossing.watching_level.value elif order.data == self.StepOfLevelCrossing.stop.value: rospy.loginfo("Now stop" ) while True : _, _, is_level_opened = self.fnFindLevel() if is_level_opened == True : break else : pass rospy.loginfo("GO~~" ) msg_pub_max_vel = Float64() msg_pub_max_vel.data = 0.04 self.pub_max_vel.publish(msg_pub_max_vel) pub_level_crossing_return.data = self.StepOfLevelCrossing.stop.value elif order.data == self.StepOfLevelCrossing.pass_level.value: rospy.loginfo("Now pass_level" ) pub_level_crossing_return.data = self.StepOfLevelCrossing.pass_level.value self.pub_level_crossing_return.publish(pub_level_crossing_return)
交通杆检测 这里主要利用blob斑点检测算法,在上一篇文章中《TB3_Autorace之路标检测》 提到的通过设定几个检测指标对掩膜后的图像进行关键点检测,将识别出的色块用黄色圆标识。当检测到3个红色矩形框时,计算关键点的平均坐标以及关键点到平均点的距离列表,通过计算三个红色块之间的距离以及校验三个色块的线性情况判断是否检测到交通杆,将斑点用蓝色线连接标识,计算交通杆的斜率,根据斜率值的大小说明到拦路杆的距离,判断拦路杆的状态;通过计算self.stop_bar_count的值判断交通杆已经收起,小车可以通过,返回交通杆的三个状态(小车减速,停车,通过),代码封装如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 def fnFindRectOfLevel (self, mask) : is_level_detected = False is_level_close = False is_level_opened = False params = cv2.SimpleBlobDetector_Params() params.minThreshold = 0 params.maxThreshold = 255 params.filterByArea = True params.minArea = 200 params.maxArea = 3000 params.filterByCircularity = True params.minCircularity = 0.5 params.filterByConvexity = True params.minConvexity = 0.9 det = cv2.SimpleBlobDetector_create(params) keypts = det.detect(mask) frame = cv2.drawKeypoints(self.cv_image, keypts, np.array([]), (0 , 255 , 255 ), cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS) mean_x = 0.0 mean_y = 0.0 if len(keypts) == 3 : for i in range(3 ): mean_x = mean_x + keypts[i].pt[0 ] / 3 mean_y = mean_y + keypts[i].pt[1 ] / 3 arr_distances = [0 ] * 3 for i in range(3 ): arr_distances[i] = fnCalcDistanceDot2Dot(mean_x, mean_y, keypts[i].pt[0 ], keypts[i].pt[1 ]) idx1, idx2, idx3 = fnArrangeIndexOfPoint(arr_distances) frame = cv2.line(frame, (int(keypts[idx1].pt[0 ]), int(keypts[idx1].pt[1 ])), (int(keypts[idx2].pt[0 ]), int(keypts[idx2].pt[1 ])), (255 , 0 , 0 ), 5 ) frame = cv2.line(frame, (int(mean_x), int(mean_y)), (int(mean_x), int(mean_y)), (255 , 255 , 0 ), 5 ) point1 = [int(keypts[idx1].pt[0 ]), int(keypts[idx1].pt[1 ] - 1 )] point2 = [int(keypts[idx3].pt[0 ]), int(keypts[idx3].pt[1 ] - 1 )] point3 = [int(keypts[idx2].pt[0 ]), int(keypts[idx2].pt[1 ] - 1 )] is_rects_linear = fnCheckLinearity(point1, point2, point3) is_rects_dist_equal = fnCheckDistanceIsEqual(point1, point2, point3) if is_rects_linear == True or is_rects_dist_equal == True : distance_bar2car = 100 / fnCalcDistanceDot2Dot(point1[0 ], point1[1 ], point2[0 ], point2[1 ]) self.stop_bar_count = 40 if distance_bar2car > 0.8 : is_level_detected = True self.stop_bar_state = 'slowdown' self.state = "detected" else : is_level_close = True self.stop_bar_state = 'stop' if self.stop_bar_count > 0 : self.stop_bar_count -= 1 if self.stop_bar_count == 0 : is_level_opened = True self.stop_bar_state = 'go' if self.pub_image_type == "compressed" : self.pub_image_level.publish(self.cvBridge.cv2_to_compressed_imgmsg(frame, "jpg" )) elif self.pub_image_type == "raw" : self.pub_image_level.publish(self.cvBridge.cv2_to_imgmsg(frame, "bgr8" )) return is_level_detected, is_level_close, is_level_opened def cbLevelCrossingFinished (self, level_crossing_finished_msg) : self.is_level_crossing_finished = True def main (self) : rospy.spin()
至此,已经完成对交通杆的检测工作,根据交通杆下放过程中实时计算的杆子斜率指标给小车发布速度控制命令,实现了减速,停车,通过全过程。
附 完整进程源代码:https://github.com/sun-coke/TB3_Autorace