Smart_Talker/main.py
2025-04-17 14:21:53 +08:00

211 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import pvporcupine
import pyaudio
import struct
import os
import socket
import time
import threading
import configparser
# 获取基础目录
base_dir = os.path.dirname(os.path.abspath(__file__)) if getattr(sys, 'frozen', False) else os.path.dirname(os.path.abspath(sys.executable))
config_file_path = os.path.join(base_dir, 'config.ini')
# 读取配置文件
config = configparser.ConfigParser()
try:
config.read(config_file_path, encoding='utf-8')
HOST = config.get('Server', 'address')
PORT = config.getint('Server', 'port')
KEY = config.get('Server', 'key')
CPORT = config.getint('Server', 'client-port')
except (configparser.NoSectionError, configparser.NoOptionError) as e:
print(f"读取配置文件时出错: {e}")
sys.exit(1)
except FileNotFoundError:
print("配置文件未找到")
sys.exit(1)
def resource_path(relative_path):
"""获取资源文件的绝对路径"""
try:
base_path = sys._MEIPASS
except Exception:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
class UDPClientTool:
def __init__(self, server_host=HOST, server_port=PORT, client_port=CPORT, wake_word_detector=None):
self.SERVER_HOST = server_host
self.SERVER_PORT = server_port
self.CLIENT_PORT = client_port
self.client_socket = None
self.wake_word_detector = wake_word_detector
self.heartbeat_interval = 30 # 心跳间隔(秒)
self.start_communication()
def connect(self):
try:
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if self.CLIENT_PORT != 0:
self.client_socket.bind(('0.0.0.0', self.CLIENT_PORT))
# 发送连接测试消息
test_message = "连接测试"
self.client_socket.sendto(test_message.encode('utf-8'), (self.SERVER_HOST, self.SERVER_PORT))
buffer_size = 4096
data, addr = self.client_socket.recvfrom(buffer_size)
decoded_data = data.decode('utf-8')
print(f"成功连接到 {self.SERVER_HOST}:{self.SERVER_PORT},收到测试响应: {decoded_data}")
return True
except Exception as e:
print(f"连接出错: {e}")
return False
def send_message(self, message):
if not self.client_socket:
while not self.connect():
print("连接失败3 秒后尝试重新连接...")
time.sleep(3)
try:
self.client_socket.sendto(message.encode('utf-8'), (self.SERVER_HOST, self.SERVER_PORT))
buffer_size = 4096
data, addr = self.client_socket.recvfrom(buffer_size)
decoded_data = data.decode('utf-8')
print(f"收到来自 {addr} 的响应: {decoded_data}")
# 处理特殊指令
if decoded_data == "打开助手":
self.toggle_microphone(True)
elif decoded_data == "关闭助手":
self.toggle_microphone(False)
return decoded_data
except Exception as e:
print(f"发送消息出错: {e}")
self.reconnect()
return None
def toggle_microphone(self, state):
if self.wake_word_detector:
if state:
self.wake_word_detector.open_PyAudio()
else:
self.wake_word_detector.close_PyAudio()
def start_communication(self):
while not self.connect():
print("连接失败3 秒后尝试重新连接...")
time.sleep(3)
# 启动接收消息线程
receive_thread = threading.Thread(target=self.receive_messages)
receive_thread.daemon = True
receive_thread.start()
# 启动心跳线程
heartbeat_thread = threading.Thread(target=self.send_heartbeat)
heartbeat_thread.daemon = True
heartbeat_thread.start()
def receive_messages(self):
buffer_size = 4096
print("开始接收服务器消息...")
try:
while True:
data, addr = self.client_socket.recvfrom(buffer_size)
decoded_data = data.decode('utf-8')
print(f"收到来自 {addr} 的消息: {decoded_data}")
# 处理特殊指令
if decoded_data in ["打开助手", "关闭助手"]:
self.toggle_microphone(decoded_data == "打开助手")
except KeyboardInterrupt:
print("手动停止接收消息。")
finally:
self.reconnect()
def send_heartbeat(self):
while True:
if self.client_socket:
try:
heartbeat_message = "heartbeat"
self.client_socket.sendto(heartbeat_message.encode('utf-8'), (self.SERVER_HOST, self.SERVER_PORT))
print(f"发送心跳消息: {heartbeat_message}")
except Exception as e:
print(f"发送心跳出错: {e}")
self.reconnect()
time.sleep(self.heartbeat_interval)
def reconnect(self):
print("尝试重新连接服务器...")
self.client_socket.close()
self.client_socket = None
while not self.connect():
print("重新连接失败3 秒后重试...")
time.sleep(3)
print("重新连接成功。")
class WakeWordDetector:
def __init__(self):
# 配置唤醒词相关参数
keyword_paths = [resource_path('hello.ppn')]
model_file = resource_path('porcupine_params_zh.pv')
wake_words = ["你好"]
sensitivities = [0.9]
access_key = KEY
# 创建 Porcupine 实例
self.porcupine = pvporcupine.create(
access_key=access_key,
keyword_paths=keyword_paths,
model_path=model_file,
keywords=wake_words,
sensitivities=sensitivities
)
# 创建 UDP 客户端工具实例
self.client_tool = UDPClientTool(wake_word_detector=self)
# 音频流参数
self.sample_rate = 16000
self.frame_length = 512
self.isOpen = False
def open_PyAudio(self):
self.p = pyaudio.PyAudio()
self.stream = self.p.open(format=pyaudio.paInt16,
channels=1,
rate=self.sample_rate,
input=True,
frames_per_buffer=self.frame_length)
self.isOpen = True
print("开启麦克风...")
def close_PyAudio(self):
if self.stream and self.stream.is_active():
self.stream.stop_stream()
self.stream.close()
self.p.terminate()
self.isOpen = False
print("关闭麦克风...")
def start_detection(self):
print("正在等待唤醒词...")
try:
while True:
if self.isOpen:
pcm = self.stream.read(self.frame_length)
pcm = struct.unpack_from("h" * self.frame_length, pcm)
result = self.porcupine.process(pcm)
if result >= 0:
print("检测到唤醒词!")
message = "打开助手"
self.client_tool.send_message(message)
except KeyboardInterrupt:
print("程序已停止。")
finally:
self.close_PyAudio()
self.porcupine.delete()
if __name__ == "__main__":
detector = WakeWordDetector()
detector.start_detection()