基于FCN的人脸检测

由于本人深度学习环境安装在windows上,因此下面是在windows系统上实现的。仅供自己学习记录。

使用caffe训练模型,首先需要准备数据。

正样本:对于人脸检测项目,正样本就是人脸的图片。制作正样本需要将人脸从图片中裁剪出来(数据源已经标注出人脸在图片中的坐标)。裁剪完成之后,需要check一下数据是否制作的没问题。

负样本:随机进行裁剪,使用IOU确定是正样本还是负样本。比如:IOU<0.3为负样本,最好是拿没有人脸的图片。

1、caffe数据源准备:

caffe支持LMDB数据,在训练模型时首先需要将训练集、验证集转换成LMDB数据。

首先需要准备两个txt文件:train.txt和test.txt。格式如下: 

/path/to/folder/image_x.jpg 0 (即图片样本所在的路径和标签。文本后面的标签,对于二分类时,为0和1。本例中,0表示人脸数据,1表示非人脸数据。)

可以使用脚本来获取txt文档。简单写个脚本(获取train.txt)如下:

(txt文档中应该只需要相对路径,如train.txt的格式如下:xxxx.jpg label ,下面的代码有点问题)——2019年7月28日更新

import os

full_train_path = r"C:UsersAdministratorDesktopFaceDetection	rain.txt"
full_val_path = r"C:UsersAdministratorDesktopFaceDetectionval.txt"

train_txt = open(full_train_path, 'w')
val_txt = open(full_val_path, 'w')

