基于MTCNN算法的人脸检测

论文:《Joint Face Detection and Alignment using Multi-task Cascaded Convolutional Networks》

论文网址:https://arxiv.org/abs/1604.02878v1

一、总体框架

MTCNN通过不同的卷积神经网络,实现对人脸的识别以及人脸关键点检测。总的框架如下:

                                                                    图1 Pipeline

如图1所示为MTCNN的整体框架(检测实现流程—测试流程)。

给定一张图片,需要将其resize成不同大小的图片,建立图像金字塔。这些不同size的图片是下面三个stage的输入。

stage1:首先使用全卷积网络(P-Net)获取候选框和他们的回归向量。然后使用估计的bounding box回归向量去标定候选框。然后再使用非极大值抑制(NMS)去合并高度重叠的候选框;

stage2:这一层使用一个提炼网络(Refine Network, R-Net)。所有stage1中的候选框传入R-Net,使用边界框回归(bounding box regression)以及NMS,使得消除更多假的候选框(false candidates);

stage3:stage3使用一个输出网络(O-Net),该阶段与stage2相似。但是在这个阶段,我们的目标是更加详细地描述人脸。尤其是该网络将要输出5个人脸标记位置(facial landmarks' positions)。

二、CNN结构:

 许多的论文都设计CNN用于人脸检测。但是,这些论文都受到以下几个原因的限制:

1)许多filter缺乏权重多样性限制了他们产生有判别力的描述。

2)相比于其他的多分类物体检测和分类任务,人脸检测是一个具有挑战性的二分类任务,因此可能需要更少数量的filters但是需要对人脸更具有辨别力。为了这个目的,我们减少filter的数量,将5×5的filter变成3×3的filter,减少计算量,尽管增加filter的深度能够获取到更好的性能。通过这些改进,我们可以得到更好的性能,但是运行时间变少。CNN的结构如图2所示。

                                        图2 CNN的结构(MP-max polling,Conv—convolution, 卷积和池化的step分别为1和2)

三、训练

使用三个tasks训练CNN detector,分别为:人脸/非人脸分类,边界框回归以及人脸标记定位。

1)人脸分类

学习目标可以表述为二分类任务。对于每个样本$x_{i}$,我们使用交叉熵损失函数(cross-entropy loss):

$L_{i}^{det}=-(y_{i}^{det}log(p_{i})+(1-y_{i}^{det})(1-log(p_{i})))$    (1)

其中,$p_{i}$是神经网络输出的概率,表示了一个样本是人脸的概率。$y_{i}^{det}in left {0,  1 ight }$,表示ground truth的标签。

2)边界框回归

对于每一个候选框,我们需要预测它与最近的ground truth的偏移,包括:左上坐标、高度和宽度。学习目标可以表述为回归问题,对于每一个样本$x_{i}$,我们使用欧式损失(Euclidean loss):

$L_{i}^{box}=left | hat{y}_{i}^{box} - y_{i}^{box} ight |_{2}^{2}$    (2)

其中,$hat{y}_{i}^{box}$回归目标是从神经网络获得的(即网络的输出),$y_{i}^{box}$是ground truth。有4个坐标,包括:左上、高度和宽度,因此$y_{i}^{box}in mathbb{R}^{4}$。

3)人脸标记定位

和边界框回归任务相似,人脸标记定位可以表示为回归任务问题,使用最小化欧式损失:

$L_{i}^{landmark}=left | hat{y}_{i}^{landmark} - y_{i}^{landmark} ight |_{2}^{2}$   (3)

其中,$hat{y}_{i}^{landmark}$是从神经网络输出获得的人脸标记坐标,$ y_{i}^{landmark}$是ground truth。因为有5个人脸标记,包括:左眼睛、右眼睛、鼻子、嘴巴左边界和嘴巴右边界,因此$y_{i}^{landmark}in mathbb{R}^{10}$。

4)多数据源训练

因为我们在不同的CNN中执行不同的任务,所以在训练过程中,使用不同类型的训练图像数据,例如:人脸、非人脸和部分人脸数据。所以,一些损失函数(1-3公式)不会使用。例如,对于背景区域,我们仅仅计算$L_{i}^{det}$,另外两个损失设置为0,这个可以使用采样类型指示器实现。总的学习目标可以表示为:

$minsum _{i=1}^{N}sum _{jin det,box,landmark}alpha _{j}eta _{i}^{j}L_{i}^{j}$    (4)

其中,$N$为训练样本的数量。$alpha_{j}$表示人物的重要性(分别设置为:P-Net和R-Net中,$alpha_{det}=1,alpha_{box}=0.5,alpha_{landmark}=0.5$;O-Net为了获得更加精确的人脸标记点定位,在O-Net中,$alpha_{det}=1,alpha_{box}=0.5,alpha_{landmark}=1$)。$eta _{i}^{j}in left { 0,1 ight }$为采样类型指示器。使用随机梯度下降算法(SGD)训练CNNs。

5)在线困难样本挖掘

 不同于在原始分类训练完成之后执行传统的困难样本挖掘,我们采用在线困难样挖掘适应训练过程。

特别地,我们对前向传播过程计算出的损失进行分类,然后只采用其中的70%作为困难样本。然后我们在后向传播过程中,只计算困难样本的梯度。这也就意味着我们忽略简单样本,这些简单样本对增强训练过程的探测功能不太有帮助。

6)训练数据

因为我们联合执行人脸检测和人脸对齐,因此我们在训练过程中使用四种不同的数据标记。分别为:

6.1负样本:与图片中任何一个ground truth的IOU小于0.3的区域;

6.2正样本:与图片中任何一个ground truth的IOU大于0.65的区域;

6.3部分人脸:IOU介于0.4和0.65之间;

6.4标记人脸:标记5个人脸标记位置的图片;

其中,负样本和正样本用于人脸分类任务(即判别是人脸还是非人脸);正样本和部分人脸用于边界框回归;人脸编辑样本用于人脸标记定位。每一个网络的训练数据可以如下表示:

①P-Net:从WIDER FACE数据集中随机裁剪获取正样本、负样本和部分人脸样本。然后,从CelebA数据裁剪人脸标记数据,需要resize成12×12;

②R-Net:将框架第一阶段的输出的proposal作为R-Net的输入,需要resize成24×24;

③O-Net :输入是经过第二步筛选和refine过的人脸框,同样从原图抠出后统一resize到48*48,成批输入ONet。

后面阶段都是在前面阶段的基础上对训练结果进行调整。

四、测试阶段

如第一节的总体架构,首先使图像生成图像金字塔,生成多尺度的图像,然后输入P-Net(因为P-Net是全卷积网络,该网络的输出的featuremap上的每一个特征点都对应于输入图像上的12×12的区域,因此)。PNet由于尺寸很小,所以可以很快的选出候选区域,但是准确率不高,不同尺度上的判断出来的人脸检测框,然后采用NMS算法,合并候选框,然后根据候选框提取图像,之后缩放到24*24的大小,作为RNet的输入,RNet可以精确的选取边框,一般最后只剩几个边框,最后缩放到48*48的大小,输入ONet,判断后选框是不是人脸,ONet虽然速度较慢,但是由于经过前两个网络,已经得到了高概率的边框,所以输入ONet的图像较少,然后ONet输出精确的边框和关键点信息,只是在第三个阶段上才显示人脸特征定位;前两个阶段只是分类,不显示人脸定点的结果。

参考:https://blog.csdn.net/wfei101/article/details/79935037

五、项目实践

参考项目地址:GitHub

根据参考项目做一些调整,模型实现。

数据集下载:

这里使用的数据集是WIDER FACE以及CelebA

代码讲解如下:

参考:https://www.ctolib.com/LeslieZhoa-tensorflow-MTCNN.html

主要代码理解如下:

生成P-Net数据:

gen_12net_data.py

# coding: utf-8

"""
截取pos,neg,part三种类型图片并resize成12x12大小作为PNet的输入
"""
import os
import cv2
import numpy as np
npr = np.random
from tqdm import  tqdm
from utils import IOU 

# face的id对应label的txt
anno_file = '../data/wider_face_train.txt'
# 图片地址
im_dir = '../data/WIDER_train/images'
# pos,part,neg裁剪图片放置位置
pos_save_dir = '../data/12/positive'
part_save_dir = '../data/12/part'
neg_save_dir = '../data/12/negative'
# PNet数据地址
save_dir = '../data/12'

if not os.path.exists(save_dir):
    os.mkdir(save_dir)
if not os.path.exists(pos_save_dir):
    os.mkdir(pos_save_dir)
if not os.path.exists(part_save_dir):
    os.mkdir(part_save_dir)
if not os.path.exists(neg_save_dir):
    os.mkdir(neg_save_dir)
    
f1 = open(os.path.join(save_dir, 'pos_12.txt'), 'w')
f2 = open(os.path.join(save_dir, 'neg_12.txt'), 'w')
f3 = open(os.path.join(save_dir, 'part_12.txt'), 'w')

with open(anno_file, 'r') as f:
    annotations = f.readlines()
