NLP采用Bert进行简单文本情感分类

参照当Bert遇上Kerashttps://spaces.ac.cn/archives/6736此示例准确率达到95.5%+
https://github.com/CyberZHG/keras-bert/blob/master/README.zh-CN.md

示例实现

# ! -*- coding:utf-8 -*-

import json
import numpy as np
import pandas as pd
from random import choice
from keras_bert import load_trained_model_from_checkpoint, Tokenizer
import codecs

maxlen = 100
config_path = 'model/bert_config.json'
checkpoint_path = 'model/bert_model.ckpt'
dict_path = 'model/vocab.txt'

token_dict = {}

with codecs.open(dict_path, 'r', 'utf8') as reader:
    for line in reader:
        token = line.strip()
        token_dict[token] = len(token_dict)


class OurTokenizer(Tokenizer):

    def __init__(self, token_dict):
        super(OurTokenizer, self).__init__(token_dict)

    def _tokenize(self, text):
        R = []
        for c in text:
            if c in self._token_dict:
                R.append(c)
            elif self._is_space(c):
                R.append('[unused1]')  # space类用未经训练的[unused1]表示
            else:
                R.append('[UNK]')  # 剩余的字符是[UNK]
        return R


tokenizer = OurTokenizer(token_dict)

neg = pd.read_excel('neg.xls', header=None)
pos = pd.read_excel('pos.xls', header=None)

data = []

for d in neg[0]:
    data.append((d, 0))

for d in pos[0]:
    data.append((d, 1))

# 按照9:1的比例划分训练集和验证集
random_order = list(range(len(data)))
np.random.shuffle(random_order)
train_data = [data[j] for i, j in enumerate(random_order) if i % 10 != 0]
valid_data = [data[j] for i, j in enumerate(random_order) if i % 10 == 0]


def seq_padding(X, padding=0):
    L = [len(x) for x in X]
    ML = max(L)
    return np.array([
        np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X
    ])


class data_generator:

    def __init__(self, data, batch_size=32):
        self.data = data
        self.batch_size = batch_size
        self.steps = len(self.data) // self.batch_size
        if len(self.data) % self.batch_size != 0:
            self.steps += 1

    def __len__(self):
        return self.steps

    def __iter__(self):
        while True:
            idxs = list(range(len(self.data)))
            np.random.shuffle(idxs)
            X1, X2, Y = [], [], []
            for i in idxs:
                d = self.data[i]
                text = d[0][:maxlen]
                x1, x2 = tokenizer.encode(first=text)
                y = d[1]
                X1.append(x1)
                X2.append(x2)
                Y.append([y])
                if len(X1) == self.batch_size or i == idxs[-1]:
                    X1 = seq_padding(X1)
                    X2 = seq_padding(X2)
                    Y = seq_padding(Y)
                    yield [X1, X2], Y
                    [X1, X2, Y] = [], [], []


from keras.layers import *
from keras.models import Model
from keras.optimizers import Adam

bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, seq_len=None)

for l in bert_model.layers:
    l.trainable = False

x1_in = Input(shape=(None,))
x2_in = Input(shape=(None,))

x = bert_model([x1_in, x2_in])

x = Lambda(lambda x: x[:, 0])(x)
p = Dense(1, activation='sigmoid')(x)

model = Model([x1_in, x2_in], p)
model.compile(
    loss='binary_crossentropy',
    optimizer=Adam(1e-5),  # 用足够小的学习率
    metrics=['accuracy']
)
model.summary()

train_D = data_generator(train_data)
valid_D = data_generator(valid_data)

test = [train_data[0]]
test_D = data_generator(test)

model.fit_generator(
    train_D.__iter__(),
    steps_per_epoch=len(train_D),
    epochs=1,
    validation_data=valid_D.__iter__(),
    validation_steps=len(valid_D)
)
 
#保存模型权重值
model.save('model.h5')

原示例存在的问题

模型在保持完之后再进行加载时提示存在自定义层和激活方法的问题,暂没找到解决办法,如有知道办法的小伙伴请留言私信

问题解决

# ! -*- coding:utf-8 -*-

import json
import numpy as np
import pandas as pd
from random import choice
from keras_bert import load_trained_model_from_checkpoint, Tokenizer, get_custom_objects
import re, os
import codecs
from keras.models import load_model

maxlen = 100
config_path = 'model/bert_config.json'
checkpoint_path = 'model/bert_model.ckpt'
dict_path = 'model/vocab.txt'

token_dict = {}