# get train.txt
for file in os.listdir(r"C:UsersAdministratorDesktopFaceDetection	rain	rain"):
    for figure in os.listdir(r"C:UsersAdministratorDesktopFaceDetection	rain	rain\" + file):
        train_txt.writelines(file + r"/" + figure + " " + file + "
")

train_txt.close()

#get val.txt
for val_file in os.listdir(r"C:UsersAdministratorDesktopFaceDetection	rainval"):
    if val_file.find("faceimage") != -1:
        val_txt.writelines(val_file + " " + "0" + "
")
    else:
        val_txt.writelines(val_file + " " + "1" + "
")

val_txt.close()

 2、制作LMDB数据源:

分类问题使用LMDB数据,回归问题使用HDF5数据。

使用caffe自带的脚本文件,制作LMDB数据源。

convert_imageset使用格式:
convert_imageset --参数(如:resize、shuffle等) 数据源路径 数据源的txt 需要输出的lmdb路径。
cd C:Program Filescaffe-windowsscriptsuild	oolsRelease
convert_imageset.exe --resize_height=227 --resize_width=227 --shuffle C:UsersAdministratorDesktopFaceDetection rain rain C:UsersAdministratorDesktopFaceDetection rain rain rain.txt C:UsersAdministratorDesktopFaceDetection rain_lmdb convert_imageset.exe --resize_height=227 --resize_width=227 --shuffle C:UsersAdministratorDesktopFaceDetection rainval C:UsersAdministratorDesktopFaceDetection rainvalval.txt C:UsersAdministratorDesktopFaceDetectionval_lmdb

 3、训练ALEXNET网络:

3.1配置caffe文件:

1、train.prototxt

配置caffe格式的ALEXNET网络结构。

2、solver.prototxt

①net:指定网络配置文件路径:

②test_iter:设置一次测试需要测试的batch数。最好是test_iter * batch_size = 样本总个数。

③base_lr:基础学习率。最终总的学习率为base_lr * lr_mult(train.prototxt中每一层指定的)。学习率不能太大,太大会

注:windows版本中,配置文件的路径使用“/”,如:source: "C:/Users/Administrator/Desktop/FaceDetection/train_lmdb"

3.2编写脚本,训练模型,得到模型(_iter_36000.caffemodel):

cd C:Program Filescaffe-windowsscriptsuild	oolsRelease
caffe.exe train --solver=C:UsersAdministratorDesktopFaceDetectionsolver.prototxt

训练过程如下:

 4、人脸识别检测算法框架:

4.1滑动窗口:

对输入的图片,画出不同的227*227的窗口(到目前为止,只支持固定大小的图片---卷积神经网络,最后的全连接层,参数固定。后面讲到全卷积网络,可以输入任意大小图片)。

为了检测不同尺寸图片中的人脸。需要进行多尺度scale变换。

FCN全卷积网络。得到heatmap,heatmap每一个点,代表了原图的每一个区域,其值为该区域是人脸的概率值。通过前向传播forward_all() ,得到heatmap。

设置阈值α,比如当α>0.9时,保存框。这样的结果可能得到多个框。可以使用NMS(非极大值抑制)得到最终的一个框。

4.2将训练时的全连接的Alexnet网络进行转换成全卷积网络fcn的模型:

可以根据caffe官网示例操作:https://nbviewer.jupyter.org/github/BVLC/caffe/blob/master/examples/net_surgery.ipynb

首先需要将原先全连接网络的deploy.prototxt文件中的全连接层(InnerProduct)转换层卷积网络层(Convolution),并计算设定kernel size。

然后使用以下代码转换成全卷积网络fcn的模型(full_conv.caffemodel)

    net = caffe.Net(r"C:UsersAdministratorDesktopFaceDetectiondeploy.prototxt",
                    r"C:UsersAdministratorDesktopFaceDetectionmodel2\_iter_36000.caffemodel",
                    caffe.TEST)
    params = ['fc6', 'fc7', 'fc8_flickr']

    fc_params = {pr: (net.params[pr][0].data, net.params[pr][1].data) for pr in params}

    for fc in params:
        print("{} weights are {} dimensional and biases are {} dimensional".format(fc, fc_params[fc][0].shape, fc_params[fc][1].shape))

    net_fully_conv = caffe.Net(r"C:UsersAdministratorDesktopFaceDetectiondeploy_full_conv.prototxt",
                               r"C:UsersAdministratorDesktopFaceDetectionmodel2\_iter_36000.caffemodel",
                               caffe.TEST)
    params_fully_conv = ['fc6-conv', 'fc7-conv', 'fc8-conv']

    conv_params = {pr:(net_fully_conv.params[pr][0].data, net_fully_conv.params[pr][1].data) for pr in params_fully_conv}
    for conv in params_fully_conv:
        print("{} weights are {} dimensional and biases are {} dimensional".format(conv, conv_params[conv][0].shape, conv_params[conv][1].shape))

    for pr, pr_conv in zip(params, params_fully_conv):
        conv_params[pr_conv][0].flat = fc_params[pr][0].flat
        conv_params[pr_conv][1][...] = fc_params[pr][1]
    net_fully_conv.save(r"C:UsersAdministratorDesktopFaceDetectionfull_conv.caffemodel")

4.3使用训练好的模型,编码实现人脸检测:

import os
import sys
import numpy as np
import math
import cv2
import random

caffe_root = r"C:Program Filescaffe-windowspython"
sys.path.insert(0, caffe_root + 'python')
os.environ['GLOG_minloglevel'] = '2'
import caffe

class Point(object):
    def __init__(self, x, y):
        self.x = x
        self.y = y

class Rect(object):
    def __init__(self, p1, p2):
        """Store the top, bottom, left, right values for points
        p1, p2 are the left-top and right-bottom points of the rectangle"""
        self.left = min(p1.x, p2.x)
        self.right = max(p1.x, p2.x)
        self.bottom = min(p1.y, p2.y)
        self.top = max(p1.y, p2.y)

    def __str__(self):
        return "Rect[%d, %d, %d, %d]" %(self.left, self.top, self.right, self.bottom)

def calcDistance(x1, y1, x2, y2):
    dist = math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
    return dist

def range_overlap(a_min, a_max, b_min, b_max):
    """Judge whether there is intersection on one dimension"""
    return (a_min <= b_max) and (a_max >= b_min)

def rect_overlaps(r1, r2):
    """Judge whether the two rectangles have intersection"""
    return range_overlap(r1.left, r1.right, r2.left, r2.right) and range_overlap(r1.bottom, r1.top, r2.bottom, r2.top)

def rect_merge(r1, r2, mergeThresh):
    """Calculate the merge area of two rectangles"""
    if rect_overlaps(r1, r2):
        SI = abs(min(r1.right, r2.right) - max(r1.left, r2.left)) * abs(min(r1.top, r2.top) - max(r1.bottom, r2.bottom))
        SA = abs(r1.right - r1.left) * abs(r1.top - r1.bottom)
        SB = abs(r2.right - r2.left) * abs(r2.top - r2.bottom)
        S = SA + SB - SI

        ratio = float(SI) / float(S)

        if ratio > mergeThresh:
            return 1
    return 0

def generateBoundingBox(featureMap, scale):
    boundingBox = []
    """We can calculate the stride from the architecture of the alexnet"""
    stride = 32
    """We need to get the boundingbox whose size is 227 * 227. When we trained the alexnet,
    we also resize the size of the input image to 227 * 227 in caffe"""
    cellSize = 227

    for (x, y), prob in np.ndenumerate(featureMap):
        if(prob >= 0.50):
            """Get the bounding box: we record the left-bottom and right-top coordinates"""
            boundingBox.append([float(stride * y) / scale, float(stride * x) / scale, float(stride * y + cellSize - 1) / scale,
                               float(stride * x + cellSize - 1) / scale, prob])
    return boundingBox

def nms_average(boxes, groupThresh = 2, overlapThresh=0.2):
    rects = []

    for i in range(len(boxes)):
        if boxes[i][4] > 0.2:
            """The box in here, we record the left-bottom coordinates(y, x) and the height and width"""
            rects.append([boxes[i, 0], boxes[i, 1], boxes[i, 2] - boxes[i, 0], boxes[i, 3] - boxes[i, 1]])

    rects, weights = cv2.groupRectangles(rects, groupThresh, overlapThresh)

    rectangles = []
    for i in range(len(rects)):
        testRect = Rect(Point(rects[i, 0], rects[i, 1]), Point(rects[i, 0] + rects[i, 2], rects[i, 1] + rects[i, 3]))
        rectangles.append(testRect)
    clusters = []
    for rect in rectangles:
        matched = 0
        for cluster in clusters:
            if (rect_merge(rect, cluster, 0.2)):
                matched = 1
                cluster.left = (cluster.left + rect.left) / 2
                cluster.right = (cluster.right + rect.right) / 2
                cluster.bottom = (cluster.bottom + rect.bottom) / 2
                cluster.top = (cluster.top + rect.top) / 2
        if (not matched):
            clusters.append(rect)

    result_boxes = []
    for i in range(len(clusters)):
        result_boxes.append([clusters[i].left, clusters[i].bottom, clusters[i].right, clusters[i].top, 1])

    return result_boxes

def face_detection(imgFlie):
    net_fully_conv = caffe.Net(r"C:UsersAdministratorDesktopFaceDetectiondeploy_full_conv.prototxt",
                               r"C:UsersAdministratorDesktopFaceDetectionfull_conv.caffemodel",
                               caffe.TEST)

    scales = []
    factor = 0.793700526

    img = cv2.imread(imgFlie)
    print(img.shape)

    largest = min(2, 4000 / max(img.shape[0:2]))
    scale = largest
    minD = largest * min(img.shape[0:2])
    while minD >= 227:
        scales.append(scale)
        scale *= factor
        minD *= factor
    total_boxes = []

    for scale in scales:
        scale_img = cv2.resize(img, (int(img.shape[0] * scale), int(img.shape[1] * scale)))
        cv2.imwrite(r"C:UsersAdministratorDesktopFaceDetectionscale_img.jpg", scale_img)
        im = caffe.io.load_image(r"C:UsersAdministratorDesktopFaceDetectionscale_img.jpg")

        """Change the test input data size of the scaled image size """
        net_fully_conv.blobs['data'].reshape(1, 3, scale_img.shape[1], scale_img.shape[0])
        transformer = caffe.io.Transformer({'data': net_fully_conv.blobs['data'].data.shape})
        transformer.set_transpose('data', (2, 0, 1))
        transformer.set_channel_swap('data', (2, 1, 0))
        transformer.set_raw_scale('data', 255.0)

        out = net_fully_conv.forward_all(data=np.asarray([transformer.preprocess('data', im)]))
        print(out['prob'][0, 1].shape)

        boxes = generateBoundingBox(out['prob'][0, 1], scale)

        if (boxes):
            total_boxes.extend(boxes)
    print(total_boxes)
    boxes_nms = np.array(total_boxes)
    true_boxes = nms_average(boxes_nms, 1, 0.2)

    if (not true_boxes == []):
        (x1, y1, x2, y2) = true_boxes[0][:-1]
        cv2.rectangle(img, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0))
        win = cv2.namedWindow('face detection', flags=0)
        cv2.imshow('face detection', img)
        cv2.waitKey(0)

if __name__ == "__main__":
    img = r"C:UsersAdministratorDesktopFaceDetection	mp9055.jpg"
    face_detection(img)

因为电脑配置实在是太低了,所以训练了好久,电脑开着跑了好几天,也没有训练很多次。所以模型训练的不是很好。本例中,经过调参,发现生成boundingbox时,prob设置为大于等于0.5得到的结果较好。结果如下:

另外,也是用过tensorflow写过训练代码,但是由于电脑太差,训练速度太慢、精度太差。待以后慢慢再进一步学习。

2、tensorflow实现

2.1将数据转换成TFRecord文件,便于后续训练,代码如下:

import tensorflow as tf
import numpy as np
import os
import cv2
from PIL import Image

class0_path = "/home/sxj/DL/face_detect/train/train/0/"
class1_path = "/home/sxj/DL/face_detect/train/train/1/"
tf_output_dir = "/home/sxj/DL/face_detect/data/"
tf_filename = "/home/sxj/DL/face_detect/data/train.tfrecord"
SAMPLER_PER_FILES = 5000

# 将数据集转换成TFRecords格式
def int64_feature(value):
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))

def bytes_feature(value):
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=value))


