- 背景
- 方法
- 人体骨架提取
- 空间关系分析
在校园安防中学生安全是第一位的,在本篇博文中,将简单介绍一个小demo,楼道栏杆安全检测。
第一步骨架提取,第二步人体骨架关键部位与栏杆的空间关系分析
人体骨架提取参考https://github.com/Daniil-Osokin/lightweight-human-pose-estimation.pytorch
空间关系分析取0-1段关节,即脖子,作为与栏杆相交的判断依据。如下图所示。
设定栏杆在图像中的位置,由于场景是固定不变的,栏杆也不会变。在我的实验视频中,栏杆线段为:[(280, 374), (500, 0)]
判断两个线段的空间关系,即相交与否,一旦相交就预警。详细参考https://py1995.blog.csdn.net/article/details/120652439
更改后的pose.py里的Pose类
class Pose:
num_kpts = 18
kpt_names = ['nose', 'neck',
'r_sho', 'r_elb', 'r_wri', 'l_sho', 'l_elb', 'l_wri',
'r_hip', 'r_knee', 'r_ank', 'l_hip', 'l_knee', 'l_ank',
'r_eye', 'l_eye',
'r_ear', 'l_ear']
sigmas = np.array([.26, .79, .79, .72, .62, .79, .72, .62, 1.07, .87, .89, 1.07, .87, .89, .25, .25, .35, .35],
dtype=np.float32) / 10.0
vars = (sigmas * 2) ** 2
last_id = -1
color = [0, 224, 255]
def __init__(self, keypoints, confidence):
super().__init__()
self.keypoints = keypoints
self.confidence = confidence
self.bbox = Pose.get_bbox(self.keypoints)
self.id = None
self.filters = [[OneEuroFilter(), OneEuroFilter()] for _ in range(Pose.num_kpts)]
@staticmethod
def get_bbox(keypoints):
found_keypoints = np.zeros((np.count_nonzero(keypoints[:, 0] != -1), 2), dtype=np.int32)
found_kpt_id = 0
for kpt_id in range(Pose.num_kpts):
if keypoints[kpt_id, 0] == -1:
continue
found_keypoints[found_kpt_id] = keypoints[kpt_id]
found_kpt_id += 1
bbox = cv2.boundingRect(found_keypoints)
return bbox
def update_id(self, id=None):
self.id = id
if self.id is None:
self.id = Pose.last_id + 1
Pose.last_id += 1
def draw(self, img):
assert self.keypoints.shape == (Pose.num_kpts, 2)
LineSegment = []
for part_id in range(len(BODY_PARTS_PAF_IDS) - 2):
if part_id not in [12]:
continue
kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0]
global_kpt_a_id = self.keypoints[kpt_a_id, 0]
if global_kpt_a_id != -1:
x_a, y_a = self.keypoints[kpt_a_id]
cv2.circle(img, (int(x_a), int(y_a)), 3, Pose.color, -1)
kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1]
global_kpt_b_id = self.keypoints[kpt_b_id, 0]
if global_kpt_b_id != -1:
x_b, y_b = self.keypoints[kpt_b_id]
cv2.circle(img, (int(x_b), int(y_b)), 3, Pose.color, -1)
if global_kpt_a_id != -1 and global_kpt_b_id != -1:
cv2.line(img, (int(x_a), int(y_a)), (int(x_b), int(y_b)), Pose.color, 2)
LineSegment.append([x_a, y_a, x_b, y_b])
return LineSegment
更改后的demo.py代码
import argparse
import cv2
import numpy as np
import torch
import os
from models.with_mobilenet import PoseEstimationWithMobileNet
from modules.keypoints import extract_keypoints, group_keypoints
from modules.load_state import load_state
from modules.pose import Pose, track_poses
from val import normalize, pad_width
from cv2 import VideoWriter, VideoWriter_fourcc, imread, resize
import glob
class ImageReader(object):
def __init__(self, file_names):
self.file_names = file_names
self.max_idx = len(file_names)
def __iter__(self):
self.idx = 0
return self
def __next__(self):
if self.idx == self.max_idx:
raise StopIteration
img = cv2.imread(self.file_names[self.idx], cv2.IMREAD_COLOR)
if img.size == 0:
raise IOError('Image {} cannot be read'.format(self.file_names[self.idx]))
self.idx = self.idx + 1
return img
class VideoReader(object):
def __init__(self, file_name):
self.file_name = file_name
try: # OpenCV needs int to read from webcam
self.file_name = int(file_name)
except ValueError:
pass
def __iter__(self):
self.cap = cv2.VideoCapture(self.file_name)
if not self.cap.isOpened():
raise IOError('Video {} cannot be opened'.format(self.file_name))
return self
def __next__(self):
was_read, img = self.cap.read()
if not was_read:
raise StopIteration
return img
def infer_fast(net, img, net_input_height_size, stride, upsample_ratio, cpu,
pad_value=(0, 0, 0), img_mean=np.array([128, 128, 128], np.float32), img_scale=np.float32(1/256)):
height, width, _ = img.shape
scale = net_input_height_size / height
scaled_img = cv2.resize(img, (0, 0), fx=scale, fy=scale, interpolation=cv2.INTER_LINEAR)
scaled_img = normalize(scaled_img, img_mean, img_scale)
min_dims = [net_input_height_size, max(scaled_img.shape[1], net_input_height_size)]
padded_img, pad = pad_width(scaled_img, stride, pad_value, min_dims)
tensor_img = torch.from_numpy(padded_img).permute(2, 0, 1).unsqueeze(0).float()
if not cpu:
tensor_img = tensor_img.cuda()
stages_output = net(tensor_img)
stage2_heatmaps = stages_output[-2]
heatmaps = np.transpose(stage2_heatmaps.squeeze().cpu().data.numpy(), (1, 2, 0))
heatmaps = cv2.resize(heatmaps, (0, 0), fx=upsample_ratio, fy=upsample_ratio, interpolation=cv2.INTER_CUBIC)
stage2_pafs = stages_output[-1]
pafs = np.transpose(stage2_pafs.squeeze().cpu().data.numpy(), (1, 2, 0))
pafs = cv2.resize(pafs, (0, 0), fx=upsample_ratio, fy=upsample_ratio, interpolation=cv2.INTER_CUBIC)
return heatmaps, pafs, scale, pad
def mkdir(path):
if not os.path.exists(path):
os.mkdir(path)
def Intersect(l1, l2):
v1 = (l1[0] - l2[0], l1[1] - l2[1])
v2 = (l1[0] - l2[2], l1[1] - l2[3])
v0 = (l1[0] - l1[2], l1[1] - l1[3])
a = v0[0] * v1[1] - v0[1] * v1[0]
b = v0[0] * v2[1] - v0[1] * v2[0]
temp = l1
l1 = l2
l2 = temp
v1 = (l1[0] - l2[0], l1[1] - l2[1])
v2 = (l1[0] - l2[2], l1[1] - l2[3])
v0 = (l1[0] - l1[2], l1[1] - l1[3])
c = v0[0] * v1[1] - v0[1] * v1[0]
d = v0[0] * v2[1] - v0[1] * v2[0]
if a*b < 0 and c*d < 0:
return True
else:
return False
def write(images, outimg=None, fps=5, size=None, is_color=True, outvid='demo.avi'):
fourcc = cv2.VideoWriter_fourcc('m', 'p', '4', 'v')
vid = None
for image in images:
if image.split('.')[-1] != 'jpg':
continue
img = imread(image)
if vid is None:
if size is None:
size = img.shape[1], img.shape[0]
vid = VideoWriter(outvid, fourcc, float(fps), size, is_color)
if size[0] != img.shape[1] and size[1] != img.shape[0]:
img = resize(img, size)
vid.write(img)
vid.release()
return vid
def run_demo(net, image_provider, height_size, cpu, track, smooth):
net = net.eval()
if not cpu:
net = net.cuda()
stride = 8
upsample_ratio = 4
num_keypoints = Pose.num_kpts
previous_poses = []
delay = 1
# fps = 20
# size = (544, 960)
# video_writer = cv2.VideoWriter("passageway2.avi", cv2.VideoWriter_fourcc('I', '4', '2', '0'), fps, size)
mkdir('test_results')
for i, img in enumerate(image_provider):
orig_img = img.copy()
LineSegment = None
heatmaps, pafs, scale, pad = infer_fast(net, img, height_size, stride, upsample_ratio, cpu)
total_keypoints_num = 0
all_keypoints_by_type = []
for kpt_idx in range(num_keypoints): # 19th for bg
total_keypoints_num += extract_keypoints(heatmaps[:, :, kpt_idx], all_keypoints_by_type, total_keypoints_num)
pose_entries, all_keypoints = group_keypoints(all_keypoints_by_type, pafs)
for kpt_id in range(all_keypoints.shape[0]):
all_keypoints[kpt_id, 0] = (all_keypoints[kpt_id, 0] * stride / upsample_ratio - pad[1]) / scale
all_keypoints[kpt_id, 1] = (all_keypoints[kpt_id, 1] * stride / upsample_ratio - pad[0]) / scale
current_poses = []
for n in range(len(pose_entries)):
if len(pose_entries[n]) == 0:
continue
pose_keypoints = np.ones((num_keypoints, 2), dtype=np.int32) * -1
for kpt_id in range(num_keypoints):
if pose_entries[n][kpt_id] != -1.0: # keypoint was found
pose_keypoints[kpt_id, 0] = int(all_keypoints[int(pose_entries[n][kpt_id]), 0])
pose_keypoints[kpt_id, 1] = int(all_keypoints[int(pose_entries[n][kpt_id]), 1])
pose = Pose(pose_keypoints, pose_entries[n][18])
current_poses.append(pose)
if track:
track_poses(previous_poses, current_poses, smooth=smooth)
previous_poses = current_poses
for pose in current_poses:
LineSegment = pose.draw(img)
######## 栏杆线段的坐标
targetLine = [280, 374, 500, 0]
if LineSegment is not None:
for ls in LineSegment:
JudgeInter = Intersect(targetLine, ls)
if JudgeInter:
img = cv2.line(img, (targetLine[0], targetLine [1]), (targetLine [2], targetLine[3]), (0, 0, 255), 4)
w, h = img.shape[:2]
font = cv2.FONT_HERSHEY_SIMPLEX
img = cv2.putText(img, 'WARNING', (w // 2, h // 2), font, 1.2, (0, 0, 255), 2)
cv2.imwrite(f'./test_results/{str(i).zfill(5)}.jpg', img)
break
img = cv2.addWeighted(orig_img, 0.6, img, 0.4, 0)
cv2.imshow('Lightweight Human Pose Estimation Python Demo', img)
cv2.imwrite(f'./test_results/{str(i).zfill(5)}.jpg', img)
key = cv2.waitKey(delay)
if key == 27: # esc
return
elif key == 112: # 'p'
if delay == 1:
delay = 0
else:
delay = 1
fps = 25
imglist = glob.glob('test_results/*.jpg')
write(imglist, fps=fps, outvid='2.mp4')
# python demo.py --checkpoint-path ./ckpts/checkpoint_iter_370000.pth --video D:202110human_action_recognition1.mp4
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='''Lightweight human pose estimation python demo.
This is just for quick results preview.
Please, consider c++ demo for the best performance.''')
parser.add_argument('--checkpoint-path', type=str, required=True, help='path to the checkpoint')
parser.add_argument('--height-size', type=int, default=412, help='network input layer height size')
parser.add_argument('--video', type=str, default='', help='path to video file or camera id')
parser.add_argument('--images', nargs='+', default='', help='path to input image(s)')
parser.add_argument('--cpu', action='store_true', help='run network inference on cpu')
parser.add_argument('--track', type=int, default=1, help='track pose id in video')
parser.add_argument('--smooth', type=int, default=1, help='smooth pose keypoints')
args = parser.parse_args()
if args.video == '' and args.images == '':
raise ValueError('Either --video or --image has to be provided')
net = PoseEstimationWithMobileNet()
checkpoint = torch.load(args.checkpoint_path, map_location='cpu')
load_state(net, checkpoint)
frame_provider = ImageReader(args.images)
if args.video != '':
frame_provider = VideoReader(args.video)
else:
args.track = 0
run_demo(net, frame_provider, args.height_size, args.cpu, args.track, args.smooth)



