项目需求
我们需要从网上下载小说,并通过 ROS 2 发布一个话题(Topic),每5秒发布一行小说内容。
项目分析
- 下载小说:使用 Python 的
requests
和BeautifulSoup
库从网站获取小说内容。 - 发布消息:使用 ROS 2 话题来传输小说文本。
- 间隔控制:通过 ROS 2 的
Timer
控制每5秒发布一行小说。
步骤一:创建工作空间
首先,创建一个 ROS 2 工作空间,并将相关功能包放入工作空间目录。使用以下命令构建工作空间:

运行后,文件夹应如下所示:

运行代码 colcon build 后会对工作空间中的包进行构建

构建完成后,确保 Package.xml
文件中添加了必要的依赖项,特别是需要导入的 String
消息类型。
打开Package.xml 中找到depend(依赖)部分

运行下列代码我们能在example_interface中找到String 这个用于展示字符串系统
步骤二:代码实现
1. 下载小说
通过 requests
获取网页内容,使用 BeautifulSoup
解析并提取纯文本
注:此部分Jackson根据实际电脑情况对代码进行了调整,未按照指导视频一步一步来:
在demo_python_topic下创建novel_pub_node.py
在topic_ws 下创建 download.py
这里的思路是先通过download生成正确格式的Novel.txt,再把novel_pub_node中的代码进行调整,如果能识别到存在的txt,则导入;如果识别不到再去网站下载。
局限性:
1) 这里如果直接在novel_pub_node.py中运行,导入的文件总是包含html格式,即无法生成仅有文字的文件,这是Jackson的局限性。
2) 应该是电脑硬件原因,下载1000多字的文本下载时间过长,因此这里只取了前200多字。
Download代码如下:
import requests
from bs4 import BeautifulSoup
# 目标 URL (这里可替换为实际的)
url = 'https://www.jjjxsw.com/read/47/50945/1.html'
# 发起 GET 请求下载网页
response = requests.get(url)
# 检查响应状态码,确保下载成功
if response.status_code == 200:
# 使用 BeautifulSoup 解析 HTML 内容
soup = BeautifulSoup(response.text, 'html.parser')
# 提取网页中的纯文本
text = soup.get_text(strip=True)
# 如果你只想获取前200个字符,可以限制文本长度
preview_text = text[:200]
# 输出前200个字符
print(f"前200个字符:\n{preview_text}")
# 保存到本地 txt 文件,(这里文件名novel.txt可替换)
with open('novel.txt', 'w', encoding='utf-8') as file:
file.write(preview_text) # 将提取的纯文本内容写入文件
print("前200个字符已保存为 'novel.txt'!")
else:
# 如果请求失败,输出错误状态码
print(f"下载失败,状态码: {response.status_code}")
Novel_pub_node代码如下:
import rclpy
from rclpy.node import Node
import requests
from example_interfaces.msg import String
from queue import Queue
import os
class NovelPubNode(Node):
def __init__(self, node_name):
# 初始化父类 Node
super().__init__(node_name)
# 创建队列,用来存放小说的每一行文本
self.novels_queue_ = Queue()
# 创建一个发布者,消息类型为 String,话题名称为 'novel',队列大小为 10
self.novel_publisher_ = self.create_publisher(String, 'novel', 10)
# 创建一个定时器,每 5 秒调用一次 `timer_callback` 方法
self.timer = self.create_timer(5, self.timer_callback)
def timer_callback(self):
# 检查队列中是否有数据
if self.novels_queue_.qsize() > 0:
# 从队列中获取一行文本
line = self.novels_queue_.get()
# 实例化一个 String 类型的消息
msg = String()
# 将获取的行文本赋值给消息
msg.data = line
# 发布该消息
self.novel_publisher_.publish(msg)
# 打印日志,记录发布了哪一行
self.get_logger().info(f'发布了一行小说: {msg}')
def download_novel(self, url):
# 检查文件是否已经存在
if os.path.exists('/home/jackson/chapt3/novel.txt'):
# 如果文件存在,则从本地文件中读取
with open('/home/jackson/chapt3/novel.txt', 'r', encoding='utf-8') as file:
text = file.read()
# 打印日志,显示文件内容的字数
self.get_logger().info(f'从文件读取完成,字数: {len(text.split())}')
else:
# 如果文件不存在,从网页下载小说内容
response = requests.get(url)
# 设置响应编码为 'utf-8'
response.encoding = 'utf-8'
# 获取网页的文本内容
text = response.text
# 计算下载的文本字数
word_count = len(text.split())
# 打印日志,显示下载完成的 URL 和字数
self.get_logger().info(f'下载完成: {url},{len(text)},字数: {word_count}')
# 将下载的文本保存到本地文件
with open('/home/jackson/chapt3/novel.txt', 'w', encoding='utf-8') as file:
file.write(text)
# 打印日志,表示文件已保存
self.get_logger().info(f'网页内容已保存')
# 将文本按行分割,并将每一行放入队列
for line in text.splitlines():
self.novels_queue_.put(line)
def main():
# 初始化 ROS 2 节点
rclpy.init()
# 创建一个 NovelPubNode 节点,节点名称为 'novel_pub'
node = NovelPubNode('novel_pub')
# 下载小说并将内容保存到文件中
node.download_novel('https://www.jjjxsw.com/read/47/50945/1.html')
# 启动节点,开始处理消息
rclpy.spin(node)
# 关闭节点
rclpy.shutdown()
这段代码易错点(Jackson 亲测15分钟出现的6个BUG)
1)