num = len(annotations)
print('总共的图片数: %d' % num)
# 记录pos, neg, part三类生成数
p_idx = 0
n_idx = 0
d_idx = 0
# 记录读取图片数
idx = 0
for annotation in tqdm(annotations):  # 进度条显示
    annotation = annotation.strip().split(' ')
    im_path = annotation[0]
    box = list(map(float, annotation[1:]))
    boxes = np.array(box, dtype=np.float32).reshape(-1, 4)  # numpy.array.reshape -> 4列, 每一行是box
    
    img = cv2.imread(os.path.join(im_dir, im_path+'.jpg'))
    idx += 1
    height, width, channel = img.shape
    
    neg_num = 0
    # 先采样一定数量neg图片
    while neg_num < 50:
        # 随机选取截取图像大小
        size = npr.randint(12, min(width, height)/2)
        # 随机选取左上坐标
        nx = npr.randint(0, width-size)
        ny = npr.randint(0, height-size)
        # 截取box
        crop_box = np.array([nx, ny, nx+size, ny+size])
        # 计算iou值
        Iou = IOU(crop_box, boxes)
        # 截取图片并resize成12x12大小
        cropped_im = img[ny:ny+size, nx:nx+size, :]  # cv2.imread读取的图片第一维度是y
        resized_im = cv2.resize(cropped_im, (12, 12), interpolation=cv2.INTER_LINEAR)  # P-Net的训练输入图像大小为12 × 12

        # iou值小于0.3判定为neg图像
        if np.max(Iou) < 0.3:
            save_file = os.path.join(neg_save_dir, '%s.jpg' %n_idx)  # neg的图片的绝对路径
            f2.write(neg_save_dir+'/%s.jpg'%n_idx+' 0
')  # neg_12.txt文件保存neg的图片的绝对路径
            cv2.imwrite(save_file, resized_im)  # 将截取的图片保存
            n_idx += 1
            neg_num += 1
    
    for box in boxes:  # 以每个box为基础选取截图
        # 左上右下坐标
        x1, y1, x2, y2 = box
        w = x2 - x1 + 1
        h = y2 - y1 + 1
        # 舍去图像过小和box在图片外的图像
        if max(w, h) < 20 or x1 < 0 or y1 < 0:
            continue
        for i in range(5):  # 每个box附近截取5个截图用于判断是否为negative训练样本
            size = npr.randint(12, min(width, height)/2)

            # 随机生成的关于x1, y1的偏移量,并且保证x1+delta_x>0,y1+delta_y>0
            delta_x = npr.randint(max(-size, -x1), w)
            delta_y = npr.randint(max(-size, -y1), h)
            # 截取后的左上角坐标
            # 这里面是获取negative的截图, 所以可以(最好是)随意选取, 因此左上角坐标和偏移量都是随意选取的.
            nx1 = int(max(0, x1+delta_x))
            ny1 = int(max(0, y1+delta_y))
            # 排除大于图片尺度的
            if nx1 + size > width or ny1 + size > height:
                continue
            crop_box = np.array([nx1, ny1, nx1+size, ny1+size])
            Iou = IOU(crop_box, boxes)
            cropped_im = img[ny1:ny1+size, nx1:nx1+size, :]
            resized_im = cv2.resize(cropped_im, (12, 12), interpolation=cv2.INTER_LINEAR)
            
            if np.max(Iou) < 0.3:
                save_file = os.path.join(neg_save_dir, '%s.jpg' %n_idx)
                f2.write(neg_save_dir+'/%s.jpg'%n_idx+' 0
')
                cv2.imwrite(save_file, resized_im)
                n_idx += 1
        for i in range(20):  # 每个box附近截取20个截图用于判断是否为positive或者是part训练样本
            # 这里是截取positive和part图片, 目的是需要截取box附近的图片, 因此下面size的大小也需要接近w, h. 不然取不到positive、part的几率大.
            size = npr.randint(int(min(w, h)*0.8), np.ceil(1.25*max(w, h)))

            # 除去尺度小的box
            # 注意:w, h是box的尺寸. width、height是整个训练图片的尺寸.
            if w < 5:
                continue
            # 在box附近截取图片, 偏移量取值, 稍微小一点好.
            delta_x = npr.randint(-w*0.2, w*0.2)
            delta_y = npr.randint(-h*0.2, h*0.2)
            # 截取图像左上坐标计算是先计算x1+w/2表示的中心坐标,再+delta_x偏移量,再-size/2,
            nx1 = int(max(x1+w/2+delta_x-size/2, 0))
            ny1 = int(max(y1+h/2+delta_y-size/2, 0))
            nx2 = nx1 + size
            ny2 = ny1 + size
            
            # 排除超出的图像
            if nx2 > width or ny2 > height:
                continue
            crop_box = np.array([nx1, ny1, nx2, ny2])
            # 人脸框相对于截取图片的偏移量并做归一化处理
            # 这里训练数据使用相对于人脸框归一化处理的offset, 实际测试时得到的也是归一化的offset. 因此训练就是获取归一化的offset.
            offset_x1 = (x1-nx1)/float(size)
            offset_y1 = (y1-ny1)/float(size)
            offset_x2 = (x2-nx2)/float(size)
            offset_y2 = (y2-ny2)/float(size)
            
            cropped_im = img[ny1:ny2, nx1:nx2, :]
            resized_im = cv2.resize(cropped_im, (12, 12), interpolation=cv2.INTER_LINEAR)
            # box扩充一个维度作为iou输入
            box_ = box.reshape(1, -1)  # 这里是每一个box, 对每一个box和截取的图像进行IOU计算
            iou = IOU(crop_box, box_)
            if iou >= 0.65:
                save_file = os.path.join(pos_save_dir, '%s.jpg'%p_idx)
                f1.write(pos_save_dir+'/%s.jpg'%p_idx+' 1 %.2f %.2f %.2f %.2f
'%(offset_x1,
                         offset_y1, offset_x2, offset_y2))
                cv2.imwrite(save_file, resized_im)
                p_idx += 1
            elif iou >= 0.4:
                save_file = os.path.join(part_save_dir, '%s.jpg'%d_idx)
                f3.write(part_save_dir+'/%s.jpg'%d_idx+' -1 %.2f %.2f %.2f %.2f
'%(offset_x1,
                         offset_y1, offset_x2, offset_y2))
                cv2.imwrite(save_file, resized_im)
                d_idx += 1

print('%s 个图片已处理,pos:%s  part: %s neg:%s' %(idx, p_idx, d_idx, n_idx))
f1.close()
f2.close()
f3.close()
View Code

生成landmark数据 :

gen_landmark_aug.py

# coding: utf-8

import os
import random
import sys
import cv2
import numpy as np
npr = np.random
import argparse
from tqdm import tqdm
from utils import IOU
from BBox_utils import getDataFromTxt, BBox
data_dir = '../data'


