src.face_detection_trace 源代码

import json
import os
from collections import defaultdict

import itertools
import logging
import threading
from datetime import datetime
from queue import Queue
import cv2
import face_recognition
import numpy as np

from src.face_encoding import FaceEncodingFactory
from src.face_detection import FaceDetectionFactory
from src.util import Util, LimitList, FrameBox

"""
人脸自动录入和识别,并生成事件形式的打卡记录

自动录入:未识别头像自动采集到到指定文件夹,如果是公司员工,修改文件夹名称将未识别头像变成公司内部员工
打卡记录:通过人脸检测,跟踪,识别(特征值提取),比对底库,等生成人员打卡记录

帮助:python face_detection_trace.py -h
optional arguments:
  -c CONFIG_PATH, --config_path :配置文件路径
  -fe FACE_ENCODING, --face_encoding :人脸特征值提取算法
  -pimg PERSON_IMAGE_DIR, --person_image_dir :人脸图片保存路径
  -vimg VIDEO_IMAGE_DIR, --video_image_dir :视频图片保存路径

使用示例:python face_detection_trace.py -c config.json 
"""

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler = logging.FileHandler("../log.txt")
handler.setLevel(logging.INFO)
logger = logging.getLogger(__name__)
logger.addHandler(handler)


[文档]class Person(object): """ 人员,可能是员工或者陌生人 :cvar str ipc_name: 摄像头名称 :cvar bool is_new: 是否是新人 :cvar str person_name: 人员姓名 :cvar list frames_box_limit: 如果是新人,建立list,保存新人头像 :cvar List __encodings: frames_box_limit里头像对应的头像特征码信息 """ __unknow_max_id = 0 face_encoding = None img_dir = "" def __init__(self, person_name, img_files, ipc_name, is_new=False, new_face_frame_max=10): self.ipc_name = ipc_name self.is_new = is_new self.person_name = person_name assert np.all([os.path.exists(img) for img in img_files]), "img file is error" # assert np.all([img.find("unknow") == -1 for img in self.imgs]), "img file can"t contain unknow" frames_box = [FrameBox(*FrameBox.parse_file(img_file)) for img_file in img_files] encodings = [Person.face_encoding.encoding_frame_box(frame_box) for frame_box in frames_box] self.__encodings = encodings if is_new: self.frames_box_limit = LimitList(new_face_frame_max) self.__encodings = [] logger.info("add new persion %s" % (str([np.sum(encoding) for encoding in self.encodings_valid()])))
[文档] def new_frame_box(self, frame_box): """ 向frames_box_limit中新增frame_box信息 :param frame_box: FrameFox信息,包含图片和图片里头像位置信息 """ if self.is_new: self.frames_box_limit.append(frame_box) and self.__encodings.append( Person.face_encoding.encoding_frame_box(frame_box))
[文档] def encodings_valid(self): """ 返回有效的encoding信息 :return: encoding中有效的头像编码 """ return [x for x in self.__encodings if x is not None and len(x)]
[文档] @staticmethod def new_unknow_person(ipc_name): """ 新增陌生人 :param ipc_name:摄像头名称 :return:陌生人person实例 """ return Person(Person.get_unknow_name(), list(), ipc_name=ipc_name, is_new=True)
[文档] @staticmethod def get_unknow_name() -> str: """ 生成陌生人名字 :return: 陌生人名字 """ Person.__unknow_max_id += 1 return "unknow{0:03d}".format(Person.__unknow_max_id)
[文档] def save(self): """ 保存frames_box_limit中的图片和头像位置信息 """ if self.is_new: dir_path = Person.img_dir + self.ipc_name + "/" + self.person_name + "/" os.path.exists(dir_path) or os.makedirs(dir_path) for frame_box in self.frames_box_limit: cv2.imwrite(dir_path + frame_box.name, frame_box.img) self.is_new = False
[文档] @staticmethod def get_camera_person_files(cameras_dir): """ 加载某个摄像头下某人的所有头像图片信息 :param cameras_dir: 图片目录 :return: dict,key:camera_name,摄像头名称,value:dict02,dict02:key:person_name,姓名,value:img_list,此人对应头像列表 """ assert os.path.exists(cameras_dir) and os.path.isdir(cameras_dir), "dir is illegal" camera_person_dict = defaultdict(dict) for camera_dir in os.listdir(cameras_dir): if os.path.isdir(cameras_dir + camera_dir): camera_person_dict[camera_dir] = Util.get_dirs_files(cameras_dir + "/" + camera_dir + "/") return camera_person_dict
[文档]class Track(object): """ 跟踪器 内部包含了opencv追踪器,或者说对opencv追踪器的二次封装 :cvar str ipc_name: 摄像头名称 :cvar int __id: 跟踪器id :cvar object tracker: 跟踪器,opencv追踪器实例 :cvar list face_img: 图片里的头像小图信息 :cvar list img: 图片 :cvar list encoding: 人脸头像对应特征码 :cvar list __history: 临近的各帧是否包含此人 :cvar object match_person: 跟踪器匹配的人 :cvar callable event_call_back: 追踪的人消失之后的回调函数,比如生成事件日志 """ __id = 0 def __init__(self, ipc_name, tracker, img, box, encoding, persons, event_call_back, history=5): self.ipc_name = ipc_name self.__id = Track.__id = Track.__id + 1 self.tracker = tracker self.face_img = img[(box[1]):(box[1] + box[3]), (box[0]):(box[0] + box[2])] self.img = img self.encoding = encoding self.__history = [False] * history self.__history_iter = itertools.cycle(range(history)) self.match_person = None # 暂不用self.__history_have=bool self.__history[next(self.__history_iter)] = True self.__init_tracker(img, box) self.find_person(persons) self.event_call_back = event_call_back self.event_call_back(0, self.ipc_name, self.__id, self.face_img, box, self.match_person.person_name) def __init_tracker(self, img, box): """ 使用图片和图片里的头像位置,初始化内置的opencv追踪器 :param img: 图片 :param box: 头像位置 """ self.tracker.init(img, tuple(box))
[文档] def update_img(self, img, box, encoding): """ 更新追踪器信息 :param img:图片 :param box:人脸位置 :param encoding:人脸特征编码 """ self.face_img = img[box[1]:box[1] + box[3], box[0]:box[0] + box[2]] self.match_person.new_frame_box(FrameBox(img, box)) self.encoding = encoding self.img = img iter_num = next(self.__history_iter) if self.alive() and self.__history[iter_num] == 1 and sum(self.__history) == 1: self.event_call_back(1, self.ipc_name, self.id, self.img, box, self.match_person.person_name) self.__history[iter_num] = False
[文档] def update(self, img): """ 更新追踪器图片 :param img: 图片 :return: """ return self.tracker.update(img)
[文档] def find_person(self, persons, tolerance=0.6): """ 从入参的persons中匹配track追踪器追踪的人(将track和person关联起来) :param persons:所有人员列表 :param tolerance:人员匹配阈值,如果人员距离小于此阈值则认为是同一个人 :return: """ if self.encoding is None or len(self.encoding) == 0: return person_dist = [min(face_recognition.face_distance(person.encodings_valid(), self.encoding), default=1.0) for person in persons] logger.info("match_person self.encodings %s " % (str(np.sum(self.encoding)))) if min(person_dist, default=1.0) < tolerance: min_dist_index = np.argmin(person_dist) self.match_person = persons[min_dist_index] else: self.match_person = Person.new_unknow_person(ipc_name=self.ipc_name) persons.append(self.match_person)
[文档] def alive(self): """ 追踪器是否处于活跃活跃状态 追踪器匹配的人最近几帧是否出现过 :return: """ return sum(self.__history) > 0
@property def id(self): return self.__id
[文档]class CapDetectionTrack(threading.Thread): """ 检测追踪类 集成人脸检测和追踪 :cvar bool is_start: 是否已经开启 :cvar object face_detector: 人脸检测器 :cvar object face_encoding: 人脸编码器 :cvar object __last_frame: 视频流的最新帧 :cvar queue frame_queue: 视频流的视频帧队列 :cvar bool is_realtime: 是否是实时模式 :cvar list tracks: 追踪器列表,一个视频会出现多个人,自然也有多个追踪器 :cvar list ipc_info: 视频源配置信息 :cvar list persons: 已保存的人员和对应的人脸信息 """ video_imgs = None def __init__(self, ipc_info, is_realtime, face_detector, face_encoding, detection_freq, persons, face_decector_lock, face_encoding_lock): super(CapDetectionTrack, self).__init__() self.is_start = False self.face_detector = face_detector self.face_encoding = face_encoding self.face_detector_lock = face_decector_lock self.face_encoding_lock = face_encoding_lock self.__last_frame = None self.frame_queue = Queue(maxsize=60) self.detection_freq_iter = itertools.cycle(range(detection_freq)) self.is_realtime = is_realtime self.tracks = list() self.ipc_info = ipc_info self.persons = persons @property def name(self): return self.ipc_info["name"] @property def path(self): return self.ipc_info["path"] @property def is_save_stranger(self): return bool(self.ipc_info.get("save_stranger", 0))
[文档] def run(self): """ 启动视频解码,人脸检测和人脸特征码提取线程 """ self.is_start = True cv_cap = cv2.VideoCapture(self.path) size = (int(cv_cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cv_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) out_file_path = "%s_output.avi" % self.path video_write = cv2.VideoWriter( out_file_path, cv2.VideoWriter_fourcc("M", "P", "4", "2"), # 编码器 cv_cap.get(cv2.CAP_PROP_FPS), size ) cap_thread = threading.Thread(target=self.__start_capture, args=(cv_cap,)) cap_thread.start() dec_thread = threading.Thread(target=self.__start_detection_trace, args=(video_write,)) dec_thread.start() cap_thread.join() dec_thread.join() self.save_release_resouce()
def __start_capture(self, cv_cap): """ 启动视频解码线程 :param cv_cap: 视频解码器 """ if self.is_realtime: while self.is_start: ret, frame = cv_cap.read() if ret: self.__last_frame = frame else: self.__last_frame = None self.is_start = False # stop threading:_start_capture,_start_detection_trace # todo enhance,some tail frame wo"t be run by threading _start_detection_trace else: while self.is_start: ret, frame = cv_cap.read() if ret: self.frame_queue.put(frame) else: self.frame_queue.put(None) self.is_start = False cv_cap.release() def __get_last_frame(self): """ 返回视频流最新帧 :return: """ ret = self.__last_frame if self.is_realtime else self.frame_queue.get() self.__last_frame = None return ret def __start_detection_trace(self, video_write): """ 启动视频的人脸检测和追踪线程 :param video_write: 处理后的视频写入文件 """ while self.is_start: last_frame = self.__get_last_frame() if last_frame is None: continue if next(self.detection_freq_iter) == 0: boxes = self.__face_dec(last_frame) else: boxes = self.__face_track(last_frame) [Util.draw_boxes(last_frame, list(box)) for box in boxes] video_write.write(last_frame) # cv2.imshow(self.name, last_frame) # cv2.waitKey(1) video_write.release() def __face_dec(self, img): """ 更新人脸检测图片帧信息 :param img: 图片帧 :return: """ with self.face_detector_lock: boxes = self.face_detector.detection(img) self.__face_upgrade_track(img, boxes) return boxes
[文档] @staticmethod def event_call_back(type, ipc_name, track_id, img=None, box=None, person_name=None): """ 追踪的人消失后的回调函数 :param type: 事件类型 :param ipc_name: 摄像头名称 :param track_id: 追踪器id :param img: 图片帧 :param box: 图片帧中的人脸位置 :param person_name: 人名 """ # type=0 enter, 1 out cv2.imwrite("%snew_face_%s.png" % (CapDetectionTrack.video_imgs, track_id), img) logger.info("lost person" + ",".join((ipc_name, str(type), datetime.now().strftime("%Y%m%d%H%M%S"), str(track_id), "%snew_face_%s.png" % (CapDetectionTrack.video_imgs, track_id), str(box), person_name)))
def __face_upgrade_track(self, img, boxes): """ 用图片帧更新追踪器 图片中可能存在多张人脸 如果人脸和已有track中的某人脸匹配,则用此frame更新已有track 如果人脸无法和已有track中的人脸匹配,新建立track :param img: 图片帧 :param boxes: 图片帧中的人脸位置信息 """ tracks_map = {track.id: track for track in self.tracks} track_ids, track_encodings = list(map(lambda x: x.id, self.tracks)), list( map(lambda x: x.encoding, self.tracks)) boxes_imgs_encoding = list() if boxes is not None and len(boxes): with self.face_encoding_lock: boxes_imgs_encoding = [self.face_encoding.encoding(img, box) for box in boxes] boxes_encoding_filter = [boxes_img_encoding is not None and len(boxes_img_encoding) > 0 for boxes_img_encoding in boxes_imgs_encoding] boxes = np.array(boxes)[boxes_encoding_filter] boxes_imgs_encoding = np.array(boxes_imgs_encoding)[boxes_encoding_filter] # logger.info("###", np.array(boxes_imgs_encoding).shape) # boxes_imgs_encodings = [ # face_recognition.face_encodings(frame[(box[1]):(box[1] + box[3]), (box[0]):(box[0] + box[2])]) for box in boxes] # boxes_imgs_encoding = [boxes_imgs_encoding[0] for boxes_imgs_encoding in boxes_imgs_encodings if # boxes_imgs_encoding[:1]] # logger.info("####", np.array(boxes_imgs_encoding).shape) # [cv2.imwrite(str(np.sum(boxes_encoding)) + ".png", Util.cut_frame_box(frame, box)) # for boxes_encoding, box in zip(boxes_imgs_encoding, boxes)] box_track_ids = [ np.array(track_ids, int)[face_recognition.compare_faces(track_encodings, unknown_face_encoding)] for unknown_face_encoding in boxes_imgs_encoding] new_trackers = [] for box_track_id, boxes_img, boxes_img_encoding in zip(box_track_ids, boxes, boxes_imgs_encoding): if len(box_track_id) > 0: tracks_map[box_track_id[0]].update_img(img, boxes_img, boxes_img_encoding) else: new_trackers.append(Track(self.name, cv2.TrackerKCF_create(), img, boxes_img, boxes_img_encoding, self.persons, event_call_back=self.event_call_back)) # keep alive tracks only if self.is_save_stranger: [tracker.match_person.save() for tracker in self.tracks if not tracker.alive()] self.tracks = [tracker for tracker in self.tracks if tracker.alive()] self.tracks.extend(new_trackers) def __face_track(self, img): """ 使用frame更新已有人脸追踪器track :param img: 图片帧 :return: """ boxes = [list(map(int, track.update(img)[1])) for track in self.tracks] return boxes
[文档] def save_release_resouce(self): """ 资源保存和释放,保存追踪器里关联的陌生人的人脸图片 """ del self.detection_freq_iter if self.is_save_stranger: [tracker.match_person.save() for tracker in self.tracks] del self.tracks del self.frame_queue
[文档]class DetectionTracksCtl(object): """ 人脸检测,识别控制器 :cvar object face_detector: 人脸识别器 :cvar object face_encoding: 人脸编码器 """ def __init__(self, face_detector, face_encoding): self.face_detector = face_detector self.face_encoding = face_encoding
[文档] def start_all(self, ipc_infos, camera_persons): """ 启动所有视频流的人脸检测线程 :param ipc_infos: 视频流配置信息 :param camera_persons: 各视频流对应对应人员信息 """ # ipc_infos:list.map.key=ipc_url/ipc_name,list.map.value="xx/xx.mp4"/test01 threads = list() face_decector_lock = threading.Lock() face_encoding_lock = threading.Lock() for ipc_info in ipc_infos: ipc_name = ipc_info["name"] is_realtime = ipc_info["realtime"] detection_freq = ipc_info["detection_freq"] thread = CapDetectionTrack(ipc_info, is_realtime, face_detector, face_encoding, detection_freq, camera_persons[ipc_name], face_decector_lock, face_encoding_lock) threads.append(thread) [thread.start() for thread in threads] [thread.join() for thread in threads] self.__after_all_stop()
def __after_all_stop(self): print("__after_all_stop") # self.event_df["img_file"] = self.event_df["img_file"].apply(lambda x: "<img src="%s">" % x) # event_name_map = {0: "进", 1: "出"} # self.event_df["event_name"] = self.event_df["event_name"].map(event_name_map) # self.event_df.to_html("event.html") # todo 01 use diff change to decation detection ,02,use yield change param # ref Human-detection-and-Tracking
[文档] @staticmethod def background_subtraction(previous_frame, frame_resized_grayscale, min_area): """ 通过视频帧变化比率,判断是否需要启动检测线程 如果视频帧不发生变化,说明没有人出现,不需要进行人脸检测和追踪识别 :param previous_frame: 上一帧图片信息 :param frame_resized_grayscale: :param min_area: 最小变化区域阈值 :return: 图片变化率 """ frameDelta = cv2.absdiff(previous_frame, frame_resized_grayscale) thresh = cv2.threshold(frameDelta, 25, 255, cv2.THRESH_BINARY)[1] thresh = cv2.dilate(thresh, None, iterations=2) im2, cnts, hierarchy = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) temp = 0 for c in cnts: # if the contour is too small, ignore it if cv2.contourArea(c) > min_area: temp = 1 return temp
if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("-c", "--config_path", help="the path of the config file(json format config)", default="../config.json") parser.add_argument("-fe", "--face_encoding", help="face encoding method") parser.add_argument("-fd", "--face_detection", help="face detection method") parser.add_argument("-pimg", "--person_image_dir", help="person images save dir") parser.add_argument("-vimg", "--video_image_dir", help="video images save dir") args = parser.parse_args() config_path = args.config_path assert os.path.exists(config_path), "config file not exists" config_json = json.load(open(config_path, "r")) ipc_infos = config_json["ipcs"] face_encoding_config = args.face_encoding or config_json.get("face_encoding", "DLIB_REG") face_detection_config = args.face_detection or config_json.get("face_detection", "CV_CAS") person_image_dir_config = args.person_image_dir or config_json.get("person_image_dir", "../data/person_img/") video_image_dir_config = args.video_image_dir or config_json.get("video_image_dir", "../data/video_imgs/") os.path.exists(person_image_dir_config) or os.makedirs(person_image_dir_config) os.path.exists(video_image_dir_config) or os.makedirs(video_image_dir_config) CapDetectionTrack.video_imgs = video_image_dir_config face_encoding = FaceEncodingFactory.get_instance(face_encoding_config) Person.face_encoding = face_encoding Person.img_dir = person_image_dir_config camera_persons = defaultdict(list) # 加载摄像头和对应人员以及人员人脸头像信息 camere_persons_files = Person.get_camera_person_files(person_image_dir_config) [camera_persons[camera_name].append(Person(person_name, person_files, camera_name)) for camera_name, persons_map in camere_persons_files.items() for person_name, person_files in persons_map.items()] # camera_persons = {"test1": persons, "test6": persons} face_detector = FaceDetectionFactory.get_detection(face_detection_config) detection_track = DetectionTracksCtl(face_detector, face_encoding) detection_track.start_all(ipc_infos, camera_persons=camera_persons)