飞腾派坐姿检测

分享作者:Mr_Fang
作者昵称:Mr_Fang
评测品牌:萤火工场
评测型号:CEK8903-E2KQD4-30
发布时间:2026-04-09 09:53:13
1
概要
使用飞腾派实现坐姿检测
开源口碑分享内容

硬件

飞腾派、USB 免驱摄像头

实现原理

PaddleHub 实现人体骨骼关键点识别

根据关键点判断坐姿状态:

  1. 判断歪头 根据 head_top 坐标和 right_shoulder、left_shoulder 中心点的连线偏移角度,来判定头部倾斜角度
  2. 判断低头 根据头顶到脖子的距离和脖子到胸腔的距离进行比较

部署模型

  • 创建虚拟环境
python3 -m venv ~/paddle_env
  • 激活虚拟环境
source ~/paddle_env/bin/activate
  • 安装 PaddlePaddle
pip install paddlepaddle -i https://pypi.tuna.tsinghua.edu.cn/simple
  • 安装 PaddleHub
pip install --upgrade paddlehub -i https://pypi.tuna.tsinghua.edu.cn/simple
  • 使用 PaddleHub 安装骨骼关键点识别模型
hub install human_pose_estimation_resnet50_mpii

程序开发

import os
import sys
import time
import math
import signal
import json
import glob
from collections import deque
from datetime import datetime

import cv2
import paddlehub as hub

CAMERA_DEVICE = "/dev/video0"
INTERVAL = 15
MQTT_TOPIC = "ctrl/lyh"
SAVE_DIR = "./imgs"
MAX_SAVED_IMAGES = 5

STATUS_FAIL = 0      # 失败/异常
STATUS_NORMAL = 1    # 正常
STATUS_HEAD_DOWN = 2 # 低头
STATUS_HEAD_TILT = 3 # 歪头/头部倾斜

running = True
saved_image_paths = deque(maxlen=MAX_SAVED_IMAGES)
print("[系统] 正在加载模型...")
try:
    pose_estimation = hub.Module(name="human_pose_estimation_resnet50_mpii")
    print("[系统] 模型加载完成")
except Exception as e:
    print(f"[系统] 模型加载失败: {e}")
    sys.exit(1)
def Center_Point(point_1, point_2):
    """计算中心点"""
    center_x = math.fabs((point_1[0] - point_2[0]) / 2) + min(point_1[0], point_2[0])
    center_y = math.fabs((point_1[1] - point_2[1]) / 2) + min(point_1[1], point_2[1])
    return [center_x, center_y]


def Cal_Ang(point_1, point_2, point_3):
    """计算夹角"""
    a = math.sqrt((point_2[0] - point_3[0])**2 + (point_2[1] - point_3[1])**2)
    b = math.sqrt((point_1[0] - point_3[0])**2 + (point_1[1] - point_3[1])**2)
    c = math.sqrt((point_1[0] - point_2[0])**2 + (point_1[1] - point_2[1])**2)

    if a * c == 0:
        return 90.0

    cos_value = (b*b - a*a - c*c) / (-2 * a * c)
    cos_value = max(-1.0, min(1.0, cos_value))
    return math.degrees(math.acos(cos_value))
def detect_pose_from_frame(frame):
    """
    检测单帧图像,返回状态码
    0: 失败, 1: 正常, 2: 低头, 3: 歪头
    """
    try:
        temp_path = "/tmp/pose_temp.jpg"
        cv2.imwrite(temp_path, frame)

        result = pose_estimation.keypoint_detection(
            paths=[temp_path],
            visualization=False
        )

        if os.path.exists(temp_path):
            os.remove(temp_path)

        if not result or len(result) == 0:
            return STATUS_FAIL, "未检测到人体", None

        data = result[0]['data']

        left_shoulder = data.get('left_shoulder')
        right_shoulder = data.get('right_shoulder')
        head_top = data.get('head_top')
        upper_neck = data.get('upper_neck')
        thorax = data.get('thorax')

        if not all([left_shoulder, right_shoulder, head_top, upper_neck, thorax]):
            return STATUS_FAIL, "关键点不完整", None

        head_neck = math.fabs(head_top[1] - upper_neck[1])
        neck_thorax = math.fabs(thorax[1] - upper_neck[1])
        is_head_down = (head_neck / 3.0 > neck_thorax)

        center_point = Center_Point(left_shoulder, right_shoulder)
        head_angle = Cal_Ang(head_top, center_point, right_shoulder)
        is_head_tilt = (head_angle <= 80 or head_angle >= 100)

        if is_head_tilt:
            return STATUS_HEAD_TILT, "歪头", data
        elif is_head_down:
            return STATUS_HEAD_DOWN, "低头", data
        else:
            return STATUS_NORMAL, "正常", data

    except Exception as e:
        return STATUS_FAIL, f"检测异常: {str(e)}", None


