一.模型结构
实现一个简单的孪生网络(结构如上图),做语义相似度:
1.从上图可看出整体的结构相对较简单,左右两边基本一致。A句和B句分别进入左右两个结构。输入到网络中是token embedding + position_embedding
2.再经过cnn-encoder进行编码
3.多头注意力层,self-attention的输入:一个是本句cnn-encoder的输出;一个是另一句的cnn-encoder的输出。作为两句的交互层
4.将cnn-encoder的输出和self-attention的输出进行cat连接
5.接一个fc层
6.一个平均池化层
7.最后是用cosine余弦作相似度匹配计算
cnn-encoder结构如下:
二.程序:(完整项目见:https://github.com/jiangnanboy/semantic_matching/tree/master/model1)
# 搭建模型 class Encoder(nn.Module): def __init__(self, input_dim, hid_dim, n_layers, kernel_size, dropout, max_length=30): super(Encoder, self).__init__() #for kernel in kernel_size: assert kernel_size % 2 == 1,'kernel size must be odd!' # 卷积核size为奇数,方便序列两边pad处理 self.scale = torch.sqrt(torch.FloatTensor([0.5])).to(DEVICE) # 确保整个网络的方差不会发生显著变化 self.tok_embedding = nn.Embedding(input_dim, hid_dim) # token编码 self.pos_embedding = nn.Embedding(max_length, hid_dim) # token的位置编码 #self.emb2hid = nn.Linear(emb_dim, hid_dim) # 线性层,从emb_dim转为hid_dim #self.hid2emb = nn.Linear(hid_dim, emb_dim) # 线性层,从hid_dim转为emb_dim # 卷积块 self.convs = nn.ModuleList([nn.Conv1d(in_channels=hid_dim, out_channels=2*hid_dim, # 卷积后输出的维度,这里2*hid_dim是为了后面的glu激活函数 kernel_size=kernel_size, padding=(kernel_size - 1)//2) # 序列两边补0个数,保持维度不变 for _ in range(n_layers)]) ''' 利用不同size的卷积核进行特征提取 self.conv_1 = nn.ModuleList([nn.Conv1d(in_channels=hid_dim, out_channels=2*hid_dim, # 卷积后输出的维度,这里2*hid_dim是为了后面的glu激活函数 kernel_size=kernel_size[0], padding=(kernel_size[0] - 1)//2) # 序列两边补0个数,保持维度不变 for _ in range(n_layers)]) self.conv_2 = nn.ModuleList([nn.Conv1d(in_channels=hid_dim, out_channels=2*hid_dim, # 卷积后输出的维度,这里2*hid_dim是为了后面的glu激活函数 kernel_size=kernel_size[1], padding=(kernel_size[1] - 1)//2) # 序列两边补0个数,保持维度不变 for _ in range(n_layers)]) self.conv_3 = nn.ModuleList([nn.Conv1d(in_channels=hid_dim, out_channels=2*hid_dim, # 卷积后输出的维度,这里2*hid_dim是为了后面的glu激活函数 kernel_size=kernel_size[2], padding=(kernel_size[2] - 1)//2) # 序列两边补0个数,保持维度不变 for _ in range(n_layers)]) # 几个卷积模块转换维度 self.convhid2hid = nn.Linear(len(kernel_size) * hid_dim, hid_dim) ''' self.dropout = nn.Dropout(dropout) def forward(self, src): # src: [batch_size, src_len] batch_size = src.shape[0] src_len = src.shape[1] # 创建token位置信息 pos = torch.arange(src_len).unsqueeze(0).repeat(batch_size, 1).to(DEVICE) # [batch_size, src_len] # 对token与其位置进行编码 tok_embedded = self.tok_embedding(src) # [batch_size, src_len, emb_dim] pos_embedded = self.pos_embedding(pos.long()) # [batch_size, src_len, emb_dim] # 对token embedded和pos_embedded逐元素加和 embedded = self.dropout(tok_embedded + pos_embedded) # [batch_size, src_len, emb_dim] # embedded经过一线性层,将emb_dim转为hid_dim,作为卷积块的输入 #conv_input = self.emb2hid(embedded) # [batch_size, src_len, hid_dim] # 转变维度,卷积在输入数据的最后一维进行 conv_input = embedded.permute(0, 2, 1) # [batch_size, hid_dim, src_len] # 以下进行卷积块 for i, conv in enumerate(self.convs): # 进行卷积 conved = conv(self.dropout(conv_input)) # [batch_size, 2*hid_dim, src_len] # 进行激活glu conved = F.glu(conved, dim=1) # [batch_size, hid_dim, src_len] # 进行残差连接 conved = (conved + conv_input) * self.scale # [batch_size, hid_dim, src_len] # 作为下一个卷积块的输入 conv_input = conved # 经过一线性层,将hid_dim转为emb_dim,作为enocder的卷积输出的特征 #conved = self.hid2emb(conved.permute(0, 2, 1)) # [batch_size, src_len, emb_dim] ''' 利用不同size的卷积核进行特征提取 # 第一个kernel_size conved_input = conv_input for i, conv in enumerate(self.conv_1): # 进行卷积 conved1 = conv(self.dropout(conved_input)) # [batch_size, 2*hid_dim, src_len] # 进行激活glu conved1 = F.glu(conved1, dim=1) # [batch_size, hid_dim, src_len] # 进行残差连接 conved1 = (conved1 + conved_input) * self.scale # [batch_size, hid_dim, src_len] # 作为下一个卷积块的输入 conved_input = conved1 combine_conv_module = conved1 # 第二个kernel_size conved_input = conv_input for i, conv in enumerate(self.conv_2): # 进行卷积 conved2 = conv(self.dropout(conved_input)) # [batch_size, 2*hid_dim, src_len] # 进行激活glu conved2 = F.glu(conved2, dim=1) # [batch_size, hid_dim, src_len] # 进行残差连接 conved2 = (conved2 + conved_input) * self.scale # [batch_size, hid_dim, src_len] # 作为下一个卷积块的输入 conved_input = conved2 combine_conv_module = torch.cat([combine_conv_module, conved2], dim = 1) # 第三个kernel_size conved_input = conv_input for i, conv in enumerate(self.conv_3): # 进行卷积 conved3 = conv(self.dropout(conved_input)) # [batch_size, 2*hid_dim, src_len] # 进行激活glu conved3 = F.glu(conved3, dim=1) # [batch_size, hid_dim, src_len] # 进行残差连接 conved3 = (conved3 + conved_input) * self.scale # [batch_size, hid_dim, src_len] # 作为下一个卷积块的输入 conved_input = conved3 combine_conv_module = torch.cat([combine_conv_module, conved3], dim = 1) conved = self.convhid2hid(combine_conv_module.permute(0, 2, 1)) # [batch_size, src_len, hid_dim] ''' # 又是一个残差连接,逐元素加和输出,作为encoder的联合输出特征 combined = (conved.permute(0, 2, 1) + embedded) * self.scale # [batch_size, src_len, emb_dim] return conved, combined ''' 多头注意力multi-head attention ''' class MultiHeadAttentionLayer(nn.Module): def __init__(self, hid_dim, n_heads, dropout): super(MultiHeadAttentionLayer, self).__init__() assert hid_dim % n_heads == 0 self.hid_dim = hid_dim self.n_heads = n_heads self.head_dim = hid_dim // n_heads self.fc_q = nn.Linear(hid_dim, hid_dim) self.fc_k = nn.Linear(hid_dim, hid_dim) self.fc_v = nn.Linear(hid_dim, hid_dim) self.fc_o = nn.Linear(hid_dim, hid_dim) self.dropout = nn.Dropout(dropout) self.scale = torch.sqrt(torch.FloatTensor([self.hid_dim])).to(DEVICE) # 缩放因子 def forward(self, query, key, value, mask=None): ''' query: [batch_size, query_len, hid_dim] key: [batch_size, key_len, hid_dim] value: [batch_size, value_len, hid_dim] ''' batch_size = query.shape[0] Q = self.fc_q(query) # [batch_size, query_len, hid_dim] K = self.fc_k(key) # [batch_size, key_len, hid_dim] V = self.fc_v(value) # [batch_size, value_len, hid_dim] Q = Q.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3) # [batch_size, n_heads, query_len, head_dim] K = K.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3) # [batch_size, n_heads, key_len, head_dim] V = V.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3) # [batch_size, n_heads, value_len, head_dim] # [batch_size, n_heads, query_len, head_dim] * [batch_size, n_heads, head_dim, key_len] energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale # [batch_size, n_heads, query_len, key_len] if mask != None: energy = energy.masked_fill(mask == 0, -1e10) attention = torch.softmax(energy, dim=-1) # [batch_size, n_heads, query_len, key_len] # [batch_size, n_heads, query_len, key_len] * [batch_size, n_heads, value_len, head_dim] x = torch.matmul(self.dropout(attention), V) # [batch_size, n_heads, query_len, head_dim] x = x.permute(0, 2, 1, 3).contiguous() # [batch_size, query_len, n_heads, head_dim] x = x.view(batch_size, -1, self.hid_dim) # [batch_size, query_len, hid_dim] x = self.fc_o(x) # [batch_size, query_len, hid_dim] return x, attention class SiameseNetwork(nn.Module): def __init__(self, EncoderA, hid_dim, n_heads, dropout): super(SiameseNetwork, self).__init__() self.EncoderA = EncoderA #self.EncoderB = EncoderB #self.dropout = nn.Dropout(dropout) # 多头 self.self_attention = MultiHeadAttentionLayer(hid_dim, n_heads, dropout) self.fcA = nn.Linear(2 * hid_dim, hid_dim) self.fcB = nn.Linear(2 * hid_dim, hid_dim) self.fc_out = nn.Linear(5 * hid_dim, 2) def calculate_attention(self, convedA, convedB): ''' convedA:[batch_size, len, hid_dim] convedB:[batch_size, len, hid_dim] ''' energy = torch.matmul(convedA, convedB.permute(0, 2, 1)) # [batch_size, trg_len, src_len] attention = F.softmax(energy, dim=2) # [batch_size, trg_len, src_len] attention_encoding = torch.matmul(attention, convedB) # [batch_size, trg_len, hid_dim] return attention, attention_encoding def forward(self, sentA, sentB): convedA, combinedA = self.EncoderA(sentA) convedB, combinedB = self.EncoderA(sentB) # 普通attention #attentionA, attended_encodingA = self.calculate_attention(combinedB, combinedA) #attentionB, attended_encodingB = self.calculate_attention(combinedA, combinedB) # 多头attention,来自transformer模型中 self_attentionA, attentionA = self.self_attention(combinedB, combinedA, combinedA) self_attentionB, attentionB = self.self_attention(combinedA, combinedB, combinedB) combinedA = torch.cat([self_attentionA, combinedA], dim=2) # [batch_size, len, 2 * hid_dim] combinedB = torch.cat([self_attentionB, combinedB], dim=2) # [batch_size, len, 2 * hid_dim] combinedA = self.fcA(combinedA) # [batch_size, len, hid_dim] combinedB = self.fcB(combinedB) # [batch_size, len, hid_dim] combinedA = F.avg_pool1d(combinedA.permute(0, 2, 1), combinedA.shape[1]).squeeze(2) # [batch_size, emb_dim] combinedB = F.avg_pool1d(combinedB.permute(0, 2, 1), combinedB.shape[1]).squeeze(2) # [batch_size, emb_dim] similarity = torch.cosine_similarity(combinedA, combinedB, dim=1) # 直接计算和学习相似度 # 以下是做二分类 # [p, q, p+q, p-q, p*q] #fc_out = self.fc_out(torch.cat([combinedA, combinedB, combinedA+combinedB, combinedA-combinedB, combinedA*combinedB], dim=1)) # 【batch_size, 2】 return similarity