def get_output_filename(tf_output_dir, dataset_name, fdx):
    return "%s/%s_%03d.tfrecord"

# 将标签为0的数据集进行转换
i = 0
total_files = len(os.listdir(class0_path))
train_writer = tf.python_io.TFRecordWriter(tf_filename)

for img in os.listdir(class0_path):
    print("转换图片进度%d/%d" % (i + 1, total_files))

    # 获取图片
    file_name = class0_path + img
    img_raw = Image.open(file_name)
    img_raw = img_raw.resize((227, 227))
    img_data = img_raw.tobytes()
    # img_data = tf.gfile.FastGFile(file_name, 'rb').read()

    # 将图片数据封装出example
    example = tf.train.Example(features=tf.train.Features(feature={
        'label': int64_feature(value=0),
        'image/encoded': bytes_feature(value=img_data)
    }))

    # 序列化
    train_writer.write(example.SerializeToString())

    i += 1

print("数据集 转换成功")

# 将标签为1的数据集进行转换
i = 0
total_files = len(os.listdir(class1_path))

for img in os.listdir(class1_path):
    print("转换图片进度%d/%d" % (i + 1, total_files))

    # 获取图片
    file_name = class1_path + img
    # img_data = tf.gfile.FastGFile(file_name, 'rb').read()
    img_raw = Image.open(file_name)
    img_raw = img_raw.resize((227, 227))
    img_data = img_raw.tobytes()

    # 将图片数据封装出example
    example = tf.train.Example(features=tf.train.Features(feature={
        'label': int64_feature(value=1),
        'image/encoded': bytes_feature(value=img_data)
    }))

    # 序列化
    train_writer.write(example.SerializeToString())

    i += 1

train_writer.close()
View Code

2.2 TFRecord文件读取

import os
import sys
import numpy as np
import math
import cv2
import random
import tensorflow as tf
import matplotlib.pyplot as plt

slim = tf.contrib.slim


batch_size = 32
img_size = 227
num_bathes = 100
train_step = 1000001
model_path = "/home/sxj/DL/insightface/alex_model"
model_name = 'Alex'
addr = '/home/sxj/DL/face_detect/data/train.tfrecords'


lr_steps = [40000, 60000, 80000]
lr_values = [0.004, 0.002, 0.0012, 0.0004]


class Point(object):
    def __init__(self, x, y):
        self.x = x
        self.y = y


class Rect(object):
    def __init__(self, p1, p2):
        """Store the top, bottom, left, right values for points
        p1, p2 are the left-top and right-bottom points of the rectangle"""
        self.left = min(p1.x, p2.x)
        self.right = max(p1.x, p2.x)
        self.bottom = min(p1.y, p2.y)
        self.top = max(p1.y, p2.y)

    def __str__(self):
        return "Rect[%d, %d, %d, %d]" %(self.left, self.top, self.right, self.bottom)


def calcDistance(x1, y1, x2, y2):
    dist = math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
    return dist


def range_overlap(a_min, a_max, b_min, b_max):
    """Judge whether there is intersection on one dimension"""
    return (a_min <= b_max) and (a_max >= b_min)


def rect_overlaps(r1, r2):
    """Judge whether the two rectangles have intersection"""
    return range_overlap(r1.left, r1.right, r2.left, r2.right) and range_overlap(r1.bottom, r1.top, r2.bottom, r2.top)


def rect_merge(r1, r2, mergeThresh):
    """Calculate the merge area of two rectangles"""
    if rect_overlaps(r1, r2):
        SI = abs(min(r1.right, r2.right) - max(r1.left, r2.left)) * abs(min(r1.top, r2.top) - max(r1.bottom, r2.bottom))
        SA = abs(r1.right - r1.left) * abs(r1.top - r1.bottom)
        SB = abs(r2.right - r2.left) * abs(r2.top - r2.bottom)
        S = SA + SB - SI

        ratio = float(SI) / float(S)

        if ratio > mergeThresh:
            return 1
    return 0


def softmax(a, b):
    a0 = math.exp(a)
    a1 = math.exp(b)
    return a0/(a0 + a1)