def main(args):
    """
    用于处理带有landmark的数据
    """
    size = args.input_size
    # 是否对图像变换
    argument = True
    if size == 12:
        net = 'PNet'
    elif size == 24:
        net = 'RNet'
    elif size == 48:
        net = 'ONet'
    image_id = 0
    # 数据输出路径
    OUTPUT = os.path.join(data_dir, str(size))
    if not os.path.exists(OUTPUT):
        os.mkdir(OUTPUT)
    # 图片处理后输出路径
    dstdir = os.path.join(OUTPUT, 'train_%s_landmark_aug' %(net))
    if not os.path.exists(dstdir):
        os.mkdir(dstdir)
    # label记录txt
    ftxt = os.path.join(data_dir, 'trainImageList.txt')  # trainImageList.txt记录了CelebA数据的路径以及关键点信息.
    # 记录label的txt
    f = open(os.path.join(OUTPUT, 'landmark_%d_aug.txt' %(size)), 'w')
    # 获取图像路径,box,关键点
    data = getDataFromTxt(ftxt, data_dir)
    idx = 0
    for (imgPath, box, landmarkGt) in tqdm(data):
        # 存储人脸图片和关键点
        F_imgs = []
        F_landmarks = []
        img = cv2.imread(imgPath)
        
        img_h, img_w, img_c = img.shape
        gt_box = np.array([box.left, box.top, box.right, box.bottom])
        # 人脸图片
        f_face = img[box.top:box.bottom+1, box.left:box.right+1]
        # resize成网络输入大小
        f_face = cv2.resize(f_face, (size, size))
        
        landmark = np.zeros((5, 2))
        for index, one in enumerate(landmarkGt):
            # 关键点相对于左上坐标偏移量并归一化
            rv = ((one[0]-gt_box[0])/(gt_box[2]-gt_box[0]), (one[1]-gt_box[1])/(gt_box[3]-gt_box[1]))
            landmark[index] = rv
        F_imgs.append(f_face)
        F_landmarks.append(landmark.reshape(10))
        landmark = np.zeros((5, 2))
        if argument:
            # 对图像变换
            idx = idx+1
            x1, y1, x2, y2 = gt_box
            gt_w = x2 - x1 + 1
            gt_h = y2 - y1 + 1
            # 除去过小的人脸图像
            if max(gt_w, gt_h) < 40 or x1 < 0 or y1 < 0:
                continue
            for i in range(10):
                # 随机裁剪图像大小
                # 每张图片截取10个, x下面计算方法类似于在positive和part的截图过程.
                box_size = npr.randint(int(min(gt_w, gt_h)*0.8), np.ceil(1.25*max(gt_w, gt_h)))
                # 随机左上坐标偏移量
                delta_x = npr.randint(-gt_w*0.2, gt_w*0.2)
                delta_y = npr.randint(-gt_h*0.2, gt_h*0.2)
                # 计算左上坐标
                nx1 = int(max(x1+gt_w/2-box_size/2+delta_x, 0))
                ny1 = int(max(y1+gt_h/2-box_size/2+delta_y, 0))
                nx2 = nx1 + box_size
                ny2 = ny1 + box_size
                # 除去超过边界的
                if nx2 > img_w or ny2 > img_h:
                    continue
                # 裁剪边框, 图片
                crop_box = np.array([nx1, ny1, nx2, ny2])
                cropped_im = img[ny1:ny2+1, nx1:nx2+1, :]
                resized_im = cv2.resize(cropped_im, (size, size))
                iou = IOU(crop_box, np.expand_dims(gt_box, 0))  # 扩展数组形状. -> 1 * 1 * 4
                # 只保留pos图像
                if iou > 0.65:
                    F_imgs.append(resized_im)
                    # 关键点相对偏移
                    for index, one in enumerate(landmarkGt):
                        rv = ((one[0]-nx1)/box_size, (one[1]-ny1)/box_size)
                        landmark[index] = rv
                    F_landmarks.append(landmark.reshape(10))
                    landmark = np.zeros((5, 2))
                    landmark_ = F_landmarks[-1].reshape(-1, 2)
                    box = BBox([nx1, ny1, nx2, ny2])
                    # 镜像
                    if random.choice([0, 1]) > 0:
                        face_flipped, landmark_flipped = flip(resized_im, landmark_)
                        face_flipped = cv2.resize(face_flipped, (size, size))
                        F_imgs.append(face_flipped)
                        F_landmarks.append(landmark_flipped.reshape(10))
                    # 逆时针翻转
                    if random.choice([0, 1]) > 0:
                        face_rotated_by_alpha, landmark_rorated = rotate(img, box, box.reprojectLandmark(landmark_), 5)
                        # 关键点偏移
                        landmark_rorated = box.projectLandmark(landmark_rorated)
                        face_rotated_by_alpha = cv2.resize(face_rotated_by_alpha, (size, size))
                        F_imgs.append(face_rotated_by_alpha)
                        F_landmarks.append(landmark_rorated.reshape(10))
                        
                        # 左右翻转
                        face_flipped, landmark_flipped = flip(face_rotated_by_alpha, landmark_rorated)
                        face_flipped = cv2.resize(face_flipped, (size, size))
                        F_imgs.append(face_flipped)
                        F_landmarks.append(landmark_flipped.reshape(10))
                    # 顺时针翻转
                    if random.choice([0, 1]) > 0:
                        face_rotated_by_alpha, landmark_rorated = rotate(img, box, box.reprojectLandmark(landmark_), -5)
                        # 关键点偏移
                        landmark_rorated = box.projectLandmark(landmark_rorated)
                        face_rotated_by_alpha = cv2.resize(face_rotated_by_alpha, (size, size))
                        F_imgs.append(face_rotated_by_alpha)
                        F_landmarks.append(landmark_rorated.reshape(10))
                        
                        # 左右翻转
                        face_flipped, landmark_flipped = flip(face_rotated_by_alpha, landmark_rorated)
                        face_flipped = cv2.resize(face_flipped, (size, size))
                        F_imgs.append(face_flipped)
                        F_landmarks.append(landmark_flipped.reshape(10))

        F_imgs, F_landmarks = np.asarray(F_imgs), np.asarray(F_landmarks)
        for i in range(len(F_imgs)):
            # 剔除数据偏移量在[0,1]之间
            if np.sum(np.where(F_landmarks[i] <= 0, 1, 0)) > 0:
                continue
            if np.sum(np.where(F_landmarks[i] >= 1, 1, 0)) > 0:
                continue
            cv2.imwrite(os.path.join(dstdir, '%d.jpg' %(image_id)), F_imgs[i])
            landmarks = list(map(str, list(F_landmarks[i])))
            f.write(os.path.join(dstdir, '%d.jpg' %(image_id))+' -2 '+' '.join(landmarks)+'
')
            image_id += 1
    f.close()
    return F_imgs, F_landmarks


def flip(face, landmark):
    # 镜像
    face_flipped_by_x = cv2.flip(face, 1)
    landmark_ = np.asarray([(1-x, y) for (x, y) in landmark])
    landmark_[[0, 1]] = landmark_[[1, 0]]
    landmark_[[3, 4]] = landmark_[[4, 3]]
    return (face_flipped_by_x, landmark_)


def rotate(img, box, landmark, alpha):
    # 旋转
    center = ((box.left+box.right)/2, (box.top+box.bottom)/2)
    rot_mat = cv2.getRotationMatrix2D(center, alpha, 1)
    img_rotated_by_alpha = cv2.warpAffine(img, rot_mat, (img.shape[1], img.shape[0]))
    landmark_ = np.asarray([(rot_mat[0][0]*x+rot_mat[0][1]*y+rot_mat[0][2],
                            rot_mat[1][0]*x+rot_mat[1][1]*y+rot_mat[1][2]) for (x, y) in landmark])
    face = img_rotated_by_alpha[box.top:box.bottom+1, box.left:box.right+1]
    return (face, landmark_)


def parse_arguments(argv):

    parser = argparse.ArgumentParser()
    parser.add_argument('input_size', type=int,
                        help='The input size for specific net')
    
    return parser.parse_args(argv)


if __name__ == '__main__':
    main(parse_arguments(sys.argv[1:]))
View Code

合并生成P-Net训练使用的数据:

gen_imglist_pnet.py

# coding: utf-8

import numpy as np
npr = np.random
import os
data_dir = '../data/'


"""
将pos, part, neg, landmark四者混在一起
"""

size = 12
with open(os.path.join(data_dir, '12/pos_12.txt'), 'r') as f:
    pos = f.readlines()
with open(os.path.join(data_dir, '12/neg_12.txt'), 'r') as f:
    neg = f.readlines()
with open(os.path.join(data_dir, '12/part_12.txt'), 'r') as f:
    part = f.readlines()
with open(os.path.join(data_dir, '12/landmark_12_aug.txt'), 'r') as f:
    landmark = f.readlines()
dir_path = os.path.join(data_dir, '12')
if not os.path.exists(dir_path):
    os.makedirs(dir_path)
with open(os.path.join(dir_path, 'train_pnet_landmark.txt'), 'w') as f:
    nums = [len(neg), len(pos), len(part)]
    base_num = 250000
    print('neg数量:{} pos数量:{} part数量:{} 基数:{}'.format(len(neg), len(pos), len(part), base_num))
    if len(neg) > base_num*3:
        neg_keep = npr.choice(len(neg), size=base_num*3, replace=True)
    else:
        neg_keep = npr.choice(len(neg), size=len(neg), replace=True)
    sum_p = len(neg_keep)//3  # pos : part : neg = 1 : 1 : 3
    pos_keep = npr.choice(len(pos), sum_p, replace=True)
    part_keep = npr.choice(len(part), sum_p, replace=True)
    print('neg数量:{} pos数量:{} part数量:{}'.format(len(neg_keep), len(pos_keep), len(part_keep)))
    for i in pos_keep:
        f.write(pos[i])
    for i in neg_keep:
        f.write(neg[i])
    for i in part_keep:
        f.write(part[i])
    for item in landmark:
        f.write(item) 
View Code

将训练数据转换成TFRecords个数文件:

gen_tfrecords.py

# coding: utf-8

import os
import random
import sys
import time
import tensorflow as tf
import cv2
from tqdm import tqdm
import argparse


def main(args):
    """
    生成tfrecords文件
    """
    size = args.input_size
    # 数据存放地址
    dataset_dir = '../data/'
    # tfrecord存放地址
    output_dir = os.path.join(dataset_dir, str(size)+'/tfrecord')
    if not os.path.exists(output_dir):
        os.mkdir(output_dir)
    # pnet只生成一个混合的tfrecords, rnet和onet要分别生成4个
    if size == 12:
        net = 'PNet'
        tf_filenames = [os.path.join(output_dir, 'train_%s_landmark.tfrecord' % net)]
        items = ['12/train_pnet_landmark.txt']
    elif size == 24:
        net = 'RNet'
        tf_filename1 = os.path.join(output_dir, 'pos_landmark.tfrecord')
        item1 = '%d/pos_%d.txt' % (size, size)
        tf_filename2 = os.path.join(output_dir, 'part_landmark.tfrecord')
        item2 = '%d/part_%d.txt' % (size, size)
        tf_filename3 = os.path.join(output_dir, 'neg_landmark.tfrecord')
        item3 = '%d/neg_%d.txt' % (size, size)
        tf_filename4 = os.path.join(output_dir, 'landmark_landmark.tfrecord')
        item4 = '%d/landmark_%d_aug.txt' % (size, size)
        tf_filenames = [tf_filename1, tf_filename2, tf_filename3, tf_filename4]
        items = [item1, item2, item3, item4]
    elif size == 48:
        net = 'ONet'
        tf_filename1 = os.path.join(output_dir, 'pos_landmark.tfrecord')
        item1 = '%d/pos_%d.txt' % (size, size)
        tf_filename2 = os.path.join(output_dir, 'part_landmark.tfrecord')
        item2 = '%d/part_%d.txt' % (size, size)
        tf_filename3 = os.path.join(output_dir, 'neg_landmark.tfrecord')
        item3 = '%d/neg_%d.txt' % (size, size)
        tf_filename4 = os.path.join(output_dir, 'landmark_landmark.tfrecord')
        item4 = '%d/landmark_%d_aug.txt' % (size, size)
        tf_filenames = [tf_filename1, tf_filename2, tf_filename3, tf_filename4]
        items = [item1, item2, item3, item4]
    
    if tf.gfile.Exists(tf_filenames[0]):
        print('tfrecords文件早已生成,无需此操作')
        return
    # 获取数据
    for tf_filename, item in zip(tf_filenames, items):
        print('开始读取数据')
        dataset = get_dataset(dataset_dir, item)
        tf_filename = tf_filename+'_shuffle'
        random.shuffle(dataset)  # 数据进行打乱
        print('开始转换tfrecords')
        with tf.python_io.TFRecordWriter(tf_filename) as tfrecord_writer:
            for image_example in tqdm(dataset):
                filename = image_example['filename']
                try:
                    _add_to_tfrecord(filename, image_example, tfrecord_writer)
                except:
                    print(filename)
    print('完成转换')


def get_dataset(dir, item):
    """
    从txt获取数据
    参数:
      dir:存放数据目录
      item:txt目录
    返回值:
      包含label,box,关键点的data
    """
    dataset_dir = os.path.join(dir, item)
    imagelist = open(dataset_dir, 'r')
    dataset = []
    for line in tqdm(imagelist.readlines()):  # 进度条显示
        info = line.strip().split(' ')
        data_example = dict()
        bbox = dict()
        data_example['filename'] = info[0]
        data_example['label'] = int(info[1])
        # neg的box默认为0,part,pos的box只包含人脸框,landmark的box只包含关键点
        bbox['xmin'] = 0
        bbox['ymin'] = 0
        bbox['xmax'] = 0
        bbox['ymax'] = 0
        bbox['xlefteye'] = 0
        bbox['ylefteye'] = 0
        bbox['xrighteye'] = 0
        bbox['yrighteye'] = 0
        bbox['xnose'] = 0
        bbox['ynose'] = 0
        bbox['xleftmouth'] = 0
        bbox['yleftmouth'] = 0
        bbox['xrightmouth'] = 0
        bbox['yrightmouth'] = 0        
        if len(info) == 6:  # 长度为6, 说明只有人脸框标记(6-2)
            bbox['xmin'] = float(info[2])
            bbox['ymin'] = float(info[3])
            bbox['xmax'] = float(info[4])
            bbox['ymax'] = float(info[5])
        if len(info) == 12:  # 长度为12, 说明是人脸关键点关键点(12-2)
            bbox['xlefteye'] = float(info[2])
            bbox['ylefteye'] = float(info[3])
            bbox['xrighteye'] = float(info[4])
            bbox['yrighteye'] = float(info[5])
            bbox['xnose'] = float(info[6])
            bbox['ynose'] = float(info[7])
            bbox['xleftmouth'] = float(info[8])
            bbox['yleftmouth'] = float(info[9])
            bbox['xrightmouth'] = float(info[10])
            bbox['yrightmouth'] = float(info[11])
        data_example['bbox'] = bbox
        dataset.append(data_example)
    return dataset


def _add_to_tfrecord(filename, image_example, tfrecord_writer):
    """
    转换成tfrecord文件
    参数:
      filename:图片文件名
      image_example:数据
      tfrecord_writer:写入文件
    """
    image_data, height, width = _process_image_withoutcoder(filename)
    example = _convert_to_example_simple(image_example, image_data)
    tfrecord_writer.write(example.SerializeToString())


def _process_image_withoutcoder(filename):
    """
    读取图片文件,返回图片大小
    """
    image = cv2.imread(filename)
    image_data = image.tostring()
    assert len(image.shape) == 3
    height = image.shape[0]
    width = image.shape[1]
    assert image.shape[2] == 3
    return image_data, height, width


# 不同类型数据的转换
def _int64_feature(value):
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))


def _float_feature(value):
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))


def _bytes_feature(value):
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=value))


def _convert_to_example_simple(image_example, image_buffer):
    """
    转换成tfrecord接受形式
    """
    class_label = image_example['label']
    bbox = image_example['bbox']
    roi = [bbox['xmin'], bbox['ymin'], bbox['xmax'], bbox['ymax']]
    landmark = [bbox['xlefteye'], bbox['ylefteye'], bbox['xrighteye'], bbox['yrighteye'], bbox['xnose'], bbox['ynose'],
                bbox['xleftmouth'], bbox['yleftmouth'], bbox['xrightmouth'], bbox['yrightmouth']]

    example = tf.train.Example(features=tf.train.Features(feature={
        'image/encoded': _bytes_feature(image_buffer),
        'image/label': _int64_feature(class_label),
        'image/roi': _float_feature(roi),
        'image/landmark': _float_feature(landmark)
    }))
    return example


def parse_arguments(argv):

    parser = argparse.ArgumentParser()

    parser.add_argument('input_size', type=int,
                        help='The input size for specific net')
    
    return parser.parse_args(argv)


if __name__ == '__main__':
    main(parse_arguments(sys.argv[1:]))
View Code

训练:

train_model.py

# coding: utf-8


import os
import sys
from datetime import datetime
import numpy as np
import tensorflow as tf
import config as FLAGS
import random
import cv2


