funtion integraty

This commit is contained in:
Tang1705
2020-07-10 22:58:44 +08:00
parent aaa3799fe2
commit b79ae2e115
18 changed files with 4283 additions and 337 deletions

View File

@@ -1,4 +1,5 @@
# 摄像头实时人脸识别
import threading
import dlib
import numpy as np
@@ -9,8 +10,11 @@ import time
import facenet
from PIL import Image, ImageDraw, ImageFont
from Post import post
from model import create_model
import smile_detection
start_time = 0
# 1. Dlib 正向人脸检测器
# detector = dlib.get_frontal_face_detector()
@@ -20,10 +24,10 @@ detector = cv2.dnn.readNetFromCaffe("data/data_opencv/deploy.prototxt.txt",
"data/data_opencv/res10_300x300_ssd_iter_140000.caffemodel")
# 2. Dlib 人脸 landmark 特征点检测器
predictor = dlib.shape_predictor('data/data_dlib/shape_predictor_68_face_landmarks.dat')
# predictor = dlib.shape_predictor('data/data_dlib/shape_predictor_68_face_landmarks.dat')
# 3. Dlib Resnet 人脸识别模型,提取 128D 的特征矢量
face_reco_model = dlib.face_recognition_model_v1("data/data_dlib/dlib_face_recognition_resnet_model_v1.dat")
# face_reco_model = dlib.face_recognition_model_v1("data/data_dlib/dlib_face_recognition_resnet_model_v1.dat")
nn4_small2_pretrained = create_model()
nn4_small2_pretrained.load_weights('weights/nn4.small2.v1.h5')
@@ -35,6 +39,7 @@ class Face_Recognizer:
self.features_known_list = []
# 存储录入人脸名字
self.loaded = False
self.name_known_cnt = 0
self.name_known_list = []
@@ -44,6 +49,7 @@ class Face_Recognizer:
# 存储当前摄像头中捕获到的所有人脸的坐标名字
self.pos_camera_list = []
self.name_camera_list = []
self.type_camera_list = []
# 存储当前摄像头中捕获到的人脸数
self.faces_cnt = 0
# 存储当前摄像头中捕获到的人脸特征
@@ -55,31 +61,38 @@ class Face_Recognizer:
# 从 "features_all.csv" 读取录入人脸特征
def get_face_database(self):
if os.path.exists("data/data_faces_from_camera/"):
self.metadata = facenet.load_metadata("data/data_faces_from_camera/")
self.name_known_cnt = self.metadata.shape[0]
self.embedded = np.zeros((self.metadata.shape[0], 128))
for i, m in enumerate(self.metadata):
for j, n in enumerate(m):
img = facenet.load_image(n.image_path())
# img = align_image(img)
img = cv2.resize(img, (96, 96))
# scale RGB values to interval [0,1]
img = (img / 255.).astype(np.float32)
# obtain embedding vector for image
self.embedded[i] = nn4_small2_pretrained.predict(np.expand_dims(img, axis=0))[0]
# self.embedded[i] = self.embedded[i] / len(m)
self.name_known_list.append('')
if self.loaded:
return 1
else:
print('##### Warning #####', '\n')
print("'features_all.csv' not found!")
print(
"Please run 'get_faces_from_camera.py' before 'face_reco_from_camera.py'",
'\n')
print('##### End Warning #####')
return 0
if os.path.exists("data/data_faces_from_camera/"):
self.metadata = facenet.load_metadata("data/data_faces_from_camera/")
self.name_known_cnt = self.metadata.shape[0]
self.embedded = np.zeros((self.metadata.shape[0], 128))
for i, m in enumerate(self.metadata):
for j, n in enumerate(m):
for k, p in enumerate(n):
img = facenet.load_image(p.image_path())
# img = align_image(img)
img = cv2.resize(img, (96, 96))
# scale RGB values to interval [0,1]
img = (img / 255.).astype(np.float32)
# obtain embedding vector for image
self.embedded[i] = nn4_small2_pretrained.predict(np.expand_dims(img, axis=0))[0]
# self.embedded[i] = self.embedded[i] / len(m)
path = p.image_path().replace("\\", "/")
self.name_known_list.append(path.split('/')[-2])
self.type_camera_list.append(path.split('/')[-3])
self.loaded = True
return 1
else:
print('##### Warning #####', '\n')
print("'features_all.csv' not found!")
print(
"Please run 'get_faces_from_camera.py' before 'face_reco_from_camera.py'",
'\n')
print('##### End Warning #####')
return 0
# 计算两个128D向量间的欧式距离
# @staticmethod
@@ -106,23 +119,25 @@ class Face_Recognizer:
def draw_name(self, img_rd):
# 在人脸框下面写人脸名字
img_with_name = img_rd
font = ImageFont.truetype("simsun.ttc", 30, index=1)
img = Image.fromarray(cv2.cvtColor(img_rd, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img)
for i in range(self.faces_cnt):
# cv2.putText(img_rd, self.name_camera_list[i], self.pos_camera_list[i], font, 0.8, (0, 255, 255), 1, cv2.LINE_AA)
draw.text(xy=self.pos_camera_list[i], text=self.name_camera_list[i], font=font)
img_with_name = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
if self.name_camera_list[i] != 'unknown':
# cv2.putText(img_rd, self.name_camera_list[i], self.pos_camera_list[i], font, 0.8, (0, 255, 255), 1, cv2.LINE_AA)
draw.text(xy=self.pos_camera_list[i], text=self.name_camera_list[i], font=font)
img_with_name = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
return img_with_name
# 修改显示人名
def modify_name_camera_list(self):
# TODO 数据库 ID
# Default known name: person_1, person_2, person_3
self.name_known_list[0] = '唐麒'.encode('utf-8').decode()
self.name_known_list[1] = '段海燕'.encode('utf-8').decode()
# Default known name: 1, 2, person_3
self.name_known_list[0] = '1'.encode('utf-8').decode()
self.name_known_list[1] = 'Tony Blair'.encode('utf-8').decode()
# self.name_known_list[2] = '唐保生'.encode('utf-8').decode()
# self.name_known_list[3] = '唐麒'.encode('utf-8').decode()
# self.name_known_list[3] = '1'.encode('utf-8').decode()
# self.name_known_list[4] ='xx'.encode('utf-8').decode()
# 处理获取的视频流,进行人脸识别
@@ -180,7 +195,7 @@ class Face_Recognizer:
# 确定人名的位置坐标
# 先默认所有人不认识,是 unknown
# Set the default names of faces with "unknown"
self.name_camera_list.append("陌生人")
self.name_camera_list.append("unknown")
# 每个捕获人脸的名字坐标
box = faces[0, 0, k, 3:7] * np.array([w, h, w, h])
@@ -188,62 +203,84 @@ class Face_Recognizer:
self.pos_camera_list.append(tuple(
[int(startX + 5), int(startY - 30)]))
height = (endY - startY)
width = (endX - startX)
# height = (endY - startY)
# width = (endX - startX)
img_blank = np.zeros((height, width, 3), np.uint8)
for ii in range(height):
for jj in range(width):
img_blank[ii][jj] = img_rd[startY + ii][startX + jj]
# img_blank = np.zeros((height, width, 3), np.uint8)
img_blank = img_rd[startY:endY, startX:endX]
try:
# for ii in range(height):
# for jj in range(width):
# img_blank[ii][jj] = img_rd[startY + ii][startX + jj]
img = cv2.resize(img_blank, (96, 96))
img = (img / 255.).astype(np.float32)
img = nn4_small2_pretrained.predict(np.expand_dims(img, axis=0))[0]
img = cv2.resize(img_blank, (96, 96))
img = (img / 255.).astype(np.float32)
img = nn4_small2_pretrained.predict(np.expand_dims(img, axis=0))[0]
# 5. 对于某张人脸,遍历所有存储的人脸特征
e_distance_list = []
for i in range(0, len(self.embedded)):
e_distance_list.append(facenet.distance(self.embedded[i], img))
# for i in range(len(self.features_known_list)):
# # 如果 person_X 数据不为空
# if str(self.features_known_list[i][0]) != '0.0':
# # print("with person", str(i + 1), "the e distance: ", end='')
# e_distance_tmp = self.return_euclidean_distance(self.features_camera_list[k],
# self.features_known_list[i])
# # print(e_distance_tmp)
# e_distance_list.append(e_distance_tmp)
# else:
# # 空数据 person_X
# e_distance_list.append(999999999)
# # 6. 寻找出最小的欧式距离匹配
similar_person_num = e_distance_list.index(min(e_distance_list))
# print("Minimum e distance with person", self.name_known_list[similar_person_num])
# print(min(e_distance_list))
if min(e_distance_list) < 0.58:
self.name_camera_list[k] = self.name_known_list[similar_person_num % 8]
# print("May be person " + str(self.name_known_list[similar_person_num]))
else:
pass
# print("Unknown person")
# 矩形框
for kk, d in enumerate(faces):
# 绘制矩形框
if self.name_camera_list[k] != '陌生人':
cv2.rectangle(img_rd, tuple([startX, startY]), tuple([endX, endY]),
(0, 255, 0), 2)
cv2.rectangle(img_rd, tuple([startX, startY - 35]), tuple([endX, startY]),
(0, 255, 0), cv2.FILLED)
# 5. 对于某张人脸,遍历所有存储的人脸特征
e_distance_list = []
for i in range(0, len(self.embedded)):
e_distance_list.append(facenet.distance(self.embedded[i], img))
# for i in range(len(self.features_known_list)):
# # 如果 person_X 数据不为空
# if str(self.features_known_list[i][0]) != '0.0':
# # print("with person", str(i + 1), "the e distance: ", end='')
# e_distance_tmp = self.return_euclidean_distance(self.features_camera_list[k],
# self.features_known_list[i])
# # print(e_distance_tmp)
# e_distance_list.append(e_distance_tmp)
# else:
# # 空数据 person_X
# e_distance_list.append(999999999)
# # 6. 寻找出最小的欧式距离匹配
similar_person_num = e_distance_list.index(min(e_distance_list))
# print("Minimum e distance with person", self.name_known_list[similar_person_num])
# print(min(e_distance_list))
if min(e_distance_list) < 0.58:
self.name_camera_list[k] = self.name_known_list[similar_person_num % 8]
if self.type_camera_list[similar_person_num % 8] == 'elder':
mode = smile_detection.smile_detect(img_blank)
if mode == 'happy':
cv2.imwrite('smile_detection.jpg', img_rd)
cv2.rectangle(img_rd, tuple([startX, startY - 70]),
tuple([endX, startY - 35]),
(0, 215, 255), cv2.FILLED)
cv2.putText(img_rd, 'happy', (startX + 5, startY - 45), cv2.FONT_ITALIC, 1,
(255, 255, 255), 1, cv2.LINE_AA)
# t = threading.Thread(target=post(elder_id=self.name_camera_list[k], event=0,
# imagePath='smile_detection.jpg'))
# t.start()
# print("May be person " + str(self.name_known_list[similar_person_num]))
elif min(e_distance_list) > 0.75:
self.name_camera_list[k] = '陌生人'
cv2.imwrite('stranger_detection.jpg', img_rd)
# t = threading.Thread(target=post(event=2, imagePath='stranger_detection.jpg'))
# t.start()
else:
cv2.rectangle(img_rd, tuple([startX, startY]), tuple([endX, endY]),
(0, 0, 255), 2)
cv2.rectangle(img_rd, tuple([startX, startY - 35]), tuple([endX, startY]),
(0, 0, 255), cv2.FILLED)
pass
# print("Unknown person")
# 矩形框
for kk, d in enumerate(faces):
# 绘制矩形框
if self.name_camera_list[k] == '陌生人':
cv2.rectangle(img_rd, tuple([startX, startY]), tuple([endX, endY]),
(0, 0, 255), 2)
cv2.rectangle(img_rd, tuple([startX, startY - 35]), tuple([endX, startY]),
(0, 0, 255), cv2.FILLED)
elif self.name_camera_list[k] != 'unknown':
cv2.rectangle(img_rd, tuple([startX, startY]), tuple([endX, endY]),
(0, 255, 0), 2)
cv2.rectangle(img_rd, tuple([startX, startY - 35]), tuple([endX, startY]),
(0, 255, 0), cv2.FILLED)
except:
continue
# print('\n')
# self.faces_cnt = faces.shape[2]
# if len(self.name_camera_list) > 0:
# 7. 在这里更改显示的人名
self.modify_name_camera_list()
# self.modify_name_camera_list()
# 8. 写名字
# self.draw_name(img_rd)
img_with_name = self.draw_name(img_rd)
@@ -269,6 +306,10 @@ class Face_Recognizer:
def main():
# Calibration_on = Calibration()
# scale = Calibration_on.run()
# print(scale)
Face_Recognizer_con = Face_Recognizer()
Face_Recognizer_con.run()

View File

@@ -17,6 +17,7 @@ action_map = {'look_ahead': '请看前方', 'blink': '请眨眼', 'open_mouth':
'smile': '请笑一笑', 'rise_head': '请抬头',
'bow_head': '请低头', 'look_left': '请看左边',
'look_right': '请看右边', 'over': '录入完成'}
people_type_dict = {'0': 'elder', '1': 'worker', '2': 'volunteer'}
# Dlib 正向人脸检测器
# detector = dlib.get_frontal_face_detector()
@@ -27,7 +28,7 @@ detector = cv2.dnn.readNetFromCaffe("data/data_opencv/deploy.prototxt.txt",
class Face_Register:
def __init__(self):
def __init__(self, people_type, id):
self.init = False
self.path_photos_from_camera = "data/data_faces_from_camera/"
self.font = cv2.FONT_ITALIC
@@ -47,18 +48,24 @@ class Face_Register:
self.frame_start_time = 0
self.fps = 0
def speak(self, text, rate=2):
self.people_type = people_type_dict[str(people_type)]
self.id = id
def speak(self):
text = action_map[action_list[self.index]]
speak = win32com.client.Dispatch('Sapi.SpVoice')
# speak.Voice = speak.GetVoices('Microsoft Zira')
speak.Volume = 100
speak.Rate = rate
speak.Rate = 2
speak.Speak(text)
# 新建保存人脸图像文件
def pre_work_mkdir(self):
if os.path.isdir(self.path_photos_from_camera):
pass
if os.path.isdir(self.path_photos_from_camera + '/' + self.people_type):
pass
else:
os.mkdir(self.path_photos_from_camera + '/' + self.people_type)
else:
os.mkdir(self.path_photos_from_camera)
@@ -71,7 +78,8 @@ class Face_Register:
if os.path.isfile("data/features_all.csv"):
os.remove("data/features_all.csv")
# 如果有之前录入的人脸, 在之前 person_x 的序号按照 person_x+1 开始录入
# 如果有之前录入的人脸, 在之前 person_x 的序号按照 person_x+1 开始录入 (v1)
# 根据传入的人员类型和 id 录入 (v2)
def check_existing_faces_cnt(self):
if os.listdir("data/data_faces_from_camera/"):
# 获取已录入的最后一个人脸序号
@@ -81,8 +89,8 @@ class Face_Register:
person_num_list.append(int(person.split('_')[-1]))
self.existing_faces_cnt = max(person_num_list)
# 如果第一次存储或者没有之前录入的人脸, 按照 person_1 开始录入
# Start from person_1
# 如果第一次存储或者没有之前录入的人脸, 按照 1 开始录入
# Start from 1
else:
self.existing_faces_cnt = 0
@@ -127,7 +135,7 @@ class Face_Register:
# self.pre_work_del_old_face_folders()
# 3. 检查 "/data/data_faces_from_camera" 中已有人脸文件
self.check_existing_faces_cnt()
# self.check_existing_faces_cnt()
while stream.isOpened():
self.faces_cnt = 0
@@ -143,8 +151,9 @@ class Face_Register:
# 4. 按下 'n' 新建存储人脸的文件夹
if kk == ord('n'):
self.existing_faces_cnt += 1
current_face_dir = self.path_photos_from_camera + "person_" + str(self.existing_faces_cnt)
# self.existing_faces_cnt += 1
current_face_dir = self.path_photos_from_camera + '/' + self.people_type + '/' + self.id
# current_face_dir = self.path_photos_from_camera + "person_" + str(self.existing_faces_cnt)
os.makedirs(current_face_dir)
print('\n')
print("新建的人脸文件夹 / Create folders: ", current_face_dir)
@@ -174,12 +183,13 @@ class Face_Register:
height = (endY - startY)
width = (endX - startX)
hh = int(height / 2)
ww = int(width / 2)
# hh = int(height / 2)
# ww = int(width / 2)
# 6. 判断人脸矩形框是否超出 480x640
if (endX + ww) > 640 or (endY + hh > 480) or (startX - ww < 0) or (
startY - hh < 0):
if endX > 640 or endY > 480or startX < 0 or startY < 0:
# if (endX + ww) > 640 or (endY + hh > 480) or (startX - ww < 0) or (
# startY - hh < 0):
cv2.putText(img_rd, "OUT OF RANGE", (20, 300), self.font, 0.8, (0, 0, 255), 1, cv2.LINE_AA)
color_rectangle = (0, 0, 255)
save_flag = 0
@@ -190,8 +200,8 @@ class Face_Register:
save_flag = 1
cv2.rectangle(img_rd,
tuple([startX - ww, startY - hh]),
tuple([endX + ww, endY + hh]),
tuple([startX , startY]),
tuple([endX , endY ]),
color_rectangle, 2)
# 7. 根据人脸大小生成空的图像
@@ -208,12 +218,13 @@ class Face_Register:
if self.index <= 7:
for ii in range(height):
for jj in range(width):
img_blank[ii][jj] = img_rd[startY + ii][startX + jj]
img_blank[ii][jj] = img_rd[startY + ii][startX + jj]
cv2.imwrite(current_face_dir + "/img_face_" + str(self.ss_cnt) + ".jpg", img_blank)
print("写入本地 / Save into",
str(current_face_dir) + "/img_face_" + str(self.ss_cnt) + ".jpg")
self.index += 1
self.speak(action_map[action_list[self.index]], 2)
if self.index < len(action_list) - 1:
self.index += 1
self.speak()
else:
print("请先按 'N' 来建文件夹, 按 'S' / Please press 'N' and press 'S'")
# self.faces_cnt = len(faces)
@@ -231,7 +242,7 @@ class Face_Register:
cv2.imshow("camera", img_rd)
if not self.init:
self.speak(self.speak(action_map[action_list[self.index]], 2))
self.speak()
self.init = True
def run(self):
@@ -243,7 +254,7 @@ class Face_Register:
def main():
Face_Register_con = Face_Register()
Face_Register_con = Face_Register(people_type=1, id='3')
Face_Register_con.run()

View File

@@ -10,6 +10,7 @@ from GFmatrix import GF
np.seterr(invalid='ignore')
# config = {'morning': [80, 220, 125, 125]}
@@ -26,6 +27,8 @@ class Calibration:
self.frame_start_time = 0
self.fps = 0
self.scale = 1
# 获取处理之后 stream 的帧数
def update_fps(self):
now = time.time()
@@ -249,7 +252,7 @@ class Calibration:
def decode(self, frame, feature_point):
points = []
position = []
frame=aug(frame)
frame = aug(frame)
color_map = np.zeros((frame.shape[0], frame.shape[1]))
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(frame)
@@ -319,8 +322,8 @@ class Calibration:
position.append([i, j])
except:
pass
cv2.imshow('lab', frame)
cv2.waitKey(0) # 按0退出
# cv2.imshow('lab', frame)
# cv2.waitKey(0) # 按0退出
return points, position
def process(self, stream):
@@ -372,10 +375,10 @@ class Calibration:
world_distance = math.sqrt((feature_points[index + 1][0] * 2 - feature_points[index][0] * 2) ** 2 +
(feature_points[index + 1][1] * 2 - feature_points[index][1] * 2) ** 2)
scale = world_distance / pixel_distance
print(pixel_distance)
print(feature_points[index + 1][0] - feature_points[index][0],
feature_points[index + 1][1] - feature_points[index][1])
self.scale = world_distance / pixel_distance
# print(pixel_distance)
# print(feature_points[index + 1][0] - feature_points[index][0],
# feature_points[index + 1][1] - feature_points[index][1])
# print(distance)
# for i in range(index - 1, index + 2):
# print(distance[i])
@@ -389,10 +392,10 @@ class Calibration:
# point_size, point_color, thickness)
for point in featurepoints_position:
cv2.circle(img_rd, (int(point[1]), int(point[0])), point_size, point_color, thickness)
cv2.namedWindow("image")
cv2.imshow('image', img_rd)
cv2.waitKey(0) # 按0退出
return scale
# cv2.namedWindow("image")
# cv2.imshow('image', img_rd)
# cv2.waitKey(0) # 按0退出
self.draw_note(img_rd)
self.update_fps()
cv2.imshow("image", img_rd)
@@ -404,10 +407,13 @@ class Calibration:
cap.release()
cv2.destroyAllWindows()
return self.scale
def main():
Calibration_on = Calibration()
Calibration_on.run()
scale = Calibration_on.run()
print(scale)
if __name__ == "__main__":

View File

@@ -0,0 +1,362 @@
import math
import time
import cv2
import numpy as np
from auto_whiteBalance import aug
from GFmatrix import GF
np.seterr(invalid='ignore')
# config = {'morning': [80, 220, 125, 125]}
class Calibration:
def __init__(self):
self.font = cv2.FONT_ITALIC
self.flag = np.zeros((1024, 1280))
self.nx = [-1, -1, -1, 0, 0, 1, 1, 1]
self.ny = [1, 0, -1, 1, -1, 1, 0, -1]
self.frame_time = 0
self.frame_start_time = 0
self.fps = 0
self.scale = 1
# 获取处理之后 stream 的帧数
def update_fps(self):
now = time.time()
self.frame_time = now - self.frame_start_time
if self.frame_time != 0:
self.fps = 1.0 / self.frame_time
self.frame_start_time = now
# 生成的 cv2 window 上面添加说明文字
def draw_note(self, img_rd):
# 添加说明
cv2.putText(img_rd, "Calibration", (20, 40), self.font, 1, (255, 255, 255), 1, cv2.LINE_AA)
cv2.putText(img_rd, "FPS: " + str(self.fps.__round__(2)), (20, 100), self.font, 0.8, (0, 255, 0), 1,
cv2.LINE_AA)
cv2.putText(img_rd, "S: Calibrate Current Frame", (20, 400), self.font, 0.8, (255, 255, 255), 1, cv2.LINE_AA)
cv2.putText(img_rd, "Q: Quit", (20, 450), self.font, 0.8, (255, 255, 255), 1, cv2.LINE_AA)
"""
get_candidate_points:
将RGB图像转为单通道
计算每一个像素点横向和纵向的单通道色彩强度差,选择值最大的通道为该像素点的差值,将差值大于选定阈值的点视为候选点
在选定的窗口大小中进行计算,减少处理数据的数量
返回存有候选点数据的矩阵
-1为候选点
-3为非候选点
"""
def get_candidate_points(self, frame):
candidate = np.zeros((frame.shape[0], frame.shape[1])) # 初始化图像点位置矩阵为0
b, g, r = cv2.split(frame)
i = int(frame.shape[0] / 2)
while i < int(7 * frame.shape[0] / 8):
j = 0
while j < int(frame.shape[1] / 2):
sum = [0, 0, 0] # 存储r,g,b三个通道的差值结果
tempxy = [0, 0, 0, 0, 0, 0] # 存储三个通道纵向和横向的临时求和结果
k = -6
while k < 7: # 十字掩码长度选择为菱形对角线长度的一半
tempxy[0] = tempxy[0] + b[i + k][j] # b通道水平方向
tempxy[1] = tempxy[1] + b[i][j + k] # b通道铅直方向
tempxy[2] = tempxy[2] + g[i + k][j] # g通道水平方向
tempxy[3] = tempxy[3] + g[i][j + k] # g通道铅直方向
tempxy[4] = tempxy[4] + r[i + k][j] # r通道水平方向
tempxy[5] = tempxy[5] + r[i][j + k] # r通道铅直方向
k = k + 1
sum[0] = sum[0] + abs(tempxy[0] - tempxy[1]) # r通道差值
sum[1] = sum[1] + abs(tempxy[2] - tempxy[3]) # g通道差值
sum[2] = sum[2] + abs(tempxy[4] - tempxy[5]) # b通道差值
d = max(sum[0], sum[1], sum[2]) # 选择差值最大的通道
# print(d)
if d > 1300: # tq和zyy人工学习调参选阈值阈值增大候选点集中于球体中央
candidate[i][j] = -1 # -1标记为候选点
# point_size = 1
# point_color = (0, 0, 255)
# thickness = 0 # 可以为 0 、4、8
#
# cv2.circle(frame, (j,i), point_size, point_color, thickness)
else:
candidate[i][j] = -3 # -3标记为非候选点
j = j + 1
i = i + 1
# cv2.imshow('1', frame)
# cv2.waitKey(0) # 按0退出
return candidate
"""
get_grid_points:
将RGB图像转为灰度图像
根据真正的角点具有严格的中心对称性,将相关系数大于选定阈值的点选做特征点
并根据P1和P2类型点的特征左右或上下为模式元素将特征点分类为两类特征点
白色背景的灰度值高于颜色元素的灰度值
圆邻域采用SUSAN角点检测法的圆邻域直径为7
1为角点
2为非角点
"""
def get_grid_points(self, frame, candidate):
GrayImage = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gridpoints = np.zeros((frame.shape[0], frame.shape[1]))
size = 7
counter = 0
circle_neighborhood = np.zeros((size, size))
for i in range(0, size):
for j in range(0, size):
if i < (size - 1) / 2:
if j >= (size - 1) / 2 - i - (size - 3) / 4 and j <= (size - 1) / 2 + i + + (size - 3) / 4:
counter += 1
circle_neighborhood[i][j] = 1
elif i >= (size + 5) / 4 and i <= (3 * size - 5) / 4:
counter += 1
circle_neighborhood[i][j] = 1
else:
if j >= (size - 1) / 2 - (size - 1 - i) - (size - 3) / 4 and j <= (size - 1) / 2 + (
size - 1 - i) + (
size - 3) / 4:
counter += 1
circle_neighborhood[i][j] = 1
i = int(frame.shape[0] / 2)
while i < int(7 * frame.shape[0] / 8):
j = 0
while j < int(frame.shape[1] / 2):
"""
element元素说明
便于计算圆形邻域的相关系数引入变量element
第一个元素M_{Ci} * M_{Ci}'之和
第二个元素M_{Ci}之和
第三个元素M_{Ci}'之和
第四个元素M_{Ci}^2之和
第五个元素:(M_{Ci}')^2之和
"""
element = [0, 0, 0, 0, 0]
if candidate[i][j] == -1:
# 计算直径为7的圆邻域的色彩强度与数据结构中数组和矩阵的关系相似
p = -3
while p < 4:
q = -3
while q < 4:
if circle_neighborhood[p][q] == 1:
# 以圆心像素点为0,0
# 计算其余点的坐标和旋转180度后的坐标
# 原坐标
imgx = i + p
imgy = j + q
# 旋转180度后的坐标
imgxp = i - p
imgyp = j - q
element[0] = element[0] + int(GrayImage[imgx][imgy]) * int(GrayImage[imgxp][imgyp])
element[1] = element[1] + int(GrayImage[imgx][imgy])
element[2] = element[2] + int(GrayImage[imgxp][imgyp])
element[3] = element[3] + int(GrayImage[imgx][imgy] ** 2)
element[4] = element[4] + int(GrayImage[imgxp][imgyp] ** 2)
q = q + 1
p = p + 1
pc = (counter * element[0] - element[1] * element[2]) / (
np.sqrt(counter * element[3] - element[1] ** 2) * np.sqrt(
counter * element[4] - element[2] ** 2))
if pc > 0.5: # 相关系数足够大的点被判断为特征点(对称系数为0的为特征点——A Twofold...)
gridpoints[i][j] = 1
# point_size = 1
# point_color = (0, 0, 255)
# thickness = 0 # 可以为 0 、4、8
#
# cv2.circle(frame, (j, i), point_size, point_color, thickness)
else:
gridpoints[i][j] = 0
j = j + 1
i = i + 1
# cv2.imshow('2', frame)
# cv2.waitKey(0) # 按0退出
return gridpoints
"""
bfs8:
8邻域广度优先搜索
确定唯一特征点位置
"""
def bfs8(self, frame, g, x, y):
counter = 1
queue = [[x, y]]
ans = [0, 0]
self.flag[x][y] = 1
ans[0] = 1.0 * x
ans[1] = 1.0 * y
while len(queue) > 0:
current = queue.pop(0)
self.flag[current[0]][current[1]] = 1
i = 0
while i < 8:
temp = [0, 0]
temp[0] = current[0] + self.nx[i]
temp[1] = current[1] + self.ny[i]
if temp[0] < 0 or temp[0] > frame.shape[0] or temp[1] < 0 or temp[1] > frame.shape[1]:
i = i + 1
continue
if self.flag[int(temp[0])][int(temp[1])] or g[int(temp[0])][int(temp[1])] == 0:
i = i + 1
continue
self.flag[int(temp[0])][int(temp[1])] = 1
queue.append(temp)
ans[0] = ans[0] + 1.0 * temp[0]
ans[1] = ans[1] + 1.0 * temp[1]
counter = counter + 1
i = i + 1
ans[0] = ans[0] / counter
ans[1] = ans[1] / counter
return ans
"""
get_feature_point:
调用8邻域深度优先搜索
确定单一特征点位置
"""
def get_feature_point(self, frame, gps):
q = []
i = int(frame.shape[0] / 2)
while i < int(7 * frame.shape[0] / 8):
j = 0
while j < int(frame.shape[1] / 2):
if self.flag[i][j] == 1 or gps[i][j] == 0:
j = j + 1
continue
temp = self.bfs8(frame, gps, i, j)
q.append(temp)
j = j + 1
i = i + 1
return q
"""
decode:
根据颜色解码
确定特征点在矩阵的位置
"""
def decode(self, frame, feature_point):
points = []
position = []
frame = aug(frame)
color_map = np.zeros((frame.shape[0], frame.shape[1]))
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(frame)
for i in range(0, frame.shape[0]):
for j in range(0, frame.shape[1]):
if l[i][j] < 100:
color_map[i][j] = 3
frame[i][j] = np.array([0, 0, 0])
elif l[i][j] > 220:
frame[i][j] = np.array([255, 255, 255])
color_map[i][j] = 255
else:
if b[i][j] < 120:
color_map[i][j] = 0
frame[i][j] = np.array([255, 0, 0])
else:
if a[i][j] > 125:
color_map[i][j] = 1
frame[i][j] = np.array([0, 0, 255])
else:
color_map[i][j] = 2
frame[i][j] = np.array([0, 255, 0])
# cv2.imshow('lab', frame)
# cv2.waitKey(0) # 按0退出
data = GF(1, 1, 1, 1, 1, 1)
# map = np.zeros((4, 4, 4, 4, 4, 4, 2))
map = np.zeros((4096, 2))
i = 0
while i < 64:
j = 1
while j < 62:
index = data[i][j - 1] * 4 ** 5 + data[i][j] * 4 ** 4 + data[i][j + 1] * 4 ** 3 + data[i + 1][
j - 1] * 4 ** 2 + data[i + 1][j] * 4 + data[i + 1][j + 1]
map[index] = [i, j]
j = j + 1
i = i + 1
for point in feature_point:
if color_map[int(point[0])][int(point[1]) + 9] != 255 and color_map[int(point[0])][
int(point[1]) - 9] != 255:
pass
else:
i = int(point[0])
j = int(point[1])
index = int(color_map[i - 9][j - 18]) * 4 ** 5 + int(color_map[i - 9][j]) * 4 ** 4 + int(
color_map[i - 9][j + 18]) * 4 ** 3 + int(color_map[i + 9][j - 18]) * 4 ** 2 + int(
color_map[i + 9][j]) * 4 + int(color_map[i + 9][j + 18])
# point_size = 1
# point_color = (255, 255, 255)
# thickness = 0 # 可以为 0 、4、8
# print(color_map[i - 9][j - 18], color_map[i - 9][j], color_map[i - 9][j + 18], color_map[i + 9][j - 18],
# color_map[i + 9][j], color_map[i + 9][j + 18])
# print(index)
#
# cv2.circle(frame, (int(j - 18), int(i - 9)), point_size, point_color, thickness)
# cv2.circle(frame, (int(j), int(i - 9)), point_size, point_color, thickness)
# cv2.circle(frame, (int(j + 18), int(i - 9)), point_size, point_color, thickness)
# cv2.circle(frame, (int(j - 18), int(i + 9)), point_size, point_color, thickness)
# cv2.circle(frame, (int(j), int(i + 9)), point_size, point_color, thickness)
# cv2.circle(frame, (int(j + 18), int(i + 9)), point_size, point_color, thickness)
try:
if map[index][0] < 10 and map[index][1] < 8:
points.append(list(map[int(index)]))
position.append([i, j])
except:
pass
# cv2.imshow('lab', frame)
# cv2.waitKey(0) # 按0退出
return points, position
def process(self, img_rd):
self.flag = np.zeros((img_rd.shape[0], img_rd.shape[1]))
# 1024*1280 -1候选点-3非候选点
candidate = self.get_candidate_points(frame=img_rd)
# 1024*1280 1角点0非角点
gridpoints = self.get_grid_points(frame=img_rd, candidate=candidate)
# 存有特征点的列表 -1特征点
featurepoints_position = self.get_feature_point(frame=img_rd, gps=gridpoints)
feature_points, position = self.decode(frame=img_rd, feature_point=featurepoints_position)
index = len(feature_points) // 2
pixel_distance = math.sqrt((position[index + 1][0] - position[index][0]) ** 2 +
(position[index + 1][1] - position[index][1]) ** 2)
world_distance = math.sqrt((feature_points[index + 1][0] * 2 - feature_points[index][0] * 2) ** 2 +
(feature_points[index + 1][1] * 2 - feature_points[index][1] * 2) ** 2)
self.scale = world_distance / pixel_distance
# 绘制特征点
point_size = 1
point_color = (0, 0, 255)
thickness = 0 # 可以为 0 、4、8
for point in featurepoints_position:
cv2.circle(img_rd, (int(point[1]), int(point[0])), point_size, point_color, thickness)
self.draw_note(img_rd)
self.update_fps()
def run(self, frame):
# cap = cv2.VideoCapture(0)
self.process(frame)
return self.scale

View File

@@ -0,0 +1,230 @@
# 进行人脸录入 / face register
# 录入多张人脸 / support multi-faces
import datetime
import dlib
import numpy as np
import cv2
import os
import shutil # 读写文件
import time
import win32com.client
from PIL import ImageDraw, ImageFont
from PIL import Image
action_list = ['look_ahead', 'look_left', 'look_right', 'rise_head', 'bow_head', 'blink', 'open_mouth', 'smile', 'over']
action_map = {'look_ahead': '请看前方', 'blink': '请眨眼', 'open_mouth': '请张嘴',
'smile': '请笑一笑', 'rise_head': '请抬头',
'bow_head': '请低头', 'look_left': '请看左边',
'look_right': '请看右边', 'over': '录入完成'}
people_type_dict = {'0': 'elder', '1': 'worker', '2': 'volunteer'}
# Dlib 正向人脸检测器
# detector = dlib.get_frontal_face_detector()
# OpenCV DNN face detector
detector = cv2.dnn.readNetFromCaffe("data/data_opencv/deploy.prototxt.txt",
"data/data_opencv/res10_300x300_ssd_iter_140000.caffemodel")
class Face_Register:
def __init__(self, people_type, id):
self.init = False
self.path_photos_from_camera = "data/data_faces_from_camera/"
self.font = cv2.FONT_ITALIC
self.existing_faces_cnt = 0 # 已录入的人脸计数器
self.ss_cnt = 0 # 录入 personX 人脸时图片计数器
self.faces_cnt = 0 # 录入人脸计数器
# 之后用来控制是否保存图像的 flag
self.save_flag = 1
# 之后用来检查是否先按 'n' 再按 's',即先新建文件夹再保存
self.press_n_flag = 0
# 之后用来提示动作的计数器
self.index = 0
self.frame_time = 0
self.frame_start_time = 0
self.fps = 0
self.people_type = people_type_dict[str(people_type)]
self.id = id
def speak(self):
text = action_map[action_list[self.index]]
speak = win32com.client.Dispatch('Sapi.SpVoice')
# speak.Voice = speak.GetVoices('Microsoft Zira')
speak.Volume = 100
speak.Rate = 2
speak.Speak(text)
# 新建保存人脸图像文件
def pre_work_mkdir(self):
if os.path.isdir(self.path_photos_from_camera):
if os.path.isdir(self.path_photos_from_camera + '/' + self.people_type):
pass
else:
os.mkdir(self.path_photos_from_camera + '/' + self.people_type)
else:
os.mkdir(self.path_photos_from_camera)
# 删除之前存的人脸数据文件夹
def pre_work_del_old_face_folders(self):
# 删除之前存的人脸数据文件夹
folders_rd = os.listdir(self.path_photos_from_camera)
for i in range(len(folders_rd)):
shutil.rmtree(self.path_photos_from_camera + folders_rd[i])
if os.path.isfile("data/features_all.csv"):
os.remove("data/features_all.csv")
# 如果有之前录入的人脸, 在之前 person_x 的序号按照 person_x+1 开始录入 (v1)
# 根据传入的人员类型和 id 录入 (v2)
def check_existing_faces_cnt(self):
if os.listdir("data/data_faces_from_camera/"):
# 获取已录入的最后一个人脸序号
person_list = os.listdir("data/data_faces_from_camera/")
person_num_list = []
for person in person_list:
person_num_list.append(int(person.split('_')[-1]))
self.existing_faces_cnt = max(person_num_list)
# 如果第一次存储或者没有之前录入的人脸, 按照 1 开始录入
# Start from 1
else:
self.existing_faces_cnt = 0
# 获取处理之后 stream 的帧数
def update_fps(self):
now = time.time()
self.frame_time = now - self.frame_start_time
self.fps = 1.0 / self.frame_time
self.frame_start_time = now
# 生成的 cv2 window 上面添加说明文字
def draw_note(self, img_rd):
# 添加说明
# cv2.putText(img_rd, action_map[action_list[index]].encode('utf-8').decode(), (20, 250), self.font, 1,
# (0, 255, 0), 1, cv2.LINE_AA)
cv2.putText(img_rd, "Face Register", (20, 40), self.font, 1, (255, 255, 255), 1, cv2.LINE_AA)
cv2.putText(img_rd, "FPS: " + str(self.fps.__round__(2)), (20, 100), self.font, 0.8, (0, 255, 0), 1,
cv2.LINE_AA)
cv2.putText(img_rd, "Faces: " + str(self.faces_cnt), (20, 140), self.font, 0.8, (0, 255, 0), 1, cv2.LINE_AA)
cv2.putText(img_rd, "N: Create face folder", (20, 350), self.font, 0.8, (255, 255, 255), 1, cv2.LINE_AA)
cv2.putText(img_rd, "S: Save current face", (20, 400), self.font, 0.8, (255, 255, 255), 1, cv2.LINE_AA)
cv2.putText(img_rd, "Q: Quit", (20, 450), self.font, 0.8, (255, 255, 255), 1, cv2.LINE_AA)
font = ImageFont.truetype("simsun.ttc", 30, index=1)
img_rd = Image.fromarray(cv2.cvtColor(img_rd, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img_rd)
if self.index <= 7:
draw.text((20, 230), text=action_map[action_list[self.index]].encode('utf-8').decode(), font=font,
fill=(0, 255, 0))
else:
draw.text((20, 230), text=action_map[action_list[8]].encode('utf-8').decode(), font=font,
fill=(0, 255, 0))
img_rd = cv2.cvtColor(np.array(img_rd), cv2.COLOR_RGB2BGR)
return img_rd
# 获取人脸
def process(self, img_rd):
self.faces_cnt = 0
(h, w) = img_rd.shape[:2]
blob = cv2.dnn.blobFromImage(cv2.resize(img_rd, (300, 300)), 1.0,
(300, 300), (104.0, 177.0, 123.0))
detector.setInput(blob)
faces = detector.forward()
# # 新建存储人脸的文件夹
# if self.index == 0:
# # self.existing_faces_cnt += 1
# current_face_dir = self.path_photos_from_camera + '/' + self.people_type + '/' + self.id
# # current_face_dir = self.path_photos_from_camera + "person_" + str(self.existing_faces_cnt)
# os.makedirs(current_face_dir)
# # print('\n')
# # print("新建的人脸文件夹 / Create folders: ", current_face_dir)
#
# self.ss_cnt = 0 # 将人脸计数器清零
# self.index = 0
# self.press_n_flag = 1 # 已经按下 'n'
# 检测到人脸
if faces.shape[2] != 0:
# 矩形框
for i in range(0, faces.shape[2]):
# 计算矩形框大小
confidence = faces[0, 0, i, 2]
# filter out weak detections by ensuring the `confidence` is
# greater than the minimum confidence
if confidence < 0.5:
continue
self.faces_cnt += 1
# compute the (x, y)-coordinates of the bounding box for the
# object
box = faces[0, 0, i, 3:7] * np.array([w, h, w, h])
(startX, startY, endX, endY) = box.astype("int")
height = (endY - startY)
width = (endX - startX)
# hh = int(height / 2)
# ww = int(width / 2)
# 判断人脸矩形框是否超出 480x640
if endX > 640 or endY > 480 or startX < 0 or startY < 0:
# if (endX + ww) > 640 or (endY + hh > 480) or (startX - ww < 0) or (
# startY - hh < 0):
cv2.putText(img_rd, "OUT OF RANGE", (20, 300), self.font, 0.8, (0, 0, 255), 1, cv2.LINE_AA)
color_rectangle = (0, 0, 255)
save_flag = 0
print("请调整位置 / Please adjust your position")
else:
color_rectangle = (0, 255, 0)
save_flag = 1
cv2.rectangle(img_rd, tuple([startX, startY]), tuple([endX, endY]), color_rectangle, 2)
# 根据人脸大小生成空的图像
# img_blank = np.zeros((int(height * 2), width * 2, 3), np.uint8)
img_blank = np.zeros((height, width, 3), np.uint8)
# if save_flag:
# # 保存摄像头中的人脸到本地
# # 检查有没有先按'n'新建文件夹
# if self.press_n_flag:
# self.ss_cnt += 1
#
# if self.index <= 7:
# for ii in range(height):
# for jj in range(width):
# img_blank[ii][jj] = img_rd[startY + ii][startX + jj]
# cv2.imwrite(current_face_dir + "/img_face_" + str(self.ss_cnt) + ".jpg", img_blank)
# print("写入本地 / Save into",
# str(current_face_dir) + "/img_face_" + str(self.ss_cnt) + ".jpg")
# if self.index < len(action_list) - 1:
# self.index += 1
# self.speak()
# else:
# print("请先按 'N' 来建文件夹, 按 'S' / Please press 'N' and press 'S'")
# # self.faces_cnt = len(faces)
# 生成的窗口添加说明文字
img_rd = self.draw_note(img_rd)
self.update_fps()
if not self.init:
self.speak()
self.init = True
return img_rd
def take_photo(self, frame):
pass
def run(self, frame):
return self.process(frame)

View File

@@ -0,0 +1,194 @@
import math
import sys
import threading
import time
import cv2
import os
from sys import platform
import argparse
import numpy as np
from PIL import ImageDraw, ImageFont
from PIL import Image
import queue
import frame_process
import algorithm_fall
# Import Openpose (Windows/Ubuntu/OSX)
# dir_path = os.path.dirname(os.path.realpath(__file__))
dir_path = 'D:\\BJTU\\Python\\openpose-master\\build-2017'
try:
# Windows Import
if platform == "win32":
# Change these variables to point to the correct folder (Release/x64 etc.)
sys.path.append(dir_path + '\\python\\openpose\\Release')
os.environ['PATH'] = os.environ['PATH'] + ';' + dir_path + '\\x64\\Release;' + dir_path + '\\bin;'
import pyopenpose as op
else:
# Change these variables to point to the correct folder (Release/x64 etc.)
sys.path.append('../../python')
# If you run `make install` (default path is `/usr/local/python` for Ubuntu), you can also access the OpenPose/python module from there. This will install OpenPose and the python library at your desired installation path. Ensure that this is in your python path in order to use it.
# sys.path.append('/usr/local/python')
from openpose import pyopenpose as op
except ImportError as e:
print(
'Error: OpenPose library could not be found. Did you enable `BUILD_PYTHON` in CMake and have this Python script in the right folder?')
raise e
class Fall_Detection:
def __init__(self):
self.parser = argparse.ArgumentParser()
self.args = self.parser.parse_known_args()
self.params = dict()
self.params["model_folder"] = "D:\\BJTU\\Python\\openpose-master\\models\\"
self.params['net_resolution'] = "-1x160"
for i in range(0, len(self.args[1])):
curr_item = self.args[1][i]
if i != len(self.args[1]) - 1:
next_item = self.args[1][i + 1]
else:
next_item = "1"
if "--" in curr_item and "--" in next_item:
key = curr_item.replace('-', '')
if key not in self.params: self.params[key] = "1"
elif "--" in curr_item and "--" not in next_item:
key = curr_item.replace('-', '')
if key not in self.params: self.params[key] = next_item
# Starting OpenPose
self.opWrapper = op.WrapperPython()
self.opWrapper.configure(self.params)
self.opWrapper.start()
# init variables
self.frame_start_time = 0
self.v0 = 0
self.width0 = []
self.height0 = []
self.center0 = []
self.couter = 0
self.error = 0
self.xList = queue.Queue(maxsize=10)
self.yList = queue.Queue(maxsize=10)
self.prevX = 0.0
self.prevY = 0.0
self.centerV = 0
self.centerSpeed = 0
self.alert = 0
self.firstFrame = None
def re_init(self):
self.frame_start_time = 0
self.v0 = 0
self.width0 = []
self.height0 = []
self.center0 = []
self.couter = 0
self.error = 0
self.xList = queue.Queue(maxsize=10)
self.yList = queue.Queue(maxsize=10)
self.prevX = 0.0
self.prevY = 0.0
self.centerV = 0
self.centerSpeed = 0
self.alert = 0
self.firstFrame = None
def run(self, frame):
height = []
width = []
center = []
# Process Image
datum = op.Datum()
datum.cvInputData = frame
self.opWrapper.emplaceAndPop([datum])
img_rd = datum.cvOutputData
# fall judge
try:
# key points have been identified
x = datum.poseKeypoints[0][:, 0]
y = datum.poseKeypoints[0][:, 1]
width.append(np.max(x[np.nonzero(x)]) - np.min(x[np.nonzero(x)]))
height.append(np.max(y[np.nonzero(y)]) - np.min(y[np.nonzero(y)]))
center.append(np.mean(x[np.nonzero(x)]))
center.append(np.mean(y[np.nonzero(y)]))
if self.frame_start_time == 0:
self.center0 = center.copy()
self.width0 = width.copy()
self.height0 = height.copy()
self.frame_start_time = time.time()
else:
diff = np.array([center[0] - self.center0[0], center[1] - self.center0[1]])
dist = math.sqrt(np.sum((diff * 10 ** (-4)) ** 2))
now = time.time()
v = dist / (now - self.frame_start_time)
a = (v ** 2 - self.v0 ** 2) / (2 * dist)
# print(v, abs(a))
if (abs(a) > 0.2) and \
(np.subtract(np.array(width), np.array(height)) > np.subtract(np.array(self.width0), np.array(
self.height0)) and np.subtract(np.array(width), np.array(height)) > 0):
self.couter += 1
# print("alarm by v and a")
elif (width > height and (x[8] != 0 or x[9] != 0 or x[12] != 0) and v < 1):
self.couter += 1
# print("alarm by w and h")
else:
if self.error == 0:
self.error += 1
else:
self.couter = 0
self.error = 0
if self.couter > 3:
font = ImageFont.truetype("simsun.ttc", 30, index=1)
img_rd = Image.fromarray(cv2.cvtColor(datum.cvOutputData, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img_rd)
draw.text((10, 10), text="Fall Detected", font=font,
fill=(255, 0, 0))
img_rd = cv2.cvtColor(np.array(img_rd), cv2.COLOR_RGB2BGR)
cv2.imwrite('fall_detection.jpg', frame)
# t = threading.Thread(target=post(event=3, imagePath='fall_detection.jpg'))
# t.start()
# status = post(event=3, imagePath='fall_detection.jpg')
# print("fall")
# update variables
self.frame_start_time = now
self.v0 = v
self.width0 = width.copy()
self.height0 = height.copy()
# if width > height:
# print("alarm")
self.firstFrame = None
except Exception as e:
gray = frame_process.preprocess_frame(frame)
if self.firstFrame is None:
self.firstFrame = gray
pass
frameDelta = cv2.absdiff(self.firstFrame, gray)
cnts = frame_process.get_contours(self.firstFrame, gray)
defined_min_area = 3000
frame, self.alert = algorithm_fall.fall_detect(cnts, defined_min_area, frame, self.prevX, self.prevY,
self.xList, self.yList, self.centerV, self.alert)
img_rd = frame
# cv2.imshow("OpenPose 1.6.0 - Tutorial Python API", frame)
frame = cv2.resize(img_rd, (640, 480))
# cv2.imshow("OpenPose 1.6.0 - Tutorial Python API", img_rd)
return frame

View File

@@ -0,0 +1,258 @@
# 摄像头实时人脸识别
import threading
from datetime import datetime
import dlib
import numpy as np
import cv2
import pandas as pd
import os
import time
import facenet
from model import create_model
from PIL import Image, ImageDraw, ImageFont
from Post import post
import smile_detection
start_time = 0
# 1. Dlib 正向人脸检测器
# detector = dlib.get_frontal_face_detector()
# OpenCV DNN face detector
# detector = cv2.dnn.readNetFromCaffe("data/data_opencv/deploy.prototxt.txt",
# "data/data_opencv/res10_300x300_ssd_iter_140000.caffemodel")
# 2. Dlib 人脸 landmark 特征点检测器
# predictor = dlib.shape_predictor('data/data_dlib/shape_predictor_68_face_landmarks.dat')
# 3. Dlib Resnet 人脸识别模型,提取 128D 的特征矢量
# face_reco_model = dlib.face_recognition_model_v1("data/data_dlib/dlib_face_recognition_resnet_model_v1.dat")
# nn4_small2 = create_model()
# nn4_small2.load_weights('weights/nn4.small2.v1.h5')
class Face_Recognizer:
def __init__(self, detector, nn4_small2):
# 模型
self.detector = detector
self.nn4_small2 = nn4_small2
# 用来存放所有录入人脸特征的数组
self.features_known_list = []
# 存储录入人脸名字
self.loaded = False
self.name_known_cnt = 0
self.name_known_list = []
self.metadata = []
self.embedded = []
# 存储当前摄像头中捕获到的所有人脸的坐标名字
self.pos_camera_list = []
self.name_camera_list = []
self.type_camera_list = []
# 存储当前摄像头中捕获到的人脸数
self.faces_cnt = 0
# 存储当前摄像头中捕获到的人脸特征
self.features_camera_list = []
# Update FPS
self.fps = 0
self.frame_start_time = 0
# 读取录入人脸特征
def get_face_database(self):
if self.loaded:
return 1
else:
if os.path.exists("data/data_faces_from_camera/"):
self.metadata = facenet.load_metadata("data/data_faces_from_camera/")
self.name_known_cnt = self.metadata.shape[0]
self.embedded = np.zeros((self.metadata.shape[0], 128))
for i, m in enumerate(self.metadata):
for j, n in enumerate(m):
for k, p in enumerate(n):
img = facenet.load_image(p.image_path())
# img = align_image(img)
img = cv2.resize(img, (96, 96))
# scale RGB values to interval [0,1]
img = (img / 255.).astype(np.float32)
# obtain embedding vector for image
self.embedded[i] = self.nn4_small2.predict(np.expand_dims(img, axis=0))[0]
# self.embedded[i] = self.embedded[i] / len(m)
path = p.image_path().replace("\\", "/")
self.name_known_list.append(path.split('/')[-2])
self.type_camera_list.append(path.split('/')[-3])
self.loaded = True
return 1
else:
print('##### Warning #####', '\n')
print("'features_all.csv' not found!")
print(
"Please run 'get_faces_from_camera.py' before 'face_reco_from_camera.py'",
'\n')
print('##### End Warning #####')
return 0
# 更新 FPS
def update_fps(self):
now = time.time()
self.frame_time = now - self.frame_start_time
self.fps = 1.0 / self.frame_time
self.frame_start_time = now
def draw_note(self, img_rd):
font = cv2.FONT_ITALIC
# cv2.putText(img_rd, "Face Recognizer", (20, 40), font, 1, (255, 255, 255), 1, cv2.LINE_AA)
# cv2.putText(img_rd, "FPS: " + str(self.fps.__round__(2)), (20, 100), font, 0.8, (0, 255, 0), 1, cv2.LINE_AA)
cv2.putText(img_rd, "Faces: " + str(self.faces_cnt), (20, 40), font, 0.8, (0, 255, 0), 1, cv2.LINE_AA)
# cv2.putText(img_rd, "Q: Quit", (20, 450), font, 0.8, (255, 255, 255), 1, cv2.LINE_AA)
def draw_name(self, img_rd):
# 在人脸框下面写人脸名字
img_with_name = img_rd
font = ImageFont.truetype("simsun.ttc", 30, index=1)
img = Image.fromarray(cv2.cvtColor(img_rd, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img)
for i in range(self.faces_cnt):
if self.name_camera_list[i] != 'unknown':
# cv2.putText(img_rd, self.name_camera_list[i], self.pos_camera_list[i], font, 0.8, (0, 255, 255), 1, cv2.LINE_AA)
draw.text(xy=self.pos_camera_list[i], text=self.name_camera_list[i], font=font)
img_with_name = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
return img_with_name
# 修改显示人名
def modify_name_camera_list(self):
# TODO 数据库 ID
# Default known name: 1, 2, person_3
self.name_known_list[0] = '1'.encode('utf-8').decode()
self.name_known_list[1] = 'Tony Blair'.encode('utf-8').decode()
# self.name_known_list[2] = '唐保生'.encode('utf-8').decode()
# self.name_known_list[3] = '1'.encode('utf-8').decode()
# self.name_known_list[4] ='xx'.encode('utf-8').decode()
# 进行人脸识别和微笑检测
def process(self, img_rd):
img_with_name = img_rd
# 读取所有人脸
if self.get_face_database():
cv2.putText(img_rd, "Faces: " + str(self.faces_cnt), (20, 40), cv2.FONT_ITALIC, 0.8, (0, 255, 0), 1,
cv2.LINE_AA)
# cv2.putText(img_rd, str(datetime.now()), (120, 40), cv2.FONT_ITALIC, 0.8, (0, 255, 0), 1,
# cv2.LINE_AA)
self.features_camera_list = []
self.faces_cnt = 0
self.pos_camera_list = []
self.name_camera_list = []
(h, w) = img_rd.shape[:2]
blob = cv2.dnn.blobFromImage(cv2.resize(img_rd, (300, 300)), 1.0,
(300, 300), (104.0, 177.0, 123.0))
self.detector.setInput(blob)
faces = self.detector.forward()
# 检测到人脸
if faces.shape[2] != 0:
# 遍历捕获到的图像中所有的人脸
for k in range(0, faces.shape[2]):
# 计算矩形框大小
confidence = faces[0, 0, k, 2]
# filter out weak detections by ensuring the `confidence` is
# greater than the minimum confidence
if confidence < 0.5:
continue
self.faces_cnt += 1
# 让人名跟随在矩形框的上方
# 确定人名的位置坐标
# 先默认所有人不认识,是 unknown
self.name_camera_list.append("unknown")
# 每个捕获人脸的名字坐标
box = faces[0, 0, k, 3:7] * np.array([w, h, w, h])
(startX, startY, endX, endY) = box.astype("int")
self.pos_camera_list.append(tuple(
[int(startX + 5), int(startY - 30)]))
# height = (endY - startY)
# width = (endX - startX)
img_blank = img_rd[startY:endY, startX:endX]
try:
# for ii in range(height):
# for jj in range(width):
# img_blank[ii][jj] = img_rd[startY + ii][startX + jj]
img = cv2.resize(img_blank, (96, 96))
img = (img / 255.).astype(np.float32)
img = self.nn4_small2.predict(np.expand_dims(img, axis=0))[0]
# 对于某张人脸,遍历所有存储的人脸
e_distance_list = []
for i in range(0, len(self.embedded)):
e_distance_list.append(facenet.distance(self.embedded[i], img))
similar_person_num = e_distance_list.index(min(e_distance_list))
# print(min(e_distance_list))
if min(e_distance_list) < 0.58:
self.name_camera_list[k] = self.name_known_list[similar_person_num % 8]
if self.type_camera_list[similar_person_num % 8] == 'elder':
mode = smile_detection.smile_detect(img_blank)
if mode == 'happy':
cv2.imwrite('smile_detection.jpg', img_rd)
cv2.rectangle(img_rd, tuple([startX, startY - 70]),
tuple([endX, startY - 35]),
(0, 215, 255), cv2.FILLED)
cv2.putText(img_rd, 'happy', (startX + 5, startY - 45), cv2.FONT_ITALIC, 1,
(255, 255, 255), 1, cv2.LINE_AA)
# t = threading.Thread(target=post(elder_id=self.name_camera_list[k], event=0,
# imagePath='smile_detection.jpg'))
# t.start()
# print("May be person " + str(self.name_known_list[similar_person_num]))
elif min(e_distance_list) > 0.75:
self.name_camera_list[k] = '陌生人'
cv2.imwrite('stranger_detection.jpg', img_rd)
# t = threading.Thread(target=post(event=2, imagePath='stranger_detection.jpg'))
# t.start()
else:
pass
if self.name_camera_list[k] == '陌生人':
cv2.rectangle(img_rd, tuple([startX, startY]), tuple([endX, endY]),
(0, 0, 255), 2)
cv2.rectangle(img_rd, tuple([startX, startY - 35]), tuple([endX, startY]),
(0, 0, 255), cv2.FILLED)
elif self.name_camera_list[k] != 'unknown':
cv2.rectangle(img_rd, tuple([startX, startY]), tuple([endX, endY]),
(0, 255, 0), 2)
cv2.rectangle(img_rd, tuple([startX, startY - 35]), tuple([endX, startY]),
(0, 255, 0), cv2.FILLED)
except:
continue
img_with_name = self.draw_name(img_rd)
else:
img_with_name = img_rd
# 更新 FPS / Update stream FPS
# self.update_fps()
return img_with_name
# OpenCV 调用摄像头并进行 process
def run(self, frame):
# cap = cv2.VideoCapture(0)
# cap.set(3, 480)
img_with_name = self.process(frame)
return img_with_name
# cap.release()
# cv2.destroyAllWindows()

View File

@@ -0,0 +1,238 @@
from oldcare.track.centroidtracker import CentroidTracker
from oldcare.track.trackableobject import TrackableObject
from imutils.video import FPS
import numpy as np
import imutils
import argparse
import time
import dlib
import cv2
# 全局变量
# prototxt_file_path = 'data/data_opencv/MobileNetSSD_deploy.prototxt'
# # Contains the Caffe deep learning model files.
# # Well be using a MobileNet Single Shot Detector (SSD),
# # “Single Shot Detectors for object detection”.
# model_file_path = 'data/data_opencv/MobileNetSSD_deploy.caffemodel'
skip_frames = 30 # of skip frames between detections
# 超参数
# minimum probability to filter weak detections
minimum_confidence = 0.80
# 物体识别模型能识别的物体21种
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
"bottle", "bus", "car", "cat", "chair",
"cow", "diningtable", "dog", "horse", "motorbike",
"person", "pottedplant", "sheep", "sofa", "train",
"tvmonitor"]
# 加载物体识别模型
# net = cv2.dnn.readNetFromCaffe(prototxt_file_path, model_file_path)
class Intrusion_Detection():
def __init__(self, net):
self.net = net
# initialize the frame dimensions (we'll set them as soon as we read
# the first frame from the video)
self.W = None
self.H = None
# instantiate our centroid tracker, then initialize a list to store
# each of our dlib correlation trackers, followed by a dictionary to
# map each unique object ID to a TrackableObject
self.ct = CentroidTracker(maxDisappeared=40, maxDistance=50)
self.trackers = []
self.trackableObjects = {}
# initialize the total number of frames processed thus far, along
# with the total number of objects that have moved either up or down
self.totalFrames = 0
self.totalDown = 0
self.totalUp = 0
# start the frames per second throughput estimator
self.fps = FPS().start()
# loop over frames from the video stream
def process(self,frame):
# grab the next frame and handle if we are reading from either
# VideoCapture or VideoStream
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# if the frame dimensions are empty, set them
if self.W is None or self.H is None:
(self.H, self.W) = frame.shape[:2]
# initialize the current status along with our list of bounding
# box rectangles returned by either (1) our object detector or
# (2) the correlation trackers
status = "Waiting"
rects = []
# check to see if we should run a more computationally expensive
# object detection method to aid our tracker
if self.totalFrames % skip_frames == 0:
# set the status and initialize our new set of object trackers
status = "Detecting"
trackers = []
# convert the frame to a blob and pass the blob through the
# network and obtain the detections
blob = cv2.dnn.blobFromImage(frame, 0.007843, (self.W, self.H), 127.5)
self.net.setInput(blob)
detections = self.net.forward()
# loop over the detections
for i in np.arange(0, detections.shape[2]):
# extract the confidence (i.e., probability) associated
# with the prediction
confidence = detections[0, 0, i, 2]
# filter out weak detections by requiring a minimum
# confidence
if confidence > minimum_confidence:
# extract the index of the class label from the
# detections list
idx = int(detections[0, 0, i, 1])
# if the class label is not a person, ignore it
if CLASSES[idx] != "person":
continue
# compute the (x, y)-coordinates of the bounding box
# for the object
box = detections[0, 0, i, 3:7] * np.array([self.W, self.H, self.W, self.H])
(startX, startY, endX, endY) = box.astype("int")
# construct a dlib rectangle object from the bounding
# box coordinates and then start the dlib correlation
# tracker
tracker = dlib.correlation_tracker()
rect = dlib.rectangle(startX, startY, endX, endY)
tracker.start_track(rgb, rect)
# add the tracker to our list of trackers so we can
# utilize it during skip frames
trackers.append(tracker)
# otherwise, we should utilize our object *trackers* rather than
# object *detectors* to obtain a higher frame processing throughput
else:
# loop over the trackers
for tracker in self.trackers:
# set the status of our system to be 'tracking' rather
# than 'waiting' or 'detecting'
status = "Tracking"
# update the tracker and grab the updated position
tracker.update(rgb)
pos = tracker.get_position()
# unpack the position object
startX = int(pos.left())
startY = int(pos.top())
endX = int(pos.right())
endY = int(pos.bottom())
# draw a rectangle around the people
cv2.rectangle(frame, (startX, startY), (endX, endY),
(0, 255, 0), 2)
# add the bounding box coordinates to the rectangles list
rects.append((startX, startY, endX, endY))
# draw a horizontal line in the center of the frame -- once an
# object crosses this line we will determine whether they were
# moving 'up' or 'down'
# cv2.line(frame, (0, H // 2), (W, H // 2), (0, 255, 255), 2)
# use the centroid tracker to associate the (1) old object
# centroids with (2) the newly computed object centroids
objects = self.ct.update(rects)
# loop over the tracked objects
for (objectID, centroid) in objects.items():
# check to see if a trackable object exists for the current
# object ID
to = self.trackableObjects.get(objectID, None)
# if there is no existing trackable object, create one
if to is None:
to = TrackableObject(objectID, centroid)
# otherwise, there is a trackable object so we can utilize it
# to determine direction
else:
# the difference between the y-coordinate of the *current*
# centroid and the mean of *previous* centroids will tell
# us in which direction the object is moving (negative for
# 'up' and positive for 'down')
y = [c[1] for c in to.centroids]
direction = centroid[1] - np.mean(y)
to.centroids.append(centroid)
# check to see if the object has been counted or not
if not to.counted:
# if the direction is negative (indicating the object
# is moving up) AND the centroid is above the center
# line, count the object
if direction < 0 and centroid[1] < self.H // 2:
self.totalUp += 1
to.counted = True
# if the direction is positive (indicating the object
# is moving down) AND the centroid is below the
# center line, count the object
elif direction > 0 and centroid[1] > self.H // 2:
self.totalDown += 1
to.counted = True
current_time = time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(time.time()))
event_desc = '有人闯入禁止区域!!!'
event_location = '院子'
print('[EVENT] %s, 院子, 有人闯入禁止区域!!!'
% (current_time))
cv2.imwrite('intrusion.jpg', frame)
# todo insert into database
# command = '%s inserting.py --event_desc %s--event_type4 - -event_location % s' % \
# (python_path, event_desc, event_location)
# p = subprocess.Popen(command, shell=True)
# store the trackable obj ect in our dictionary
self.trackableObjects[objectID] = to
# draw both the ID of the object and the centroid of the
# object on the output frame
text = "ID {}".format(objectID)
cv2.putText(frame, text, (centroid[0] - 10, centroid[1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
cv2.circle(frame, (centroid[0], centroid[1]), 4,
(0, 255, 0), -1)
# construct a tuple of information we will be displaying on the
# frame
info = [
# ("Up", totalUp),
# ("Down", totalDown),
("Status", status),
]
# loop over the info tuples and draw them on our frame
for (i, (k, v)) in enumerate(info):
text = "{}: {}".format(k, v)
cv2.putText(frame, text, (10, self.H - ((i * 20) + 20)),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
# show the output frame
frame = cv2.resize(frame, (640, 480))
# increment the total number of frames processed thus far and
# then update the FPS counter
self.totalFrames += 1

View File

@@ -0,0 +1,245 @@
# 摄像头实时人脸识别
import threading
import dlib
import numpy as np
import cv2
import pandas as pd
import os
import time
import facenet
from PIL import Image, ImageDraw, ImageFont
from Calibration import Calibration
from Post import post
from model import create_model
import smile_detection
start_time = 0
# 1. Dlib 正向人脸检测器
# detector = dlib.get_frontal_face_detector()
# OpenCV DNN face detector
# detector = cv2.dnn.readNetFromCaffe("data/data_opencv/deploy.prototxt.txt",
# "data/data_opencv/res10_300x300_ssd_iter_140000.caffemodel")
# 2. Dlib 人脸 landmark 特征点检测器
# predictor = dlib.shape_predictor('data/data_dlib/shape_predictor_68_face_landmarks.dat')
# 3. Dlib Resnet 人脸识别模型,提取 128D 的特征矢量
# face_reco_model = dlib.face_recognition_model_v1("data/data_dlib/dlib_face_recognition_resnet_model_v1.dat")
# nn4_small2 = create_model()
# nn4_small2.load_weights('weights/nn4.small2.v1.h5')
class Interaction_Detection:
def __init__(self, detector, nn4_small2):
# 模型
self.detector = detector
self.nn4_small2 = nn4_small2
# 用来存放所有录入人脸特征的数组
self.features_known_list = []
# 存储录入人脸名字
self.loaded = False
self.name_known_cnt = 0
self.name_known_list = []
self.type_known_list = []
self.metadata = []
self.embedded = []
# 存储当前摄像头中捕获到的所有人脸的坐标名字
self.pos_camera_list = []
self.name_camera_list = []
self.type_camera_list = []
# 存储当前摄像头中捕获到的人脸数
self.faces_cnt = 0
# 存储当前摄像头中捕获到的人脸特征
self.features_camera_list = []
# Update FPS
self.fps = 0
self.frame_start_time = 0
# 从 "features_all.csv" 读取录入人脸特征
def get_face_database(self):
if self.loaded:
return 1
else:
if os.path.exists("data/data_faces_from_camera/"):
self.metadata = facenet.load_metadata("data/data_faces_from_camera/")
self.name_known_cnt = self.metadata.shape[0]
self.embedded = np.zeros((self.metadata.shape[0], 128))
for i, m in enumerate(self.metadata):
for j, n in enumerate(m):
for k, p in enumerate(n):
img = facenet.load_image(p.image_path())
# img = align_image(img)
img = cv2.resize(img, (96, 96))
# scale RGB values to interval [0,1]
img = (img / 255.).astype(np.float32)
# obtain embedding vector for image
self.embedded[i] = self.nn4_small2.predict(np.expand_dims(img, axis=0))[0]
# self.embedded[i] = self.embedded[i] / len(m)
path = p.image_path().replace("\\", "/")
self.name_known_list.append(path.split('/')[-2])
self.type_known_list.append(path.split('/')[-3])
self.loaded = True
return 1
else:
print('##### Warning #####', '\n')
print("'features_all.csv' not found!")
print(
"Please run 'get_faces_from_camera.py' before 'face_reco_from_camera.py'",
'\n')
print('##### End Warning #####')
return 0
# 更新 FPS
def update_fps(self):
now = time.time()
self.frame_time = now - self.frame_start_time
self.fps = 1.0 / self.frame_time
self.frame_start_time = now
def draw_note(self, img_rd):
font = cv2.FONT_ITALIC
# cv2.putText(img_rd, "Face Recognizer", (20, 40), font, 1, (255, 255, 255), 1, cv2.LINE_AA)
# cv2.putText(img_rd, "FPS: " + str(self.fps.__round__(2)), (20, 100), font, 0.8, (0, 255, 0), 1, cv2.LINE_AA)
cv2.putText(img_rd, "Faces: " + str(self.faces_cnt), (20, 40), font, 0.8, (0, 255, 0), 1, cv2.LINE_AA)
# cv2.putText(img_rd, "Q: Quit", (20, 450), font, 0.8, (255, 255, 255), 1, cv2.LINE_AA)
def draw_name(self, img_rd):
# 在人脸框下面写人脸名字
img_with_name = img_rd
font = ImageFont.truetype("simsun.ttc", 30, index=1)
img = Image.fromarray(cv2.cvtColor(img_rd, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img)
for i in range(self.faces_cnt):
if self.name_camera_list[i] != 'unknown':
# cv2.putText(img_rd, self.name_camera_list[i], self.pos_camera_list[i], font, 0.8, (0, 255, 255), 1, cv2.LINE_AA)
draw.text(xy=self.pos_camera_list[i], text=self.name_camera_list[i], font=font)
img_with_name = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
return img_with_name
# 修改显示人名
def modify_name_camera_list(self):
# TODO 数据库 ID
# Default known name: 1, 2, person_3
self.name_known_list[0] = '1'.encode('utf-8').decode()
self.name_known_list[1] = 'Tony Blair'.encode('utf-8').decode()
# self.name_known_list[2] = '唐保生'.encode('utf-8').decode()
# self.name_known_list[3] = '1'.encode('utf-8').decode()
# self.name_known_list[4] ='xx'.encode('utf-8').decode()
# 进行人脸识别和微笑检测
def process(self, img_rd, scale):
img_with_name = img_rd
# 读取所有人脸
if self.get_face_database():
self.draw_note(img_rd)
self.features_camera_list = []
self.faces_cnt = 0
self.pos_camera_list = []
self.name_camera_list = []
self.type_camera_list = []
(h, w) = img_rd.shape[:2]
blob = cv2.dnn.blobFromImage(cv2.resize(img_rd, (300, 300)), 1.0,
(300, 300), (104.0, 177.0, 123.0))
self.detector.setInput(blob)
faces = self.detector.forward()
# 检测到人脸
if faces.shape[2] != 0:
# 遍历捕获到的图像中所有的人脸
for k in range(0, faces.shape[2]):
# 计算矩形框大小
confidence = faces[0, 0, k, 2]
# filter out weak detections by ensuring the `confidence` is
# greater than the minimum confidence
if confidence < 0.5:
continue
self.faces_cnt += 1
# 让人名跟随在矩形框的上方
# 确定人名的位置坐标
# 先默认所有人不认识,是 unknown
self.name_camera_list.append("unknown")
# 每个捕获人脸的名字坐标
box = faces[0, 0, k, 3:7] * np.array([w, h, w, h])
(startX, startY, endX, endY) = box.astype("int")
self.pos_camera_list.append(tuple(
[int(startX + 5), int(startY - 30)]))
img_blank = img_rd[startY:endY, startX:endX]
try:
# for ii in range(height):
# for jj in range(width):
# img_blank[ii][jj] = img_rd[startY + ii][startX + jj]
img = cv2.resize(img_blank, (96, 96))
img = (img / 255.).astype(np.float32)
img = self.nn4_small2.predict(np.expand_dims(img, axis=0))[0]
# 对于某张人脸,遍历所有存储的人脸
e_distance_list = []
for i in range(0, len(self.embedded)):
e_distance_list.append(facenet.distance(self.embedded[i], img))
similar_person_num = e_distance_list.index(min(e_distance_list))
# print(min(e_distance_list))
if min(e_distance_list) < 0.58:
self.name_camera_list[k] = self.name_known_list[similar_person_num % 8]
self.type_camera_list[k] = self.type_known_list[similar_person_num % 8]
# 绘制矩形框
if self.name_camera_list[k] != 'unknown':
cv2.rectangle(img_rd, tuple([startX, startY]), tuple([endX, endY]),
(0, 255, 0), 2)
cv2.rectangle(img_rd, tuple([startX, startY - 35]), tuple([endX, startY]),
(0, 255, 0), cv2.FILLED)
except:
continue
img_with_name = self.draw_name(img_rd)
if 'volunteer' in self.type_camera_list:
index = self.type_camera_list.index('volunteer')
pos_vol = self.pos_camera_list[index]
for i in range(0, len(self.type_camera_list)):
if i != index:
d = scale * np.sqrt(facenet.distance(pos_vol, self.type_camera_list[i]))
if d < 50:
pass
else:
img_with_name = img_rd
# if 'volunteer' in self.type_camera_list:
# 更新 FPS / Update stream FPS
self.update_fps()
# 距离检测
return img_with_name
# OpenCV 调用摄像头并进行 process
def run(self, frame, scale):
# cap = cv2.VideoCapture(0)
# cap.set(3, 480)
self.process(frame, scale)
# cap.release()
# cv2.destroyAllWindows()

View File

@@ -0,0 +1,203 @@
import os
import queue
import threading
import cv2
import sys
import platform
import subprocess as sp
from sys import platform
from datetime import datetime
from model import create_model
from websocket import create_connection
from Camera_Beside_Com import Face_Register
from Camera_In_Room import Face_Recognizer
from Calibration_On_Desk import Calibration
from Camera_In_Hall import Fall_Detection
from Camera_On_Desk import Interaction_Detection
from Camera_In_Yard import Intrusion_Detection
# Import Openpose (Windows/Ubuntu/OSX)
# dir_path = os.path.dirname(os.path.realpath(__file__))
dir_path = 'D:\\BJTU\\Python\\openpose-master\\build-2017'
try:
# Windows Import
if platform == "win32":
# Change these variables to point to the correct folder (Release/x64 etc.)
sys.path.append(dir_path + '\\python\\openpose\\Release')
os.environ['PATH'] = os.environ['PATH'] + ';' + dir_path + '\\x64\\Release;' + dir_path + '\\bin;'
import pyopenpose as op
else:
# Change these variables to point to the correct folder (Release/x64 etc.)
sys.path.append(dir_path + '\\python\\openpose\\Release')
# If you run `make install` (default path is `/usr/local/python` for Ubuntu), you can also access the OpenPose/python module from there. This will install OpenPose and the python library at your desired installation path. Ensure that this is in your python path in order to use it.
# sys.path.append('/usr/local/python')
from openpose import pyopenpose as op
except ImportError as e:
print(
'Error: OpenPose library could not be found. Did you enable `BUILD_PYTHON` in CMake and have this Python script in the right folder?')
raise e
class Live(object):
def __init__(self):
self.cap = cv2.VideoCapture(0)
# opencv dnn 人脸检测器
self.detector = cv2.dnn.readNetFromCaffe("data/data_opencv/deploy.prototxt.txt",
"data/data_opencv/res10_300x300_ssd_iter_140000.caffemodel")
# facenet model
self.nn4_small2 = create_model()
self.nn4_small2.load_weights('weights/nn4.small2.v1.h5')
# websocket 连接服务器
self.ws = create_connection("ws://192.144.229.49:8000/api/websocket/cameraLink")
self.frame_queue = queue.Queue()
self.rtmpUrl = "rtmp://39.97.124.237:1984/wodelive"
self.camera_path = "test4.mp4"
# 人脸搜集
self.Face_Register_on = Face_Register(people_type=1, id='3')
# 微笑检测
self.Face_Recognizer_on = Face_Recognizer(self.detector, self.nn4_small2)
# 与义工交互 距离标定和颜色标定
self.scale = -1
self.Calibration_on = Calibration()
self.Interaction_on = Interaction_Detection(detector=self.detector, nn4_small2=self.nn4_small2)
# 摔倒检测
self.transfer_flag = False
self.Fall_Detection_on = Fall_Detection()
# 入侵检测
self.net = cv2.dnn.readNetFromCaffe('data/data_opencv/MobileNetSSD_deploy.prototxt',
'data/data_opencv/MobileNetSSD_deploy.caffemodel')
self.Intrusion_Detection_on = Intrusion_Detection(self.net)
# 视频记录
self.fourcc = cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')
self.out = cv2.VideoWriter('output.avi', self.fourcc, 20, (640, 480))
# Get video information
self.fps = 20 # 设置帧速率
# ffmpeg command
self.command = ['ffmpeg',
'-y',
'-f', 'rawvideo',
'-vcodec', 'rawvideo',
'-pix_fmt', 'bgr24',
'-s', "{}x{}".format(480, 640),
'-r', str(self.fps),
'-i', '-',
'-c:v', 'libx264',
'-pix_fmt', 'yuv420p',
'-preset', 'slow',
'-f', 'flv',
self.rtmpUrl]
self.recieve = None
self.pretodo = ''
self.transfer_flag = False
def read_frame(self):
# 根据不同的操作系统,设定读取哪个摄像头
if platform.system() == 'Linux': # 如果是Linux系统
cap = cv2.VideoCapture(10) # 绑定编号为10的摄像头
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'))
cap.set(3, 640) # 设置摄像头画面的宽
cap.set(4, 480) # 设置摄像头画面的高
elif platform.system() == 'Darwin': # 如果是苹果的OS X系统
cap = cv2.VideoCapture(0) # 绑定编号为0的摄像头
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'))
cap.set(3, 640)
cap.set(4, 480)
else: # windows系统
cap = cv2.VideoCapture(0) # 绑定编号为0的摄像头
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'))
cap.set(3, 640)
cap.set(4, 480)
# read webcamera
counter = 0
pre = datetime.now()
while cap.isOpened():
counter += 1
if counter <= 10:
continue
ret, frame = cap.read()
if not ret:
self.cap = cv2.VideoCapture(0)
ret, frame = self.cap.read()
self.transfer_flag = False
recieve = self.ws.recv()
self.recieve = eval(recieve)
# put frame into queue
now = datetime.now()
if (now - pre).total_seconds() > 0.15:
self.frame_queue.put(frame)
pre = now
def push_frame(self):
# 防止多线程时 command 未被设置
while True:
if len(self.command) > 0:
# 管道配置
p = sp.Popen(self.command, stdin=sp.PIPE)
break
while True:
if not self.frame_queue.empty():
frame = self.frame_queue.get()
if self.recieve.todo == 'reboot':
pass
elif self.recieve.todo == 'entering':
data = self.recieve.data
people_type = data.type # 0代表老人,1代表员工,2代表义工
id = data.id
self.pretodo = 'entering'
self.Face_Register_on = Face_Register(people_type, id)
elif self.recieve.todo == 'takePhoto':
if self.pretodo == 'entering':
frame = self.Face_Register_on.take_photo(frame)
elif self.pretodo == '2':
self.scale = self.Calibration_on.run(frame)
elif self.recieve.todo == 'change':
fuc = self.recieve.data.fuc # 更改的功能 0:无 1微笑检测 2交互检测 3摔倒检测 4禁区入侵
if fuc == 0:
self.out.write(frame)
elif fuc == 1:
frame = self.Face_Recognizer_on.run(frame)
elif fuc == 2:
self.pretodo = '2'
if self.scale == -1:
frame = self.Calibration_on.run(frame)
else:
self.Interaction_on.process(frame)
elif fuc == 3:
if not self.transfer_flag:
self.cap = cv2.VideoCapture("test4.mp4")
self.transfer_flag = True
self.Fall_Detection_on.re_init()
frame = self.Fall_Detection_on.run(frame)
elif fuc == 4:
frame = self.Intrusion_Detection_on.process(frame)
# frame = Fall_Detection_on.run(frame=frame)
p.stdin.write(frame.tobytes())
def run(self):
threads = [
threading.Thread(target=Live.read_frame, args=(self,)),
threading.Thread(target=Live.push_frame, args=(self,))
]
[thread.setDaemon(False) for thread in threads]
[thread.start() for thread in threads]
if __name__ == "__main__":
live = Live()
live.run()

64
Class/detection/Post.py Normal file
View File

@@ -0,0 +1,64 @@
import requests
import datetime
url = "http://192.144.229.49:8000/api/event/list"
data_type_one = {
"oldperson_id": 1,
"event_type": 3, # 0代表情感检测1代表义工交互检测2代表陌生人检测3代表摔倒检测4代表禁止区域入侵检测
"event_date": "", # 事件日期
"event_location": "", # 事件地点(选)
"event_desc": "" # 必填,事件描述
}
data_type_two = {
"event_type": 3, # 0代表情感检测1代表义工交互检测2代表陌生人检测3代表摔倒检测4代表禁止区域入侵检测
"event_date": "", # 事件日期
"event_location": "", # 事件地点(选)
"event_desc": "" # 必填,事件描述
}
def post(elder_id='None', event=-1, imagePath='None', volunteer='None'):
flag = 0
if event == 0:
data_type_one['oldperson_id'] = elder_id
data_type_one['event_type'] = event
data_type_one['event_date'] = datetime.datetime.today().date()
data_type_one['event_location'] = '来自房间的摄像头'
data_type_one['event_desc'] = '老人笑了'
flag = 1
elif event == 1:
data_type_one['oldperson_id'] = elder_id
data_type_one['event_type'] = event
data_type_one['event_date'] = datetime.datetime.today().date()
data_type_one['event_location'] = '来自桌子上的摄像头'
data_type_one['event_desc'] = '老人与' + volunteer + '交互了'
flag = 1
elif event == 2:
data_type_two['event_type'] = event
data_type_two['event_date'] = datetime.datetime.today().date()
data_type_two['event_location'] = '来自房间的摄像头'
data_type_two['event_desc'] = '有陌生人出现在房间了'
flag = 2
elif event == 3:
data_type_two['event_type'] = event
data_type_two['event_date'] = datetime.datetime.today().date()
data_type_two['event_location'] = '来自走廊的摄像头'
data_type_two['event_desc'] = '老人摔倒了'
flag = 2
elif event == 4:
data_type_two['event_type'] = event
data_type_two['event_date'] = datetime.datetime.today().date()
data_type_two['event_location'] = '来自院子的摄像头'
data_type_two['event_desc'] = '有人闯入禁止区域了'
flag = 2
file = open(imagePath, 'rb')
imageFile = {'file': file}
if flag == 1:
status = requests.post(url, files=imageFile, data=data_type_one)
else:
status = requests.post(url, files=imageFile, data=data_type_two)
print(status)

View File

@@ -3,7 +3,7 @@ import cv2
def compute(img, min_percentile, max_percentile):
"""计算分位点,目的是去掉图1的直方图两头的异常情况"""
"""计算分位点,目的是去掉图的直方图两头的异常情况"""
max_percentile_pixel = np.percentile(img, max_percentile)
min_percentile_pixel = np.percentile(img, min_percentile)
@@ -15,8 +15,6 @@ def aug(src):
if get_lightness(src) > 130:
print("图片亮度足够,不做增强")
# 先计算分位点,去掉像素值中少数异常值,这个分位点可以自己配置。
# 比如1中直方图的红色在0到255上都有值但是实际上像素值主要在0到20内。
max_percentile_pixel, min_percentile_pixel = compute(src, 1, 99)

View File

@@ -0,0 +1,306 @@
# -*- coding: utf-8 -*-
'''
禁止区域检测主程序
摄像头对准围墙那一侧
用法:
python checkingfence.py
python checkingfence.py --filename tests/yard_01.mp4
'''
# import the necessary packages
from oldcare.track.centroidtracker import CentroidTracker
from oldcare.track.trackableobject import TrackableObject
from imutils.video import FPS
import numpy as np
import imutils
import argparse
import time
import dlib
import cv2
import os
import subprocess
# 得到当前时间
current_time = time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(time.time()))
print('[INFO] %s 禁止区域检测程序启动了.' % (current_time))
# 传入参数
ap = argparse.ArgumentParser()
ap.add_argument("-f", "--filename", required=False, default='',
help="")
args = vars(ap.parse_args())
# 全局变量
prototxt_file_path = 'data/data_opencv/MobileNetSSD_deploy.prototxt'
# Contains the Caffe deep learning model files.
# Well be using a MobileNet Single Shot Detector (SSD),
# “Single Shot Detectors for object detection”.
model_file_path = 'data/data_opencv/MobileNetSSD_deploy.caffemodel'
output_fence_path = 'supervision/fence'
input_video = args['filename']
skip_frames = 30 # of skip frames between detections
# your python path
# python_path = '/home/reed/anaconda3/envs/tensorflow/bin/python'
python_path = 'D://Python'
# todo 传入的参数
width = 400
# 超参数
# minimum probability to filter weak detections
minimum_confidence = 0.80
# 物体识别模型能识别的物体21种
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
"bottle", "bus", "car", "cat", "chair",
"cow", "diningtable", "dog", "horse", "motorbike",
"person", "pottedplant", "sheep", "sofa", "train",
"tvmonitor"]
# if a video path was not supplied, grab a reference to the webcam
if not input_video:
print("[INFO] starting video stream...")
vs = cv2.VideoCapture(0)
time.sleep(2)
else:
print("[INFO] opening video file...")
vs = cv2.VideoCapture(input_video)
# 加载物体识别模型
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(prototxt_file_path, model_file_path)
# initialize the frame dimensions (we'll set them as soon as we read
# the first frame from the video)
W = None
H = None
# instantiate our centroid tracker, then initialize a list to store
# each of our dlib correlation trackers, followed by a dictionary to
# map each unique object ID to a TrackableObject
ct = CentroidTracker(maxDisappeared=40, maxDistance=50)
trackers = []
trackableObjects = {}
# initialize the total number of frames processed thus far, along
# with the total number of objects that have moved either up or down
totalFrames = 0
totalDown = 0
totalUp = 0
# start the frames per second throughput estimator
fps = FPS().start()
# loop over frames from the video stream
while True:
# grab the next frame and handle if we are reading from either
# VideoCapture or VideoStream
ret, frame = vs.read()
# if we are viewing a video and we did not grab a frame then we
# have reached the end of the video
if input_video and not ret:
break
if not input_video:
frame = cv2.flip(frame, 1)
# resize the frame to have a maximum width of 500 pixels (the
# less data we have, the faster we can process it), then convert
# the frame from BGR to RGB for dlib
frame = imutils.resize(frame, width=width)
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# if the frame dimensions are empty, set them
if W is None or H is None:
(H, W) = frame.shape[:2]
# initialize the current status along with our list of bounding
# box rectangles returned by either (1) our object detector or
# (2) the correlation trackers
status = "Waiting"
rects = []
# check to see if we should run a more computationally expensive
# object detection method to aid our tracker
if totalFrames % skip_frames == 0:
# set the status and initialize our new set of object trackers
status = "Detecting"
trackers = []
# convert the frame to a blob and pass the blob through the
# network and obtain the detections
blob = cv2.dnn.blobFromImage(frame, 0.007843, (W, H), 127.5)
net.setInput(blob)
detections = net.forward()
# loop over the detections
for i in np.arange(0, detections.shape[2]):
# extract the confidence (i.e., probability) associated
# with the prediction
confidence = detections[0, 0, i, 2]
# filter out weak detections by requiring a minimum
# confidence
if confidence > minimum_confidence:
# extract the index of the class label from the
# detections list
idx = int(detections[0, 0, i, 1])
# if the class label is not a person, ignore it
if CLASSES[idx] != "person":
continue
# compute the (x, y)-coordinates of the bounding box
# for the object
box = detections[0, 0, i, 3:7] * np.array([W, H, W, H])
(startX, startY, endX, endY) = box.astype("int")
# construct a dlib rectangle object from the bounding
# box coordinates and then start the dlib correlation
# tracker
tracker = dlib.correlation_tracker()
rect = dlib.rectangle(startX, startY, endX, endY)
tracker.start_track(rgb, rect)
# add the tracker to our list of trackers so we can
# utilize it during skip frames
trackers.append(tracker)
# otherwise, we should utilize our object *trackers* rather than
# object *detectors* to obtain a higher frame processing throughput
else:
# loop over the trackers
for tracker in trackers:
# set the status of our system to be 'tracking' rather
# than 'waiting' or 'detecting'
status = "Tracking"
# update the tracker and grab the updated position
tracker.update(rgb)
pos = tracker.get_position()
# unpack the position object
startX = int(pos.left())
startY = int(pos.top())
endX = int(pos.right())
endY = int(pos.bottom())
# draw a rectangle around the people
cv2.rectangle(frame, (startX, startY), (endX, endY),
(0, 255, 0), 2)
# add the bounding box coordinates to the rectangles list
rects.append((startX, startY, endX, endY))
# draw a horizontal line in the center of the frame -- once an
# object crosses this line we will determine whether they were
# moving 'up' or 'down'
# cv2.line(frame, (0, H // 2), (W, H // 2), (0, 255, 255), 2)
# use the centroid tracker to associate the (1) old object
# centroids with (2) the newly computed object centroids
objects = ct.update(rects)
# loop over the tracked objects
for (objectID, centroid) in objects.items():
# check to see if a trackable object exists for the current
# object ID
to = trackableObjects.get(objectID, None)
# if there is no existing trackable object, create one
if to is None:
to = TrackableObject(objectID, centroid)
# otherwise, there is a trackable object so we can utilize it
# to determine direction
else:
# the difference between the y-coordinate of the *current*
# centroid and the mean of *previous* centroids will tell
# us in which direction the object is moving (negative for
# 'up' and positive for 'down')
y = [c[1] for c in to.centroids]
direction = centroid[1] - np.mean(y)
to.centroids.append(centroid)
# check to see if the object has been counted or not
if not to.counted:
# if the direction is negative (indicating the object
# is moving up) AND the centroid is above the center
# line, count the object
if direction < 0 and centroid[1] < H // 2:
totalUp += 1
to.counted = True
# if the direction is positive (indicating the object
# is moving down) AND the centroid is below the
# center line, count the object
elif direction > 0 and centroid[1] > H // 2:
totalDown += 1
to.counted = True
current_time = time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(time.time()))
event_desc = '有人闯入禁止区域!!!'
event_location = '院子'
print('[EVENT] %s, 院子, 有人闯入禁止区域!!!'
% (current_time))
cv2.imwrite(os.path.join(output_fence_path,
'snapshot_%s.jpg'
% (time.strftime('%Y%m%d_%H%M%S'))), frame)
# todo insert into database
# command = '%s inserting.py --event_desc %s--event_type4 - -event_location % s' % \
# (python_path, event_desc, event_location)
# p = subprocess.Popen(command, shell=True)
# store the trackable obj ect in our dictionary
trackableObjects[objectID] = to
# draw both the ID of the object and the centroid of the
# object on the output frame
text = "ID {}".format(objectID)
cv2.putText(frame, text, (centroid[0] - 10, centroid[1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
cv2.circle(frame, (centroid[0], centroid[1]), 4,
(0, 255, 0), -1)
# construct a tuple of information we will be displaying on the
# frame
info = [
# ("Up", totalUp),
# ("Down", totalDown),
("Status", status),
]
# loop over the info tuples and draw them on our frame
for (i, (k, v)) in enumerate(info):
text = "{}: {}".format(k, v)
cv2.putText(frame, text, (10, H - ((i * 20) + 20)),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
# show the output frame
frame = cv2.resize(frame, (640, 480))
cv2.imshow("Prohibited Area", frame)
k = cv2.waitKey(1) & 0xff
# esc
if k == 27:
break
# increment the total number of frames processed thus far and
# then update the FPS counter
totalFrames += 1
fps.update()
# stop the timer and display FPS information
fps.stop()
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed())) # 14.19
print("[INFO] approx. FPS: {:.2f}".format(fps.fps())) # 90.43
# close any open windows
vs.release()
cv2.destroyAllWindows()

View File

@@ -1,36 +1,33 @@
from model import create_model
from keras import backend as K
from keras.models import Model
from keras.layers import Input, Layer
from data import triplet_generator
# from keras import backend as K
# from keras.layers import Layer
import numpy as np
import os.path
import cv2
from align import AlignDlib
alignment = AlignDlib('data/data_dlib/landmarks.dat')
class TripletLossLayer(Layer):
def __init__(self, alpha, **kwargs):
self.alpha = alpha
super(TripletLossLayer, self).__init__(**kwargs)
def triplet_loss(self, inputs):
a, p, n = inputs
p_dist = K.sum(K.square(a - p), axis=-1)
n_dist = K.sum(K.square(a - n), axis=-1)
return K.sum(K.maximum(p_dist - n_dist + self.alpha, 0), axis=0)
def call(self, inputs):
loss = self.triplet_loss(inputs)
self.add_loss(loss)
return loss
# class TripletLossLayer(Layer):
# def __init__(self, alpha, **kwargs):
# self.alpha = alpha
# super(TripletLossLayer, self).__init__(**kwargs)
#
# def triplet_loss(self, inputs):
# a, p, n = inputs
# p_dist = K.sum(K.square(a - p), axis=-1)
# n_dist = K.sum(K.square(a - n), axis=-1)
# return K.sum(K.maximum(p_dist - n_dist + self.alpha, 0), axis=0)
#
# def call(self, inputs):
# loss = self.triplet_loss(inputs)
# self.add_loss(loss)
# return loss
class IdentityMetadata():
def __init__(self, base, name, file):
def __init__(self, base, type, name, file):
# dataset base directory
self.base = base
# identity people type
self.type = type
# identity name
self.name = name
# image file name
@@ -40,20 +37,23 @@ class IdentityMetadata():
return self.image_path()
def image_path(self):
return os.path.join(self.base, self.name, self.file)
return os.path.join(self.base, self.type, self.name, self.file)
def load_metadata(path):
metadata = []
for i in sorted(os.listdir(path)):
person = []
for f in sorted(os.listdir(os.path.join(path, i))):
# Check file extension. Allow only jpg/jpeg' files.
ext = os.path.splitext(f)[1]
if ext == '.jpg' or ext == '.jpeg':
person.append(IdentityMetadata(path, i, f))
metadata.append(person)
return np.array(metadata,dtype=object)
type = []
for j in sorted(os.listdir(os.path.join(path, i))):
person = []
for k in sorted(os.listdir(os.path.join(path + i, j))):
# Check file extension. Allow only jpg/jpeg' files.
ext = os.path.splitext(k)[1]
if ext == '.jpg' or ext == '.jpeg':
person.append(IdentityMetadata(path, i, j, k))
type.append(person)
metadata.append(type)
return np.array(metadata, dtype=object)
def load_image(path):
@@ -63,108 +63,5 @@ def load_image(path):
return img[..., ::-1]
def align_image(img):
return alignment.align(96, img, alignment.getLargestFaceBoundingBox(img),
landmarkIndices=AlignDlib.OUTER_EYES_AND_NOSE)
def distance(emb1, emb2):
return np.sum(np.square(emb1 - emb2))
# if __name__ == '__main__':
# # nn4_small2 = create_model()
# #
# # # Input for anchor, positive and negative images
# # in_a = Input(shape=(96, 96, 3))
# # in_p = Input(shape=(96, 96, 3))
# # in_n = Input(shape=(96, 96, 3))
# #
# # # Output for anchor, positive and negative embedding vectors
# # # The nn4_small model instance is shared (Siamese network)
# # emb_a = nn4_small2(in_a)
# # emb_p = nn4_small2(in_p)
# # emb_n = nn4_small2(in_n)
# #
# # # Layer that computes the triplet loss from anchor, positive and negative embedding vectors
# # triplet_loss_layer = TripletLossLayer(alpha=0.2, name='triplet_loss_layer')([emb_a, emb_p, emb_n])
# #
# # # Model that can be trained with anchor, positive negative images
# # nn4_small2_train = Model([in_a, in_p, in_n], triplet_loss_layer)
# #
# # # triplet_generator() creates a generator that continuously returns
# # # ([a_batch, p_batch, n_batch], None) tuples where a_batch, p_batch
# # # and n_batch are batches of anchor, positive and negative RGB images
# # # each having a shape of (batch_size, 96, 96, 3).
# # generator = triplet_generator()
# #
# # nn4_small2_train.compile(loss=None, optimizer='adam')
# # nn4_small2_train.fit_generator(generator, epochs=1, steps_per_epoch=100)
#
# # Please note that the current implementation of the generator only generates
# # random image data. The main goal of this code snippet is to demonstrate
# # the general setup for model training. In the following, we will anyway
# # use a pre-trained model so we don't need a generator here that operates
# # on real training data. I'll maybe provide a fully functional generator
# # later.
# nn4_small2_pretrained = create_model()
# nn4_small2_pretrained.load_weights('weights/nn4.small2.v1.h5')
#
# metadata = load_metadata('images')
#
# # Initialize the OpenFace face alignment utility
#
#
# # # Load an image of Jacques Chirac
# # jc_orig = load_image(metadata[78].image_path())
# #
# # # Detect face and return bounding box
# # bb = alignment.getLargestFaceBoundingBox(jc_orig)
# #
# # # Transform image using specified face landmark indices and crop image to 96x96
# # jc_aligned = alignment.align(96, jc_orig, bb, landmarkIndices=AlignDlib.OUTER_EYES_AND_NOSE)
#
# embedded = np.zeros((metadata.shape[0], 128))
#
# for i, m in enumerate(metadata):
# img = load_image(m.image_path())
# # img = align_image(img)
# img = cv2.resize(img, (96, 96))
# # scale RGB values to interval [0,1]
# try:
# img = (img / 255.).astype(np.float32)
# # obtain embedding vector for image
# embedded[i] = nn4_small2_pretrained.predict(np.expand_dims(img, axis=0))[0]
# except:
# print(m.image_path)
#
# # show_pair(77, 78)
# # show_pair(77, 100)
# cap = cv2.VideoCapture(0)
# while cap.isOpened():
# flag, frame = cap.read()
# kk = cv2.waitKey(1)
# # 按下 q 键退出
# if kk == ord('q'):
# break
# else:
# try:
# # img = align_image(frame)
# frame = cv2.resize(frame, (96, 96))
# img = (frame / 255.).astype(np.float32)
# img = nn4_small2_pretrained.predict(np.expand_dims(img, axis=0))[0]
# d = []
# for i in range(0, len(embedded)):
# d.append(distance(embedded[i], img))
#
# name = ['Person_2', 'tbs', 'Person_1']
#
# print(name[d.index(min(d))])
# # if d < 1:
# # print("same face")
# # else:
# # print("different face")
# except Exception as e:
# print(e)
# cv2.imshow("normal", frame)
# cap.release()
# cv2.destroyAllWindows()

View File

@@ -2,6 +2,7 @@
# # It requires OpenCV installed for Python
import math
import sys
import threading
import time
import cv2
import os
@@ -9,6 +10,8 @@ from sys import platform
import argparse
import numpy as np
from Post import post
from PIL import ImageDraw, ImageFont
from PIL import Image
@@ -45,13 +48,14 @@ try:
cap = cv2.VideoCapture("test4.mp4")
_, frame = cap.read()
cv2.imwrite('fall_detection.jpg', frame)
# frame = cv2.resize(frame, (640, 480))
# cv2.imwrite('fall_detection.jpg', frame)
# Flags
parser = argparse.ArgumentParser()
parser.add_argument("--image_path",
default="fall_detection.jpg",
help="Process an image. Read all standard formats (jpg, png, bmp, etc.).")
# parser.add_argument("--image_path",
# default="fall_detection.jpg",
# help="Process an image. Read all standard formats (jpg, png, bmp, etc.).")
args = parser.parse_known_args()
# Custom Params (refer to include/openpose/flags.hpp for more parameters)
@@ -106,15 +110,16 @@ try:
if kk == ord('q'):
break
else:
cv2.imwrite('fall_detection.jpg', frame)
# cv2.imwrite('fall_detection.jpg', frame)
height = []
width = []
center = []
# Process Image
datum = op.Datum()
imageToProcess = cv2.imread(args[0].image_path)
datum.cvInputData = imageToProcess
# imageToProcess = cv2.imread(args[0].image_path)
datum.cvInputData = frame
# datum.cvInputData = imageToProcess
opWrapper.emplaceAndPop([datum])
img_rd = datum.cvOutputData
@@ -148,10 +153,11 @@ try:
# print(v, abs(a))
if (abs(a) > 0.2) and \
(np.subtract(np.array(width), np.array(height)) > np.subtract(np.array(width0),
np.array(height0))):
np.array(height0)) and np.subtract(np.array(width), np.array(height)) > 0):
couter += 1
# print(np.subtract(np.array(width), np.array(height)))
# print("alarm by v and a")
elif (width > height and (x[8] != 0 or x[9] != 0 or x[12] != 0) and v < 0.41):
elif (width > height and (x[8] != 0 or x[9] != 0 or x[12] != 0) and v < 1):
couter += 1
# print("alarm by w and h")
else:
@@ -168,7 +174,10 @@ try:
draw.text((10, 10), text="Fall Detected", font=font,
fill=(255, 0, 0))
img_rd = cv2.cvtColor(np.array(img_rd), cv2.COLOR_RGB2BGR)
cv2.imwrite('fall_detection.jpg', frame)
# t = threading.Thread(target=post(event=3, imagePath='fall_detection.jpg'))
# t.start()
# status = post(event=3, imagePath='fall_detection.jpg')
# print("fall")
# update variables
@@ -179,7 +188,7 @@ try:
# if width > height:
# print("alarm")
firstFrame = None
except:
except Exception as e:
text = ""
@@ -200,9 +209,11 @@ try:
# cv2.putText(frame, datetime.datetime.now().strftime("%A %d %B %Y %I:%M:%S%p"),
# (10, frame.shape[0] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.35, (255, 255, 255), 1)
# cv2.imshow("Frame Delta", frameDelta)
cv2.imshow("OpenPose 1.6.0 - Tutorial Python API", frame)
img_rd = cv2.resize(frame, (640, 480))
cv2.imshow("OpenPose 1.6.0 - Tutorial Python API", img_rd)
continue
img_rd = cv2.resize(img_rd, (640, 480))
# cv2.resizeWindow("OpenPose 1.6.0 - Tutorial Python API", 640, 480)
cv2.imshow("OpenPose 1.6.0 - Tutorial Python API", img_rd)
cap.release()
cv2.destroyAllWindows()

View File

@@ -1,80 +1,50 @@
# -*- coding: utf-8 -*-
import json
import urllib.request
import urllib.error
import time
from statistics import mode
import cv2
from keras.models import load_model
import numpy as np
http_url = 'https://api-cn.faceplusplus.com/facepp/v3/detect'
key = "cW2DWAvBTk7lsoo8iCgR4TzWrYpVP_qP"
secret = "NYXhr6g0rTDtKNloY6xyGK1fUpyL_t5W"
filepath = r"C:\Users\tq\Desktop\1.jpg"
cap = cv2.VideoCapture(0)
cap.set(3, 480)
while cap.isOpened():
flag, img_rd = cap.read()
kk = cv2.waitKey(1)
# 按下 q 键退出
if kk == ord('q'):
break
else:
cv2.imwrite(r"C:\Users\tq\Desktop\1.jpg", img_rd)
boundary = '----------%s' % hex(int(time.time() * 1000))
data = []
data.append('--%s' % boundary)
data.append('Content-Disposition: form-data; name="%s"\r\n' % 'api_key')
data.append(key)
data.append('--%s' % boundary)
data.append('Content-Disposition: form-data; name="%s"\r\n' % 'api_secret')
data.append(secret)
data.append('--%s' % boundary)
fr = open(filepath, 'rb')
data.append('Content-Disposition: form-data; name="%s"; filename=" "' % 'image_file')
data.append('Content-Type: %s\r\n' % 'application/octet-stream')
data.append(fr.read())
fr.close()
data.append('--%s' % boundary)
data.append('Content-Disposition: form-data; name="%s"\r\n' % 'return_landmark')
data.append('1')
data.append('--%s' % boundary)
data.append('Content-Disposition: form-data; name="%s"\r\n' % 'return_attributes')
data.append(
"gender,age,smiling,headpose,facequality,blur,eyestatus,emotion,ethnicity,beauty,mouthstatus,eyegaze,skinstatus")
data.append('--%s--\r\n' % boundary)
from utils.datasets import get_labels
from utils.inference import detect_faces
from utils.inference import draw_text
from utils.inference import draw_bounding_box
from utils.inference import load_detection_model
from utils.preprocessor import preprocess_input
for i, d in enumerate(data):
if isinstance(d, str):
data[i] = d.encode('utf-8')
# parameters for loading data and images
detection_model_path = './data/data_opencv/res10_300x300_ssd_iter_140000.caffemodel'
emotion_model_path = 'data/trained_models/emotion_models/fer2013_mini_XCEPTION.102-0.66.hdf5'
emotion_labels = get_labels('fer2013')
http_body = b'\r\n'.join(data)
# hyper-parameters for bounding boxes shape
frame_window = 10
emotion_offsets = (20, 40)
# build http request
req = urllib.request.Request(url=http_url, data=http_body)
# loading models
face_detection = load_detection_model(detection_model_path)
emotion_classifier = load_model(emotion_model_path, compile=False)
# header
req.add_header('Content-Type', 'multipart/form-data; boundary=%s' % boundary)
# getting input model shapes for inference
emotion_target_size = emotion_classifier.input_shape[1:3]
try:
# post data to server
resp = urllib.request.urlopen(req, timeout=5)
# get response
qrcont = resp.read()
# if you want to load as json, you should decode first,
# for example: json.loads(qrount.decode('utf-8'))
try:
print(json.loads(qrcont.decode('utf-8'))['faces'][0]['attributes']['smile'])
print(json.loads(qrcont.decode('utf-8'))['faces'][0]['attributes']['emotion'])
except:
pass
faces = json.loads(qrcont.decode('utf-8'))['faces']
for face in faces:
rec = face['face_rectangle']
cv2.rectangle(img_rd, tuple([rec['left'], rec['top']]),
tuple([rec['left'] + rec['width'], rec['top'] + rec['height']]),
(0, 255, 0), 2)
cv2.imshow("camera", img_rd)
except urllib.error.HTTPError as e:
pass
cap.release()
cv2.destroyAllWindows()
# starting lists for calculating modes
emotion_window = []
def smile_detect(bgr_image):
gray_image = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2GRAY)
x1 = y1 = 0
(x2, y2, channel) = bgr_image.shape
gray_face = gray_image[y1:y2, x1:x2]
gray_face = cv2.resize(gray_face, (emotion_target_size))
gray_face = preprocess_input(gray_face, True)
gray_face = np.expand_dims(gray_face, 0)
gray_face = np.expand_dims(gray_face, -1)
emotion_prediction = emotion_classifier.predict(gray_face)
emotion_label_arg = np.argmax(emotion_prediction)
emotion_text = emotion_labels[emotion_label_arg]
return emotion_text