硬件
飞腾派、USB 免驱摄像头

实现原理
PaddleHub 实现人体骨骼关键点识别
根据关键点判断坐姿状态:
- 判断歪头 根据 head_top 坐标和 right_shoulder、left_shoulder 中心点的连线偏移角度,来判定头部倾斜角度
- 判断低头 根据头顶到脖子的距离和脖子到胸腔的距离进行比较
部署模型
- 创建虚拟环境
python3 -m venv ~/paddle_env- 激活虚拟环境
source ~/paddle_env/bin/activate- 安装 PaddlePaddle
pip install paddlepaddle -i https://pypi.tuna.tsinghua.edu.cn/simple- 安装 PaddleHub
pip install --upgrade paddlehub -i https://pypi.tuna.tsinghua.edu.cn/simple- 使用 PaddleHub 安装骨骼关键点识别模型
hub install human_pose_estimation_resnet50_mpii程序开发
import os
import sys
import time
import math
import signal
import json
import glob
from collections import deque
from datetime import datetime
import cv2
import paddlehub as hub
CAMERA_DEVICE = "/dev/video0"
INTERVAL = 15
MQTT_TOPIC = "ctrl/lyh"
SAVE_DIR = "./imgs"
MAX_SAVED_IMAGES = 5
STATUS_FAIL = 0 # 失败/异常
STATUS_NORMAL = 1 # 正常
STATUS_HEAD_DOWN = 2 # 低头
STATUS_HEAD_TILT = 3 # 歪头/头部倾斜
running = True
saved_image_paths = deque(maxlen=MAX_SAVED_IMAGES)print("[系统] 正在加载模型...")
try:
pose_estimation = hub.Module(name="human_pose_estimation_resnet50_mpii")
print("[系统] 模型加载完成")
except Exception as e:
print(f"[系统] 模型加载失败: {e}")
sys.exit(1)def Center_Point(point_1, point_2):
"""计算中心点"""
center_x = math.fabs((point_1[0] - point_2[0]) / 2) + min(point_1[0], point_2[0])
center_y = math.fabs((point_1[1] - point_2[1]) / 2) + min(point_1[1], point_2[1])
return [center_x, center_y]
def Cal_Ang(point_1, point_2, point_3):
"""计算夹角"""
a = math.sqrt((point_2[0] - point_3[0])**2 + (point_2[1] - point_3[1])**2)
b = math.sqrt((point_1[0] - point_3[0])**2 + (point_1[1] - point_3[1])**2)
c = math.sqrt((point_1[0] - point_2[0])**2 + (point_1[1] - point_2[1])**2)
if a * c == 0:
return 90.0
cos_value = (b*b - a*a - c*c) / (-2 * a * c)
cos_value = max(-1.0, min(1.0, cos_value))
return math.degrees(math.acos(cos_value))def detect_pose_from_frame(frame):
"""
检测单帧图像,返回状态码
0: 失败, 1: 正常, 2: 低头, 3: 歪头
"""
try:
temp_path = "/tmp/pose_temp.jpg"
cv2.imwrite(temp_path, frame)
result = pose_estimation.keypoint_detection(
paths=[temp_path],
visualization=False
)
if os.path.exists(temp_path):
os.remove(temp_path)
if not result or len(result) == 0:
return STATUS_FAIL, "未检测到人体", None
data = result[0]['data']
left_shoulder = data.get('left_shoulder')
right_shoulder = data.get('right_shoulder')
head_top = data.get('head_top')
upper_neck = data.get('upper_neck')
thorax = data.get('thorax')
if not all([left_shoulder, right_shoulder, head_top, upper_neck, thorax]):
return STATUS_FAIL, "关键点不完整", None
head_neck = math.fabs(head_top[1] - upper_neck[1])
neck_thorax = math.fabs(thorax[1] - upper_neck[1])
is_head_down = (head_neck / 3.0 > neck_thorax)
center_point = Center_Point(left_shoulder, right_shoulder)
head_angle = Cal_Ang(head_top, center_point, right_shoulder)
is_head_tilt = (head_angle <= 80 or head_angle >= 100)
if is_head_tilt:
return STATUS_HEAD_TILT, "歪头", data
elif is_head_down:
return STATUS_HEAD_DOWN, "低头", data
else:
return STATUS_NORMAL, "正常", data
except Exception as e:
return STATUS_FAIL, f"检测异常: {str(e)}", None
def save_annotated_image(frame, keypoints_data, status_code):
"""
保存带标注的图像,带关键点和连线
"""
global saved_image_paths
os.makedirs(SAVE_DIR, exist_ok=True)
annotated = frame.copy()
# 颜色定义
color_keypoint = (0, 255, 0) # 绿色 - 关键点
color_line = (255, 0, 0) # 蓝色 - 连线
color_text = (0, 0, 255) # 红色 - 文字
# 关键点映射
keypoints_map = {
'head_top': keypoints_data.get('head_top'),
'upper_neck': keypoints_data.get('upper_neck'),
'thorax': keypoints_data.get('thorax'),
'left_shoulder': keypoints_data.get('left_shoulder'),
'right_shoulder': keypoints_data.get('right_shoulder'),
'left_elbow': keypoints_data.get('left_elbow'),
'right_elbow': keypoints_data.get('right_elbow'),
'left_wrist': keypoints_data.get('left_wrist'),
'right_wrist': keypoints_data.get('right_wrist'),
}
# 骨架连接关系
connections = [
('head_top', 'upper_neck'),
('upper_neck', 'thorax'),
('thorax', 'left_shoulder'),
('thorax', 'right_shoulder'),
('left_shoulder', 'left_elbow'),
('left_elbow', 'left_wrist'),
('right_shoulder', 'right_elbow'),
('right_elbow', 'right_wrist'),
]
# 绘制连线
for p1_name, p2_name in connections:
p1 = keypoints_map.get(p1_name)
p2 = keypoints_map.get(p2_name)
if p1 and p2:
pt1 = (int(p1[0]), int(p1[1]))
pt2 = (int(p2[0]), int(p2[1]))
cv2.line(annotated, pt1, pt2, color_line, 2)
# 绘制关键点
for name, point in keypoints_map.items():
if point:
x, y = int(point[0]), int(point[1])
cv2.circle(annotated, (x, y), 4, color_keypoint, -1)
cv2.circle(annotated, (x, y), 4, (0, 0, 0), 1)
# 添加状态文字
status_texts = {0: "FAIL", 1: "NORMAL", 2: "HEAD DOWN", 3: "HEAD TILT"}
text = f"Status: {status_code} ({status_texts.get(status_code, 'UNKNOWN')})"
cv2.putText(annotated, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color_text, 2)
# 添加时间戳
timestamp = datetime.now().strftime("%H:%M:%S")
cv2.putText(annotated, timestamp, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color_text, 1)
# 保存文件
filename = f"pose_{datetime.now().strftime('%H%M%S')}_{status_code}.jpg"
filepath = os.path.join(SAVE_DIR, filename)
cv2.imwrite(filepath, annotated)
saved_image_paths.append(filepath)
# 清理旧文件
all_files = sorted(glob.glob(os.path.join(SAVE_DIR, "pose_*.jpg")),
key=os.path.getctime, reverse=True)
for old_file in all_files[MAX_SAVED_IMAGES:]:
try:
os.remove(old_file)
except:
pass
print(f"[图像] 已保存: {filepath}")
return filepathdef main():
global running
print(f"[系统] 正在打开摄像头: {CAMERA_DEVICE}")
cap = cv2.VideoCapture(CAMERA_DEVICE)
if not cap.isOpened():
print(f"[系统] 无法打开 {CAMERA_DEVICE},尝试索引 0...")
cap = cv2.VideoCapture(0)
if not cap.isOpened():
print("[系统] 无法打开任何摄像头")
return
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
print(f"[系统] 摄像头就绪 ({int(cap.get(3))}x{int(cap.get(4))})")
print(f"[系统] 检测间隔: {INTERVAL}秒")
print("=" * 60)
print("按 Ctrl+C 停止")
print("=" * 60)
last_detection_time = 0
frame_count = 0
while running:
ret, frame = cap.read()
if not ret:
time.sleep(0.1)
continue
frame_count += 1
current_time = time.time()
if current_time - last_detection_time >= INTERVAL:
timestamp = datetime.now().strftime("%H:%M:%S")
status_code, status_text, keypoints_data = detect_pose_from_frame(frame)
if keypoints_data is not None:
save_annotated_image(frame, keypoints_data, status_code)
print(f"[{timestamp}] 帧{frame_count} | 姿势: {status_code}({status_text})", end="")
last_detection_time = current_time
time.sleep(0.1)
cap.release()
print("[系统] 已停止")
if __name__ == "__main__":
main()运行结果


开源社区
