197 lines
7.2 KiB
Python
197 lines
7.2 KiB
Python
import math
|
|
import sys
|
|
import threading
|
|
import time
|
|
import cv2
|
|
import os
|
|
from sys import platform
|
|
import argparse
|
|
import numpy as np
|
|
from Post import post
|
|
|
|
from PIL import ImageDraw, ImageFont
|
|
from PIL import Image
|
|
|
|
import queue
|
|
import frame_process
|
|
import algorithm_fall
|
|
|
|
# Import Openpose (Windows/Ubuntu/OSX)
|
|
# dir_path = os.path.dirname(os.path.realpath(__file__))
|
|
dir_path = 'D:\\BJTU\\Python\\openpose-master\\build-2017'
|
|
try:
|
|
# Windows Import
|
|
if platform == "win32":
|
|
# Change these variables to point to the correct folder (Release/x64 etc.)
|
|
sys.path.append(dir_path + '\\python\\openpose\\Release')
|
|
os.environ['PATH'] = os.environ['PATH'] + ';' + dir_path + '\\x64\\Release;' + dir_path + '\\bin;'
|
|
import pyopenpose as op
|
|
else:
|
|
# Change these variables to point to the correct folder (Release/x64 etc.)
|
|
sys.path.append('../../python')
|
|
# If you run `make install` (default path is `/usr/local/python` for Ubuntu), you can also access the OpenPose/python module from there. This will install OpenPose and the python library at your desired installation path. Ensure that this is in your python path in order to use it.
|
|
# sys.path.append('/usr/local/python')
|
|
from openpose import pyopenpose as op
|
|
except ImportError as e:
|
|
print(
|
|
'Error: OpenPose library could not be found. Did you enable `BUILD_PYTHON` in CMake and have this Python script in the right folder?')
|
|
raise e
|
|
|
|
|
|
class Fall_Detection:
|
|
def __init__(self):
|
|
self.parser = argparse.ArgumentParser()
|
|
self.args = self.parser.parse_known_args()
|
|
self.params = dict()
|
|
self.params["model_folder"] = "D:\\BJTU\\Python\\openpose-master\\models\\"
|
|
self.params['net_resolution'] = "-1x160"
|
|
|
|
for i in range(0, len(self.args[1])):
|
|
curr_item = self.args[1][i]
|
|
if i != len(self.args[1]) - 1:
|
|
next_item = self.args[1][i + 1]
|
|
else:
|
|
next_item = "1"
|
|
if "--" in curr_item and "--" in next_item:
|
|
key = curr_item.replace('-', '')
|
|
if key not in self.params: self.params[key] = "1"
|
|
elif "--" in curr_item and "--" not in next_item:
|
|
key = curr_item.replace('-', '')
|
|
if key not in self.params: self.params[key] = next_item
|
|
|
|
# Starting OpenPose
|
|
self.opWrapper = op.WrapperPython()
|
|
self.opWrapper.configure(self.params)
|
|
self.opWrapper.start()
|
|
|
|
# init variables
|
|
self.frame_start_time = 0
|
|
self.v0 = 0
|
|
self.width0 = []
|
|
self.height0 = []
|
|
self.center0 = []
|
|
self.couter = 0
|
|
self.error = 0
|
|
|
|
self.xList = queue.Queue(maxsize=10)
|
|
self.yList = queue.Queue(maxsize=10)
|
|
self.prevX = 0.0
|
|
self.prevY = 0.0
|
|
self.centerV = 0
|
|
self.centerSpeed = 0
|
|
self.alert = 0
|
|
self.firstFrame = None
|
|
|
|
def re_init(self):
|
|
self.frame_start_time = 0
|
|
self.v0 = 0
|
|
self.width0 = []
|
|
self.height0 = []
|
|
self.center0 = []
|
|
self.couter = 0
|
|
self.error = 0
|
|
|
|
self.xList = queue.Queue(maxsize=10)
|
|
self.yList = queue.Queue(maxsize=10)
|
|
self.prevX = 0.0
|
|
self.prevY = 0.0
|
|
self.centerV = 0
|
|
self.centerSpeed = 0
|
|
self.alert = 0
|
|
self.firstFrame = None
|
|
|
|
def run(self, frame):
|
|
height = []
|
|
width = []
|
|
center = []
|
|
|
|
# Process Image
|
|
datum = op.Datum()
|
|
datum.cvInputData = frame
|
|
self.opWrapper.emplaceAndPop([datum])
|
|
img_rd = datum.cvOutputData
|
|
|
|
# fall judge
|
|
try:
|
|
# key points have been identified
|
|
x = datum.poseKeypoints[0][:, 0]
|
|
y = datum.poseKeypoints[0][:, 1]
|
|
|
|
width.append(np.max(x[np.nonzero(x)]) - np.min(x[np.nonzero(x)]))
|
|
height.append(np.max(y[np.nonzero(y)]) - np.min(y[np.nonzero(y)]))
|
|
|
|
center.append(np.mean(x[np.nonzero(x)]))
|
|
center.append(np.mean(y[np.nonzero(y)]))
|
|
|
|
if self.frame_start_time == 0:
|
|
self.center0 = center.copy()
|
|
self.width0 = width.copy()
|
|
self.height0 = height.copy()
|
|
self.frame_start_time = time.time()
|
|
else:
|
|
diff = np.array([center[0] - self.center0[0], center[1] - self.center0[1]])
|
|
dist = math.sqrt(np.sum((diff * 10 ** (-4)) ** 2))
|
|
now = time.time()
|
|
v = dist / (now - self.frame_start_time)
|
|
a = (v ** 2 - self.v0 ** 2) / (2 * dist)
|
|
|
|
# print(v, abs(a))
|
|
if (abs(a) > 0.2) and \
|
|
(np.subtract(np.array(width), np.array(height)) > np.subtract(np.array(self.width0), np.array(
|
|
self.height0)) and np.subtract(np.array(width), np.array(height)) > 0):
|
|
self.couter += 1
|
|
# print("alarm by v and a")
|
|
elif (width > height and (x[8] != 0 or x[9] != 0 or x[12] != 0) and v < 1):
|
|
self.couter += 1
|
|
# print("alarm by w and h")
|
|
else:
|
|
if self.error == 0:
|
|
self.error += 1
|
|
else:
|
|
self.couter = 0
|
|
self.error = 0
|
|
|
|
if self.couter > 3:
|
|
font = ImageFont.truetype("simsun.ttc", 30, index=1)
|
|
img_rd = Image.fromarray(cv2.cvtColor(datum.cvOutputData, cv2.COLOR_BGR2RGB))
|
|
draw = ImageDraw.Draw(img_rd)
|
|
draw.text((10, 10), text="Fall Detected", font=font,
|
|
fill=(255, 0, 0))
|
|
img_rd = cv2.cvtColor(np.array(img_rd), cv2.COLOR_RGB2BGR)
|
|
cv2.imwrite('fall_detection.jpg', frame)
|
|
t = threading.Thread(target=post(event=3, imagePath='fall_detection.jpg'))
|
|
t.setDaemon(False)
|
|
t.start()
|
|
# status = post(event=3, imagePath='fall_detection.jpg')
|
|
# print("fall")
|
|
|
|
# update variables
|
|
self.frame_start_time = now
|
|
self.v0 = v
|
|
self.width0 = width.copy()
|
|
self.height0 = height.copy()
|
|
# if width > height:
|
|
# print("alarm")
|
|
self.firstFrame = None
|
|
except Exception as e:
|
|
gray = frame_process.preprocess_frame(frame)
|
|
|
|
if self.firstFrame is None:
|
|
self.firstFrame = gray
|
|
pass
|
|
|
|
frameDelta = cv2.absdiff(self.firstFrame, gray)
|
|
|
|
cnts = frame_process.get_contours(self.firstFrame, gray)
|
|
|
|
defined_min_area = 3000
|
|
frame, self.alert = algorithm_fall.fall_detect(cnts, defined_min_area, frame, self.prevX, self.prevY,
|
|
self.xList, self.yList, self.centerV, self.alert)
|
|
|
|
img_rd = frame
|
|
# cv2.imshow("OpenPose 1.6.0 - Tutorial Python API", frame)
|
|
|
|
frame = cv2.resize(img_rd, (640, 480))
|
|
# cv2.imshow("OpenPose 1.6.0 - Tutorial Python API", img_rd)
|
|
return frame
|
|
# http://zhuooyu.cn:8000/api/person/old/10 |