import sys import pvporcupine import pyaudio import struct import os import socket import time import threading import configparser # 获取基础目录 base_dir = os.path.dirname(os.path.abspath(__file__)) if getattr(sys, 'frozen', False) else os.path.dirname(os.path.abspath(sys.executable)) config_file_path = os.path.join(base_dir, 'config.ini') # 读取配置文件 config = configparser.ConfigParser() try: config.read(config_file_path, encoding='utf-8') HOST = config.get('Server', 'address') PORT = config.getint('Server', 'port') KEY = config.get('Server', 'key') CPORT = config.getint('Server', 'client-port') except (configparser.NoSectionError, configparser.NoOptionError) as e: print(f"读取配置文件时出错: {e}") sys.exit(1) except FileNotFoundError: print("配置文件未找到") sys.exit(1) def resource_path(relative_path): """获取资源文件的绝对路径""" try: base_path = sys._MEIPASS except Exception: base_path = os.path.abspath(".") return os.path.join(base_path, relative_path) class UDPClientTool: def __init__(self, server_host=HOST, server_port=PORT, client_port=CPORT, wake_word_detector=None): self.SERVER_HOST = server_host self.SERVER_PORT = server_port self.CLIENT_PORT = client_port self.client_socket = None self.wake_word_detector = wake_word_detector self.heartbeat_interval = 30 # 心跳间隔(秒) self.start_communication() def connect(self): try: self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) if self.CLIENT_PORT != 0: self.client_socket.bind(('0.0.0.0', self.CLIENT_PORT)) # 发送连接测试消息 test_message = "连接测试" self.client_socket.sendto(test_message.encode('utf-8'), (self.SERVER_HOST, self.SERVER_PORT)) buffer_size = 4096 data, addr = self.client_socket.recvfrom(buffer_size) decoded_data = data.decode('utf-8') print(f"成功连接到 {self.SERVER_HOST}:{self.SERVER_PORT},收到测试响应: {decoded_data}") return True except Exception as e: print(f"连接出错: {e}") return False def send_message(self, message): if not self.client_socket: while not self.connect(): print("连接失败,3 秒后尝试重新连接...") time.sleep(3) try: self.client_socket.sendto(message.encode('utf-8'), (self.SERVER_HOST, self.SERVER_PORT)) buffer_size = 4096 data, addr = self.client_socket.recvfrom(buffer_size) decoded_data = data.decode('utf-8') print(f"收到来自 {addr} 的响应: {decoded_data}") # 处理特殊指令 if decoded_data == "打开助手": self.toggle_microphone(True) elif decoded_data == "关闭助手": self.toggle_microphone(False) return decoded_data except Exception as e: print(f"发送消息出错: {e}") self.reconnect() return None def toggle_microphone(self, state): if self.wake_word_detector: if state: self.wake_word_detector.open_PyAudio() else: self.wake_word_detector.close_PyAudio() def start_communication(self): while not self.connect(): print("连接失败,3 秒后尝试重新连接...") time.sleep(3) # 启动接收消息线程 receive_thread = threading.Thread(target=self.receive_messages) receive_thread.daemon = True receive_thread.start() # 启动心跳线程 heartbeat_thread = threading.Thread(target=self.send_heartbeat) heartbeat_thread.daemon = True heartbeat_thread.start() def receive_messages(self): buffer_size = 4096 print("开始接收服务器消息...") try: while True: data, addr = self.client_socket.recvfrom(buffer_size) decoded_data = data.decode('utf-8') print(f"收到来自 {addr} 的消息: {decoded_data}") # 处理特殊指令 if decoded_data in ["打开助手", "关闭助手"]: self.toggle_microphone(decoded_data == "打开助手") except KeyboardInterrupt: print("手动停止接收消息。") finally: self.reconnect() def send_heartbeat(self): while True: if self.client_socket: try: heartbeat_message = "heartbeat" self.client_socket.sendto(heartbeat_message.encode('utf-8'), (self.SERVER_HOST, self.SERVER_PORT)) print(f"发送心跳消息: {heartbeat_message}") except Exception as e: print(f"发送心跳出错: {e}") self.reconnect() time.sleep(self.heartbeat_interval) def reconnect(self): print("尝试重新连接服务器...") self.client_socket.close() self.client_socket = None while not self.connect(): print("重新连接失败,3 秒后重试...") time.sleep(3) print("重新连接成功。") class WakeWordDetector: def __init__(self): # 配置唤醒词相关参数 keyword_paths = [resource_path('hello.ppn')] model_file = resource_path('porcupine_params_zh.pv') wake_words = ["你好"] sensitivities = [0.9] access_key = KEY # 创建 Porcupine 实例 self.porcupine = pvporcupine.create( access_key=access_key, keyword_paths=keyword_paths, model_path=model_file, keywords=wake_words, sensitivities=sensitivities ) # 创建 UDP 客户端工具实例 self.client_tool = UDPClientTool(wake_word_detector=self) # 音频流参数 self.sample_rate = 16000 self.frame_length = 512 self.isOpen = False def open_PyAudio(self): self.p = pyaudio.PyAudio() self.stream = self.p.open(format=pyaudio.paInt16, channels=1, rate=self.sample_rate, input=True, frames_per_buffer=self.frame_length) self.isOpen = True print("开启麦克风...") def close_PyAudio(self): if self.stream and self.stream.is_active(): self.stream.stop_stream() self.stream.close() self.p.terminate() self.isOpen = False print("关闭麦克风...") def start_detection(self): print("正在等待唤醒词...") try: while True: if self.isOpen: pcm = self.stream.read(self.frame_length) pcm = struct.unpack_from("h" * self.frame_length, pcm) result = self.porcupine.process(pcm) if result >= 0: print("检测到唤醒词!") message = "打开助手" self.client_tool.send_message(message) except KeyboardInterrupt: print("程序已停止。") finally: self.close_PyAudio() self.porcupine.delete() if __name__ == "__main__": detector = WakeWordDetector() detector.start_detection()