def generateBoundingBox(featureMap, scale):
    boundingBox = []
    """We can calculate the stride from the architecture of the alexnet"""
    stride = 32
    """We need to get the boundingbox whose size is 227 * 227. When we trained the alexnet,
    we also resize the size of the input image to 227 * 227 in caffe"""
    cellSize = 227
    # print(featureMap.shape[0])
    # featureMap = tf.nn.softmax(featureMap)
    # print(featureMap[0][0][0])
    # print(featureMap[0][0][1])
    for x in range(featureMap.shape[0]):
        for y in range(featureMap.shape[1]):
            prob = softmax(featureMap[x][y][0], featureMap[x][y][1])
            if prob > 0.4:
                print("success")
                boundingBox.append(
                    [float(stride * y) / scale, float(stride * x) / scale, float(stride * y + cellSize - 1) / scale,
                     float(stride * x + cellSize - 1) / scale, 1])
    return boundingBox

    # for (x, prob) in np.ndenumerate(featureMap):
        # if x[2] == 0
        # print(x)
        # print(prob)
        # print("aaaaaaaaaaa")

    # for x, y in zip(range(featureMap.get_shape().as_list()[0]), range(featureMap.get_shape().as_list()[1])):
    # for x in range(featureMap.get_shape().as_list()[0]):
    #     for y in range(featureMap.get_shape().as_list()[1]):
    #         # print(x)
    #         # print(y)
    #         prob = tf.nn.softmax([featureMap[x, y][0], featureMap[x, y][1]])
    #         print(sess.run(prob[0]))
    #         print(prob[1])
    #         print("----")
    #         if (prob[0] >= 0.50):
    #             """Get the bounding box: we record the left-bottom and right-top coordinates"""
    #             boundingBox.append(
    #                 [float(stride * y) / scale, float(stride * x) / scale, float(stride * y + cellSize - 1) / scale,
    #                  float(stride * x + cellSize - 1) / scale, prob])
    #     # prob = tf.nn.softmax([featureMap[x, y][0], featureMap[x, y][1]])
    #     # if(prob[0] >= 0.50):
    #     #     """Get the bounding box: we record the left-bottom and right-top coordinates"""
    #     #     boundingBox.append([float(stride * y) / scale, float(stride * x) / scale, float(stride * y + cellSize - 1) / scale,
    #     #                        float(stride * x + cellSize - 1) / scale, prob])
    # return boundingBox


def nms_average(boxes, groupThresh = 2, overlapThresh=0.2):
    rects = []

    for i in range(len(boxes)):
        if boxes[i][4] > 0.2:
            """The box in here, we record the left-bottom coordinates(y, x) and the height and width"""
            rects.append([boxes[i, 0], boxes[i, 1], boxes[i, 2] - boxes[i, 0], boxes[i, 3] - boxes[i, 1]])

    rects, weights = cv2.groupRectangles(rects, groupThresh, overlapThresh)

    rectangles = []
    for i in range(len(rects)):
        testRect = Rect(Point(rects[i, 0], rects[i, 1]), Point(rects[i, 0] + rects[i, 2], rects[i, 1] + rects[i, 3]))
        rectangles.append(testRect)
    clusters = []
    for rect in rectangles:
        matched = 0
        for cluster in clusters:
            if (rect_merge(rect, cluster, 0.2)):
                matched = 1
                cluster.left = (cluster.left + rect.left) / 2
                cluster.right = (cluster.right + rect.right) / 2
                cluster.bottom = (cluster.bottom + rect.bottom) / 2
                cluster.top = (cluster.top + rect.top) / 2
        if (not matched):
            clusters.append(rect)

    result_boxes = []
    for i in range(len(clusters)):
        result_boxes.append([clusters[i].left, clusters[i].bottom, clusters[i].right, clusters[i].top, 1])

    return result_boxes


def print_tensor_info(tensor):
    print("tensor name:", tensor.op.name, "-tensor shape:", tensor.get_shape().as_list())


def read_single_tfrecord(addr, _batch_size, shape):
    filename_queue = tf.train.string_input_producer([addr], shuffle=True)

    reader = tf.TFRecordReader()
    _, serialized_example = reader.read(filename_queue)

    features = tf.parse_single_example(serialized_example,
                                       features={
                                        'image': tf.FixedLenFeature([], tf.string),
                                        'label': tf.FixedLenFeature([], tf.int64)})
    img = tf.decode_raw(features['image'], tf.uint8)
    label = tf.cast(features['label'], tf.int32)
    img = tf.reshape(img, [shape, shape, 3])
    # img = augmentation(img)
    img=(tf.cast(img, tf.float32)-127.5)/128
    min_after_dequeue = 10000
    batch_size = _batch_size
    capacity = min_after_dequeue + 10 * batch_size
    image_batch, label_batch = tf.train.shuffle_batch([img, label],
                                                        batch_size=batch_size,
                                                        capacity=capacity,
                                                        min_after_dequeue=min_after_dequeue,
                                                        num_threads=4)

    label_batch = tf.reshape(label_batch, [batch_size])

    return image_batch, label_batch


def Network(images, is_train):
    # input 227*227
    with tf.variable_scope('vgg_16'):
        with slim.arg_scope([slim.conv2d, slim.max_pool2d], padding='SAME'):
            conv1 = slim.conv2d(images, 96, [11, 11], stride=[4, 4], scope='conv_1')  # 55*55*96
            print_tensor_info(conv1)
            pool1 = slim.max_pool2d(conv1, [3, 3], stride=[2, 2], scope='pool_1')  # 27*27*96
            print_tensor_info(pool1)

            conv2 = slim.conv2d(pool1, 256, [5, 5], stride=[1, 1], scope='conv_2')
            print_tensor_info(conv2)
            pool2 = slim.max_pool2d(conv2, [3, 3], stride=[2, 2], scope='pool_2')
            print_tensor_info(pool2)

            conv3 = slim.conv2d(pool2, 384, [3, 3], stride=[1, 1], scope='conv_3')
            print_tensor_info(conv3)

            conv4 = slim.conv2d(conv3, 384, [3, 3], stride=[1, 1], scope='conv_4')
            print_tensor_info(conv4)

            conv5 = slim.conv2d(conv4, 256, [3, 3], stride=[1, 1], scope='conv_5')
            pool5 = slim.max_pool2d(conv5, [3, 3], stride=[2, 2], scope='pool_5')
            print_tensor_info(pool5)

            conv6 = slim.conv2d(pool5, 256, [3, 3], stride=[1, 1], scope='conv_6')
            print_tensor_info(conv6)

            output = slim.conv2d(conv6, 2, [8, 8], stride=[8, 8], scope='output')
            print_tensor_info(output)
            if is_train:
                output = tf.reshape(output, [-1, 2])
                print_tensor_info(output)
            else:
                output = tf.squeeze(output, axis=0)
                output = tf.nn.softmax(output)
                print_tensor_info(output)
            return output

