栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

Python-PyQt-ROS通信

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Python-PyQt-ROS通信

Python-PyQt-ROS通信
  • 前言
  • 一、安装并引入库
  • 二、使用方法
    • 创建一个对象连接ROS
    • 运行
    • 检测连接状态
    • 断开连接
    • 订阅话题
    • 发布话题
    • 使用服务
    • 活动客户端
  • 三、简单的ROS通信建立
  • 总结

前言

建立与开源机器人系统ROS的通信,并基于QT创建应用程序,用于操作机器人。
由于ROS环境搭建过于繁琐,在这里仅仅是与ROS建立通信进行交互,因此,采用python的roslibpy库来进行通信连接。

Python ROS Bridge 库允许使用 Python 和 IronPython 与开源机器人中间件ROS进行交互。它使用 WebSockets 连接到 rosbridge 2.0并提供发布、订阅、服务调用、actionlib、TF 和其他基本 ROS 功能。
与rospy库不同,它不需要本地 ROS 环境,允许从 Linux 以外的平台使用。

roslibpy库详细内容见官网https://roslibpy.readthedocs.io/en/latest/
中文文档见https://roslibpy-docs-zh.readthedocs.io/zh/latest/
建议大家看官网,能够看到其具体实现。

一、安装并引入库

安装,命令行:

pip install roslibpy

引入库:

import roslibpy
二、使用方法 创建一个对象连接ROS
#连接ROS
self.ros = roslibpy.Ros(host=self.host, port=self.port)
运行
self.ros.run(timeout=2)
检测连接状态
if self.ros and self.ros.is_connected:
    self.ros.close()
断开连接
self.ros.close() #断开 websocket 连接. 在连接被关闭后,通过调用
self.ros.connect() #还可以再次连接。
self.ros.terminate() #终止主事件循环。如果连接还是打开着的,那么就会首先关闭它订阅一个消息
订阅话题
#订阅话题
self.get_data = roslibpy.Topic(self.ros, "/slamware_ros_sdk_server_node/odom", "nav_msgs/Odometry")
self.get_data.subscribe(callback=self.setRobotdata) #收到数据,执行回调

# 接收数据,并处理及执行其它动作
def setRobotdata(self, data):
#
    pass
发布话题
#发布话题
self.move_to = roslibpy.Topic(self.ros, "/slamware_ros_sdk_server_node/move_to",
                              "slamware_ros_sdk/MoveToRequest")
#发送数据
self.move_to.publish(roslibpy.Message(数据内容))
使用服务
#使用服务
self.get_last_map = roslibpy.Service(self.ros, "/slamware_ros_sdk_server_node/get_last_map",
                                      "slamware_ros_sdk/GetLastMap")
#发送请求并等待响应
result = self.get_last_map.call(roslibpy.ServiceRequest({请求内容}), timeout=10) 
#result为响应数据
活动客户端
#客户端使用ROS活动,
self.move_to_action = actionlib.ActionClient(self.ros, "/MoveActionServer", "slamware_ros_sdk/Move__Action")
goal = actionlib.Goal(self.move_to_action,roslibpy.Message(数据内容))
goal.send(result_callback=s_callback)  #发送成功,执行回调s_callback         
三、简单的ROS通信建立

部分代码:

import roslibpy, time
from enum import Enum
from PyQt5.QtCore import pyqtSignal, QThread
from roslibpy import actionlib
from roslibpy.actionlib import ActionClient

class RobotClient(QThread):
    move_to_action: ActionClient
    
    #创建信号
    map_ready = pyqtSignal(dict)
    robot_pos_ready = pyqtSignal(dict)
    scan_ready = pyqtSignal(dict)
    plan_path_ready = pyqtSignal(dict)
    robot_basic_state_ready = pyqtSignal(dict)
    virtual_walls_ready = pyqtSignal(dict)
    stoptask = pyqtSignal(bool)
    error_happened = pyqtSignal(str)

    connect_robot = pyqtSignal(bool)
    map_robot = pyqtSignal()
    map_loal = pyqtSignal()
    mgsdisplay = pyqtSignal()
    
    def __init__(self, parent=None):
        super(RobotClient, self).__init__(parent)
        self.host = None
        self.port = None
        self.ros = None

    def __del__(self):
        if self.ros and self.ros.is_connected:
            self.ros.close()
	# ==========================初始化ROS连接==========================
    def __initRos(self):
        try:
            #连接ROS
            self.ros = roslibpy.Ros(host=self.host, port=self.port)
            #订阅地图数据
            self.get_map = roslibpy.Topic(self.ros, "/slamware_ros_sdk_server_node/map", "nav_msgs/OccupancyGrid")
            self.get_map.subscribe(callback=self.setMap)
            #订阅位姿数据
            self.get_odom = roslibpy.Topic(self.ros, "/slamware_ros_sdk_server_node/odom", "nav_msgs/Odometry")
            self.get_odom.subscribe(callback=self.setRobotPos)
            #订阅激光数据
            self.get_scan = roslibpy.Topic(self.ros, "/slamware_ros_sdk_server_node/scan", "sensor_msgs/LaserScan")
            self.get_scan.subscribe(callback=self.setScan)
            #订阅路径数据
            self.get_plan_path = roslibpy.Topic(self.ros, "/slamware_ros_sdk_server_node/global_plan_path",
                                                "nav_msgs/Path")
          	#请求获取最新的地图
            self.get_last_map = roslibpy.Service(self.ros, "/slamware_ros_sdk_server_node/get_last_map",
                                                 "slamware_ros_sdk/GetLastMap")
         except Exception as e:
            print(e)