with codecs.open(dict_path, 'r', 'utf8') as reader:
    for line in reader:
        token = line.strip()
        token_dict[token] = len(token_dict)


class OurTokenizer(Tokenizer):

    def __init__(self, token_dict):
        super(OurTokenizer, self).__init__(token_dict)

    def _tokenize(self, text):
        R = []
        for c in text:
            if c in self._token_dict:
                R.append(c)
            elif self._is_space(c):
                R.append('[unused1]')  # space类用未经训练的[unused1]表示
            else:
                R.append('[UNK]')  # 剩余的字符是[UNK]
        return R


tokenizer = OurTokenizer(token_dict)

neg = pd.read_excel('neg.xls', header=None)
pos = pd.read_excel('pos.xls', header=None)

data = []

for d in neg[0]:
    data.append((d, 0))

for d in pos[0]:
    data.append((d, 1))

# 按照9:1的比例划分训练集和验证集
random_order = list(range(len(data)))
np.random.shuffle(random_order)
train_data = [data[j] for i, j in enumerate(random_order) if i % 10 != 0]
valid_data = [data[j] for i, j in enumerate(random_order) if i % 10 == 0]


def seq_padding(X, padding=0):
    L = [len(x) for x in X]
    ML = max(L)
    return np.array([
        np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X
    ])


class data_generator:

    def __init__(self, data, batch_size=32):
        self.data = data
        self.batch_size = batch_size
        self.steps = len(self.data) // self.batch_size
        if len(self.data) % self.batch_size != 0:
            self.steps += 1

    def __len__(self):
        return self.steps

    def __iter__(self):
        while True:
            idxs = list(range(len(self.data)))
            np.random.shuffle(idxs)
            X1, X2, Y = [], [], []
            for i in idxs:
                d = self.data[i]
                text = d[0][:maxlen]
                x1, x2 = tokenizer.encode(first=text)
                y = d[1]
                X1.append(x1)
                X2.append(x2)
                Y.append([y])
                if len(X1) == self.batch_size or i == idxs[-1]:
                    X1 = seq_padding(X1)
                    X2 = seq_padding(X2)
                    Y = seq_padding(Y)
                    yield [X1, X2], Y
                    [X1, X2, Y] = [], [], []


from keras.layers import *
from keras.models import Model
import keras.backend as K
from keras.optimizers import Adam

bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, seq_len=None)

for l in bert_model.layers:
    l.trainable = False

x1_in = Input(shape=(None,))
x2_in = Input(shape=(None,))

x = bert_model([x1_in, x2_in])

print(bert_model.layers)

x = Lambda(lambda x: x[:, 0])(x)
p = Dense(1, activation='sigmoid')(x)

model = Model([x1_in, x2_in], p)
model.compile(
    loss='binary_crossentropy',
    optimizer=Adam(1e-5),  # 用足够小的学习率
    metrics=['accuracy']
)
model.summary()
train_D = data_generator(train_data)
valid_D = data_generator(valid_data)

'''
model.fit_generator(
    train_D.__iter__(),
    steps_per_epoch=len(train_D),
    epochs=5,
    validation_data=valid_D.__iter__(),
    validation_steps=len(valid_D)
)

model.save('save_path.h5')
'''


# 定义生成器将数据集解析为
class data_token_generator:

    def __init__(self, data, batch_size=32):
        self.data = data
        self.batch_size = batch_size
        self.steps = len(self.data)  # self.batch_size
        if len(self.data) % self.batch_size != 0:
            self.steps += 1
            
    def __len__(self):
        return self.steps

    def get_data(self):
        idxs = list(range(len(self.data)))
        np.random.shuffle(idxs)
        X1, X2, Y = [], [], []
        for i in idxs:
            d = self.data[i]
            text = d[0][:maxlen]
            print(text)
            x1, x2 = tokenizer.encode(first=text)
            y = d[1]
            X1.append(x1)
            X2.append(x2)
            Y.append([y])
        X1 = seq_padding(X1)
        X2 = seq_padding(X2)
        Y = seq_padding(Y)
        return X1, X2, Y


new_model = load_model('save_path.h5', custom_objects=get_custom_objects())
test_T = data_token_generator(valid_data[0:10])
X_test1, X_test2, Y_test = test_T.get_data()
print(Y_test)
print(new_model.predict([X_test1, X_test2]))

我的实现

# ! -*- coding:utf-8 -*-
import numpy as np
import pandas as pd
from random import choice
from keras_bert import load_trained_model_from_checkpoint, Tokenizer, get_checkpoint_paths
import codecs
from keras.layers import *
from keras.models import Model
from keras.optimizers import Adam