# def test():
#     reader = tf.TFRecordReader()
#     filename_queue = tf.train.string_input_producer([addr])
#
#     _, serialized_example = reader.read(filename_queue)
#
#     features = tf.parse_single_example(serialized_example,
#                                        features={
#                                            'image': tf.FixedLenFeature([], tf.string),
#                                            'label': tf.FixedLenFeature([], tf.int64),
#                                        })
#
#     image = tf.decode_raw(features['image'], tf.uint8)
#     # image = image.set_shape([227, 227, 3])
#     image = tf.reshape(image, [227, 227, 3])
#     label = tf.cast(features['label'], tf.int32)
#     [img_batch, label_batch] = tf.train.shuffle_batch([image, label],
#                                                       batch_size=32,
#                                                       capacity=64,
#                                                       min_after_dequeue=32)
#
#     sess = tf.Session()
#     coord = tf.train.Coordinator()
#     threads = tf.train.start_queue_runners(sess=sess, coord=coord)
#
#     for i in range(10):
#         print(sess.run([img_batch, label_batch]))
#         print(tf.shape(image))


def train():
    image = tf.placeholder(tf.float32, [batch_size, img_size, img_size, 3], name='image')
    label = tf.placeholder(tf.int32, [batch_size], name='label')
    logit = Network(image, True)  # [batch, 2]
    train_loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logit, labels=label))

    train_images, train_labels = read_single_tfrecord(addr, batch_size, img_size)

    with tf.name_scope('loss'):
        tf.summary.scalar('train_loss', train_loss)

    global_step = tf.Variable(name='global_step', initial_value=0, trainable=False)
    inc_op = tf.assign_add(global_step, 1, name='increment_global_step')

    scale = int(128.0/batch_size)
    _lr_steps = [scale*s for s in lr_steps]
    _lr_values = [v/scale for v in lr_values]
    lr = tf.train.piecewise_constant(global_step, boundaries=_lr_steps, values=_lr_values, name='lr_schedule')

    update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
    with tf.control_dependencies(update_ops):
        train_op = tf.train.MomentumOptimizer(learning_rate=lr, momentum=0.9).minimize(train_loss)

    with tf.name_scope('accuracy'):
        # label = tf.one_hot(label, 2)
        train_accuracy = tf.reduce_mean(
            tf.cast(tf.equal(tf.to_int32(tf.argmax(tf.nn.softmax(logit), axis=1)), label), tf.float32))
        tf.summary.scalar('train_accuracy', train_accuracy)

    saver = tf.train.Saver(max_to_keep=5)
    merged = tf.summary.merge_all()

    with tf.Session() as sess:
        sess.run((tf.global_variables_initializer(),
                  tf.local_variables_initializer()))
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)

        writer_train = tf.summary.FileWriter("/home/sxj/DL/insightface/alex_model/%s" % (model_name), sess.graph)

        try:

            for i in range(1, train_step):
                image_batch, label_batch = sess.run([train_images, train_labels])
                sess.run([train_op, inc_op], feed_dict={image: image_batch, label: label_batch})
                if (i % 100 == 0):
                    summary = sess.run(merged, feed_dict={image: image_batch, label: label_batch})
                    writer_train.add_summary(summary, i)
                if (i % 1000 == 0):
                    print('次数', i)
                    print('train_accuracy',
                          sess.run(train_accuracy, feed_dict={image: image_batch, label: label_batch}))
                    print('train_loss', sess.run(train_loss,
                                                 {image: image_batch, label: label_batch}))
                    if (i % 10000 == 0):
                        saver.save(sess, os.path.join(model_path, model_name), global_step=i)

        except tf.errors.OutOfRangeError:
            print("finished")
        finally:
            coord.request_stop()
            writer_train.close()

        # for i in range(1, train_step):
        #     image_batch, label_batch = sess.run([train_images, train_labels])
        #
        #     sess.run([train_op, inc_op], feed_dict={image: image_batch, label: label_batch})
        # #     if (i % 100 == 0):
        # #         summary = sess.run(merged, feed_dict={image: image_batch, label: label_batch})
        # #         writer_train.add_summary(summary, i)
        #     if (i % 10 == 0):
        #         print('次数', i)
        #         print('train_accuracy',
        #               sess.run(train_accuracy, feed_dict={image: image_batch, label: label_batch}))
        #         print('train_loss', sess.run(train_loss,
        #                                      {image: image_batch, label: label_batch}))
        #         if (i % 10000 == 0):
        #             saver.save(sess, os.path.join(model_path, model_name), global_step=i)
        # # coord.join(threads)


