ROS2 项目一:小说阅读播放器

项目需求

我们需要从网上下载小说,并通过 ROS 2 发布一个话题(Topic),每5秒发布一行小说内容。

项目分析

  1. 下载小说:使用 Python 的 requests BeautifulSoup 库从网站获取小说内容。
  2. 发布消息:使用 ROS 2 话题来传输小说文本。
  3. 间隔控制:通过 ROS 2 的 Timer 控制每5秒发布一行小说。

步骤一:创建工作空间

首先,创建一个 ROS 2 工作空间,并将相关功能包放入工作空间目录。使用以下命令构建工作空间:

运行后,文件夹应如下所示:

运行代码 colcon build 后会对工作空间中的包进行构建

构建完成后,确保 Package.xml 文件中添加了必要的依赖项,特别是需要导入的 String 消息类型。

打开Package.xml 中找到depend(依赖)部分

运行下列代码我们能在example_interface中找到String 这个用于展示字符串系统

步骤二:代码实现

1. 下载小说

通过 requests 获取网页内容,使用 BeautifulSoup 解析并提取纯文本

注:此部分Jackson根据实际电脑情况对代码进行了调整,未按照指导视频一步一步来:

在demo_python_topic下创建novel_pub_node.py

在topic_ws 下创建 download.py

这里的思路是先通过download生成正确格式的Novel.txt,再把novel_pub_node中的代码进行调整,如果能识别到存在的txt,则导入;如果识别不到再去网站下载。

局限性:

1) 这里如果直接在novel_pub_node.py中运行,导入的文件总是包含html格式,即无法生成仅有文字的文件,这是Jackson的局限性。

2) 应该是电脑硬件原因,下载1000多字的文本下载时间过长,因此这里只取了前200多字。

Download代码如下:

import requests
from bs4 import BeautifulSoup

# 目标 URL (这里可替换为实际的)
url = 'https://www.jjjxsw.com/read/47/50945/1.html'

# 发起 GET 请求下载网页
response = requests.get(url)

# 检查响应状态码,确保下载成功
if response.status_code == 200:
    # 使用 BeautifulSoup 解析 HTML 内容
    soup = BeautifulSoup(response.text, 'html.parser')

    # 提取网页中的纯文本
    text = soup.get_text(strip=True)

    # 如果你只想获取前200个字符,可以限制文本长度
    preview_text = text[:200]

    # 输出前200个字符
    print(f"前200个字符:\n{preview_text}")

    # 保存到本地 txt 文件,(这里文件名novel.txt可替换)
    with open('novel.txt', 'w', encoding='utf-8') as file:
        file.write(preview_text)  # 将提取的纯文本内容写入文件

    print("前200个字符已保存为 'novel.txt'!")
else:
    # 如果请求失败,输出错误状态码
    print(f"下载失败,状态码: {response.status_code}")

Novel_pub_node代码如下:

import rclpy 
from rclpy.node import Node
import requests
from example_interfaces.msg import String
from queue import Queue
import os

class NovelPubNode(Node):
    def __init__(self, node_name):
        # 初始化父类 Node
        super().__init__(node_name)
        # 创建队列,用来存放小说的每一行文本
        self.novels_queue_ = Queue()  
        # 创建一个发布者,消息类型为 String,话题名称为 'novel',队列大小为 10
        self.novel_publisher_ = self.create_publisher(String, 'novel', 10)  
        # 创建一个定时器,每 5 秒调用一次 `timer_callback` 方法
        self.timer = self.create_timer(5, self.timer_callback)  
    
    def timer_callback(self):
        # 检查队列中是否有数据
        if self.novels_queue_.qsize() > 0:  
            # 从队列中获取一行文本
            line = self.novels_queue_.get()
            # 实例化一个 String 类型的消息
            msg = String()
            # 将获取的行文本赋值给消息
            msg.data = line
            # 发布该消息
            self.novel_publisher_.publish(msg)
            # 打印日志,记录发布了哪一行
            self.get_logger().info(f'发布了一行小说: {msg}')
    
    def download_novel(self, url):
        # 检查文件是否已经存在
        if os.path.exists('/home/jackson/chapt3/novel.txt'):
            # 如果文件存在,则从本地文件中读取
            with open('/home/jackson/chapt3/novel.txt', 'r', encoding='utf-8') as file:
                text = file.read()
            # 打印日志,显示文件内容的字数
            self.get_logger().info(f'从文件读取完成,字数: {len(text.split())}')
        else:
            # 如果文件不存在,从网页下载小说内容
            response = requests.get(url)
            # 设置响应编码为 'utf-8'
            response.encoding = 'utf-8'
            # 获取网页的文本内容
            text = response.text
            # 计算下载的文本字数
            word_count = len(text.split())
            # 打印日志,显示下载完成的 URL 和字数
            self.get_logger().info(f'下载完成: {url},{len(text)},字数: {word_count}')
        
            # 将下载的文本保存到本地文件
            with open('/home/jackson/chapt3/novel.txt', 'w', encoding='utf-8') as file:
                file.write(text)
            # 打印日志,表示文件已保存
            self.get_logger().info(f'网页内容已保存')

        # 将文本按行分割,并将每一行放入队列
        for line in text.splitlines():
            self.novels_queue_.put(line)

    