# 评价文本最大长度
maxlen = 100
dict_path = 'model/vocab.txt'
token_dict = {}
EPOCHS = 30
BATCH_SIZE = 128

# 初始化令牌字典
with codecs.open(dict_path, 'r', 'utf8') as reader:
    for line in reader:
        token = line.strip()
        # print(token, len(token_dict))
        token_dict[token] = len(token_dict)


# 定义令牌解析器
class OurTokenizer(Tokenizer):

    def _tokenize(self, text):
        R = []
        for c in text:
            if c in self._token_dict:
                R.append(c)
            elif self._is_space(c):
                R.append('[unused1]')  # space类用未经训练的[unused1]表示
            else:
                R.append('[UNK]')  # 剩余的字符是[UNK]
        return R


# 初始化令牌解析器
tokenizer = OurTokenizer(token_dict)

# 读取数据集
neg = pd.read_excel('neg.xls', header=None)
pos = pd.read_excel('pos.xls', header=None)

data = []

for d in neg[0]:
    data.append((d, 0))

for d in pos[0]:
    data.append((d, 1))

# 按照9:1的比例划分训练集和验证集
random_order = list(range(len(data)))
np.random.shuffle(random_order)
train_data = [data[j] for i, j in enumerate(random_order) if i % 10 != 0]
valid_data = [data[j] for i, j in enumerate(random_order) if i % 10 == 0]


# 令牌序列长度补全
def seq_padding(X, padding=0):
    L = [len(x) for x in X]
    ML = max(L)
    t = [
        np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X
    ]
    return  t


# 定义生成器将数据集解析为
class data_token_generator:

    def __init__(self, data, batch_size=32, print_text=False):
        self.data = data
        self.batch_size = batch_size
        self.steps = len(self.data)  # self.batch_size
        self.print_text = print_text
        if len(self.data) % self.batch_size != 0:
            self.steps += 1
            
        # bert中文模型路径
        paths = get_checkpoint_paths('model')
        # bert中文模型加载
        self.bert_model = load_trained_model_from_checkpoint(paths.config, paths.checkpoint, seq_len=None)

        for l in self.bert_model.layers:
            l.trainable = True    
            
    def __len__(self):
        return self.steps

    def get_data(self):
        data_x = []
        data_y = []
        idxs = list(range(len(self.data)))
        # 随机
        np.random.shuffle(idxs)
        indices, segments, Y = [], [], []
        for i in idxs:
            d = self.data[i]
            # 截取数据
            text = d[0][:maxlen]
            if self.print_text:
                print(text)
            # 生成指标及段
            indice, segment = tokenizer.encode(first=text)
            y = d[1]
            # 数据放入数组中
            indices.append(indice)
            segments.append(segment)
            Y.append([y])
            # 转化成批次
            if len(indices) == self.batch_size or i == idxs[-1]:
                indices = seq_padding(indices)
                segments = seq_padding(segments)
                Y = seq_padding(Y)
                # 产生词向量
                x = self.bert_model.predict([np.array(indices), np.array(segments)])
                
                j_idxs = list(range(len(x)))
                for j in j_idxs:
                    data_x.append(x[j])
                    data_y.append(Y[j])
                
                print(len(data_y))
                [indices, segments, Y] = [], [], []

        return np.array(data_x), np.array(data_y)

    
# 定义二分类网络
x_in = Input(shape=(None, 768))
x = Lambda(lambda x: x[:, 0])(x_in)
p = Dense(1, activation='sigmoid')(x)

model = Model(x_in, p)
model.compile(
    loss='binary_crossentropy',
    optimizer=Adam(1e-5),  # 用足够小的学习率
    metrics=['accuracy']
)
# 打印模型结构
model.summary()

# 开始训练
print('Training -----------')

train_T = data_token_generator(train_data)
train_x, train_y = train_T.get_data()
valid_T = data_token_generator(valid_data)
validation_data = valid_T.get_data()
model.fit(
    train_x,
    train_y,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    validation_data=validation_data
)
 
model.save('new_model.h5')    

# 加载模型验证
import keras

test_T = data_token_generator(valid_data[0:10], print_text=True)
X_test, Y_test = test_T.get_data()
print(Y_test)
new_model = keras.models.load_model('new_model.h5')
y = new_model.predict(X_test)
print(y)

采用哈工大版权重,准确率在80%左右

相关依赖

中文版权重

原文地址:https://www.cnblogs.com/gmhappy/p/11863936.html