def train(net_factory, prefix, end_epoch, base_dir, display, base_lr):
    """
    训练模型
    """
    size = int(base_dir.split('/')[-1])  # 获取得到网络大小(因为base_dir保存的路径为:../data/12, ../data/24, ../data/48)

    # 论文中的alpha, 代表了任务的重要性. 和论文中保持一致.
    if size == 12:
        net = 'PNet'
        radio_cls_loss = 1.0; radio_bbox_loss = 0.5; radio_landmark_loss = 0.5;
    elif size == 24:
        net = 'RNet'
        radio_cls_loss = 1.0; radio_bbox_loss = 0.5; radio_landmark_loss = 0.5;
    elif size == 48:
        net = 'ONet'
        radio_cls_loss = 1.0; radio_bbox_loss = 0.5; radio_landmark_loss = 1;
        
    if net == 'PNet':
        # 计算一共多少组数据
        label_file = os.path.join(base_dir, 'train_pnet_landmark.txt')
        f = open(label_file, 'r')
   
        num = len(f.readlines())
        dataset_dir = os.path.join(base_dir, 'tfrecord/train_PNet_landmark.tfrecord_shuffle')
        # 从tfrecord读取数据
        image_batch, label_batch, bbox_batch, landmark_batch = read_single_tfrecord(dataset_dir, FLAGS.batch_size, net)
    else:
        # 计算一共多少组数据
        label_file1 = os.path.join(base_dir, 'pos_%d.txt' % size)
        f1 = open(label_file1, 'r')
        label_file2 = os.path.join(base_dir, 'part_%d.txt' % size)
        f2 = open(label_file2, 'r')
        label_file3 = os.path.join(base_dir, 'neg_%d.txt' % size)
        f3 = open(label_file3, 'r')
        label_file4 = os.path.join(base_dir, 'landmark_%d_aug.txt' % size)
        f4 = open(label_file4, 'r')
   
        num = len(f1.readlines())+len(f2.readlines())+len(f3.readlines())+len(f4.readlines())
    
        pos_dir = os.path.join(base_dir, 'tfrecord/pos_landmark.tfrecord_shuffle')
        part_dir = os.path.join(base_dir, 'tfrecord/part_landmark.tfrecord_shuffle')
        neg_dir = os.path.join(base_dir, 'tfrecord/neg_landmark.tfrecord_shuffle')
        landmark_dir = os.path.join(base_dir, 'tfrecord/landmark_landmark.tfrecord_shuffle')
        dataset_dirs = [pos_dir, part_dir, neg_dir, landmark_dir]
        # 各数据占比
        # 目的是使每一个batch的数据占比都相同
        # 训练数据的比例, pos : part : landmark, neg = 1 : 1 : 1 : 3.
        pos_radio, part_radio, landmark_radio, neg_radio = 1.0/6, 1.0/6, 1.0/6, 3.0/6
        pos_batch_size = int(np.ceil(FLAGS.batch_size*pos_radio))
        assert pos_batch_size != 0, "Batch Size 有误 "
        part_batch_size = int(np.ceil(FLAGS.batch_size*part_radio))
        assert part_batch_size != 0, "BBatch Size 有误 "
        neg_batch_size = int(np.ceil(FLAGS.batch_size*neg_radio))
        assert neg_batch_size != 0, "Batch Size 有误 "
        landmark_batch_size = int(np.ceil(FLAGS.batch_size*landmark_radio))
        assert landmark_batch_size != 0, "Batch Size 有误 "
        batch_sizes = [pos_batch_size, part_batch_size, neg_batch_size, landmark_batch_size]
        image_batch, label_batch, bbox_batch, landmark_batch = read_multi_tfrecords(dataset_dirs, batch_sizes, net)

    # 定义占位符, 训练时使用, 后续将读取的tfrecords数据传入.
    input_image = tf.placeholder(tf.float32, shape=[FLAGS.batch_size, size, size, 3], name='input_image')
    label = tf.placeholder(tf.float32, shape=[FLAGS.batch_size], name='label')
    bbox_target = tf.placeholder(tf.float32, shape=[FLAGS.batch_size, 4], name='bbox_target')
    landmark_target = tf.placeholder(tf.float32, shape=[FLAGS.batch_size, 10], name='landmark_target')
    # 图像色相变换
    input_image = image_color_distort(input_image)
    cls_loss_op, bbox_loss_op, landmark_loss_op, L2_loss_op, accuracy_op = net_factory(input_image,
                                                                                       label,
                                                                                       bbox_target,
                                                                                       landmark_target,
                                                                                       training=True)

    # 计算训练损失, 论文中公式实现.
    total_loss_op = radio_cls_loss*cls_loss_op+radio_bbox_loss*bbox_loss_op+radio_landmark_loss*landmark_loss_op+L2_loss_op
    train_op, lr_op = optimize(base_lr, total_loss_op, num)

    # 将变量添加到tensorboard, 实现可视化.
    tf.summary.scalar("cls_loss", cls_loss_op)  # cls_loss
    tf.summary.scalar("bbox_loss", bbox_loss_op)  # bbox_loss
    tf.summary.scalar("landmark_loss", landmark_loss_op)  # landmark_loss
    tf.summary.scalar("cls_accuracy", accuracy_op)  # cls_acc
    tf.summary.scalar("total_loss", total_loss_op)  # cls_loss, bbox loss, landmark loss and L2 loss add together
    summary_op = tf.summary.merge_all()
    logs_dir = "../graph/%s" % net
    if not os.path.exists(logs_dir):  # if os.path.exists(logs_dir) == False:
        os.mkdir(logs_dir)
    # 模型训练
    init = tf.global_variables_initializer()
    sess = tf.Session()

    saver = tf.train.Saver(max_to_keep=3)
    sess.run(init)
    # 模型的graph
    writer = tf.summary.FileWriter(logs_dir, sess.graph)
    # 使用 tf.train.Coordinator()来创建一个线程管理器(协调器)对象, 管理线程.
    coord = tf.train.Coordinator()
    # 启动QueueRunner
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    i = 0
    
    MAX_STEP = int(num / FLAGS.batch_size + 1) * end_epoch
    epoch = 0
    sess.graph.finalize()
    try:
        for step in range(MAX_STEP):
            i = i + 1
            if coord.should_stop():
                break
            image_batch_array, label_batch_array, bbox_batch_array, landmark_batch_array = sess.run([image_batch,
                                                                                                     label_batch,
                                                                                                     bbox_batch,
                                                                                                     landmark_batch])
            # 随机翻转图像
            image_batch_array, landmark_batch_array = random_flip_images(image_batch_array,
                                                                         label_batch_array,
                                                                         landmark_batch_array)

            _, _, summary = sess.run([train_op, lr_op, summary_op],
                                     feed_dict={input_image: image_batch_array,
                                                label: label_batch_array,
                                                bbox_target: bbox_batch_array,
                                                landmark_target: landmark_batch_array})
            # 训练过程
            if (step+1) % display == 0:
                cls_loss, bbox_loss, landmark_loss, L2_loss, lr, acc = sess.run([cls_loss_op,
                                                                                 bbox_loss_op,
                                                                                 landmark_loss_op,
                                                                                 L2_loss_op,
                                                                                 lr_op,
                                                                                 accuracy_op],
                                                                                feed_dict={input_image: image_batch_array,
                                                                                           label: label_batch_array,
                                                                                           bbox_target: bbox_batch_array,
                                                                                           landmark_target: landmark_batch_array})

                total_loss = radio_cls_loss*cls_loss + radio_bbox_loss*bbox_loss + radio_landmark_loss*landmark_loss + L2_loss
                print('epoch: %d/%d' % (epoch+1, end_epoch))
                print("Step: %d/%d, accuracy: %3f, cls loss: %4f, bbox loss: %4f, Landmark loss :%4f, L2 loss: %4f, Total Loss: %4f, lr:%f"
                      % (step+1, MAX_STEP, acc, cls_loss, bbox_loss, landmark_loss, L2_loss, total_loss, lr))

            # 每一次epoch保留一次模型
            if i * FLAGS.batch_size > num:
                epoch = epoch + 1
                i = 0
                path_prefix = saver.save(sess, prefix, global_step=epoch)
            writer.add_summary(summary, global_step=step)
    except tf.errors.OutOfRangeError:
        print("完成!!!")
    finally:
        coord.request_stop()
        writer.close()
    coord.join(threads)
    sess.close()


def optimize(base_lr, loss, data_num):
    """
    参数优化
    """
    lr_factor = 0.1
    global_step = tf.Variable(0, trainable=False)
    # 计算训练次数 data_num / batch 为整个训练集完成一次训练需要的次数. 再乘以epoch(整个数据集训练次数), 即为总的训练次数.
    # 这里使用阶梯式的学习率lr, 所以lr也区分三个. base_lr * lr_factor ^ x  --->  x=(0, 1, 2, 3)
    boundaries = [int(epoch * data_num / FLAGS.batch_size) for epoch in FLAGS.LR_EPOCH]
    lr_values = [base_lr * (lr_factor ** x) for x in range(0, len(FLAGS.LR_EPOCH) + 1)]
    lr_op = tf.train.piecewise_constant(global_step, boundaries, lr_values)
    # 使用momentum优化器
    optimizer = tf.train.MomentumOptimizer(lr_op, 0.9)
    train_op = optimizer.minimize(loss, global_step)
    return train_op, lr_op


def read_single_tfrecord(tfrecord_file, batch_size, net):
    """
    读取tfrecord数据
    """
    filename_queue = tf.train.string_input_producer([tfrecord_file], shuffle=True)
    reader = tf.TFRecordReader()
    _, serialized_example = reader.read(filename_queue)
    image_features = tf.parse_single_example(serialized_example,
                                             features={
                                                       'image/encoded': tf.FixedLenFeature([], tf.string),
                                                       'image/label': tf.FixedLenFeature([], tf.int64),
                                                       'image/roi': tf.FixedLenFeature([4], tf.float32),
                                                       'image/landmark': tf.FixedLenFeature([10], tf.float32)})

    if net == 'PNet':
        image_size = 12
    elif net == 'RNet':
        image_size = 24
    elif net == 'ONet':
        image_size = 48

    # _bytes_feature将原始图像进行转换保存到tfrecords文件, tf.decode_raw将原来编码为字符串类型的变量重新变回来原始图像数据
    image = tf.decode_raw(image_features['image/encoded'], tf.uint8)
    image = tf.reshape(image, [image_size, image_size, 3])
    # 将值规划在[-1,1]内
    image = (tf.cast(image, tf.float32)-127.5)/128  # 上面将数据转换成uint8, 即8位无符号整型(0-255).
    
    label = tf.cast(image_features['image/label'], tf.float32)
    roi = tf.cast(image_features['image/roi'], tf.float32)
    landmark = tf.cast(image_features['image/landmark'], tf.float32)
    image, label, roi, landmark = tf.train.batch([image, label, roi, landmark],
                                                 batch_size=batch_size,
                                                 num_threads=2,
                                                 capacity=batch_size)

    # tf.train.batch获取一个batch的数据, 所以下面将数据的第一维reshape成batch_size.
    label = tf.reshape(label, [batch_size])
    roi = tf.reshape(roi, [batch_size, 4])
    landmark = tf.reshape(landmark, [batch_size, 10])
    return image, label, roi, landmark


def read_multi_tfrecords(tfrecord_files, batch_sizes, net):
    """
    读取多个tfrecord文件放一起
    """
    pos_dir, part_dir, neg_dir, landmark_dir = tfrecord_files
    pos_batch_size, part_batch_size, neg_batch_size, landmark_batch_size = batch_sizes
   
    pos_image, pos_label, pos_roi, pos_landmark = read_single_tfrecord(pos_dir, pos_batch_size, net)
  
    part_image, part_label, part_roi, part_landmark = read_single_tfrecord(part_dir, part_batch_size, net)
  
    neg_image, neg_label, neg_roi, neg_landmark = read_single_tfrecord(neg_dir, neg_batch_size, net)

    landmark_image, landmark_label, landmark_roi, landmark_landmark = read_single_tfrecord(landmark_dir, landmark_batch_size, net)

    images = tf.concat([pos_image, part_image, neg_image, landmark_image], 0, name="concat/image")
   
    labels = tf.concat([pos_label, part_label, neg_label, landmark_label], 0, name="concat/label")
 
    assert isinstance(labels, object)

    rois = tf.concat([pos_roi, part_roi, neg_roi, landmark_roi], 0, name="concat/roi")
    
    landmarks = tf.concat([pos_landmark, part_landmark, neg_landmark, landmark_landmark], 0, name="concat/landmark")
    return images, labels, rois, landmarks