def save_annotated_image(frame, keypoints_data, status_code):
    """
    保存带标注的图像,带关键点和连线
    """
    global saved_image_paths

    os.makedirs(SAVE_DIR, exist_ok=True)
    annotated = frame.copy()

    # 颜色定义
    color_keypoint = (0, 255, 0)      # 绿色 - 关键点
    color_line = (255, 0, 0)          # 蓝色 - 连线
    color_text = (0, 0, 255)          # 红色 - 文字

    # 关键点映射
    keypoints_map = {
        'head_top': keypoints_data.get('head_top'),
        'upper_neck': keypoints_data.get('upper_neck'),
        'thorax': keypoints_data.get('thorax'),
        'left_shoulder': keypoints_data.get('left_shoulder'),
        'right_shoulder': keypoints_data.get('right_shoulder'),
        'left_elbow': keypoints_data.get('left_elbow'),
        'right_elbow': keypoints_data.get('right_elbow'),
        'left_wrist': keypoints_data.get('left_wrist'),
        'right_wrist': keypoints_data.get('right_wrist'),
    }

    # 骨架连接关系
    connections = [
        ('head_top', 'upper_neck'),
        ('upper_neck', 'thorax'),
        ('thorax', 'left_shoulder'),
        ('thorax', 'right_shoulder'),
        ('left_shoulder', 'left_elbow'),
        ('left_elbow', 'left_wrist'),
        ('right_shoulder', 'right_elbow'),
        ('right_elbow', 'right_wrist'),
    ]

    # 绘制连线
    for p1_name, p2_name in connections:
        p1 = keypoints_map.get(p1_name)
        p2 = keypoints_map.get(p2_name)
        if p1 and p2:
            pt1 = (int(p1[0]), int(p1[1]))
            pt2 = (int(p2[0]), int(p2[1]))
            cv2.line(annotated, pt1, pt2, color_line, 2)

    # 绘制关键点
    for name, point in keypoints_map.items():
        if point:
            x, y = int(point[0]), int(point[1])
            cv2.circle(annotated, (x, y), 4, color_keypoint, -1)
            cv2.circle(annotated, (x, y), 4, (0, 0, 0), 1)

    # 添加状态文字
    status_texts = {0: "FAIL", 1: "NORMAL", 2: "HEAD DOWN", 3: "HEAD TILT"}
    text = f"Status: {status_code} ({status_texts.get(status_code, 'UNKNOWN')})"
    cv2.putText(annotated, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color_text, 2)

    # 添加时间戳
    timestamp = datetime.now().strftime("%H:%M:%S")
    cv2.putText(annotated, timestamp, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color_text, 1)

    # 保存文件
    filename = f"pose_{datetime.now().strftime('%H%M%S')}_{status_code}.jpg"
    filepath = os.path.join(SAVE_DIR, filename)
    cv2.imwrite(filepath, annotated)
    saved_image_paths.append(filepath)

    # 清理旧文件
    all_files = sorted(glob.glob(os.path.join(SAVE_DIR, "pose_*.jpg")), 
                      key=os.path.getctime, reverse=True)
    for old_file in all_files[MAX_SAVED_IMAGES:]:
        try:
            os.remove(old_file)
        except:
            pass

    print(f"[图像] 已保存: {filepath}")
    return filepath
def main():
    global running

    print(f"[系统] 正在打开摄像头: {CAMERA_DEVICE}")
    cap = cv2.VideoCapture(CAMERA_DEVICE)

    if not cap.isOpened():
        print(f"[系统] 无法打开 {CAMERA_DEVICE},尝试索引 0...")
        cap = cv2.VideoCapture(0)
        if not cap.isOpened():
            print("[系统] 无法打开任何摄像头")
            return

    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

    print(f"[系统] 摄像头就绪 ({int(cap.get(3))}x{int(cap.get(4))})")
    print(f"[系统] 检测间隔: {INTERVAL}秒")
    print("=" * 60)
    print("按 Ctrl+C 停止")
    print("=" * 60)

    last_detection_time = 0
    frame_count = 0

    while running:
        ret, frame = cap.read()
        if not ret:
            time.sleep(0.1)
            continue

        frame_count += 1
        current_time = time.time()

        if current_time - last_detection_time >= INTERVAL:
            timestamp = datetime.now().strftime("%H:%M:%S")

            status_code, status_text, keypoints_data = detect_pose_from_frame(frame)

            if keypoints_data is not None:
                save_annotated_image(frame, keypoints_data, status_code)

            print(f"[{timestamp}] 帧{frame_count} | 姿势: {status_code}({status_text})", end="")

            last_detection_time = current_time

        time.sleep(0.1)

    cap.release()
    print("[系统] 已停止")


if __name__ == "__main__":
    main()

运行结果

全部评论
暂无评论
0/144