yolo11/main/yolo_processor.py
2025-04-16 16:20:42 +08:00

144 lines
5.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import numpy as np
import msgpack
from datetime import datetime
from PIL import ImageFont, ImageDraw, Image
def process_frame_with_yolo(frame, channel_index, camera_configs, yolo_detector, redis_client):
roi = camera_configs[channel_index]['box']
types = camera_configs[channel_index]['types']
height, width = frame.shape[:2]
# ROI坐标转换
x_min = int(roi[0] * width)
y_min = int(roi[1] * height)
x_max = int(roi[2] * width)
y_max = int(roi[3] * height)
roi_converted = (x_min, y_min, x_max - x_min, y_max - y_min)
# 执行YOLO检测
start_time = datetime.now()
frame_with_boxes, results = yolo_detector.process_frame(frame, roi_converted)
process_time = (datetime.now() - start_time).total_seconds() * 1000
print(f"\n通道 {channel_index + 1} 检测结果 (处理时间: {process_time: .2f}ms):")
detections = []
person_boxes = []
helmet_boxes = []
safevest_boxes = []
smoke_boxes = []
other_detections = []
if results is not None and hasattr(results, 'boxes') and len(results.boxes) > 0:
# 新增中文映射
class_mapping = {
"animal": "异物入侵",
"cellphone": "玩手机",
"fire": "起火"
}
# 第一步分类存储所有检测结果根据types过滤
for box, conf, cls in zip(results.boxes.xyxy, results.boxes.conf, results.boxes.cls):
class_name = results.names[int(cls)]
if class_name not in types: # 关键过滤逻辑
continue
x1, y1, x2, y2 = map(int, box)
normalized_box = (
x1 / width,
y1 / height,
(x2 - x1) / width,
(y2 - y1) / height
)
if class_name == "person":
person_boxes.append((box, conf, (x1, y1, x2, y2)))
elif class_name == "helmet":
helmet_boxes.append((x1, y1, x2, y2))
elif class_name == "safevest":
safevest_boxes.append((x1, y1, x2, y2))
elif class_name == "smoke":
smoke_boxes.append((x1, y1, x2, y2))
elif class_name in ["animal", "cellphone", "fire"]:
class_name_cn = class_mapping.get(class_name, class_name)
other_detections.append({
"class": class_name_cn,
"confidence": float(conf),
"bbox": {
"x_min": x1 / width,
"y_min": y1 / height,
"width": (x2 - x1) / width,
"height": (y2 - y1) / height
}
})
print(f"- 独立检测: {class_name_cn}, 置信度: {conf: .2f}, 位置: {box.tolist()}")
# 第二步处理人员状态仅在需要检测person时处理
detections = []
if "person" in types:
for (box, conf, (x1, y1, x2, y2)) in person_boxes:
def calculate_iou(boxA, boxB):
xA = max(boxA[0], boxB[0])
yA = max(boxA[1], boxB[1])
xB = min(boxA[2], boxB[2])
yB = min(boxA[3], boxB[3])
interArea = max(0, xB - xA) * max(0, yB - yA)
boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])
return interArea / float(boxAArea + boxBArea - interArea)
# 根据配置动态判断关联项
has_helmet = any(calculate_iou((x1, y1, x2, y2), h_box) > 0.1 for h_box in helmet_boxes) if "helmet" in types else False
has_safevest = any(calculate_iou((x1, y1, x2, y2), s_box) > 0.1 for s_box in safevest_boxes) if "safevest" in types else False
has_smoke = any(calculate_iou((x1, y1, x2, y2), sm_box) > 0.1 for sm_box in smoke_boxes) if "smoke" in types else False
# 生成状态标签
status_label = "人员"
violations = []
if has_smoke:
status_label = "吸烟"
else:
if "helmet" in types and not has_helmet:
violations.append("未戴安全帽")
if "safevest" in types and not has_safevest:
violations.append("未穿工服")
if violations:
status_label = "违规:" + "".join(violations)
else:
if "helmet" in types or "safevest" in types:
status_label = "着装规范"
detections.append({
"class": status_label,
"confidence": float(conf),
"bbox": {
"x_min": x1 / width,
"y_min": y1 / height,
"width": (x2 - x1) / width,
"height": (y2 - y1) / height
}
})
print(f"- 状态: {status_label}, 置信度: {conf:.2f}, 位置: {box}")
# 添加独立检测类别
detections.extend(other_detections)
if not detections:
print("- 未检测到任何目标")
# 序列化并发送结果
data = {
"channel": str(camera_configs[channel_index]['channel']),
"detections": detections,
"image_size": [width, height]
}
try:
serialized_data = msgpack.packb(data)
redis_client.publish('detection_result_channel', serialized_data)
print(f"[Redis] 通道 {camera_configs[channel_index]['channel']} 数据发送成功")
except Exception as e:
print(f"[Redis] 发送失败: {str(e)}")
return frame_with_boxes