yolo11/main/UploadAlarmMsg.py

159 lines
5.2 KiB
Python
Raw Normal View History

2025-04-16 16:19:53 +08:00
import requests
from typing import Optional, Dict, Any
import logging
import json
from datetime import datetime
import time
# 配置日志系统
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('alarm_upload.log', encoding='utf-8'),
logging.StreamHandler()
]
)
# 审计日志独立配置(记录原始请求响应数据)
audit_logger = logging.getLogger('audit')
audit_logger.setLevel(logging.INFO)
audit_handler = logging.FileHandler('alarm_audit.log', encoding='utf-8')
audit_handler.setFormatter(logging.Formatter('%(message)s'))
audit_logger.addHandler(audit_handler)
def upload_alarm(
camera_id: str,
detection_status: Dict[str, Any],
content: str,
timeout: int = 10
) -> Optional[Dict]:
"""
增强版安全报警信息上传函数
Parameters:
camera_id: 摄像头唯一标识ID
detection_status: 检测状态字典需包含
{
"has_head": bool,
"has_helmet": bool,
"has_safevest": bool,
"timestamp": str # 格式YYYY-MM-DD HH:MM:SS
}
content: 报警内容描述
timeout: 请求超时时间
Returns:
服务器响应字典成功时None失败时
"""
# 参数验证
required_keys = ['has_head', 'has_helmet', 'has_safevest', 'timestamp']
if not all(key in detection_status for key in required_keys):
logging.error(f"参数错误:缺失必要检测状态字段,需要包含:{required_keys}")
return None
if not camera_id or not content:
logging.error("参数错误camera_id 和 content 不能为空")
return None
# 构造请求参数
base_url = "http://192.168.110.229:38090/api/services/isas/VideoElectronicFence/UploadAlarmMsg"
params = {
"id": camera_id,
"content": f"[{detection_status['timestamp']}] {content}",
"details": json.dumps({
"camera_id": camera_id,
"detection_status": detection_status,
# "system_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}, ensure_ascii=False)
}
# 审计日志数据
audit_data = {
"camera_id": camera_id,
# "request_time": time.strftime("%Y-%m-%d %H:%M:%S"),
"request_params": params,
"response_status": None,
"response_content": None,
"exception": None
}
try:
# 发送GET请求
response = requests.get(
base_url,
params=params,
timeout=timeout
)
audit_data["response_status"] = response.status_code
# HTTP状态码检查
if response.status_code != 200:
logging.error(f"HTTP错误 | 摄像头: {camera_id} | 状态码: {response.status_code}")
audit_data["exception"] = f"HTTP错误{response.status_code}"
return None
# 解析JSON响应
try:
json_data = response.json()
audit_data["response_content"] = json_data
except ValueError:
logging.error(f"响应解析失败 | 摄像头: {camera_id}")
audit_data["exception"] = "无效的JSON响应"
return None
# 检查业务逻辑成功状态
if json_data.get("success", False):
logging.info(f"报警上传成功 | 摄像头: {camera_id} | 内容: {content}")
return json_data
else:
error_msg = json_data.get("result", {}).get("message", "未知错误")
logging.error(f"业务逻辑错误 | 摄像头: {camera_id} | 错误: {error_msg}")
audit_data["exception"] = error_msg
return None
except requests.exceptions.Timeout:
error_msg = f"请求超时 | 摄像头: {camera_id} | 超时时间: {timeout}s"
logging.error(error_msg)
audit_data["exception"] = error_msg
except requests.exceptions.RequestException as e:
error_msg = f"网络请求异常 | 摄像头: {camera_id} | 错误: {str(e)}"
logging.error(error_msg)
audit_data["exception"] = str(e)
except Exception as e:
error_msg = f"未知错误 | 摄像头: {camera_id} | 错误: {str(e)}"
logging.error(error_msg)
audit_data["exception"] = str(e)
finally:
# 记录审计日志
audit_logger.info(json.dumps(audit_data, ensure_ascii=False))
return None
# 测试用例
if __name__ == "__main__":
# 模拟检测状态
test_status = {
"has_head": True,
"has_helmet": False,
"has_safevest": True,
# "timestamp": "2023-08-20 15:30:00"
}
# 测试正常上传
result = upload_alarm(
camera_id="1236",
detection_status=test_status,
content="安全警报:未佩戴安全帽"
)
print("上传结果:", result)
# 测试错误参数
result = upload_alarm(
camera_id="",
detection_status=test_status,
content=""
)
print("错误测试结果:", result)