def image_color_distort(inputs):
    inputs = tf.image.random_contrast(inputs, lower=0.5, upper=1.5)
    inputs = tf.image.random_brightness(inputs, max_delta=0.2)
    inputs = tf.image.random_hue(inputs,max_delta= 0.2)
    inputs = tf.image.random_saturation(inputs,lower = 0.5, upper= 1.5)

    return inputs


def random_flip_images(image_batch,label_batch,landmark_batch):
    '''随机翻转图像'''
    if random.choice([0,1]) > 0:
        num_images = image_batch.shape[0]
        fliplandmarkindexes = np.where(label_batch==-2)[0]
        flipposindexes = np.where(label_batch==1)[0]
        
        flipindexes = np.concatenate((fliplandmarkindexes,flipposindexes))
          
        for i in flipindexes:
            cv2.flip(image_batch[i],1,image_batch[i])        
        
           
        for i in fliplandmarkindexes:
            landmark_ = landmark_batch[i].reshape((-1,2))
            landmark_ = np.asarray([(1-x, y) for (x, y) in landmark_])
            landmark_[[0, 1]] = landmark_[[1, 0]]
            landmark_[[3, 4]] = landmark_[[4, 3]]       
            landmark_batch[i] = landmark_.ravel()
        
    return image_batch,landmark_batch
View Code

train.py

# coding: utf-8

from model import P_Net, R_Net, O_Net
import argparse
import os
import sys
import config as FLAGS
from train_model import train
net_factorys = [P_Net, R_Net, O_Net]


def main(args):
    size = args.input_size
    base_dir = os.path.join('../data/', str(size))
    
    if size == 12:
        net = 'PNet'
        net_factory = net_factorys[0]
        end_epoch = FLAGS.end_epoch[0]
    elif size == 24:
        net = 'RNet'
        net_factory = net_factorys[1]
        end_epoch = FLAGS.end_epoch[1]
    elif size == 48:
        net = 'ONet'
        net_factory = net_factorys[2]
        end_epoch = FLAGS.end_epoch[2]
    model_path = os.path.join('../model/', net)
    if not os.path.exists(model_path):
        os.mkdir(model_path)
    prefix = os.path.join(model_path, net)
    display = FLAGS.display
    lr = FLAGS.lr
    train(net_factory, prefix, end_epoch, base_dir, display, lr)


def parse_arguments(argv):

    parser = argparse.ArgumentParser()

    parser.add_argument('input_size', type=int,
                        help='The input size for specific net')
    
    return parser.parse_args(argv)


if __name__ == '__main__':
    main(parse_arguments(sys.argv[1:]))
View Code

其中,模型文件:

model.py

# coding: utf-8

# In[1]:


import tensorflow as tf
slim = tf.contrib.slim
import numpy as np
# 只把70%数据用作参数更新
num_keep_radio = 0.7


def P_Net(inputs, label=None, bbox_target=None, landmark_target=None, training=True):
    """
    PNet的结构
    """
    with tf.variable_scope('PNet'):
        # 使用tensorflow slim构建神经网络
        with slim.arg_scope([slim.conv2d], activation_fn=prelu,
                            weights_initializer=slim.xavier_initializer(),
                            weights_regularizer=slim.l2_regularizer(0.0005),
                            padding='VALID'):
            net = slim.conv2d(inputs, 10, 3, scope='conv1')  # 第一层:输出为10, kernel_size为3
            net = slim.max_pool2d(net, kernel_size=[2, 2], stride=2, padding='SAME', scope='pool1')
            net = slim.conv2d(net, 16, 3, scope='conv2')
            net = slim.conv2d(net, 32, 3, scope='conv3')
            # 二分类输出通道数为2
            conv4_1 = slim.conv2d(net, 2, 1, activation_fn=tf.nn.softmax, scope='conv4_1')  # 二分类预测是不是人脸框
            bbox_pred = slim.conv2d(net, 4, 1, activation_fn=None, scope='conv4_2')  # 4回归获取人脸框坐标
            landmark_pred = slim.conv2d(net, 10, 1, activation_fn=None, scope='conv4_3')  # 10回归获取人脸特征点坐标
            
            if training:
                # 删除维度1, 2, size为1的维度, 即:[batch 1 1 2] -> [batch, 2]
                cls_prob = tf.squeeze(conv4_1, [1, 2], name='cls_prob')
                cls_loss = cls_ohem(cls_prob, label)
                
                bbox_pred = tf.squeeze(bbox_pred, [1, 2], name='bbox_pred')  # [batch, 4]
                bbox_loss = bbox_ohem(bbox_pred, bbox_target, label)
                
                landmark_pred = tf.squeeze(landmark_pred, [1, 2], name='landmark_pred')  # [batch, 10]
                landmark_loss = landmark_ohem(landmark_pred, landmark_target, label)
                
                accuracy = cal_accuracy(cls_prob, label)
                L2_loss = tf.add_n(slim.losses.get_regularization_losses())
                return cls_loss, bbox_loss, landmark_loss, L2_loss, accuracy
            else:
                # 测试时batch_size=1
                cls_pro_test = tf.squeeze(conv4_1, axis=0)
                bbox_pred_test = tf.squeeze(bbox_pred, axis=0)
                landmark_pred_test = tf.squeeze(landmark_pred, axis=0)
                return cls_pro_test, bbox_pred_test, landmark_pred_test


def R_Net(inputs, label=None, bbox_target=None, landmark_target=None, training=True):
    """
    RNet的结构
    """
    with tf.variable_scope('RNet'):
        with slim.arg_scope([slim.conv2d],
                            activation_fn=prelu,
                            weights_initializer=slim.xavier_initializer(),
                            weights_regularizer=slim.l2_regularizer(0.0005),
                            padding='VALID'):
            net = slim.conv2d(inputs, 28, 3, scope='conv1')
            net = slim.max_pool2d(net, kernel_size=[3, 3], stride=2, padding='SAME', scope='pool1')
            net = slim.conv2d(net, 48, 3, scope='conv2')
            net = slim.max_pool2d(net, kernel_size=[3, 3], stride=2, scope='pool2')
            net = slim.conv2d(net, 64, 2, scope='conv3')
            fc_flatten = slim.flatten(net)
            fc1 = slim.fully_connected(fc_flatten, num_outputs=128, scope='fc1')
            
            cls_prob = slim.fully_connected(fc1, num_outputs=2,activation_fn=tf.nn.softmax, scope='cls_fc')
            bbox_pred = slim.fully_connected(fc1, num_outputs=4, activation_fn=None, scope='bbox_fc')
            landmark_pred = slim.fully_connected(fc1, num_outputs=10, activation_fn=None, scope='landmark_fc')
            if training:
                cls_loss = cls_ohem(cls_prob, label)
                bbox_loss = bbox_ohem(bbox_pred, bbox_target, label)
                landmark_loss = landmark_ohem(landmark_pred, landmark_target, label)
                
                accuracy = cal_accuracy(cls_prob, label)
                L2_loss = tf.add_n(slim.losses.get_regularization_losses())
                return cls_loss, bbox_loss, landmark_loss, L2_loss, accuracy
            else:
                return cls_prob, bbox_pred, landmark_pred


def O_Net(inputs, label=None, bbox_target=None, landmark_target=None, training=True):
    """
    ONet结构
    """
    with tf.variable_scope('ONet'):
        with slim.arg_scope([slim.conv2d],
                            activation_fn=prelu,
                            weights_initializer=slim.xavier_initializer(),
                            weights_regularizer=slim.l2_regularizer(0.0005),
                            padding='VALID'):
            net = slim.conv2d(inputs, 32, 3, scope='conv1')
            net = slim.max_pool2d(net, kernel_size=[3, 3], stride=2, padding='SAME', scope='pool1')
            net = slim.conv2d(net, 64, 3, scope='conv2')
            net = slim.max_pool2d(net, kernel_size=[3, 3], stride=2, scope='pool2')
            net = slim.conv2d(net, 64, 3, scope='conv3')
            net = slim.max_pool2d(net, kernel_size=[2, 2], stride=2, padding='SAME', scope='pool3')
            net = slim.conv2d(net, 128, 2, scope='conv4')
            fc_flatten = slim.flatten(net)
            fc1 = slim.fully_connected(fc_flatten, num_outputs=256, scope='fc1')
            
            cls_prob = slim.fully_connected(fc1, num_outputs=2, activation_fn=tf.nn.softmax, scope='cls_fc')
            bbox_pred = slim.fully_connected(fc1, num_outputs=4, activation_fn=None, scope='bbox_fc')
            landmark_pred = slim.fully_connected(fc1, num_outputs=10, activation_fn=None, scope='landmark_fc')
            if training:
                cls_loss = cls_ohem(cls_prob, label)
                bbox_loss = bbox_ohem(bbox_pred, bbox_target, label)
                landmark_loss = landmark_ohem(landmark_pred, landmark_target, label)
                
                accuracy = cal_accuracy(cls_prob, label)
                L2_loss = tf.add_n(slim.losses.get_regularization_losses())
                return cls_loss, bbox_loss, landmark_loss, L2_loss, accuracy
            else:
                return cls_prob, bbox_pred, landmark_pred


def prelu(inputs):
    """
    prelu函数定义
    """
    alphas = tf.get_variable('alphas', shape=inputs.get_shape()[-1], dtype=tf.float32,
                             initializer=tf.constant_initializer(0.25))
    pos = tf.nn.relu(inputs)
    neg = alphas*(inputs-abs(inputs))*0.5
    return pos+neg