#被其他类引用
    def isConnect(self):
        if self.ros and self.ros.is_connected:
            return True
        else:
            return False  
            
   def setRos(self, host, port):
    try:
        if self.host != host or self.port != port:
            # 初始/切换
            if self.ros:
                # 切换的时候,结束上一次的连接,这里使用该库的on_ready方法的具体实现的部分代码
                def _wrapper_callback(proto):
                    proto.send_close()
                    return proto
                self.ros.factory.on_readyon_(_wrapper_callback)
            self.host = host
            self.port = port
            self.__initRos()
            return True
        else:
            if not self.ros.is_connected:
                # 重连
                return True
            else:
                return False
    except Exception as e:
        print(e)
        return False 
   # ==========================开启ROS,被其他类引用==========================
    def start_run(self):
        try:
            self.ros.run(timeout=2)
            #客户端使用ROS活动,
            self.move_to_action = actionlib.ActionClient(self.ros, "/MoveActionServer", "slamware_ros_sdk/Move__Action")
            return True
        except Exception as e:
            print(e)
            return False 
   
   #=========Robot线程=======================================================
    def run(self):
        # --------------------------点击了连接机器人--------------------------
        # while True:
        if self.parent().connectrobot:
            zRobot = self.parent().connectrobot
            if self.setRos(host=zRobot["Host"], port=zRobot["Port"]):
                self.succend = self.start_run()
                self.parent().connectrobot = None
                self.connect_robot.emit(self.succend)
                   
   # 位姿数据
    def setRobotPos(self, msg):
        # 位姿数据
        self.msg = msg
        self.robot_pos_ready.emit(msg)
        # 激光数据
    def setScan(self, msg):
        # 激光数据
        self.scan_ready.emit(msg)
    # 路径数据
    def setPlanPath(self, msg):
        # 路径数据
         self.plan_path_ready.emit(msg)
	# 获取最新的地图
    def doGetLastMap(self):
        # 获取最新的地图
        try:
            result = self.get_last_map.call(roslibpy.ServiceRequest({}), timeout=10)
            return result
        except Exception as e:
            print(e)
            return False
  	#ROS活动
	def doMoveToAction(self, point, s_callback, e_callback):
	#ROS活动
	 try:
	     goal = actionlib.Goal(self.move_to_action,
	                           roslibpy.Message(
	                               {"locations": [{"x": point.PositionX, "y": point.PositionY, "z": point.PositionZ}],
	                                "options": {"opt_flags": {"flags": 0x00000080},
	                                            "speed_ratio": {"is_valid": False, "value": 0}},
	                                "yaw": 0}))
	     goal.send(result_callback=s_callback)
	 except Exception as e:
	     print(e)                                       

roslibpy库on_ready的具体实现:

def on_ready(self, callback, run_in_thread=True):
        """Add a callback to be executed when the connection is established.

        If a connection to ROS is already available, the callback is executed immediately.

        Args:
            callback: Callable function to be invoked when ROS connection is ready.
            run_in_thread (:obj:`bool`): True to run the callback in a separate thread, False otherwise.
        """
        def _wrapper_callback(proto):
            if run_in_thread:
                self.factory.manager.call_in_thread(callback)
            else:
                callback()

            return proto

        self.factory.on_ready(_wrapper_callback)
总结

上面是我运用roslibpy库,建立的ROS通信。
如有错误希望请大家指导,谢谢点赞!
希望和大家一起学习,交流!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/503626.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号