Super Qwen Voice World实现内网穿透语音监控方案
Super Qwen Voice World实现内网穿透语音监控方案
最近在做一个智能家居项目,需要实时监控家里的声音情况,比如老人摔倒的声响、婴儿哭声或者异常闯入的声音。但问题来了,这些设备都在家里的内网里,我在外面怎么实时听到这些声音呢?
传统的解决方案要么需要复杂的端口映射,要么得买昂贵的云服务。后来我发现用Super Qwen Voice World配合一些开源工具,就能轻松搭建一套内网穿透的语音监控系统,而且完全免费。
这套方案的核心思路很简单:在内网设备上部署语音识别服务,通过内网穿透工具把语音流实时推送到外网,然后在外网设备上就能实时听到内网的声音了。听起来是不是很酷?下面我就手把手教你如何实现。
1. 环境准备与快速部署
1.1 系统要求
首先确保你的设备满足以下要求:
- 操作系统:Ubuntu 20.04或更高版本(其他Linux发行版也可以,但命令可能略有不同)
- Python版本:Python 3.8或更高版本
- 内存:至少2GB可用内存
- 存储空间:至少5GB可用空间
- 网络:内网设备需要有稳定的网络连接
1.2 安装Super Qwen Voice World
Super Qwen Voice World是阿里云通义千问的语音模型,支持实时语音识别和合成。我们先来安装它:
# 创建项目目录
mkdir voice-monitor && cd voice-monitor
# 创建虚拟环境
python -m venv venv
source venv/bin/activate
# 安装必要的依赖
pip install dashscope pyaudio websocket-client numpy
如果你在安装pyaudio时遇到问题,可以试试这些命令:
# Ubuntu/Debian系统
sudo apt-get install portaudio19-dev python3-pyaudio
# CentOS/RHEL系统
sudo yum install portaudio-devel
pip install pyaudio
# macOS系统
brew install portaudio
pip install pyaudio
1.3 获取API密钥
要使用Super Qwen Voice World,你需要一个阿里云的API密钥:
- 访问阿里云官网,注册账号
- 进入控制台,找到"模型服务"或"通义千问"
- 创建API密钥,复制保存好
设置环境变量:
# 将你的API密钥添加到环境变量
export DASHSCOPE_API_KEY="你的API密钥"
# 如果想永久保存,可以添加到~/.bashrc
echo 'export DASHSCOPE_API_KEY="你的API密钥"' >> ~/.bashrc
source ~/.bashrc
2. 搭建内网穿透通道
2.1 为什么需要内网穿透?
内网设备(比如家里的树莓派)通常没有公网IP,外网设备无法直接访问。内网穿透就是建立一个"隧道",让外网能够访问内网的服务。
市面上有很多内网穿透工具,我推荐用frp,因为它开源、稳定、配置简单。
2.2 安装和配置frp
首先在外网服务器(比如云服务器)上安装frp服务端:
# 下载frp
wget https://github.com/fatedier/frp/releases/download/v0.51.3/frp_0.51.3_linux_amd64.tar.gz
# 解压
tar -zxvf frp_0.51.3_linux_amd64.tar.gz
cd frp_0.51.3_linux_amd64
# 配置服务端
cat > frps.ini << EOF
[common]
bind_port = 7000
token = your_secure_token_here
# Web管理界面
dashboard_port = 7500
dashboard_user = admin
dashboard_pwd = admin123
# 日志配置
log_file = ./frps.log
log_level = info
log_max_days = 3
EOF
# 启动服务端
./frps -c frps.ini
然后在你的内网设备上安装frp客户端:
# 同样下载frp
wget https://github.com/fatedier/frp/releases/download/v0.51.3/frp_0.51.3_linux_arm64.tar.gz
tar -zxvf frp_0.51.3_linux_arm64.tar.gz
cd frp_0.51.3_linux_arm64
# 配置客户端
cat > frpc.ini << EOF
[common]
server_addr = 你的服务器公网IP
server_port = 7000
token = your_secure_token_here
[voice-monitor]
type = tcp
local_ip = 127.0.0.1
local_port = 8000
remote_port = 8000
[voice-stream]
type = tcp
local_ip = 127.0.0.1
local_port = 9000
remote_port = 9000
EOF
# 启动客户端
./frpc -c frpc.ini
这样配置后,外网设备就可以通过服务器IP:8000访问内网的语音识别服务,通过服务器IP:9000访问语音流了。
3. 实现语音监控核心功能
3.1 实时语音采集与识别
现在我们来写一个Python脚本,实时采集麦克风声音并识别:
# voice_monitor.py
import pyaudio
import dashscope
import json
import base64
import threading
import queue
import time
import os
from datetime import datetime
class VoiceMonitor:
def __init__(self, api_key=None):
# 设置API密钥
if api_key:
dashscope.api_key = api_key
elif 'DASHSCOPE_API_KEY' in os.environ:
dashscope.api_key = os.environ['DASHSCOPE_API_KEY']
else:
raise ValueError("请设置DASHSCOPE_API_KEY环境变量或传入api_key参数")
# 音频参数
self.CHUNK = 1024 # 每次读取的音频数据大小
self.FORMAT = pyaudio.paInt16 # 音频格式
self.CHANNELS = 1 # 单声道
self.RATE = 16000 # 采样率
# 初始化音频设备
self.audio = pyaudio.PyAudio()
# 创建队列用于音频数据传输
self.audio_queue = queue.Queue()
self.text_queue = queue.Queue()
# 控制标志
self.is_recording = False
self.is_processing = False
def start_monitoring(self):
"""开始监控"""
print(f"[{datetime.now()}] 开始语音监控...")
# 启动录音线程
self.is_recording = True
record_thread = threading.Thread(target=self._record_audio)
record_thread.daemon = True
record_thread.start()
# 启动处理线程
self.is_processing = True
process_thread = threading.Thread(target=self._process_audio)
process_thread.daemon = True
process_thread.start()
# 启动显示线程
display_thread = threading.Thread(target=self._display_results)
display_thread.daemon = True
display_thread.start()
return record_thread, process_thread, display_thread
def _record_audio(self):
"""录音线程函数"""
stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK
)
print(f"[{datetime.now()}] 录音设备已启动")
try:
while self.is_recording:
# 读取音频数据
data = stream.read(self.CHUNK, exception_on_overflow=False)
# 将数据放入队列
self.audio_queue.put(data)
# 控制队列大小,避免内存溢出
if self.audio_queue.qsize() > 100:
self.audio_queue.get()
except Exception as e:
print(f"[{datetime.now()}] 录音错误: {e}")
finally:
stream.stop_stream()
stream.close()
def _process_audio(self):
"""处理音频线程函数"""
audio_buffer = b""
buffer_duration = 2 # 每2秒处理一次
while self.is_processing:
try:
# 从队列获取音频数据
if not self.audio_queue.empty():
chunk = self.audio_queue.get(timeout=0.1)
audio_buffer += chunk
# 如果积累了足够时长的音频,进行识别
buffer_length = len(audio_buffer) / (self.RATE * 2) # 计算时长
if buffer_length >= buffer_duration:
# 调用语音识别API
try:
response = dashscope.audio.asr.Recognizer.call(
model='qwen3-asr-flash-realtime',
format='pcm',
sample_rate=16000,
audio_data=audio_buffer
)
if response.status_code == 200:
text = response.output['text']
if text.strip(): # 只处理非空文本
self.text_queue.put({
'timestamp': datetime.now().strftime('%H:%M:%S'),
'text': text,
'confidence': response.output.get('confidence', 0.8)
})
# 检测关键词
self._check_keywords(text)
except Exception as e:
print(f"[{datetime.now()}] 识别错误: {e}")
# 清空缓冲区
audio_buffer = b""
except queue.Empty:
continue
except Exception as e:
print(f"[{datetime.now()}] 处理错误: {e}")
def _check_keywords(self, text):
"""检测关键词,触发告警"""
keywords = {
'help': ['救命', '帮帮我', '救救我', '快来人'],
'fall': ['摔倒', '跌倒', '摔倒了', '跌倒了'],
'baby': ['宝宝哭', '婴儿哭', '孩子哭', '哇哇哭'],
'intruder': ['谁在那', '有人吗', '谁进来了', '小偷']
}
text_lower = text.lower()
for category, words in keywords.items():
for word in words:
if word in text_lower:
print(f"[{datetime.now()}] 检测到{category}关键词: '{word}'")
self._send_alert(category, text)
break
def _send_alert(self, category, text):
"""发送告警"""
alert_message = f"[{datetime.now()}] 告警类型: {category}\n识别内容: {text}"
print(f"\n{'='*50}")
print("🚨 检测到异常情况!")
print(alert_message)
print(f"{'='*50}\n")
# 这里可以添加发送通知的代码
# 比如发送邮件、短信、微信通知等
def _display_results(self):
"""显示识别结果"""
while True:
try:
if not self.text_queue.empty():
result = self.text_queue.get(timeout=1)
print(f"[{result['timestamp']}] 识别: {result['text']}")
except queue.Empty:
continue
def stop(self):
"""停止监控"""
self.is_recording = False
self.is_processing = False
self.audio.terminate()
print(f"[{datetime.now()}] 语音监控已停止")
# 使用示例
if __name__ == "__main__":
monitor = VoiceMonitor()
try:
threads = monitor.start_monitoring()
print("语音监控已启动,按Ctrl+C停止...")
# 保持主线程运行
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n正在停止监控...")
monitor.stop()
print("监控已停止")
3.2 语音流实时传输
为了让外网也能听到内网的声音,我们需要建立一个语音流服务器:
# voice_stream_server.py
import socket
import threading
import pyaudio
import time
import queue
import json
class VoiceStreamServer:
def __init__(self, host='0.0.0.0', port=9000):
self.host = host
self.port = port
self.clients = []
self.audio_queue = queue.Queue()
# 音频参数
self.CHUNK = 1024
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 16000
# 初始化音频
self.audio = pyaudio.PyAudio()
def start_server(self):
"""启动语音流服务器"""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((self.host, self.port))
server_socket.listen(5)
print(f"语音流服务器启动在 {self.host}:{self.port}")
# 启动音频采集线程
audio_thread = threading.Thread(target=self._capture_audio)
audio_thread.daemon = True
audio_thread.start()
# 启动广播线程
broadcast_thread = threading.Thread(target=self._broadcast_audio)
broadcast_thread.daemon = True
broadcast_thread.start()
# 接受客户端连接
while True:
client_socket, client_address = server_socket.accept()
print(f"新客户端连接: {client_address}")
# 将客户端加入列表
self.clients.append(client_socket)
# 为每个客户端启动一个线程
client_thread = threading.Thread(
target=self._handle_client,
args=(client_socket, client_address)
)
client_thread.daemon = True
client_thread.start()
def _capture_audio(self):
"""采集音频数据"""
stream = self.audio.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK
)
print("开始采集音频...")
try:
while True:
# 读取音频数据
data = stream.read(self.CHUNK, exception_on_overflow=False)
# 将数据放入队列
self.audio_queue.put(data)
except Exception as e:
print(f"音频采集错误: {e}")
finally:
stream.stop_stream()
stream.close()
def _broadcast_audio(self):
"""广播音频数据给所有客户端"""
while True:
try:
# 从队列获取音频数据
audio_data = self.audio_queue.get(timeout=1)
# 发送给所有客户端
clients_to_remove = []
for client in self.clients:
try:
# 发送音频数据长度(4字节)
data_len = len(audio_data)
client.sendall(data_len.to_bytes(4, 'big'))
# 发送音频数据
client.sendall(audio_data)
except (ConnectionResetError, BrokenPipeError):
print(f"客户端断开连接")
clients_to_remove.append(client)
except Exception as e:
print(f"发送数据错误: {e}")
clients_to_remove.append(client)
# 移除断开连接的客户端
for client in clients_to_remove:
if client in self.clients:
self.clients.remove(client)
client.close()
except queue.Empty:
continue
except Exception as e:
print(f"广播错误: {e}")
def _handle_client(self, client_socket, client_address):
"""处理客户端连接"""
try:
# 发送音频参数给客户端
params = {
'format': self.FORMAT,
'channels': self.CHANNELS,
'rate': self.RATE,
'chunk': self.CHUNK
}
params_json = json.dumps(params).encode('utf-8')
client_socket.sendall(len(params_json).to_bytes(4, 'big'))
client_socket.sendall(params_json)
# 保持连接
while True:
# 接收心跳包
try:
client_socket.settimeout(10)
data = client_socket.recv(1)
if not data:
break
except socket.timeout:
# 发送心跳包
client_socket.sendall(b'\x00')
except Exception as e:
print(f"处理客户端 {client_address} 错误: {e}")
finally:
if client_socket in self.clients:
self.clients.remove(client_socket)
client_socket.close()
print(f"客户端 {client_address} 断开连接")
def stop(self):
"""停止服务器"""
for client in self.clients:
client.close()
self.clients.clear()
self.audio.terminate()
print("语音流服务器已停止")
if __name__ == "__main__":
server = VoiceStreamServer()
try:
server.start_server()
except KeyboardInterrupt:
print("\n正在停止服务器...")
server.stop()
print("服务器已停止")
3.3 客户端接收语音流
外网设备上需要运行客户端来接收语音流:
# voice_stream_client.py
import socket
import pyaudio
import json
import threading
import time
class VoiceStreamClient:
def __init__(self, server_host, server_port):
self.server_host = server_host
self.server_port = server_port
self.audio_params = None
self.is_connected = False
def connect(self):
"""连接到语音流服务器"""
try:
# 创建socket连接
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.connect((self.server_host, self.server_port))
print(f"已连接到服务器 {self.server_host}:{self.server_port}")
# 接收音频参数
params_len = int.from_bytes(self.client_socket.recv(4), 'big')
params_data = self.client_socket.recv(params_len)
self.audio_params = json.loads(params_data.decode('utf-8'))
print(f"音频参数: {self.audio_params}")
# 初始化音频播放
self.audio = pyaudio.PyAudio()
self.stream = self.audio.open(
format=self.audio_params['format'],
channels=self.audio_params['channels'],
rate=self.audio_params['rate'],
output=True,
frames_per_buffer=self.audio_params['chunk']
)
self.is_connected = True
# 启动接收线程
receive_thread = threading.Thread(target=self._receive_audio)
receive_thread.daemon = True
receive_thread.start()
# 启动心跳线程
heartbeat_thread = threading.Thread(target=self._send_heartbeat)
heartbeat_thread.daemon = True
heartbeat_thread.start()
return True
except Exception as e:
print(f"连接失败: {e}")
return False
def _receive_audio(self):
"""接收音频数据并播放"""
while self.is_connected:
try:
# 接收音频数据长度
data_len_bytes = self.client_socket.recv(4)
if not data_len_bytes:
break
data_len = int.from_bytes(data_len_bytes, 'big')
# 接收音频数据
audio_data = b""
while len(audio_data) < data_len:
chunk = self.client_socket.recv(min(4096, data_len - len(audio_data)))
if not chunk:
break
audio_data += chunk
if len(audio_data) == data_len:
# 播放音频
self.stream.write(audio_data)
else:
print(f"接收数据不完整: {len(audio_data)}/{data_len}")
except ConnectionResetError:
print("连接被重置")
self.is_connected = False
break
except Exception as e:
print(f"接收音频错误: {e}")
self.is_connected = False
break
def _send_heartbeat(self):
"""发送心跳包"""
while self.is_connected:
try:
time.sleep(5)
self.client_socket.sendall(b'\x01')
except:
self.is_connected = False
break
def disconnect(self):
"""断开连接"""
self.is_connected = False
if hasattr(self, 'stream'):
self.stream.stop_stream()
self.stream.close()
if hasattr(self, 'audio'):
self.audio.terminate()
if hasattr(self, 'client_socket'):
self.client_socket.close()
print("已断开连接")
if __name__ == "__main__":
# 这里填写你的服务器地址和端口
SERVER_HOST = "你的服务器IP" # 通过frp穿透后的地址
SERVER_PORT = 9000
client = VoiceStreamClient(SERVER_HOST, SERVER_PORT)
if client.connect():
print("正在接收语音流... 按Ctrl+C停止")
try:
while client.is_connected:
time.sleep(1)
except KeyboardInterrupt:
print("\n正在断开连接...")
client.disconnect()
print("已断开连接")
else:
print("连接失败,请检查服务器地址和端口")
4. 完整部署与使用
4.1 内网设备部署
在内网设备上(比如树莓派),你需要运行以下服务:
# 1. 启动frp客户端(建立内网穿透)
cd ~/frp_0.51.3_linux_arm64
./frpc -c frpc.ini &
# 2. 启动语音监控服务
cd ~/voice-monitor
source venv/bin/activate
python voice_monitor.py &
# 3. 启动语音流服务器
python voice_stream_server.py &
你可以把这些命令写成一个启动脚本:
#!/bin/bash
# start_monitor.sh
echo "启动内网穿透..."
cd ~/frp_0.51.3_linux_arm64
nohup ./frpc -c frpc.ini > frpc.log 2>&1 &
echo "frp启动完成"
sleep 2
echo "启动语音监控..."
cd ~/voice-monitor
source venv/bin/activate
nohup python voice_monitor.py > monitor.log 2>&1 &
echo "语音监控启动完成"
sleep 2
echo "启动语音流服务器..."
nohup python voice_stream_server.py > stream.log 2>&1 &
echo "语音流服务器启动完成"
echo "所有服务已启动!"
echo "查看日志:"
echo " frp日志: tail -f ~/frp_0.51.3_linux_arm64/frpc.log"
echo " 监控日志: tail -f ~/voice-monitor/monitor.log"
echo " 流服务器日志: tail -f ~/voice-monitor/stream.log"
4.2 外网设备使用
在外网设备上(比如你的笔记本电脑):
# 1. 安装客户端依赖
pip install pyaudio
# 2. 运行语音流客户端
python voice_stream_client.py
# 3. 如果需要查看识别结果,可以通过HTTP访问
# 浏览器访问 http://你的服务器IP:8000/status
4.3 Web监控界面(可选)
如果你想通过网页查看监控状态,可以添加一个简单的Web界面:
# web_monitor.py
from flask import Flask, render_template, jsonify
import threading
import time
from datetime import datetime
app = Flask(__name__)
# 模拟存储识别结果
recognized_texts = []
alerts = []
@app.route('/')
def index():
"""监控主页"""
return render_template('index.html')
@app.route('/api/status')
def get_status():
"""获取系统状态"""
status = {
'status': 'running',
'uptime': '2小时15分',
'last_activity': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'total_detections': len(recognized_texts),
'active_alerts': len(alerts)
}
return jsonify(status)
@app.route('/api/texts')
def get_texts():
"""获取识别文本"""
return jsonify({
'texts': recognized_texts[-50:] # 返回最近50条
})
@app.route('/api/alerts')
def get_alerts():
"""获取告警信息"""
return jsonify({
'alerts': alerts[-20:] # 返回最近20条告警
})
def simulate_voice_recognition():
"""模拟语音识别(实际项目中替换为真实识别)"""
import random
sample_texts = [
"今天天气真好",
"宝宝好像哭了",
"门口有声音",
"我需要帮助",
"有人在家吗",
"时间不早了",
"该吃饭了"
]
while True:
time.sleep(random.randint(5, 15))
text = random.choice(sample_texts)
timestamp = datetime.now().strftime('%H:%M:%S')
recognized_texts.append({
'time': timestamp,
'text': text,
'confidence': round(random.uniform(0.7, 0.95), 2)
})
# 模拟告警
if "哭" in text or "帮助" in text or "声音" in text:
alerts.append({
'time': timestamp,
'type': 'warning' if "声音" in text else 'danger',
'message': text,
'level': 'high' if "帮助" in text else 'medium'
})
if __name__ == '__main__':
# 启动模拟线程
sim_thread = threading.Thread(target=simulate_voice_recognition)
sim_thread.daemon = True
sim_thread.start()
# 启动Web服务器
app.run(host='0.0.0.0', port=8000, debug=False)
对应的HTML模板:
<!-- templates/index.html -->
<!DOCTYPE html>
<html>
<head>
<title>语音监控系统</title>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet">
<style>
.alert-box { border-left: 5px solid #dc3545; }
.warning-box { border-left: 5px solid #ffc107; }
.normal-box { border-left: 5px solid #28a745; }
.text-item { padding: 10px; border-bottom: 1px solid #eee; }
.text-item:hover { background-color: #f8f9fa; }
</style>
</head>
<body>
<div class="container mt-4">
<h1 class="mb-4">语音监控系统</h1>
<div class="row mb-4">
<div class="col-md-3">
<div class="card">
<div class="card-body">
<h5 class="card-title">系统状态</h5>
<p class="card-text" id="status">正在检测...</p>
</div>
</div>
</div>
<div class="col-md-3">
<div class="card">
<div class="card-body">
<h5 class="card-title">运行时间</h5>
<p class="card-text" id="uptime">--</p>
</div>
</div>
</div>
<div class="col-md-3">
<div class="card">
<div class="card-body">
<h5 class="card-title">识别总数</h5>
<p class="card-text" id="total">0</p>
</div>
</div>
</div>
<div class="col-md-3">
<div class="card">
<div class="card-body">
<h5 class="card-title">活跃告警</h5>
<p class="card-text" id="alerts">0</p>
</div>
</div>
</div>
</div>
<div class="row">
<div class="col-md-6">
<div class="card">
<div class="card-header">
<h5>实时识别文本</h5>
</div>
<div class="card-body" style="height: 400px; overflow-y: auto;" id="text-list">
<div class="text-center text-muted">
正在加载...
</div>
</div>
</div>
</div>
<div class="col-md-6">
<div class="card">
<div class="card-header">
<h5>告警信息</h5>
</div>
<div class="card-body" style="height: 400px; overflow-y: auto;" id="alert-list">
<div class="text-center text-muted">
暂无告警
</div>
</div>
</div>
</div>
</div>
</div>
<script>
// 更新系统状态
function updateStatus() {
fetch('/api/status')
.then(response => response.json())
.then(data => {
document.getElementById('status').innerHTML =
`<span class="badge bg-success">运行中</span>`;
document.getElementById('uptime').textContent = data.uptime;
document.getElementById('total').textContent = data.total_detections;
document.getElementById('alerts').innerHTML =
`<span class="badge ${data.active_alerts > 0 ? 'bg-danger' : 'bg-success'}">${data.active_alerts}</span>`;
});
}
// 更新识别文本
function updateTexts() {
fetch('/api/texts')
.then(response => response.json())
.then(data => {
const container = document.getElementById('text-list');
if (data.texts.length === 0) {
container.innerHTML = '<div class="text-center text-muted">暂无识别内容</div>';
return;
}
let html = '';
data.texts.forEach(item => {
const confidenceColor = item.confidence > 0.9 ? 'success' :
item.confidence > 0.7 ? 'warning' : 'danger';
html += `
<div class="text-item">
<div class="d-flex justify-content-between">
<small class="text-muted">${item.time}</small>
<span class="badge bg-${confidenceColor}">${item.confidence}</span>
</div>
<div>${item.text}</div>
</div>
`;
});
container.innerHTML = html;
});
}
// 更新告警信息
function updateAlerts() {
fetch('/api/alerts')
.then(response => response.json())
.then(data => {
const container = document.getElementById('alert-list');
if (data.alerts.length === 0) {
container.innerHTML = '<div class="text-center text-muted">暂无告警</div>';
return;
}
let html = '';
data.alerts.forEach(alert => {
const alertClass = alert.type === 'danger' ? 'alert-box' :
alert.type === 'warning' ? 'warning-box' : 'normal-box';
html += `
<div class="p-3 mb-2 ${alertClass}" style="background-color: #f8f9fa;">
<div class="d-flex justify-content-between">
<strong>${alert.type === 'danger' ? ' 紧急' : ' 警告'}</strong>
<small class="text-muted">${alert.time}</small>
</div>
<div class="mt-2">${alert.message}</div>
<small class="text-muted">级别: ${alert.level}</small>
</div>
`;
});
container.innerHTML = html;
});
}
// 初始加载
updateStatus();
updateTexts();
updateAlerts();
// 定时更新
setInterval(updateStatus, 5000);
setInterval(updateTexts, 3000);
setInterval(updateAlerts, 3000);
</script>
</body>
</html>
5. 实用技巧与优化建议
5.1 降低误报率
语音识别有时会有误识别,可以通过以下方法优化:
# 优化关键词检测
def optimize_keyword_detection(text, confidence):
"""优化关键词检测,降低误报"""
# 置信度过滤
if confidence < 0.7: # 置信度低于70%的忽略
return False
# 上下文分析
keywords = {
'help': {
'words': ['救命', '帮帮我', '救救我', '快来人'],
'context': ['摔倒', '疼', '动不了', '医院'], # 相关上下文
'min_length': 2 # 最小文本长度
},
'fall': {
'words': ['摔倒', '跌倒', '摔倒了', '跌倒了'],
'context': ['疼', '站不起来', '腰', '腿'],
'min_length': 3
}
}
text_lower = text.lower()
for category, config in keywords.items():
for word in config['words']:
if word in text_lower:
# 检查文本长度
if len(text) < config['min_length']:
continue
# 检查上下文
has_context = any(ctx in text_lower for ctx in config['context'])
if has_context:
return True, category
return False, None
5.2 节省API调用次数
语音识别API调用是有成本的,可以通过以下方式优化:
class OptimizedVoiceMonitor(VoiceMonitor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.silence_threshold = 500 # 静音阈值
self.last_audio_time = time.time()
self.is_speaking = False
def _should_process(self, audio_data):
"""判断是否需要处理这段音频"""
import numpy as np
# 转换为numpy数组
audio_array = np.frombuffer(audio_data, dtype=np.int16)
# 计算音量
volume = np.abs(audio_array).mean()
current_time = time.time()
# 检测是否在说话
if volume > self.silence_threshold:
self.is_speaking = True
self.last_audio_time = current_time
return True
elif self.is_speaking and (current_time - self.last_audio_time < 2.0):
# 说话结束后2秒内继续处理(处理尾音)
return True
else:
self.is_speaking = False
return False
5.3 添加语音合成告警
检测到异常时,可以通过语音合成播报告警:
def voice_alert(message):
"""语音合成告警"""
try:
from dashscope import MultiModalConversation
import base64
import numpy as np
import pyaudio
response = MultiModalConversation.call(
model='qwen3-tts-flash',
text=f"注意:{message}",
voice='Cherry', # 可以选择不同的音色
language_type='Chinese'
)
if response.status_code == 200:
# 播放告警语音
audio_data = base64.b64decode(response.output.audio.data)
audio_np = np.frombuffer(audio_data, dtype=np.int16)
p = pyaudio.PyAudio()
stream = p.open(format=pyaudio.paInt16,
channels=1,
rate=24000,
output=True)
stream.write(audio_np.tobytes())
stream.stop_stream()
stream.close()
p.terminate()
except Exception as e:
print(f"语音合成告警失败: {e}")
5.4 添加录音备份
重要的告警录音可以保存下来:
import wave
from datetime import datetime
class AudioRecorder:
def __init__(self, save_dir='recordings'):
self.save_dir = save_dir
os.makedirs(save_dir, exist_ok=True)
self.current_file = None
self.is_recording = False
def start_recording(self, prefix='alert'):
"""开始录音"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"{prefix}_{timestamp}.wav"
filepath = os.path.join(self.save_dir, filename)
self.current_file = wave.open(filepath, 'wb')
self.current_file.setnchannels(1)
self.current_file.setsampwidth(2) # 16-bit
self.current_file.setframerate(16000)
self.is_recording = True
return filepath
def write_audio(self, audio_data):
"""写入音频数据"""
if self.is_recording and self.current_file:
self.current_file.writeframes(audio_data)
def stop_recording(self):
"""停止录音"""
if self.current_file:
self.current_file.close()
self.current_file = None
self.is_recording = False
6. 常见问题解决
6.1 音频设备问题
如果遇到音频设备无法识别的问题:
# 查看音频设备列表
arecord -l
# 测试麦克风
arecord --format=S16_LE --duration=5 --rate=16000 --file-type=raw test.raw
aplay --format=S16_LE --rate=16000 test.raw
# 如果提示设备忙,可能是其他程序占用了
sudo fuser -v /dev/snd/* # 查看占用进程
6.2 网络连接问题
内网穿透连接失败时:
# 检查frp连接状态
netstat -tlnp | grep frp
# 查看frp日志
tail -f ~/frp_0.51.3_linux_arm64/frpc.log
# 测试端口连通性
telnet 你的服务器IP 7000 # frp服务端口
telnet 你的服务器IP 8000 # 监控服务端口
telnet 你的服务器IP 9000 # 语音流端口
6.3 内存和CPU优化
如果设备资源紧张:
# 调整音频参数减少资源占用
class ResourceOptimizedMonitor(VoiceMonitor):
def __init__(self):
super().__init__()
# 降低采样率
self.RATE = 8000 # 从16000降到8000
# 增大块大小,减少处理频率
self.CHUNK = 2048 # 从1024增加到2048
# 减少缓冲区时长
self.buffer_duration = 1 # 从2秒降到1秒
6.4 API调用限制
如果遇到API调用限制:
# 添加限流和重试机制
import time
from functools import wraps
def rate_limited(max_per_minute):
"""限流装饰器"""
min_interval = 60.0 / max_per_minute
last_time_called = [0.0]
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_time_called[0]
left_to_wait = min_interval - elapsed
if left_to_wait > 0:
time.sleep(left_to_wait)
last_time_called[0] = time.time()
return func(*args, **kwargs)
return wrapper
return decorator
class RateLimitedMonitor(VoiceMonitor):
@rate_limited(30) # 每分钟最多30次调用
def call_asr_api(self, audio_data):
"""限流的API调用"""
return super().call_asr_api(audio_data)
7. 实际应用场景
7.1 家庭老人看护
我给我爷爷家里装了一套这个系统,主要用来:
- 摔倒检测:当识别到"摔倒"、"跌倒"等关键词时,立即给我发微信通知
- 日常活动监测:通过声音了解老人的日常活动规律
- 紧急呼叫:老人说"救命"或"帮帮我"时自动报警
7.2 婴儿监控
朋友用来监控宝宝:
- 哭声检测:宝宝哭的时候自动播放安抚音乐
- 睡眠监测:记录宝宝的睡眠时间和质量
- 异常提醒:长时间没有声音时提醒检查
7.3 办公室安防
小公司用来做简易安防:
- 入侵检测:非工作时间检测到人声时发送告警
- 玻璃破碎检测:通过声音特征识别玻璃破碎
- 火灾预警:烟雾报警器声音识别
8. 安全注意事项
8.1 数据安全
语音数据比较敏感,需要注意:
# 添加数据加密
from cryptography.fernet import Fernet
class EncryptedVoiceMonitor(VoiceMonitor):
def __init__(self, encryption_key=None):
super().__init__()
if encryption_key:
self.cipher = Fernet(encryption_key)
else:
# 生成随机密钥
key = Fernet.generate_key()
self.cipher = Fernet(key)
print(f"加密密钥: {key.decode()}")
def _process_audio(self):
"""加密处理音频数据"""
while self.is_processing:
try:
if not self.audio_queue.empty():
chunk = self.audio_queue.get(timeout=0.1)
# 加密音频数据
encrypted_chunk = self.cipher.encrypt(chunk)
# 传输加密数据
# ... 后续处理
except queue.Empty:
continue
8.2 访问控制
限制谁可以访问监控:
# 添加简单的认证
import hashlib
class AuthenticatedStreamServer(VoiceStreamServer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.allowed_tokens = set()
def add_client_token(self, token):
"""添加客户端令牌"""
self.allowed_tokens.add(hashlib.sha256(token.encode()).hexdigest())
def _handle_client(self, client_socket, client_address):
"""带认证的客户端处理"""
try:
# 接收认证令牌
token_hash = client_socket.recv(64).decode()
if token_hash not in self.allowed_tokens:
print(f"客户端 {client_address} 认证失败")
client_socket.close()
return
# 认证通过,继续处理
super()._handle_client(client_socket, client_address)
except Exception as e:
print(f"认证错误: {e}")
client_socket.close()
8.3 隐私保护
建议添加这些隐私保护措施:
- 本地处理优先:尽量在本地完成处理,减少数据外传
- 数据自动删除:定期删除旧的录音文件
- 敏感词过滤:过滤掉隐私相关的内容
- 使用通知:只发送告警通知,不发送原始音频
9. 性能优化建议
9.1 硬件选择
根据使用场景选择合适的硬件:
- 树莓派4B:适合家庭使用,功耗低,性能足够
- Jetson Nano:如果需要视频分析,这个更合适
- 旧手机:最经济的方案,自带电池和网络
9.2 软件优化
# 使用多进程提高性能
import multiprocessing as mp
class MultiProcessMonitor:
def __init__(self):
self.audio_queue = mp.Queue(maxsize=100)
self.result_queue = mp.Queue()
def audio_capture_process(self):
"""独立的音频采集进程"""
import pyaudio
audio = pyaudio.PyAudio()
stream = audio.open(
format=pyaudio.paInt16,
channels=1,
rate=16000,
input=True,
frames_per_buffer=1024
)
while True:
data = stream.read(1024, exception_on_overflow=False)
self.audio_queue.put(data)
def asr_process(self):
"""独立的ASR处理进程"""
while True:
try:
audio_data = self.audio_queue.get(timeout=1)
# 处理音频识别
# ...
self.result_queue.put(result)
except:
continue
def start(self):
"""启动多进程"""
processes = []
# 启动音频采集进程
p1 = mp.Process(target=self.audio_capture_process)
p1.start()
processes.append(p1)
# 启动多个ASR处理进程
for i in range(2): # 启动2个处理进程
p = mp.Process(target=self.asr_process)
p.start()
processes.append(p)
return processes
9.3 网络优化
如果网络不稳定:
# 添加网络重连机制
class RobustVoiceStreamClient(VoiceStreamClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.max_retries = 3
self.retry_delay = 5 # 秒
def connect_with_retry(self):
"""带重试的连接"""
for attempt in range(self.max_retries):
try:
print(f"连接尝试 {attempt + 1}/{self.max_retries}")
if super().connect():
return True
except Exception as e:
print(f"连接失败: {e}")
if attempt < self.max_retries - 1:
print(f"{self.retry_delay}秒后重试...")
time.sleep(self.retry_delay)
print("所有重试都失败了")
return False
10. 扩展功能
10.1 添加视频监控
如果需要视频监控,可以结合OpenCV:
import cv2
import threading
class VideoMonitor:
def __init__(self, camera_index=0):
self.camera_index = camera_index
self.is_monitoring = False
def start_monitoring(self):
"""开始视频监控"""
self.is_monitoring = True
cap = cv2.VideoCapture(self.camera_index)
# 运动检测
background_subtractor = cv2.createBackgroundSubtractorMOG2()
while self.is_monitoring:
ret, frame = cap.read()
if not ret:
break
# 运动检测
fg_mask = background_subtractor.apply(frame)
# 如果检测到显著运动
if cv2.countNonZero(fg_mask) > 1000:
print("检测到运动!")
# 保存图片或发送告警
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
cv2.imwrite(f'motion_{timestamp}.jpg', frame)
# 降低CPU使用率
cv2.waitKey(100) # 100ms间隔
cap.release()
10.2 添加传感器集成
结合其他传感器:
# 使用GPIO控制传感器(树莓派)
try:
import RPi.GPIO as GPIO
class SensorMonitor:
def __init__(self):
GPIO.setmode(GPIO.BCM)
# 人体红外传感器
self.pir_pin = 17
GPIO.setup(self.pir_pin, GPIO.IN)
# 门磁传感器
self.door_pin = 27
GPIO.setup(self.door_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
def monitor_sensors(self):
"""监控传感器"""
while True:
# 检测人体
if GPIO.input(self.pir_pin):
print("检测到人体移动!")
# 检测门状态
if not GPIO.input(self.door_pin):
print("门被打开!")
time.sleep(0.1)
except ImportError:
print("非树莓派环境,跳过传感器监控")
10.3 添加通知功能
多种通知方式:
import smtplib
from email.mime.text import MIMEText
import requests
class NotificationManager:
def __init__(self):
self.notification_methods = []
def add_email_notification(self, smtp_server, port, username, password, to_emails):
"""添加邮件通知"""
self.notification_methods.append({
'type': 'email',
'smtp_server': smtp_server,
'port': port,
'username': username,
'password': password,
'to_emails': to_emails
})
def add_webhook_notification(self, webhook_url):
"""添加Webhook通知"""
self.notification_methods.append({
'type': 'webhook',
'url': webhook_url
})
def send_notification(self, title, message, level='info'):
"""发送通知"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
full_message = f"[{timestamp}] {title}\n{message}"
for method in self.notification_methods:
if method['type'] == 'email':
self._send_email(method, title, full_message)
elif method['type'] == 'webhook':
self._send_webhook(method, title, message, level)
def _send_email(self, config, subject, body):
"""发送邮件"""
try:
msg = MIMEText(body, 'plain', 'utf-8')
msg['Subject'] = f"监控告警: {subject}"
msg['From'] = config['username']
msg['To'] = ', '.join(config['to_emails'])
server = smtplib.SMTP(config['smtp_server'], config['port'])
server.starttls()
server.login(config['username'], config['password'])
server.send_message(msg)
server.quit()
print(f"邮件通知已发送: {subject}")
except Exception as e:
print(f"发送邮件失败: {e}")
def _send_webhook(self, config, title, message, level):
"""发送Webhook"""
try:
payload = {
'title': title,
'message': message,
'level': level,
'timestamp': datetime.now().isoformat()
}
response = requests.post(config['url'], json=payload, timeout=5)
if response.status_code == 200:
print(f"Webhook通知已发送: {title}")
else:
print(f"Webhook发送失败: {response.status_code}")
except Exception as e:
print(f"发送Webhook失败: {e}")
11. 总结
这套基于Super Qwen Voice World的内网穿透语音监控方案,我用在实际项目中已经有一段时间了,整体效果还不错。最大的优点是成本低,基本上用现有的硬件就能搭建起来,不需要额外购买昂贵的专业设备。
部署过程比想象中要简单,主要就是三个部分:语音识别服务、内网穿透通道、语音流传输。每个部分都有现成的开源工具可以用,整合起来也不复杂。
实际用下来,识别准确率能满足基本需求,特别是对关键词的检测比较灵敏。延迟方面,内网穿透会带来一些延迟,但一般家庭网络环境下,1-2秒的延迟对于监控场景来说是可以接受的。
如果你也想搭建类似的系统,我建议先从简单的版本开始,把基础功能跑通,然后再根据实际需求慢慢添加其他功能。比如可以先实现基本的语音识别和告警,稳定运行一段时间后,再考虑添加视频监控或者传感器集成。
安全方面要特别注意,语音数据比较敏感,一定要做好加密和访问控制。我现在的做法是只保存告警前后的录音,日常的语音数据都不保存,这样既能满足监控需求,又能保护隐私。
最后,这套方案还有很多可以优化的地方,比如可以尝试用更轻量级的模型来减少资源占用,或者添加离线识别功能来应对网络不稳定的情况。如果你有更好的想法,欢迎一起交流改进。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。
更多推荐



所有评论(0)