特别注意:set up中间没有空格
2)

特别注意:这种错误是rclpy无法解析导入,可以先试试重启虚拟机,如果不行就点击下面链接进去看看https://fishros.org.cn/forum/topic/2693/rclpy%E6%9C%AA%E5%AF%BC%E5%85%A5

3)

特别注意要重新执行3件套 colcon build
4)
ModuleNotFoundError: No module named ‘rcply’ [ros2run]: Process exited with failure 1
特别注意是rclpy不是rcply
5)
AttributeError: module ‘demo_python_topic.novel_pub_node’ has no attribute ‘main’ [ros2run]: Process exited with failure 1
特别注意def main的缩进问题
6)
注意:setup中要调整
entry_points={
‘console_scripts’: [
‘novel_pub_node = demo_python_topic.novel_pub_node:main’,
],
在 ROS 2 的 setup.py 配置中,entry_points 主要用于注册 ROS 2 节点,使其能够被 ros2 run 命令执行。
你应该在 以下情况下 调整 entry_points:
- 添加新节点(新增 .py 文件)
- 重命名或移动现有节点文件
- 修改 Python 脚本的 main() 入口函数
至此我们已经能基本实现小说下载器的发布与订阅
步骤三:怎么发出声音
核心:espeakng 三段式下载安装语音合成引擎
Sudo apt install python3-pip -y
Sudo apt install espeak-ng -y
Pip3 install espeakng
同样创建一个novel_sub_node.py 并在setup中添加配置(entry_points)
import rclpy
from rclpy.node import Node
from example_interfaces.msg import String
from queue import Queue
import threading
import time
import espeakng
class NovelSubNode(Node):
def __init__(self, node_name):
super().__init__(node_name)
self.get_logger().info(f'{node_name}, 启动!')
self.novels_queue_ = Queue() # 创建队列,存放小说
self.novel_subscriber_ = self.create_subscription(
String, 'novel', self.novel_callback, 10)
self.speech_thread_ = threading.Thread(target=self.speak_thread)
self.speech_thread_.start()
def novel_callback(self, msg):
self.novels_queue_.put(msg.data)
def speak_thread(self):
speaker = espeakng.Speaker()
speaker.voice = 'zh' # 设置语音为中文
while rclpy.ok():
if self.novels_queue_.qsize() > 0:
text = self.novels_queue_.get()
self.get_logger().info(f'正在朗读: {text}')
speaker.say(text)
speaker.wait()
else:
time.sleep(1)
def main(args=None):
rclpy.init(args=args)
node = NovelSubNode("novel_read")
rclpy.spin(node)
rclpy.shutdown()
这里涉及多线程知识,详情见参考视频和资料
随后直接testing 开两个终端分别运行3段式和run后即可实现功能
参考资料