Smart_Talker/main.py

211 lines
7.5 KiB
Python
Raw Permalink Normal View History

2025-04-17 14:21:53 +08:00
import sys
import pvporcupine
import pyaudio
import struct
import os
import socket
import time
import threading
import configparser
# 获取基础目录
base_dir = os.path.dirname(os.path.abspath(__file__)) if getattr(sys, 'frozen', False) else os.path.dirname(os.path.abspath(sys.executable))
config_file_path = os.path.join(base_dir, 'config.ini')
# 读取配置文件
config = configparser.ConfigParser()
try:
config.read(config_file_path, encoding='utf-8')
HOST = config.get('Server', 'address')
PORT = config.getint('Server', 'port')
KEY = config.get('Server', 'key')
CPORT = config.getint('Server', 'client-port')
except (configparser.NoSectionError, configparser.NoOptionError) as e:
print(f"读取配置文件时出错: {e}")
sys.exit(1)
except FileNotFoundError:
print("配置文件未找到")
sys.exit(1)
def resource_path(relative_path):
"""获取资源文件的绝对路径"""
try:
base_path = sys._MEIPASS
except Exception:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
class UDPClientTool:
def __init__(self, server_host=HOST, server_port=PORT, client_port=CPORT, wake_word_detector=None):
self.SERVER_HOST = server_host
self.SERVER_PORT = server_port
self.CLIENT_PORT = client_port
self.client_socket = None
self.wake_word_detector = wake_word_detector
self.heartbeat_interval = 30 # 心跳间隔(秒)
self.start_communication()
def connect(self):
try:
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if self.CLIENT_PORT != 0:
self.client_socket.bind(('0.0.0.0', self.CLIENT_PORT))
# 发送连接测试消息
test_message = "连接测试"
self.client_socket.sendto(test_message.encode('utf-8'), (self.SERVER_HOST, self.SERVER_PORT))
buffer_size = 4096
data, addr = self.client_socket.recvfrom(buffer_size)
decoded_data = data.decode('utf-8')
print(f"成功连接到 {self.SERVER_HOST}:{self.SERVER_PORT},收到测试响应: {decoded_data}")
return True
except Exception as e:
print(f"连接出错: {e}")
return False
def send_message(self, message):
if not self.client_socket:
while not self.connect():
print("连接失败3 秒后尝试重新连接...")
time.sleep(3)
try:
self.client_socket.sendto(message.encode('utf-8'), (self.SERVER_HOST, self.SERVER_PORT))
buffer_size = 4096
data, addr = self.client_socket.recvfrom(buffer_size)
decoded_data = data.decode('utf-8')
print(f"收到来自 {addr} 的响应: {decoded_data}")
# 处理特殊指令
if decoded_data == "打开助手":
self.toggle_microphone(True)
elif decoded_data == "关闭助手":
self.toggle_microphone(False)
return decoded_data
except Exception as e:
print(f"发送消息出错: {e}")
self.reconnect()
return None
def toggle_microphone(self, state):
if self.wake_word_detector:
if state:
self.wake_word_detector.open_PyAudio()
else:
self.wake_word_detector.close_PyAudio()
def start_communication(self):
while not self.connect():
print("连接失败3 秒后尝试重新连接...")
time.sleep(3)
# 启动接收消息线程
receive_thread = threading.Thread(target=self.receive_messages)
receive_thread.daemon = True
receive_thread.start()
# 启动心跳线程
heartbeat_thread = threading.Thread(target=self.send_heartbeat)
heartbeat_thread.daemon = True
heartbeat_thread.start()
def receive_messages(self):
buffer_size = 4096
print("开始接收服务器消息...")
try:
while True:
data, addr = self.client_socket.recvfrom(buffer_size)
decoded_data = data.decode('utf-8')
print(f"收到来自 {addr} 的消息: {decoded_data}")
# 处理特殊指令
if decoded_data in ["打开助手", "关闭助手"]:
self.toggle_microphone(decoded_data == "打开助手")
except KeyboardInterrupt:
print("手动停止接收消息。")
finally:
self.reconnect()
def send_heartbeat(self):
while True:
if self.client_socket:
try:
heartbeat_message = "heartbeat"
self.client_socket.sendto(heartbeat_message.encode('utf-8'), (self.SERVER_HOST, self.SERVER_PORT))
print(f"发送心跳消息: {heartbeat_message}")
except Exception as e:
print(f"发送心跳出错: {e}")
self.reconnect()
time.sleep(self.heartbeat_interval)
def reconnect(self):
print("尝试重新连接服务器...")
self.client_socket.close()
self.client_socket = None
while not self.connect():
print("重新连接失败3 秒后重试...")
time.sleep(3)
print("重新连接成功。")
class WakeWordDetector:
def __init__(self):
# 配置唤醒词相关参数
keyword_paths = [resource_path('hello.ppn')]
model_file = resource_path('porcupine_params_zh.pv')
wake_words = ["你好"]
sensitivities = [0.9]
access_key = KEY
# 创建 Porcupine 实例
self.porcupine = pvporcupine.create(
access_key=access_key,
keyword_paths=keyword_paths,
model_path=model_file,
keywords=wake_words,
sensitivities=sensitivities
)
# 创建 UDP 客户端工具实例
self.client_tool = UDPClientTool(wake_word_detector=self)
# 音频流参数
self.sample_rate = 16000
self.frame_length = 512
self.isOpen = False
def open_PyAudio(self):
self.p = pyaudio.PyAudio()
self.stream = self.p.open(format=pyaudio.paInt16,
channels=1,
rate=self.sample_rate,
input=True,
frames_per_buffer=self.frame_length)
self.isOpen = True
print("开启麦克风...")
def close_PyAudio(self):
if self.stream and self.stream.is_active():
self.stream.stop_stream()
self.stream.close()
self.p.terminate()
self.isOpen = False
print("关闭麦克风...")
def start_detection(self):
print("正在等待唤醒词...")
try:
while True:
if self.isOpen:
pcm = self.stream.read(self.frame_length)
pcm = struct.unpack_from("h" * self.frame_length, pcm)
result = self.porcupine.process(pcm)
if result >= 0:
print("检测到唤醒词!")
message = "打开助手"
self.client_tool.send_message(message)
except KeyboardInterrupt:
print("程序已停止。")
finally:
self.close_PyAudio()
self.porcupine.delete()
if __name__ == "__main__":
detector = WakeWordDetector()
detector.start_detection()