# def face_detection(imgFlie):
#
#     scales = []
#     factor = 0.793700526
#
#     img = cv2.imread(imgFlie)
#
#     image_size = [227, 227]
#     # image = tf.placeholder(tf.float32, [1, 227, 227, 3], name='image')
#     # a = Network(image)
#     largest = min(2, 4000 / max(img.shape[0:2]))
#     scale = largest
#     minD = largest * min(img.shape[0:2])
#     while minD >= 227:
#         scales.append(scale)
#         scale *= factor
#         minD *= factor
#     total_boxes = []
#
#
#     with tf.Session() as sess:
#         # saver = tf.train.Saver()
#         # saver.restore(sess, "/home/sxj/DL/insightface/alex_model/Alex-500000")
#         sess.run((tf.global_variables_initializer(),
#                   tf.local_variables_initializer()))
#
#         for scale in scales:
#             tf.reset_default_graph()
#             scale_img = cv2.resize(img, (int(img.shape[0] * scale), int(img.shape[1] * scale)))
#             # print(scale_img.shape[0], scale_img.shape[1], scale_img.shape[2])
#             if scale_img.shape[0] < 227:
#                 continue
#             cv2.imwrite(r"/home/sxj/DL/face_detect/scale_img.jpg", scale_img)
#
#             im = cv2.imread(r"/home/sxj/DL/face_detect/scale_img.jpg")
#             im = np.array(im)
#             print(im.shape)
#
#             image = tf.placeholder(tf.float32, [1, im.shape[0], im.shape[1], im.shape[2]])
#
#             im = im.reshape(1, im.shape[0], im.shape[1], im.shape[2])
#             logit = Network(image, False)
#             saver = tf.train.Saver()
#             saver.restore(sess, "/home/sxj/DL/insightface/alex_model/Alex-10000")
#             a = sess.run(logit, feed_dict={image: im})
#
#             boxes = generateBoundingBox(sess, a, scale)
#
#             if (boxes):
#                 total_boxes.extend(boxes)
#         # print(total_boxes)
#         boxes_nms = np.array(total_boxes)
#         true_boxes = nms_average(boxes_nms, 1, 0.2)
#
#         if (not true_boxes == []):
#             (x1, y1, x2, y2) = true_boxes[0][:-1]
#             cv2.rectangle(img, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0))
#             win = cv2.namedWindow('face detection', flags=0)
#             cv2.imshow('face detection', img)
#             cv2.waitKey(0)

def face_detection(imgFlie):
    scales = []
    factor = 0.793700526

    img = cv2.imread(imgFlie)

    # image = tf.placeholder(tf.float32, name='image')
    largest = min(2, 4000 / max(img.shape[0:2]))
    scale = largest
    minD = largest * min(img.shape[0:2])
    while minD >= 227:
        scales.append(scale)
        scale *= factor
        minD *= factor
    total_boxes = []

    for scale in scales:
        image = tf.placeholder(tf.float32, name='image')
        scale_img = cv2.resize(img, (int(img.shape[0] * scale), int(img.shape[1] * scale)))

        cv2.imwrite(r"/home/sxj/DL/face_detect/scale_img.jpg", scale_img)

        im = cv2.imread(r"/home/sxj/DL/face_detect/scale_img.jpg")

        image_reshape = tf.reshape(image, [1, im.shape[0], im.shape[1], 3])
        logit = Network(image_reshape, False)
        with tf.Session() as sess:
            sess.run((tf.global_variables_initializer(),
                      tf.local_variables_initializer()))

            saver = tf.train.Saver()
            saver.restore(sess, "/home/sxj/DL/insightface/alex_model/Alex-80000")
            input = sess.run(logit, feed_dict={image: im})

            boxes = generateBoundingBox(input, scale)

            if (boxes):
                total_boxes.extend(boxes)

        tf.reset_default_graph()
    print(total_boxes)

    boxes_nms = np.array(total_boxes)
    true_boxes = nms_average(boxes_nms, 1, 0.2)

    if (not true_boxes == []):
        (x1, y1, x2, y2) = true_boxes[0][:-1]
        cv2.rectangle(img, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0))
        # win = cv2.namedWindow('face detection', flags=0)
        plt.imshow(img)
        plt.show()
        # cv2.imshow('face detection', img)
        # cv2.waitKey(0)

    # with tf.Session() as sess:
    #     sess.run((tf.global_variables_initializer(),
    #               tf.local_variables_initializer()))
    #
    #     for scale in scales:
    #         print("aaaaaaaaaaa")
    #         scale_img = cv2.resize(img, (int(img.shape[0] * scale), int(img.shape[1] * scale)))
    #         # print(scale_img.shape[0], scale_img.shape[1], scale_img.shape[2])
    #
    #         cv2.imwrite(r"/home/sxj/DL/face_detect/scale_img.jpg", scale_img)
    #
    #         im = cv2.imread(r"/home/sxj/DL/face_detect/scale_img.jpg")
    #
    #         image_reshape = tf.reshape(image, [1, im.shape[0], im.shape[1], 3])
    #         logit = Network(image_reshape, False)
    #
    #         saver = tf.train.Saver()
    #         saver.restore(sess, "/home/sxj/DL/insightface/alex_model/Alex-1000")
    #         a = sess.run(logit, feed_dict={image: im})
    #
    #         boxes = generateBoundingBox(sess, a, scale)
    #
    #         if (boxes):
    #             total_boxes.extend(boxes)
    #
    #
    #     # print(total_boxes)
    #     boxes_nms = np.array(total_boxes)
    #     true_boxes = nms_average(boxes_nms, 1, 0.2)
    #
    #     if (not true_boxes == []):
    #         (x1, y1, x2, y2) = true_boxes[0][:-1]
    #         cv2.rectangle(img, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0))
    #         win = cv2.namedWindow('face detection', flags=0)
    #         cv2.imshow('face detection', img)
    #         cv2.waitKey(0)

def run_bechmark():
    with tf.Graph().as_default():
        image_size = 227
        # 以高斯分布产生一些图片
        images = tf.Variable(tf.random_normal([batch_size, image_size, image_size, 3],
                                              dtype=tf.float32, stddev=0.1))
        Network(images)
        init = tf.global_variables_initializer()
        sess = tf.Session()
        sess.run(init)


if __name__ == "__main__":
    train()
    # img = "/home/sxj/DL/face_detect/c.jpg"
    # face_detection(img)
View Code

训练和测试代码:

import os
import sys
import numpy as np
import math
import cv2
import random
import tensorflow as tf
import matplotlib.pyplot as plt

# 因为自己电脑安装的cv2显示图片有问题,这里使用matplotlib来显示图片

slim = tf.contrib.slim