def cls_ohem(cls_prob, label):
    """
    计算类别损失
    参数:
      cls_prob:预测类别,是否有人
      label:真实值
    返回值:
      损失
    """
    zeros = tf.zeros_like(label)

    # neg: 0, pos: 1, part: -1
    # negatives and positives are used for face classification tasks
    # 这里只把pos的label置1, neg和part的label置0.
    # neg: label->0, pos: label->1, part: 0
    label_filter_invalid = tf.where(tf.less(label, 0), zeros, label)
    num_cls_prob = tf.size(cls_prob)  # 计算类别的size=batch*2
    cls_prob_reshape = tf.reshape(cls_prob, [num_cls_prob, -1])  # 将类别数组转换成1维的
    label_int = tf.cast(label_filter_invalid, tf.int32)  # 将置0, 1的数组转换成int32的
    num_row = tf.to_int32(cls_prob.get_shape()[0])  # 获取batch数
    # 对应某一batch而言,batch*2为非人类别概率,batch*2+1为人概率类别,indices为对应 cls_prob_reshape
    # 应该的真实值,后续用交叉熵计算损失
    row = tf.range(num_row) * 2  # 生成每一个类别的基址:(0, 2, 4, 6, ..., (num_row - 1) * 2)
    # 以上面为基址, 即每个样本的neg类别, label_int为是neg还是pos. 训练样本的label_int=0, neg; label_int=1, pos.
    indices_ = row + label_int
    # 获取真实标签对应的概率, indices_显示了实际标签的类别. 是neg还是pos.
    label_prob = tf.squeeze(tf.gather(cls_prob_reshape, indices_))
    loss = -tf.log(label_prob+1e-10)  # 这里有点疑问, 交叉熵损失函数公式不是这样的吧??????
    zeros = tf.zeros_like(label_prob, dtype=tf.float32)
    ones = tf.ones_like(label_prob, dtype=tf.float32)
    # 统计neg和pos的数量loss, 这里筛选neg和pos的loss用于后续训练
    # label小于0(即part: -1)-> 0, 否则:pos、part均为1.
    # 上面全部计算了所有的
    valid_inds = tf.where(label < zeros, zeros, ones)
    num_valid = tf.reduce_sum(valid_inds)
    # 选取70%的数据
    keep_num = tf.cast(num_valid*num_keep_radio, dtype=tf.int32)
    # 只选取neg, pos的70%损失
    # loss * valid_inds 数组想乘只保留valid_inds为1的元素
    loss = loss * valid_inds
    loss, _ = tf.nn.top_k(loss, k=keep_num)
    return tf.reduce_mean(loss)


def bbox_ohem(bbox_pred, bbox_target, label):
    """
    计算box的损失
    """
    zeros_index = tf.zeros_like(label, dtype=tf.float32)
    ones_index = tf.ones_like(label, dtype=tf.float32)
    # 保留pos和part的数据
    valid_inds = tf.where(tf.equal(tf.abs(label), 1), ones_index, zeros_index)
    # 计算平方差损失
    square_error = tf.square(bbox_pred-bbox_target)
    square_error = tf.reduce_sum(square_error, axis=1)
    # 保留的数据的个数
    num_valid = tf.reduce_sum(valid_inds)
    keep_num = tf.cast(num_valid, dtype=tf.int32)
    # 保留pos和part部分的损失
    square_error = square_error*valid_inds
    square_error, _ = tf.nn.top_k(square_error, k=keep_num)
    return tf.reduce_mean(square_error)


def landmark_ohem(landmark_pred, landmark_target, label):
    """
    计算关键点损失
    """
    ones = tf.ones_like(label, dtype=tf.float32)
    zeros = tf.zeros_like(label, dtype=tf.float32)
    # 只保留landmark数据
    valid_inds = tf.where(tf.equal(label, -2), ones, zeros)
    # 计算平方差损失
    square_error = tf.square(landmark_pred-landmark_target)
    square_error = tf.reduce_sum(square_error, axis=1)
    # 保留数据个数
    num_valid = tf.reduce_sum(valid_inds)
    keep_num = tf.cast(num_valid, dtype=tf.int32)
    # 保留landmark部分数据损失
    square_error = square_error*valid_inds
    square_error, _ = tf.nn.top_k(square_error, k=keep_num)
    return tf.reduce_mean(square_error)


def cal_accuracy(cls_prob, label):
    """
    计算分类准确率
    """
    # 预测最大概率的类别,0代表无人,1代表有人
    pred = tf.argmax(cls_prob, axis=1)
    label_int = tf.cast(label, tf.int64)
    # 保留label>=0的数据,即pos和neg的数据
    cond = tf.where(tf.greater_equal(label_int, 0))
    picked = tf.squeeze(cond)
    # 获取pos和neg的label值
    label_picked = tf.gather(label_int, picked)
    pred_picked = tf.gather(pred, picked)
    # 计算准确率
    accuracy_op = tf.reduce_mean(tf.cast(tf.equal(label_picked, pred_picked), tf.float32))
    return accuracy_op
View Code

生成下一个网络的输入:

gen_hard_example.py

# coding: utf-8


import sys
from utils import *
import numpy as np
import argparse
import os
import pickle
import cv2
from tqdm import tqdm
from loader import TestLoader
sys.path.append('../')
from train.model import P_Net, R_Net, O_Net
import train.config as config
from detection.detector import Detector
from detection.fcn_detector import FcnDetector
from detection.MtcnnDetector import MtcnnDetector


def main(args):
    """
    通过PNet或RNet生成下一个网络的输入
    """
    size = args.input_size
    batch_size = config.batches
    min_face_size = config.min_face
    stride = config.stride
    thresh = config.thresh
    # 模型地址
    model_path = ['../model/PNet/', '../model/RNet/', '../model/ONet']
    if size == 12:
        net = 'PNet'
        save_size = 24
    elif size == 24:
        net = 'RNet'
        save_size = 48
    # 图片数据地址
    base_dir = '../data/WIDER_train/'
    # 处理后的图片存放地址
    data_dir = '../data/%d' % save_size
    neg_dir = os.path.join(data_dir, 'negative')
    pos_dir = os.path.join(data_dir, 'positive')
    part_dir = os.path.join(data_dir, 'part')
    for dir_path in [neg_dir, pos_dir, part_dir]:
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)
    detectors = [None, None, None]
    PNet = FcnDetector(P_Net, model_path[0])
    detectors[0] = PNet
    if net == 'RNet':
        RNet = Detector(R_Net, 24, batch_size[1], model_path[1])
        detectors[1] = RNet
    basedir = '../data/'
    filename = '../data/wider_face_train_bbx_gt.txt'
    # 读取文件的image和box对应函数在utils中
    data = read_annotation(base_dir, filename)
    mtcnn_detector = MtcnnDetector(detectors, min_face_size=min_face_size,
                                   stride=stride, threshold=thresh)
    save_path = data_dir
    save_file = os.path.join(save_path, 'detections.pkl')
    if not os.path.exists(save_file):
        # 将data制作成迭代器
        print('载入数据')
        test_data = TestLoader(data['images'])
        detectors, _ = mtcnn_detector.detect_face(test_data)
        print('完成识别')

        with open(save_file, 'wb') as f:
            pickle.dump(detectors, f, 1)
    print('开始生成图像')
    save_hard_example(save_size, data, neg_dir, pos_dir, part_dir, save_path)


def save_hard_example(save_size, data, neg_dir, pos_dir, part_dir, save_path):
    """
    将网络识别的box用来裁剪原图像作为下一个网络的输入
    """
    im_idx_list = data['images']
    gt_boxes_list = data['bboxes']
    num_of_images = len(im_idx_list)

    neg_label_file = "../data/%d/neg_%d.txt" % (save_size, save_size)
    neg_file = open(neg_label_file, 'w')

    pos_label_file = "../data/%d/pos_%d.txt" % (save_size, save_size)
    pos_file = open(pos_label_file, 'w')

    part_label_file = "../data/%d/part_%d.txt" % (save_size, save_size)
    part_file = open(part_label_file, 'w')
    # read detect result
    det_boxes = pickle.load(open(os.path.join(save_path, 'detections.pkl'), 'rb'))
    # print(len(det_boxes), num_of_images)
   
    assert len(det_boxes) == num_of_images, "弄错了"

    n_idx = 0
    p_idx = 0
    d_idx = 0
    image_done = 0
    
    for im_idx, dets, gts in tqdm(zip(im_idx_list, det_boxes, gt_boxes_list)):
        gts = np.array(gts, dtype=np.float32).reshape(-1, 4)
        image_done += 1

        if dets.shape[0] == 0:
            continue
        img = cv2.imread(im_idx)
        # 转换成正方形
        dets = convert_to_square(dets)
        dets[:, 0:4] = np.round(dets[:, 0:4])
        neg_num = 0
        for box in dets:
            x_left, y_top, x_right, y_bottom, _ = box.astype(int)
            width = x_right - x_left + 1
            height = y_bottom - y_top + 1

            # 除去过小的box框
            if width < 20 or x_left < 0 or y_top < 0 or x_right > img.shape[1] - 1 or y_bottom > img.shape[0] - 1:
                continue

            Iou = IOU(box, gts)
            cropped_im = img[y_top:y_bottom + 1, x_left:x_right + 1, :]  # 截取图片得到box.
            resized_im = cv2.resize(cropped_im, (save_size, save_size),
                                    interpolation=cv2.INTER_LINEAR)

            # 划分种类, 选取60张neg人脸框用于后续网络的训练.
            if np.max(Iou) < 0.3 and neg_num < 60:
                save_file = os.path.join(neg_dir, "%s.jpg" % n_idx)
                neg_file.write(save_file + ' 0
')
                cv2.imwrite(save_file, resized_im)
                n_idx += 1
                neg_num += 1
            else:
                idx = np.argmax(Iou)  # 获取IOU最大的索引
                assigned_gt = gts[idx]  # 得到IOU最大的人脸框
                x1, y1, x2, y2 = assigned_gt

                # 偏移量
                offset_x1 = (x1 - x_left) / float(width)
                offset_y1 = (y1 - y_top) / float(height)
                offset_x2 = (x2 - x_right) / float(width)
                offset_y2 = (y2 - y_bottom) / float(height)

                # pos和part
                if np.max(Iou) >= 0.65:
                    save_file = os.path.join(pos_dir, "%s.jpg" % p_idx)
                    pos_file.write(save_file + ' 1 %.2f %.2f %.2f %.2f
' % (
                        offset_x1, offset_y1, offset_x2, offset_y2))
                    cv2.imwrite(save_file, resized_im)
                    p_idx += 1

                elif np.max(Iou) >= 0.4:
                    save_file = os.path.join(part_dir, "%s.jpg" % d_idx)
                    part_file.write(save_file + ' -1 %.2f %.2f %.2f %.2f
' % (
                        offset_x1, offset_y1, offset_x2, offset_y2))
                    cv2.imwrite(save_file, resized_im)
                    d_idx += 1
    neg_file.close()
    part_file.close()
    pos_file.close()


def parse_arguments(argv):

    parser = argparse.ArgumentParser()

    parser.add_argument('input_size', type=int,
                        help='The input size for specific net')
    
    return parser.parse_args(argv)


if __name__ == '__main__':
    main(parse_arguments(sys.argv[1:]))
View Code

训练过程如下:

source activate tensorflow

进入到preprocess目录下:
python gen_12net_data.py生成三种pnet数据


python gen_landmark_aug.py 12 生成pnet的landmark数据


python gen_imglist_pnet.py整理到一起


python gen_tfrecords.py 12生成tfrecords文件

进入到train目录下:

python train.py 12 训练pnet

tensorboard显示loss:

进入到preprocess目录:
python gen_hard_example.py 12 生成三种rnet数据,
python gen_landmark_aug.py 24 生成rnet的landmark数据,
python gen_tfrecords.py 24生成tfrecords文件
将目录cd到train上python train.py 24 训练rnet

将目录cd到preprocess上,
python gen_hard_example.py 24 生成三种onet数据,
python gen_landmark_aug.py 48 生成onet的landmark数据,
python gen_tfrecords.py 48生成tfrecords文件
将目录cd到train上python train.py 48 训练onet

测试文件:

test.py

# coding: utf-8

import sys
from detection.MtcnnDetector import MtcnnDetector
from detection.detector import Detector
from detection.fcn_detector import FcnDetector
from train.model import P_Net, R_Net, O_Net
import cv2
import os
import numpy as np
import train.config as config