def main():
    # 初始化 ROS 2 节点
    rclpy.init()
    # 创建一个 NovelPubNode 节点,节点名称为 'novel_pub'
    node = NovelPubNode('novel_pub')
    # 下载小说并将内容保存到文件中
    node.download_novel('https://www.jjjxsw.com/read/47/50945/1.html')
    # 启动节点,开始处理消息
    rclpy.spin(node)
    # 关闭节点
    rclpy.shutdown()

这段代码易错点(Jackson 亲测15分钟出现的6个BUG)

1)

特别注意:set up中间没有空格

2)

特别注意:这种错误是rclpy无法解析导入,可以先试试重启虚拟机,如果不行就点击下面链接进去看看https://fishros.org.cn/forum/topic/2693/rclpy%E6%9C%AA%E5%AF%BC%E5%85%A5

3)

特别注意要重新执行3件套 colcon build

4)

ModuleNotFoundError: No module named ‘rcply’ [ros2run]: Process exited with failure 1

特别注意是rclpy不是rcply

5)

AttributeError: module ‘demo_python_topic.novel_pub_node’ has no attribute ‘main’ [ros2run]: Process exited with failure 1

特别注意def main的缩进问题

6)

注意:setup中要调整

entry_points={

        ‘console_scripts’: [

            ‘novel_pub_node = demo_python_topic.novel_pub_node:main’,

        ],

在 ROS 2 的 setup.py 配置中,entry_points 主要用于注册 ROS 2 节点,使其能够被 ros2 run 命令执行。

你应该在 以下情况下 调整 entry_points

  1. 添加新节点(新增 .py 文件)
  2. 重命名或移动现有节点文件
  3. 修改 Python 脚本的 main() 入口函数

至此我们已经能基本实现小说下载器的发布与订阅

步骤三:怎么发出声音

核心:espeakng 三段式下载安装语音合成引擎

Sudo apt install python3-pip -y
Sudo apt install espeak-ng -y
Pip3 install espeakng

同样创建一个novel_sub_node.py 并在setup中添加配置(entry_points)

import rclpy
from rclpy.node import Node
from example_interfaces.msg import String
from queue import Queue
import threading
import time
import espeakng

class NovelSubNode(Node):
    def __init__(self, node_name):
        super().__init__(node_name)
        self.get_logger().info(f'{node_name}, 启动!')
        self.novels_queue_ = Queue()  # 创建队列,存放小说
        self.novel_subscriber_ = self.create_subscription(
            String, 'novel', self.novel_callback, 10)
        self.speech_thread_ = threading.Thread(target=self.speak_thread)
        self.speech_thread_.start()

    def novel_callback(self, msg):
        self.novels_queue_.put(msg.data)

    def speak_thread(self):
        speaker = espeakng.Speaker()
        speaker.voice = 'zh'  # 设置语音为中文
        while rclpy.ok():
            if self.novels_queue_.qsize() > 0:
                text = self.novels_queue_.get()
                self.get_logger().info(f'正在朗读: {text}')
                speaker.say(text)
                speaker.wait()
            else:
                time.sleep(1)

def main(args=None):
    rclpy.init(args=args)
    node = NovelSubNode("novel_read")
    rclpy.spin(node)
    rclpy.shutdown()

这里涉及多线程知识,详情见参考视频和资料

随后直接testing 开两个终端分别运行3段式和run后即可实现功能

参考资料

《ROS 2机器人开发从入门到实践》3.2.1通过话题发布小说_哔哩哔哩_bilibili

多线程(看这一篇就够了,超详细,满满的干货)_csdn线程-CSDN博客

发表评论