batch_size = 32
img_size = 227
num_bathes = 100
train_step = 1000001
model_path = "/home/sxj/DL/insightface/alex_model"
model_name = 'Alex'
addr = '/home/sxj/DL/face_detect/data/train.tfrecords'


lr_steps = [40000, 60000, 80000]
lr_values = [0.004, 0.002, 0.0012, 0.0004]


class Point(object):
    def __init__(self, x, y):
        self.x = x
        self.y = y


class Rect(object):
    def __init__(self, p1, p2):
        """Store the top, bottom, left, right values for points
        p1, p2 are the left-top and right-bottom points of the rectangle"""
        self.left = min(p1.x, p2.x)
        self.right = max(p1.x, p2.x)
        self.bottom = min(p1.y, p2.y)
        self.top = max(p1.y, p2.y)

    def __str__(self):
        return "Rect[%d, %d, %d, %d]" %(self.left, self.top, self.right, self.bottom)


def calcDistance(x1, y1, x2, y2):
    dist = math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
    return dist


def range_overlap(a_min, a_max, b_min, b_max):
    """Judge whether there is intersection on one dimension"""
    return (a_min <= b_max) and (a_max >= b_min)


def rect_overlaps(r1, r2):
    """Judge whether the two rectangles have intersection"""
    return range_overlap(r1.left, r1.right, r2.left, r2.right) and range_overlap(r1.bottom, r1.top, r2.bottom, r2.top)


def rect_merge(r1, r2, mergeThresh):
    """Calculate the merge area of two rectangles"""
    if rect_overlaps(r1, r2):
        SI = abs(min(r1.right, r2.right) - max(r1.left, r2.left)) * abs(min(r1.top, r2.top) - max(r1.bottom, r2.bottom))
        SA = abs(r1.right - r1.left) * abs(r1.top - r1.bottom)
        SB = abs(r2.right - r2.left) * abs(r2.top - r2.bottom)
        S = SA + SB - SI

        ratio = float(SI) / float(S)

        if ratio > mergeThresh:
            return 1
    return 0


def softmax(a, b):
    a0 = math.exp(a)
    a1 = math.exp(b)
    return a0/(a0 + a1)


def generateBoundingBox(featureMap, scale):
    boundingBox = []
    """We can calculate the stride from the architecture of the alexnet"""
    stride = 32
    """We need to get the boundingbox whose size is 227 * 227. When we trained the alexnet,
    we also resize the size of the input image to 227 * 227 in caffe"""
    cellSize = 227

    for x in range(featureMap.shape[0]):
        for y in range(featureMap.shape[1]):
            if featureMap[x][y] > 0.8:
                boundingBox.append(
                    [float(stride * y) / scale, float(stride * x) / scale, float(stride * y + cellSize - 1) / scale,
                     float(stride * x + cellSize - 1) / scale, featureMap[x][y]])

    return boundingBox


def nms_average(boxes, groupThresh = 2, overlapThresh=0.2):
    rects = []

    for i in range(len(boxes)):
        if boxes[i][4] > 0.2:
            """The box in here, we record the left-bottom coordinates(y, x) and the height and width"""
            rects.append([boxes[i, 0], boxes[i, 1], boxes[i, 2] - boxes[i, 0], boxes[i, 3] - boxes[i, 1]])

    rects, weights = cv2.groupRectangles(rects, groupThresh, overlapThresh)

    rectangles = []
    for i in range(len(rects)):
        testRect = Rect(Point(rects[i, 0], rects[i, 1]), Point(rects[i, 0] + rects[i, 2], rects[i, 1] + rects[i, 3]))
        rectangles.append(testRect)
    clusters = []
    for rect in rectangles:
        matched = 0
        for cluster in clusters:
            if (rect_merge(rect, cluster, 0.2)):
                matched = 1
                cluster.left = (cluster.left + rect.left) / 2
                cluster.right = (cluster.right + rect.right) / 2
                cluster.bottom = (cluster.bottom + rect.bottom) / 2
                cluster.top = (cluster.top + rect.top) / 2
        if (not matched):
            clusters.append(rect)

    result_boxes = []
    for i in range(len(clusters)):
        result_boxes.append([clusters[i].left, clusters[i].bottom, clusters[i].right, clusters[i].top, 1])

    return result_boxes


def print_tensor_info(tensor):
    print("tensor name:", tensor.op.name, "-tensor shape:", tensor.get_shape().as_list())


def read_single_tfrecord(addr, _batch_size, shape):
    filename_queue = tf.train.string_input_producer([addr], shuffle=True)

    reader = tf.TFRecordReader()
    _, serialized_example = reader.read(filename_queue)

    features = tf.parse_single_example(serialized_example,
                                       features={
                                        'image': tf.FixedLenFeature([], tf.string),
                                        'label': tf.FixedLenFeature([], tf.int64)})
    img = tf.decode_raw(features['image'], tf.uint8)
    label = tf.cast(features['label'], tf.int32)
    img = tf.reshape(img, [shape, shape, 3])
    # img = augmentation(img)
    img=(tf.cast(img, tf.float32)-127.5)/128
    min_after_dequeue = 10000
    batch_size = _batch_size
    capacity = min_after_dequeue + 10 * batch_size
    image_batch, label_batch = tf.train.shuffle_batch([img, label],
                                                        batch_size=batch_size,
                                                        capacity=capacity,
                                                        min_after_dequeue=min_after_dequeue,
                                                        num_threads=4)

    label_batch = tf.reshape(label_batch, [batch_size])

    return image_batch, label_batch