test_mode = config.test_mode
thresh = config.thresh
min_face_size = config.min_face
stride = config.stride
detectors = [None, None, None]
# 模型放置位置
model_path = ['model/PNet/', 'model/RNet/', 'model/ONet']
batch_size = config.batches
PNet = FcnDetector(P_Net, model_path[0])
detectors[0] = PNet


if test_mode in ["RNet", "ONet"]:
    RNet = Detector(R_Net, 24, batch_size[1], model_path[1])
    detectors[1] = RNet


if test_mode == "ONet":
    ONet = Detector(O_Net, 48, batch_size[2], model_path[2])
    detectors[2] = ONet

mtcnn_detector = MtcnnDetector(detectors=detectors, min_face_size=min_face_size,
                               stride=stride, threshold=thresh)
out_path = config.out_path
if config.input_mode == '1':
    # 选用图片
    path = config.test_dir
    # print(path)
    for item in os.listdir(path):
        img_path = os.path.join(path, item)
        img = cv2.imread(img_path)
        boxes_c, landmarks = mtcnn_detector.detect(img)
        for i in range(boxes_c.shape[0]):
            bbox = boxes_c[i, :4]
            score = boxes_c[i, 4]
            corpbbox = [int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])]
            # 画人脸框
            cv2.rectangle(img, (corpbbox[0], corpbbox[1]),
                          (corpbbox[2], corpbbox[3]), (255, 0, 0), 1)
            # 判别为人脸的置信度
            cv2.putText(img, '{:.2f}'.format(score), 
                        (corpbbox[0], corpbbox[1] - 2),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
        # 画关键点
        for i in range(landmarks.shape[0]):
            for j in range(len(landmarks[i])//2):
                cv2.circle(img, (int(landmarks[i][2*j]), int(int(landmarks[i][2*j+1]))), 2, (0, 0, 255))
        cv2.imshow('im', img)
        k = cv2.waitKey(0) & 0xFF
        if k == 27:        
            cv2.imwrite(out_path + item, img)
    cv2.destroyAllWindows()

if config.input_mode == '2':
    cap = cv2.VideoCapture(0)
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(out_path+'out.mp4', fourcc, 10, (640, 480))
    while True:
            t1 = cv2.getTickCount()
            ret, frame = cap.read()
            if ret:
                boxes_c, landmarks = mtcnn_detector.detect(frame)
                t2 = cv2.getTickCount()
                t = (t2-t1)/cv2.getTickFrequency()
                fps = 1.0/t
                for i in range(boxes_c.shape[0]):
                    bbox = boxes_c[i, :4]
                    score = boxes_c[i, 4]
                    corpbbox = [int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])]
                
                    # 画人脸框
                    cv2.rectangle(frame, (corpbbox[0], corpbbox[1]),
                          (corpbbox[2], corpbbox[3]), (255, 0, 0), 1)
                    # 画置信度
                    cv2.putText(frame, '{:.2f}'.format(score), 
                                (corpbbox[0], corpbbox[1] - 2), 
                                cv2.FONT_HERSHEY_SIMPLEX,
                                0.5, (0, 0, 255), 2)
                    # 画fps值
                cv2.putText(frame, '{:.4f}'.format(t) + " " + '{:.3f}'.format(fps), (10, 20),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 2)
                # 画关键点
                for i in range(landmarks.shape[0]):
                    for j in range(len(landmarks[i])//2):
                        cv2.circle(frame, (int(landmarks[i][2*j]), int(int(landmarks[i][2*j+1]))), 2, (0, 0, 255))
                a = out.write(frame)
                cv2.imshow("result", frame)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
            else:
                break
    cap.release()
    out.release()
    cv2.destroyAllWindows()
View Code

其中使用到的模块:

detector.py

# coding: utf-8


import tensorflow as tf
import numpy as np


class Detector:
    """
    识别多组图片
    """
    def __init__(self, net_factory, data_size, batch_size, model_path):
        graph = tf.Graph()
        with graph.as_default():
            self.image_op = tf.placeholder(tf.float32, [None, data_size, data_size, 3])
            self.cls_prob, self.bbox_pred, self.landmark_pred = net_factory(self.image_op, training=False)
            self.sess = tf.Session()
            # 重载模型
            saver = tf.train.Saver()
            model_file = tf.train.latest_checkpoint(model_path)
            saver.restore(self.sess, model_file)
        self.data_size = data_size
        self.batch_size = batch_size

    def predict(self, databatch):
        scores = []
        batch_size = self.batch_size
        minibatch = []
        cur = 0
        # 所有数据总数
        n = databatch.shape[0]
        # 将数据整理成固定batch
        while cur < n:
            minibatch.append(databatch[cur:min(cur+batch_size, n), :, :, :])
            cur += batch_size
        cls_prob_list = []
        bbox_pred_list = []
        landmark_pred_list = []
        for idx, data in enumerate(minibatch):
            m = data.shape[0]
            real_size = self.batch_size
            # 最后一组数据不够一个batch的处理
            if m < batch_size:
                keep_inds = np.arange(m)
                gap = self.batch_size-m
                while gap >= len(keep_inds):
                    gap -= len(keep_inds)
                    keep_inds = np.concatenate((keep_inds, keep_inds))
                if gap != 0:
                    keep_inds = np.concatenate((keep_inds, keep_inds[:gap]))
                data = data[keep_inds]
                real_size = m
            cls_prob, bbox_pred, landmark_pred = self.sess.run([self.cls_prob, self.bbox_pred, self.landmark_pred],
                                                               feed_dict={self.image_op: data})
            
            cls_prob_list.append(cls_prob[:real_size])
            bbox_pred_list.append(bbox_pred[:real_size])
            landmark_pred_list.append(landmark_pred[:real_size])
        
        return np.concatenate(cls_prob_list, axis=0), np.concatenate(bbox_pred_list, axis=0), np.concatenate(landmark_pred_list, axis=0)
View Code

fcn_detector.py

# coding: utf-8

import tensorflow as tf
import sys
sys.path.append('../')
import train.config as config


class FcnDetector:
    """
    识别单张图片
    """
    def __init__(self, net_factory, model_path):
        graph = tf.Graph()
        with graph.as_default():
            self.image_op = tf.placeholder(tf.float32, name='input_image')
            self.width_op = tf.placeholder(tf.int32, name='image_width')
            self.height_op = tf.placeholder(tf.int32, name='image_height')
            image_reshape = tf.reshape(self.image_op, [1, self.height_op, self.width_op, 3])
            # 预测值
            self.cls_prob, self.bbox_pred, _ = net_factory(image_reshape, training=False)
            self.sess = tf.Session()
            # 重载模型
            saver = tf.train.Saver()
            model_file = tf.train.latest_checkpoint(model_path)
            saver.restore(self.sess, model_file)

    def predict(self, databatch):
        height, width, _ = databatch.shape
        cls_prob, bbox_pred = self.sess.run([self.cls_prob, self.bbox_pred],
                                            feed_dict={self.image_op: databatch,
                                                       self.width_op: width,
                                                       self.height_op: height})
        
        return cls_prob, bbox_pred
View Code

MtcnnDetector.py

# coding: utf-8


import cv2
import numpy as np
import sys

sys.path.append('../')
from preprocess.utils import *
from tqdm import tqdm


def py_nms(dets, thresh):
    """
    剔除太相似的box
    """
    x1 = dets[:, 0]
    y1 = dets[:, 1]
    x2 = dets[:, 2]
    y2 = dets[:, 3]
    scores = dets[:, 4]

    areas = (x2 - x1 + 1) * (y2 - y1 + 1)
    # 将概率值从大到小排列
    order = scores.argsort()[::-1]

    keep = []
    while order.size > 0:
        i = order[0]
        keep.append(i)
        xx1 = np.maximum(x1[i], x1[order[1:]])
        yy1 = np.maximum(y1[i], y1[order[1:]])
        xx2 = np.minimum(x2[i], x2[order[1:]])
        yy2 = np.minimum(y2[i], y2[order[1:]])

        w = np.maximum(0.0, xx2 - xx1 + 1)
        h = np.maximum(0.0, yy2 - yy1 + 1)
        inter = w * h
        
        ovr = inter / (areas[i] + areas[order[1:]] - inter+1e-10)
       
        # 保留小于阈值的下标,因为order[0]拿出来做比较了,所以inds+1是原来对应的下标
        inds = np.where(ovr <= thresh)[0]
        order = order[inds + 1]

    return keep


class MtcnnDetector:
    """
    来生成人脸的图像
    """
    def __init__(self, detectors,
                 min_face_size=20,
                 stride=2,
                 threshold=[0.6, 0.7, 0.7],
                 scale_factor=0.79  # 图像金字塔的缩小率
                 ):
        self.pnet_detector = detectors[0]
        self.rnet_detector = detectors[1]
        self.onet_detector = detectors[2]
        self.min_face_size = min_face_size
        self.stride = stride
        self.thresh = threshold
        self.scale_factor = scale_factor

    def detect_face(self, test_data):
        all_boxes = []
        landmarks = []
        batch_idx = 0
        num_of_img = test_data.size
        empty_array = np.array([])
        for databatch in tqdm(test_data):
            batch_idx += 1
            im = databatch
            if self.pnet_detector:
                boxes, boxes_c, landmark = self.detect_pnet(im)
                if boxes_c is None:
                    all_boxes.append(empty_array)
                    landmarks.append(empty_array)
                    continue
            if self.rnet_detector:
                boxes, boxes_c, landmark = self.detect_rnet(im, boxes_c)
                
                if boxes_c is None:
                    all_boxes.append(empty_array)
                    landmarks.append(empty_array)
                    continue

            if self.onet_detector:
                
                boxes, boxes_c, landmark = self.detect_onet(im, boxes_c)
               
                if boxes_c is None:
                    all_boxes.append(empty_array)
                    landmarks.append(empty_array)
                    continue

            all_boxes.append(boxes_c)
            landmark = [1]
            landmarks.append(landmark)
        return all_boxes, landmarks

    def detect_pnet(self, im):
        """
        通过PNet筛选box和landmark
        参数:
          im:输入图像[h,2,3]
        """
        h, w, c = im.shape
        net_size = 12
        # 人脸和输入图像的比率
        current_scale = float(net_size) / self.min_face_size
        im_resized = self.processed_image(im, current_scale)
        current_height, current_width, _ = im_resized.shape
        all_boxes = list()
        # 图像金字塔, 不断地去resize图片
        while min(current_height, current_width) > net_size:
            # 类别和box
            # 这里是测试流程, 输入是一张图片(size不一定是12*12)
            # 因此这里面输出得到的cls_cls_map形状是feature map(n * m * 2)
            # reg形状是是(n * m * 4)
            cls_cls_map, reg = self.pnet_detector.predict(im_resized)
            boxes = self.generate_bbox(cls_cls_map[:, :, 1], reg, current_scale, self.thresh[0])
            current_scale *= self.scale_factor  # 继续缩小图像做金字塔
            im_resized = self.processed_image(im, current_scale)
            current_height, current_width, _ = im_resized.shape
            
            if boxes.size == 0:
                continue
            # 非极大值抑制留下重复低的box
            keep = py_nms(boxes[:, :5], 0.5)
            boxes = boxes[keep]
            all_boxes.append(boxes)
        if len(all_boxes) == 0:
            return None, None, None
        all_boxes = np.vstack(all_boxes)

        # 将金字塔之后的box也进行非极大值抑制
        keep = py_nms(all_boxes[:, 0:5], 0.7)
        all_boxes = all_boxes[keep]
        boxes = all_boxes[:, :5]

        # box的长宽
        bbw = all_boxes[:, 2] - all_boxes[:, 0] + 1
        bbh = all_boxes[:, 3] - all_boxes[:, 1] + 1

        # 对应原图的box坐标和分数, 训练数据是相对于人脸框bbox的归一化的offset, 因此这里dx、dy也都是归一化的.
        boxes_c = np.vstack([all_boxes[:, 0] + all_boxes[:, 5] * bbw,  # all_boxes[:, 5]--> dx1
                             all_boxes[:, 1] + all_boxes[:, 6] * bbh,  # all_boxes[:, 6]--> dy1
                             all_boxes[:, 2] + all_boxes[:, 7] * bbw,  # all_boxes[:, 7]--> dx2
                             all_boxes[:, 3] + all_boxes[:, 8] * bbh,  # all_boxes[:, 8]--> dy2
                             all_boxes[:, 4]])
        boxes_c = boxes_c.T
        return boxes, boxes_c, None

    def detect_rnet(self, im, dets):
        """
        通过rent选择box
        参数:
          im:输入图像
          dets: PNet选择的box,是相对原图的绝对坐标
        返回值:
          box绝对坐标
        """
        h, w, c = im.shape
        # 将PNet的box变成包含它的正方形,可以避免信息损失
        dets = convert_to_square(dets)
        dets[:, 0:4] = np.round(dets[:, 0:4])
        # 调整超出图像的box
        [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] = self.pad(dets, w, h)
        delete_size = np.ones_like(tmpw)*20
        ones = np.ones_like(tmpw)
        zeros = np.zeros_like(tmpw)
        num_boxes = np.sum(np.where((np.minimum(tmpw, tmph) >= delete_size), ones, zeros))
        cropped_ims = np.zeros((num_boxes, 24, 24, 3), dtype=np.float32)
        for i in range(num_boxes):
            # 将PNet生成的box相对与原图进行裁剪, 超出部分用0补
            if tmph[i] < 20 or tmpw[i] < 20:
                continue
            tmp = np.zeros((tmph[i], tmpw[i], 3), dtype=np.uint8)
            tmp[dy[i]:edy[i] + 1, dx[i]:edx[i] + 1, :] = im[y[i]:ey[i] + 1, x[i]:ex[i] + 1, :]
            cropped_ims[i, :, :, :] = (cv2.resize(tmp, (24, 24)) - 127.5) / 128
        cls_scores, reg, _ = self.rnet_detector.predict(cropped_ims)
        cls_scores = cls_scores[:, 1]
        keep_inds = np.where(cls_scores > self.thresh[1])[0]
        if len(keep_inds) > 0:
            boxes = dets[keep_inds]
            boxes[:, 4] = cls_scores[keep_inds]
            reg = reg[keep_inds]
        else:
            return None, None, None

        keep = py_nms(boxes, 0.6)
        boxes = boxes[keep]
        # 对PNet截取的图像的坐标进行校准,生成RNet的人脸框对于原图的绝对坐标
        boxes_c = self.calibrate_box(boxes, reg[keep])
        return boxes, boxes_c, None
    
    def detect_onet(self, im, dets):
        """
        将ONet的选框继续筛选基本和RNet差不多但多返回了landmark
        """
        h, w, c = im.shape
        dets = convert_to_square(dets)
        dets[:, 0:4] = np.round(dets[:, 0:4])
        [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] = self.pad(dets, w, h)
        num_boxes = dets.shape[0]
        cropped_ims = np.zeros((num_boxes, 48, 48, 3), dtype=np.float32)
        for i in range(num_boxes):
            tmp = np.zeros((tmph[i], tmpw[i], 3), dtype=np.uint8)
            tmp[dy[i]:edy[i] + 1, dx[i]:edx[i] + 1, :] = im[y[i]:ey[i] + 1, x[i]:ex[i] + 1, :]
            cropped_ims[i, :, :, :] = (cv2.resize(tmp, (48, 48)) - 127.5) / 128

        cls_scores, reg, landmark = self.onet_detector.predict(cropped_ims)
        
        cls_scores = cls_scores[:, 1]
        keep_inds = np.where(cls_scores > self.thresh[2])[0]
        if len(keep_inds) > 0:
            boxes = dets[keep_inds]
            boxes[:, 4] = cls_scores[keep_inds]
            reg = reg[keep_inds]
            landmark = landmark[keep_inds]
        else:
            return None, None, None

        w = boxes[:, 2] - boxes[:, 0] + 1
        h = boxes[:, 3] - boxes[:, 1] + 1
        landmark[:, 0::2] = (np.tile(w, (5, 1)) * landmark[:, 0::2].T + np.tile(boxes[:, 0], (5, 1)) - 1).T
        landmark[:, 1::2] = (np.tile(h, (5, 1)) * landmark[:, 1::2].T + np.tile(boxes[:, 1], (5, 1)) - 1).T
        boxes_c = self.calibrate_box(boxes, reg)

        boxes = boxes[py_nms(boxes, 0.6)]
        keep = py_nms(boxes_c, 0.6)
        boxes_c = boxes_c[keep]
        landmark = landmark[keep]
        return boxes, boxes_c, landmark

    def processed_image(self, img, scale):
        """
        预处理数据,转化图像尺度并对像素归一到[-1, 1]
        """
        height, width, channels = img.shape
        new_height = int(height * scale)  
        new_width = int(width * scale)  
        new_dim = (new_width, new_height)
        img_resized = cv2.resize(img, new_dim, interpolation=cv2.INTER_LINEAR) 
        img_resized = (img_resized - 127.5) / 128
        return img_resized

    def generate_bbox(self, cls_map, reg, scale, threshold):
        """
         得到对应原图的box坐标,分类分数,box偏移量
         cls_map: n * m(输入是cls_cls_map[:, :, 1], 第一维, 人脸框的概率.)
         reg: n * m * 4
        """

        # pnet大致将图像size缩小2倍
        stride = 2

        cellsize = 12

        # 将置信度高的留下, 即为预测的人脸框. 二维的.
        t_index = np.where(cls_map > threshold)

        # 没有人脸, 这里也可以是t_index[1].size
        # 使用np.where(二维数组), 得到包括两个元素的列表, 第一个元素是第一维的坐标, 第二个元素是第二维的坐标.
        if t_index[0].size == 0:
            return np.array([])
        # 偏移量
        dx1, dy1, dx2, dy2 = [reg[t_index[0], t_index[1], i] for i in range(4)]

        reg = np.array([dx1, dy1, dx2, dy2])
        score = cls_map[t_index[0], t_index[1]]
        # 对应原图的box坐标,分类分数,box偏移量
        # 原始图片中回归框坐标需要经过反向运算,计算方式如下,其中cellSize=12,是因为12*12的图片进去后变成1*1
        # stride=2是因为几层卷积中只有一个stride为2,scale代表的是我们在哪个尺度金字塔的图像,
        boundingbox = np.vstack([np.round((stride * t_index[1]) / scale),
                                 np.round((stride * t_index[0]) / scale),
                                 np.round((stride * t_index[1] + cellsize) / scale),
                                 np.round((stride * t_index[0] + cellsize) / scale),
                                 score,
                                 reg])
        # shape[n,9]
        return boundingbox.T

    def pad(self, bboxes, w, h):
        """
        将超出图像的box进行处理
        参数:
          bboxes: 人脸框
          w, h: 图像长宽
        返回值:
          dy, dx : 为调整后的box的左上角坐标相对于原box左上角的坐标
          edy, edx : n为调整后的box右下角相对原box左上角的相对坐标
          y, x : 调整后的box在原图上左上角的坐标
          ey, ex : 调整后的box在原图上右下角的坐标
          tmph, tmpw: 原始box的长宽
        """
        # box的长宽
        tmpw, tmph = bboxes[:, 2] - bboxes[:, 0] + 1, bboxes[:, 3] - bboxes[:, 1] + 1
        num_box = bboxes.shape[0]

        dx, dy = np.zeros((num_box, )), np.zeros((num_box, ))
        edx, edy = tmpw.copy() - 1, tmph.copy() - 1
        # box左上右下的坐标
        x, y, ex, ey = bboxes[:, 0], bboxes[:, 1], bboxes[:, 2], bboxes[:, 3]
        # 找到超出右下边界的box并将ex, ey归为图像的w, h
        # edx, edy为调整后的box右下角相对原box左上角的相对坐标
        tmp_index = np.where(ex > w - 1)
        # w -1 + tmpw -1 - edx= ex -> edx = w + tmpw - ex - 2
        edx[tmp_index] = tmpw[tmp_index] + w - 2 - ex[tmp_index]
        ex[tmp_index] = w - 1

        tmp_index = np.where(ey > h - 1)
        # h -1 + tmph -1 - edy = ey -> edy = h + tmph - ey - 2
        edy[tmp_index] = tmph[tmp_index] + h - 2 - ey[tmp_index]
        ey[tmp_index] = h - 1
        # 找到超出左上角的box并将x,y归为0
        # dx, dy为调整后的box的左上角坐标相对于原box左上角的坐标
        tmp_index = np.where(x < 0)
        dx[tmp_index] = 0 - x[tmp_index]
        x[tmp_index] = 0

        tmp_index = np.where(y < 0)
        dy[tmp_index] = 0 - y[tmp_index]
        y[tmp_index] = 0

        return_list = [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph]
        return_list = [item.astype(np.int32) for item in return_list]

        return return_list

    def calibrate_box(self, bbox, reg):
        """
        校准box
        参数:
          bbox: PNet生成的box
          reg: RNet生成的box偏移值
        返回值:
          调整后的box是针对原图的绝对坐标
        """
        bbox_c = bbox.copy()
        w = bbox[:, 2] - bbox[:, 0] + 1
        w = np.expand_dims(w, 1)
        h = bbox[:, 3] - bbox[:, 1] + 1
        h = np.expand_dims(h, 1)
        reg_m = np.hstack([w, h, w, h])
        aug = reg_m * reg
        bbox_c[:, 0:4] = bbox_c[:, 0:4] + aug
        return bbox_c

    def detect(self, img):
        """
        用于测试当个图像的
        """
        boxes = None

        # PNet
        if self.pnet_detector:
            boxes, boxes_c, _ = self.detect_pnet(img)
            if boxes_c is None:
                return np.array([]), np.array([])

        # RNet
        if self.rnet_detector:
            boxes, boxes_c, _ = self.detect_rnet(img, boxes_c)
            if boxes_c is None:
                return np.array([]), np.array([])

        # ONet
        if self.onet_detector:
            boxes, boxes_c, landmark = self.detect_onet(img, boxes_c)
            if boxes_c is None:
                return np.array([]), np.array([])

        return boxes_c, landmark
View Code

测试验证过程:

python test.py

结果:

 

图片数据来源网络,仅供学习使用,如有侵权,请联系删除,谢谢!

参考:https://www.ctolib.com/LeslieZhoa-tensorflow-MTCNN.html

原文地址:https://www.cnblogs.com/xjlearningAI/p/12391312.html