Smart_Report/smart_report.py

258 lines
12 KiB
Python
Raw Permalink Normal View History

2025-04-17 14:15:10 +08:00
import os
import json
import time
import requests
import argparse
import sys
from datetime import datetime
def get_base_path():
"""获取应用程序的基础路径兼容PyInstaller打包后的情况"""
if getattr(sys, 'frozen', False):
# 如果是打包后的应用程序
return os.path.dirname(sys.executable)
else:
# 如果是直接运行的脚本
return os.path.dirname(os.path.abspath(__file__))
def save_to_file(file, content, is_question=False):
"""保存对话内容到文件"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if is_question:
file.write(f"\n[{timestamp}] Question:\n{content}\n\n[{timestamp}] Answer:\n")
else:
file.write(content)
def read_input_file(file_path):
"""读取输入文件支持txt和json格式自动处理不同编码"""
print(f"正在读取输入文件: {file_path}")
file_extension = os.path.splitext(file_path)[1].lower()
# 尝试不同的编码
encodings = ['utf-8', 'gbk', 'gb2312', 'gb18030', 'big5']
for encoding in encodings:
try:
with open(file_path, "r", encoding=encoding) as file:
if file_extension == '.json':
# 读取JSON文件
data = json.load(file)
# 将JSON数据转换为字符串形式
print(f"成功读取JSON文件 (编码: {encoding})")
return json.dumps(data, ensure_ascii=False, indent=2)
else:
# 默认作为文本文件读取
content = file.read()
print(f"成功读取文本文件,共{len(content)}字符 (编码: {encoding})")
return content
except UnicodeDecodeError:
# 如果是编码错误,尝试下一种编码
print(f"尝试使用 {encoding} 编码读取失败,尝试其他编码...")
continue
except Exception as e:
# 其他错误
print(f"读取文件错误: {str(e)}")
return ""
# 如果所有编码都失败
print("所有支持的编码都无法正确读取文件")
return ""
def main():
base_path = get_base_path()
error_file = os.path.join(base_path, "error.txt")
if os.path.exists(error_file):
os.remove(error_file)
print("=" * 50)
print("智能故障分析工具")
print("=" * 50)
# 配置
parser = argparse.ArgumentParser(description='处理故障报告并生成分析')
parser.add_argument('--input', '-i', type=str, default=os.path.join(base_path, "reportIn.json"),
help='输入文件路径支持txt和json格式 (默认: ./reportIn.json)')
parser.add_argument('--output', '-o', type=str, default=os.path.join(base_path, "reportOut.txt"),
help='输出文件路径 (默认: reportOut.txt)')
parser.add_argument('--json_output', '-j', type=str, default=os.path.join(base_path, "reportOut.json"),
help='JSON输出文件路径 (默认: reportOut.json)')
args = parser.parse_args()
print(f"输入文件: {args.input}")
print(f"输出文件: {args.output}")
print(f"JSON输出: {args.json_output}")
url = "https://api.deepseek.com/chat/completions"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer sk-e4fe82d0dba246c9bd347bc045739a94" # 替换为你的 API Key
}
input_file = args.input
file_content = read_input_file(input_file)
if not file_content:
print("无法读取输入文件或文件为空,程序终止")
error_msg = "无法读取输入文件或文件为空,程序终止"
print(error_msg)
with open(error_file, "w", encoding="utf-8") as f:
f.write(error_msg)
return # 返回错误码print("无法读取输入文件或文件为空,程序终止")
# 打开文件用于保存对话
print("正在准备分析...")
with open(args.output, "a", encoding="utf-8") as file:
# 获取用户输入
question = "以下文档是一次高速铁路沿线故障后铁路各装置发出的故障报告需要说明的是故障前即正常运行时的数据不在文件里设定的定值和阈值等也不在文件里本文档里所有的数据都是在故障后测得的。文档所表征的故障虽然是一次故障但其可能由一系列断路器跳闸组成如在第一次跳闸后成功进行重合闸但由于故障未清除导致二次跳闸等。每次跳闸的时间可以通过故障报告中的“故障时间”来判断重合闸标志成功是标志在本次故障后进行了合闸操作但后面可能有后续跳闸意味着合闸失败。现在我希望你将其总结为四个自然段第一个自然段为故障原因需要注意该故障是由单一原因导致如鸟害金属性短路等请结合这些故障原因的故障表现异同给出一个你认为的故障原因并对该单一原因加以说明但同时需要考虑到数据可能不充分因此可以共列出1-3个你认为的实际原因、故障位置等第二个自然段为检修建议需要注意我们设置的定值、闭锁逻辑与配合关系都是正确的因此只用说明如何排查这一单一故障就可以了。第三个自然段为表格形式的SOE列表需要注意的是报告中的SOE是分别基于对应报告的“故障时间”的我希望你将其换算成格式为hh:mm:ss:ms的绝对时间换算方法是通过表头的“故障时间”加上“事件”里的相对时间多少ms请用markdown格式展示第一列为事件序号第二列为绝对时间第三列为事件描述。但需要注意的是可能来自于不同装置的SOE有微小的时序区别因为他们的时间与测量不可能完全精确请自行将其综合保证同一时间戳只有一行将来自不同装置同一时间戳的信息融合不同时间戳的信息不一样且互不包含。第四个自然段为故障关键参数同样用markdown展示第一列为参数名称第二列为数值第三列为物理意义当有多个值冲突的时候选择你认为更正确的那一个不用说明原因。前两个自然段各100字左右后两个自然段没有字数限制但需要确保每个自然段只给一个表格且表格各行的物理意义没有重复。请在回答中明确标注每个自然段'第一自然段''第二自然段''第三自然段''第四自然段'开头。"+file_content
# 保存问题
save_to_file(file, question, is_question=True)
# 准备请求数据
data = {
"model": "deepseek-reasoner",
"messages": [
{
"role": "user",
"content": question
}
],
"stream": True,
"max_tokens": 2048,
"temperature": 0.6,
"top_p": 0.7,
"top_k": 50,
"frequency_penalty": 0.5,
"n": 1,
"response_format": {
"type": "text"
}
}
print("正在连接API服务器...")
output_text = ''
try:
# 发送流式请求
response = requests.post(url, json=data, headers=headers, stream=True)
response.raise_for_status() # 检查响应状态
print("已连接到API服务器正在生成分析结果...")
print("-" * 50)
# 处理流式响应
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
if line == 'data: [DONE]':
continue
try:
content = json.loads(line[6:]) # 去掉 'data: ' 前缀
if content['choices'][0]['delta'].get('content'):
chunk = content['choices'][0]['delta']['content']
print(chunk, end='', flush=True)
output_text = output_text + chunk
file.write(chunk)
file.flush()
except json.JSONDecodeError:
continue
# 添加分隔符
print("\n" + "-" * 50)
print("分析结果生成完毕")
file.write("\n----------------------------------------\n")
file.flush()
try:
print("正在处理段落...")
# 完全重写的段落划分逻辑,基于明确的段落标记
text = output_text.strip()
# 初始化段落内容
fault_reason = ""
maintenance_suggestion = ""
soe_list = ""
fault_data = ""
# 查找各段落的起始位置
first_start = text.find("第一自然段")
second_start = text.find("第二自然段")
third_start = text.find("第三自然段")
fourth_start = text.find("第四自然段")
print(f"段落标记位置: 第一({first_start}), 第二({second_start}), 第三({third_start}), 第四({fourth_start})")
# 提取各段落内容
if first_start != -1:
if second_start != -1:
fault_reason = text[first_start:second_start].strip()
else:
fault_reason = text[first_start:].strip()
if second_start != -1:
if third_start != -1:
maintenance_suggestion = text[second_start:third_start].strip()
else:
maintenance_suggestion = text[second_start:].strip()
if third_start != -1:
if fourth_start != -1:
soe_list = text[third_start:fourth_start].strip()
else:
soe_list = text[third_start:].strip()
if fourth_start != -1:
fault_data = text[fourth_start:].strip()
# 移除段落标题
fault_reason = fault_reason.replace("第一自然段", "", 1).strip()
maintenance_suggestion = maintenance_suggestion.replace("第二自然段", "", 1).strip()
soe_list = soe_list.replace("第三自然段", "", 1).strip()
fault_data = fault_data.replace("第四自然段", "", 1).strip()
# 创建JSON结构
output_json = {
"fault_reason": fault_reason,
"maintenance_suggestion": maintenance_suggestion,
"soe_list": soe_list,
"fault_data": fault_data
}
# 保存JSON到文件
with open(args.json_output, 'w', encoding='utf-8') as json_file:
json.dump(output_json, json_file, ensure_ascii=False, indent=2)
print(f"JSON输出已保存到 {args.json_output}")
except Exception as e:
print(f"处理输出时发生错误: {str(e)}")
# 保存原始输出以便调试
raw_file_path = f"{args.json_output}.raw"
with open(raw_file_path, 'w', encoding='utf-8') as raw_file:
raw_file.write(output_text)
print(f"原始输出已保存到 {raw_file_path}")
except requests.RequestException as e:
error_msg = f"请求错误: {str(e)}"
print(error_msg)
file.write(f"\n{error_msg}\n")
file.flush()
with open(error_file, "w", encoding="utf-8") as f:
f.write(error_msg)
print("=" * 50)
print("分析任务已完成")
print("=" * 50)
return 0 # 返回成功码
# 移除了等待输入
if __name__ == "__main__":
try:
main()
except Exception as e:
error_msg = f"程序执行失败: {str(e)}"
print(error_msg)
with open(os.path.join(get_base_path(), "error.txt"), "w", encoding="utf-8") as f:
f.write(error_msg)