def Network(images, is_train):
    # input 227*227
    with tf.variable_scope('vgg_16'):
        with slim.arg_scope([slim.conv2d, slim.max_pool2d], padding='VALID'):
            conv1 = slim.conv2d(images, 96, [11, 11], stride=[4, 4], scope='conv_1')  # 55*55*96
            print_tensor_info(conv1)
            pool1 = slim.max_pool2d(conv1, [3, 3], stride=[2, 2], scope='pool_1')  # 27*27*96
            print_tensor_info(pool1)

            conv2 = slim.conv2d(pool1, 256, [5, 5], stride=[1, 1], scope='conv_2')
            print_tensor_info(conv2)
            pool2 = slim.max_pool2d(conv2, [3, 3], stride=[2, 2], scope='pool_2')
            print_tensor_info(pool2)

            conv3 = slim.conv2d(pool2, 384, [3, 3], stride=[1, 1], scope='conv_3')
            print_tensor_info(conv3)

            conv4 = slim.conv2d(conv3, 384, [3, 3], stride=[1, 1], scope='conv_4')
            print_tensor_info(conv4)

            conv5 = slim.conv2d(conv4, 256, [3, 3], stride=[1, 1], scope='conv_5')
            pool5 = slim.max_pool2d(conv5, [3, 3], stride=[2, 2], scope='pool_5')
            print_tensor_info(pool5)

            conv6 = slim.conv2d(pool5, 256, [2, 2], stride=[1, 1], scope='conv_6')
            print_tensor_info(conv6)

            output = slim.conv2d(conv6, 2, [1, 1], stride=[1, 1], scope='output')
            print_tensor_info(output)
            if is_train:
                output = tf.reshape(output, [-1, 2])
                print_tensor_info(output)
            else:
                output = tf.squeeze(output, axis=0)
                output = tf.nn.softmax(output)
                print_tensor_info(output)
            return output


def train():
    image = tf.placeholder(tf.float32, [batch_size, img_size, img_size, 3], name='image')
    label = tf.placeholder(tf.int32, [batch_size], name='label')
    logit = Network(image, True)  # [batch, 2]
    train_loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logit, labels=label))

    train_images, train_labels = read_single_tfrecord(addr, batch_size, img_size)

    with tf.name_scope('loss'):
        tf.summary.scalar('train_loss', train_loss)

    global_step = tf.Variable(name='global_step', initial_value=0, trainable=False)
    inc_op = tf.assign_add(global_step, 1, name='increment_global_step')

    scale = int(128.0/batch_size)
    _lr_steps = [scale*s for s in lr_steps]
    _lr_values = [v/scale for v in lr_values]
    lr = tf.train.piecewise_constant(global_step, boundaries=_lr_steps, values=_lr_values, name='lr_schedule')

    update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
    with tf.control_dependencies(update_ops):
        train_op = tf.train.MomentumOptimizer(learning_rate=lr, momentum=0.9).minimize(train_loss)

    with tf.name_scope('accuracy'):
        # label = tf.one_hot(label, 2)
        train_accuracy = tf.reduce_mean(
            tf.cast(tf.equal(tf.to_int32(tf.argmax(tf.nn.softmax(logit), axis=1)), label), tf.float32))
        tf.summary.scalar('train_accuracy', train_accuracy)

    saver = tf.train.Saver(max_to_keep=5)
    merged = tf.summary.merge_all()

    with tf.Session() as sess:
        sess.run((tf.global_variables_initializer(),
                  tf.local_variables_initializer()))
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)

        saver.restore(sess, '/home/sxj/DL/insightface/alex_model/Alex-40000')

        writer_train = tf.summary.FileWriter("/home/sxj/DL/insightface/alex_model/%s" % (model_name), sess.graph)

        try:

            for i in range(1, train_step):
                image_batch, label_batch = sess.run([train_images, train_labels])
                sess.run([train_op, inc_op], feed_dict={image: image_batch, label: label_batch})
                if (i % 100 == 0):
                    summary = sess.run(merged, feed_dict={image: image_batch, label: label_batch})
                    writer_train.add_summary(summary, i)
                if (i % 1000 == 0):
                    print('次数', i)
                    print('train_accuracy',
                          sess.run(train_accuracy, feed_dict={image: image_batch, label: label_batch}))
                    print('train_loss', sess.run(train_loss,
                                                 {image: image_batch, label: label_batch}))
                    if (i % 10000 == 0):
                        saver.save(sess, os.path.join(model_path, model_name), global_step=i)

        except tf.errors.OutOfRangeError:
            print("finished")
        finally:
            coord.request_stop()
            writer_train.close()



def face_detection(imgFlie):
    scales = []
    factor = 0.793700526

    img = cv2.imread(imgFlie)

    # image = tf.placeholder(tf.float32, name='image')
    largest = min(2, 4000 / max(img.shape[0:2]))
    scale = largest
    minD = largest * min(img.shape[0:2])
    while minD >= 227:
        scales.append(scale)
        scale *= factor
        minD *= factor
    total_boxes = []

    for scale in scales:
        print("scales")
        print(scale)
        image = tf.placeholder(tf.float32, name='image')
        scale_img = cv2.resize(img, (int(img.shape[0] * scale), int(img.shape[1] * scale)))

        image_reshape = tf.reshape(image, [1, scale_img.shape[0], scale_img.shape[1], 3])
        logit = Network(image_reshape, False)
        with tf.Session() as sess:
            sess.run((tf.global_variables_initializer(),
                      tf.local_variables_initializer()))

            saver = tf.train.Saver()
            saver.restore(sess, "/home/sxj/DL/insightface/alex_model/Alex-50000")
            input = sess.run(logit, feed_dict={image: scale_img})

            boxes = generateBoundingBox(input[:, :, 0], scale)

            if boxes:
                total_boxes.extend(boxes)

        tf.reset_default_graph()
    print(total_boxes)

    boxes_nms = np.array(total_boxes)
    true_boxes = nms_average(boxes_nms, 1, 0.2)

    if not true_boxes == []:
        (x1, y1, x2, y2) = true_boxes[0][:-1]
        cv2.rectangle(img, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0))
        plt.imshow(img)
        plt.show()



if __name__ == "__main__":
    train()
    # img = "/home/sxj/DL/face_detect/tmp9055.jpg"
    # face_detection(img)
View Code

训练过程如下:

测试结果:

tensorflow的实现代码的可能还有缺陷,实际跑出来效果不太好,还在优化中,欢迎各位大佬提出意见。

注:本人正在学习AI相关知识,本例只是通过视频学习加上自己动手操作实现人脸检测功能,仅供自己学习记录。

原文地址:https://www.cnblogs.com/xjlearningAI/p/11067200.html