Transformer
1. 数据准备
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as Data
# Encoder_input Decoder_input Decoder_output
sentences = [['我 是 学 生 P', 'S I am a student', 'I am a student E'], # S: 开始符号
['我 喜 欢 学 习', 'S I like learning P', 'I like learning P E'], # E: 结束符号
['我 是 男 生 P', 'S I am a boy', 'I am a boy E']] # P: 占位符号,如果当前句子不足固定长度用P占位
src_vocab = {'P': 0, '我': 1, '是': 2, '学': 3, '生': 4, '喜': 5, '欢': 6, '习': 7, '男': 8} # 词源字典 字:索引
src_idx2word = {src_vocab[key]: key for key in src_vocab}
src_vocab_size = len(src_vocab) # 字典字的个数
tgt_vocab = {'S': 0, 'E': 1, 'P': 2, 'I': 3, 'am': 4, 'a': 5, 'student': 6, 'like': 7, 'learning': 8, 'boy': 9}
idx2word = {tgt_vocab[key]: key for key in tgt_vocab} # 把目标字典转换成 索引:字的形式
tgt_vocab_size = len(tgt_vocab) # 目标字典尺寸
src_len = len(sentences[0][0].split(" ")) # Encoder输入的最大长度
tgt_len = len(sentences[0][1].split(" ")) # Decoder输入输出最大长度
# 把sentences 转换成字典索引
def make_data(sentences):
enc_inputs, dec_inputs, dec_outputs = [], [], []
for i in range(len(sentences)):
enc_input = [[src_vocab[n] for n in sentences[i][0].split()]]
dec_input = [[tgt_vocab[n] for n in sentences[i][1].split()]]
dec_output = [[tgt_vocab[n] for n in sentences[i][2].split()]]
enc_inputs.extend(enc_input)
dec_inputs.extend(dec_input)
dec_outputs.extend(dec_output)
return torch.LongTensor(enc_inputs), torch.LongTensor(dec_inputs), torch.LongTensor(dec_outputs)
enc_inputs, dec_inputs, dec_outputs = make_data(sentences)
# print(enc_inputs, enc_inputs.shape)
# 自定义数据集函数
class MyDataSet(Data.Dataset):
def __init__(self, enc_inputs, dec_inputs, dec_outputs):
super(MyDataSet, self).__init__()
self.enc_inputs = enc_inputs
self.dec_inputs = dec_inputs
self.dec_outputs = dec_outputs
def __len__(self):
return self.enc_inputs.shape[0]
def __getitem__(self, idx):
return self.enc_inputs[idx], self.dec_inputs[idx], self.dec_outputs[idx]
loader = Data.DataLoader(MyDataSet(enc_inputs, dec_inputs, dec_outputs), 2, True)
# 参数设置
d_model = 512 # 字 Embedding 的维度
d_ff = 2048 # 前向传播隐藏层维度 d_ff = d_model * 4
d_k = d_v = 64 # K(=Q), V的维度
n_layers = 6 # 有多少个encoder和decoder
n_heads = 8 # Multi-Head Attention设置为8 d_k * n_heads = d_model
sentences 里一共有三个训练数据,中文->英文。把Encoder_input、Decoder_input、Decoder_output转换成字典索引,例如”学”->3、”student”->6。再把数据转换成batch大小为2的分组数据,3句话一共可以分成两组,一组2句话、一组1句话。src_len表示中文句子固定最大长度,tgt_len 表示英文句子固定最大长度。
2. 定义位置信息
# Positional Embedding
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
# pos_table 第一个维度对应第几个词 pos,第二个维度对应每个词的词嵌入维度 i,[max_len, d_model]
pos_table = np.array([
[pos / np.power(10000, 2 * i / d_model) for i in range(d_model)] # i 针对词嵌入的维度
if pos != 0 else np.zeros(d_model) for pos in range(max_len)]) # pos 针对第几个词
pos_table[1:, 0::2] = np.sin(pos_table[1:, 0::2]) # 字嵌入维度为偶数时
pos_table[1:, 1::2] = np.cos(pos_table[1:, 1::2]) # 字嵌入维度为奇数时
self.pos_table = torch.FloatTensor(pos_table).cuda() # enc_inputs: [seq_len, d_model]
def forward(self, enc_inputs): # enc_inputs: [batch_size, seq_len, d_model]
enc_inputs += self.pos_table[:enc_inputs.size(1), :] # 注意,tensor的size(n)表示第n的维度大小,与numpy的array.size不一样,np只是个数值
return self.dropout(enc_inputs.cuda())
生成位置信息矩阵pos_table,直接加上输入的enc_inputs上,得到带有位置信息的字向量,pos_table是一个固定值的矩阵。这里矩阵加法利用到了广播机制.
3. Mask掉停用词
# Mask掉停用词
def get_attn_pad_mask(seq_q, seq_k): # seq_q: [batch_size, seq_len] ,seq_k: [batch_size, seq_len]
batch_size, len_q = seq_q.size()
batch_size, len_k = seq_k.size()
pad_attn_mask = seq_k.data.eq(0).unsqueeze(1) # 判断 输入那些含有P(=0),用1标记 ,[batch_size, 1, len_k]
return pad_attn_mask.expand(batch_size, len_q, len_k) # 扩展成多维度, [batch_size, len_q, len_k]
# Decoder 输入 Mask
def get_attn_subsequence_mask(seq): # seq: [batch_size, tgt_len]
attn_shape = [seq.size(0), seq.size(1), seq.size(1)]
subsequence_mask = np.triu(np.ones(attn_shape), k=1) # 生成上三角矩阵,[batch_size, tgt_len, tgt_len], 数值为1
subsequence_mask = torch.from_numpy(subsequence_mask).byte() # [batch_size, tgt_len, tgt_len]
return subsequence_mask
Mask句子中没有实际意义的占位符,例如’我 是 学 生 P’ ,P对应句子没有实际意义,所以需要被Mask,Encoder_input 和Decoder_input占位符都需要被Mask。
用来Mask未来输入信息,返回的是一个上三角矩阵。比如我们在中英文翻译时候,会先把”我是学生”整个句子输入到Encoder中,得到最后一层的输出后,才会在Decoder输入“S I am a student”(s表示开始),但是“S I am a student”这个句子我们不会一起输入,而是在T0时刻先输入“S”预测,预测第一个词“I”;在下一个T1时刻,同时输入“S”和“I”到Decoder预测下一个单词“am”;然后在T2时刻把“S,I,am”同时输入到Decoder预测下一个单词“a”,依次把整个句子输入到Decoder,预测出“I am a student E”。
4. 计算注意力信息、残差和归一化、前馈神经网络
# 计算注意力信息、残差和归一化
class ScaledDotProductAttention(nn.Module):
def __init__(self):
super(ScaledDotProductAttention, self).__init__()
def forward(self, Q, K, V, attn_mask):
# Q: [batch_size, n_heads, len_q, d_k]
# K: [batch_size, n_heads, len_k, d_k]
# V: [batch_size, n_heads, len_v(=len_k), d_v]
# attn_mask: [batch_size, n_heads, seq_len, seq_len]
scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k) # scores : [batch_size, n_heads, len_q, len_k]
scores.masked_fill_(attn_mask, -1e9) # 如果时停用词P就等于 0
attn = nn.Softmax(dim=-1)(scores)
context = torch.matmul(attn, V) # [batch_size, n_heads, len_q, d_v]
return context, attn
class MultiHeadAttention(nn.Module):
def __init__(self):
super(MultiHeadAttention, self).__init__()
self.W_Q = nn.Linear(d_model, d_k * n_heads, bias=False)
self.W_K = nn.Linear(d_model, d_k * n_heads, bias=False)
self.W_V = nn.Linear(d_model, d_v * n_heads, bias=False)
self.fc = nn.Linear(n_heads * d_v, d_model, bias=False) # multi-head 最后concat后需要乘上一个W_O得到一个 总输出b
def forward(self, input_Q, input_K, input_V, attn_mask):
# input_Q: [batch_size, len_q, d_model]
# input_K: [batch_size, len_k, d_model]
# input_V: [batch_size, len_v(=len_k), d_model]
# attn_mask: [batch_size, seq_len, seq_len]
residual, batch_size = input_Q, input_Q.size(0)
# view()函数相当于reshape函数
Q = self.W_Q(input_Q).view(batch_size, -1, n_heads, d_k).transpose(1, 2) # Q: [batch_size, n_heads, len_q, d_k]
K = self.W_K(input_K).view(batch_size, -1, n_heads, d_k).transpose(1, 2) # K: [batch_size, n_heads, len_k, d_k]
V = self.W_V(input_V).view(batch_size, -1, n_heads, d_v).transpose(1, 2) # V: [batch_size, n_heads, len_v(=len_k), d_v]
attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1) # attn_mask : [batch_size, n_heads, seq_len, seq_len]
context, attn = ScaledDotProductAttention()(Q, K, V, attn_mask) # context: [batch_size, n_heads, len_q, d_v]
# attn: [batch_size, n_heads, len_q, len_k]
context = context.transpose(1, 2).reshape(batch_size, -1,
n_heads * d_v) # context: [batch_size, len_q, n_heads * d_v]
output = self.fc(context) # [batch_size, len_q, d_model]
return nn.LayerNorm(d_model).cuda()(output + residual), attn # ADD And Layer Norm
# 前馈神经网络
class PoswiseFeedForwardNet(nn.Module):
def __init__(self):
super(PoswiseFeedForwardNet, self).__init__()
self.fc = nn.Sequential(
nn.Linear(d_model, d_ff, bias=False),
nn.ReLU(),
nn.Linear(d_ff, d_model, bias=False))
def forward(self, inputs): # inputs: [batch_size, seq_len, d_model]
residual = inputs
output = self.fc(inputs)
return nn.LayerNorm(d_model).cuda()(output + residual) # [batch_size, seq_len, d_model]
计算注意力信息,W_Q, W_K, W_V矩阵拆分成8个小矩阵。注意传入的input_Q, input_K, input_V,在Encoder和Decoder的第一次调用传入的三个矩阵是相同的,但Decoder的第二次调用传入的三个矩阵input_Q 等于 input_K 不等于 input_V。
输入inputs ,经过两个全连接成,得到的结果再加上 inputs ,再做LayerNorm归一化。LayerNorm归一化可以理解层是把Batch中每一句话进行归一化。
5. Encoder
# 单个Encoder
class EncoderLayer(nn.Module):
def __init__(self):
super(EncoderLayer, self).__init__()
self.enc_self_attn = MultiHeadAttention() # 多头注意力机制
self.pos_ffn = PoswiseFeedForwardNet() # 前馈神经网络
def forward(self, enc_inputs, enc_self_attn_mask): # enc_inputs: [batch_size, src_len, d_model]
# 输入3个enc_inputs分别与W_q、W_k、W_v相乘得到Q、K、V # enc_self_attn_mask: [batch_size, src_len, src_len]
enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, # enc_outputs: [batch_size, src_len, d_model],
enc_self_attn_mask) # attn: [batch_size, n_heads, src_len, src_len]
enc_outputs = self.pos_ffn(enc_outputs) # enc_outputs: [batch_size, src_len, d_model]
return enc_outputs, attn
# 整个Encoder
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.src_emb = nn.Embedding(src_vocab_size, d_model) # 把字转换字向量
self.pos_emb = PositionalEncoding(d_model) # 加入位置信息
self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)])
def forward(self, enc_inputs): # enc_inputs: [batch_size, src_len]
enc_outputs = self.src_emb(enc_inputs) # enc_outputs: [batch_size, src_len, d_model]
''' token embedding 与 positional embedding如何混合? '''
enc_outputs = self.pos_emb(enc_outputs) # enc_outputs: [batch_size, src_len, d_model]
enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs) # enc_self_attn_mask: [batch_size, src_len, src_len]
enc_self_attns = []
for layer in self.layers:
enc_outputs, enc_self_attn = layer(enc_outputs, enc_self_attn_mask) # enc_outputs : [batch_size, src_len, d_model],
# enc_self_attn : [batch_size, n_heads, src_len, src_len]
enc_self_attns.append(enc_self_attn)
return enc_outputs, enc_self_attns
- 第一步,中文字索引进行Embedding,转换成512维度的字向量。第二步,在子向量上面加上位置信息。第三步,Mask掉句子中的占位符号。第四步,通过6层的encoder(上一层的输出作为下一层的输入)。
6. Decoder
# 单个decoder
class DecoderLayer(nn.Module):
def __init__(self):
super(DecoderLayer, self).__init__()
self.dec_self_attn = MultiHeadAttention()
self.dec_enc_attn = MultiHeadAttention()
self.pos_ffn = PoswiseFeedForwardNet()
def forward(self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask): # dec_inputs: [batch_size, tgt_len, d_model]
# enc_outputs: [batch_size, src_len, d_model]
# dec_self_attn_mask: [batch_size, tgt_len, tgt_len]
# dec_enc_attn_mask: [batch_size, tgt_len, src_len]
dec_outputs, dec_self_attn = self.dec_self_attn(dec_inputs, dec_inputs,
dec_inputs, dec_self_attn_mask) # dec_outputs: [batch_size, tgt_len, d_model]
# dec_self_attn: [batch_size, n_heads, tgt_len, tgt_len]
dec_outputs, dec_enc_attn = self.dec_enc_attn(dec_outputs, enc_outputs,
enc_outputs, dec_enc_attn_mask) # dec_outputs: [batch_size, tgt_len, d_model]
# dec_enc_attn: [batch_size, h_heads, tgt_len, src_len]
dec_outputs = self.pos_ffn(dec_outputs) # dec_outputs: [batch_size, tgt_len, d_model]
return dec_outputs, dec_self_attn, dec_enc_attn
# 整个Decoder
class Decoder(nn.Module):
def __init__(self):
super(Decoder, self).__init__()
self.tgt_emb = nn.Embedding(tgt_vocab_size, d_model)
self.pos_emb = PositionalEncoding(d_model)
self.layers = nn.ModuleList([DecoderLayer() for _ in range(n_layers)])
def forward(self, dec_inputs, enc_inputs, enc_outputs): # dec_inputs: [batch_size, tgt_len]
# enc_intpus: [batch_size, src_len]
# enc_outputs: [batsh_size, src_len, d_model]
dec_outputs = self.tgt_emb(dec_inputs) # [batch_size, tgt_len, d_model]
dec_outputs = self.pos_emb(dec_outputs).cuda() # [batch_size, tgt_len, d_model]
dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs).cuda() # [batch_size, tgt_len, tgt_len]
dec_self_attn_subsequence_mask = get_attn_subsequence_mask(dec_inputs).cuda() # [batch_size, tgt_len, tgt_len]
# gt(a, b) a>b , return true
# >>> torch.gt(torch.tensor([[1, 2], [3, 4]]), torch.tensor([[1, 1], [4, 4]]))
# tensor([[False, True], [False, False]])
dec_self_attn_mask = torch.gt((dec_self_attn_pad_mask +
dec_self_attn_subsequence_mask), 0).cuda() # [batch_size, tgt_len, tgt_len]
dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs) # [batc_size, tgt_len, src_len]
dec_self_attns, dec_enc_attns = [], []
for layer in self.layers: # dec_outputs: [batch_size, tgt_len, d_model]
# dec_self_attn: [batch_size, n_heads, tgt_len, tgt_len]
# dec_enc_attn: [batch_size, h_heads, tgt_len, src_len]
dec_outputs, dec_self_attn, dec_enc_attn = layer(dec_outputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask)
dec_self_attns.append(dec_self_attn)
dec_enc_attns.append(dec_enc_attn)
return dec_outputs, dec_self_attns, dec_enc_attns
- 第一步,英文字索引进行Embedding,转换成512维度的字向量。第二步,在子向量上面加上位置信息。第三步,Mask掉句子中的占位符号和输出顺序细节见步骤3。第四步,通过6层的decoder(上一层的输出作为下一层的输入)。
7. Transformer
# Trasformer
class Transformer(nn.Module):
def __init__(self):
super(Transformer, self).__init__()
self.Encoder = Encoder().cuda()
self.Decoder = Decoder().cuda()
self.projection = nn.Linear(d_model, tgt_vocab_size, bias=False).cuda()
def forward(self, enc_inputs, dec_inputs): # enc_inputs: [batch_size, src_len]
# dec_inputs: [batch_size, tgt_len]
enc_outputs, enc_self_attns = self.Encoder(enc_inputs) # enc_outputs: [batch_size, src_len, d_model],
# enc_self_attns: [n_layers, batch_size, n_heads, src_len, src_len]
dec_outputs, dec_self_attns, dec_enc_attns = self.Decoder(
dec_inputs, enc_inputs, enc_outputs) # dec_outpus : [batch_size, tgt_len, d_model],
# dec_self_attns: [n_layers, batch_size, n_heads, tgt_len, tgt_len],
# dec_enc_attn : [n_layers, batch_size, tgt_len, src_len]
dec_logits = self.projection(dec_outputs) # dec_logits: [batch_size, tgt_len, tgt_vocab_size]
return dec_logits.view(-1, dec_logits.size(-1)), enc_self_attns, dec_self_attns, dec_enc_attns
- Trasformer的整体结构,输入数据先通过Encoder,再同个Decoder,最后把输出进行多分类,分类数为英文字典长度,也就是判断每一个字的概率。
8. 定义网络与训练测试
# 定义网络
model = Transformer().cuda()
criterion = nn.CrossEntropyLoss(ignore_index=0) #忽略 占位符 索引为0.
optimizer = optim.SGD(model.parameters(), lr=1e-3, momentum=0.99)
# 训练Transformer
for epoch in range(50):
for enc_inputs, dec_inputs, dec_outputs in loader: # enc_inputs : [batch_size, src_len]
# dec_inputs : [batch_size, tgt_len]
# dec_outputs: [batch_size, tgt_len]
enc_inputs, dec_inputs, dec_outputs = enc_inputs.cuda(), dec_inputs.cuda(), dec_outputs.cuda()
outputs, enc_self_attns, dec_self_attns, dec_enc_attns = model(enc_inputs, dec_inputs)
# outputs: [batch_size * tgt_len, tgt_vocab_size]
loss = criterion(outputs, dec_outputs.view(-1))
print('Epoch:', '%04d' % (epoch + 1), 'loss =', '{:.6f}'.format(loss))
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 测试模型
def test(model, enc_input, start_symbol):
enc_outputs, enc_self_attns = model.Encoder(enc_input)
dec_input = torch.zeros(1, tgt_len).type_as(enc_input.data)
next_symbol = start_symbol
for i in range(0, tgt_len):
dec_input[0][i] = next_symbol
dec_outputs, _, _ = model.Decoder(dec_input, enc_input, enc_outputs)
projected = model.projection(dec_outputs)
prob = projected.squeeze(0).max(dim=-1, keepdim=False)[1]
next_word = prob.data[i]
next_symbol = next_word.item()
return dec_input
enc_inputs, _, _ = next(iter(loader))
predict_dec_input = test(model, enc_inputs[0].view(1, -1).cuda(), start_symbol=tgt_vocab["S"])
predict, _, _, _ = model(enc_inputs[0].view(1, -1).cuda(), predict_dec_input)
predict = predict.data.max(1, keepdim=True)[1]
print([src_idx2word[int(i)] for i in enc_inputs[0]], '->',
[idx2word[n.item()] for n in predict.squeeze()])
9. 一些细节
- positional embedding在Transformer和bert中方式不一样,positional embbeding的设计是一个课题
- 注意Multi-Head Self-Attention相比正常的Self-Attention只在最后concat的时候多出参数
- Wq,Wk,Wv是要训练的参数,其尺寸取决于输入的embedding向量长度
- 区分Layer Normalization与Batch Normalization?
- Feed Forward是一个两层的线性层,使用Relu函数
- Encoder与Decoder连接的attention,Encoder提供k和v,Decoder提供q
- Decoder的Multi-Head Self-Attention带Mask,带Mask是为了并行训练计算,实际测试是依次一个个输入。
10. 全部代码与测试
其他实现
1. 模型总览
首先,看一下Transformer模型架构图:
- Embedding,对于输入进行Token Embedding并加上Positional Embedding
- Encoder:由四个部分组成,包括Multi-Head Attention,Feed Forward,Residual Connection,Layer Normalization
- Decoder:由五部分组成,Masked Multi-Head Attention,Encoder-Decoder Attention,Feed Forward,Residual Connection,Layer Normalization
- Output:包括Linear和Softmax
以上是一个完整的Transformer的部件,下面开始手撸一个Transformer吧!
2. Config
下面是这个Demo所用的库文件以及一些超参的信息。单独实现一个Config类保存的原因是,方便日后复用。直接将模型部分复制,所用超参保存在新项目的Config类中即可。这里不过多赘述。
import torch
import torch.nn as nn
import numpy as np
import math
class Config(object):
def __init__(self):
self.vocab_size = 6 # 设置词库大小
self.d_model = 20 # 设置Embedding得到向量的维度
self.n_heads = 2 # 设置mutil-head数目
assert self.d_model % self.n_heads == 0
dim_k = d_model % n_heads
dim_v = d_model % n_heads
self.padding_size = 30 # 统一句子的长度,截长补短
self.UNK = 5 # 设置此表中出现OOV情况下的索引,跟vocab_size关联,一般放在词表的最后一项6-1=5,从0开始
self.PAD = 4 # 设置padding_idx
self.N = 6
self.p = 0.1 # Dropout比率
config = Config()
3. Embedding
Embedding部分接受原始的文本输入(batch_size*seq_len,例:[[1,3,10,5],[3,4,5],[5,3,1,1]]),叠加一个普通的Embedding层以及一个Positional Embedding层,输出最后结果。
在这一层中,输入的是一个list: [batch_size * seq_len],输出的是一个tensor:[batch_size * seq_len * d_model]
普通的 Embedding 层想说两点:
- 采用
torch.nn.Embedding
实现embedding操作。需要关注的一点是论文中提到的Mask机制,包括padding_mask以及sequence_mask(具体请见论文)。在文本输入之前,我们需要进行padding统一长度,padding_mask的实现可以借助torch.nn.Embedding
中的padding_idx
参数。 - 在padding过程中,短补长截
class Embedding(nn.Module):
def __init__(self,vocab_size):
super(Embedding, self).__init__()
# 一个普通的 embedding层,我们可以通过设置padding_idx=config.PAD 来实现论文中的 padding_mask
self.embedding = nn.Embedding(vocab_size,config.d_model,padding_idx=config.PAD)
def forward(self,x):
# 根据每个句子的长度,进行padding,短补长截
for i in range(len(x)):
if len(x[i]) < config.padding_size:
x[i].extend([config.UNK] * (config.padding_size - len(x[i]))) # 注意 UNK是你词表中用来表示oov的token索引,这里进行了简化,直接假设为6
else:
x[i] = x[i][:config.padding_size]
x = self.embedding(torch.tensor(x)) # batch_size * seq_len * d_model
return x
关于Positional Embedding,我们需要参考论文给出的公式。说一句题外话,在作者的实验中对比了Positional Embedding与单独采用一个Embedding训练模型对位置的感知两种方式,模型效果相差无几。
class Positional_Encoding(nn.Module):
def __init__(self,d_model):
super(Positional_Encoding,self).__init__()
self.d_model = d_model
def forward(self,seq_len,embedding_dim):
positional_encoding = np.zeros((seq_len,embedding_dim))
for pos in range(positional_encoding.shape[0]):
for i in range(positional_encoding.shape[1]):
positional_encoding[pos][i] = math.sin(pos/(10000**(2*i/self.d_model))) if i % 2 == 0 else math.cos(pos/(10000**(2*i/self.d_model)))
return torch.from_numpy(positional_encoding)
4. Encoder
Muti_head_Attention
这一部分是模型的核心内容,理论具体参考Bert详解。
Encoder 中的 Muti_head_Attention 不需要Mask。
为了避免模型信息泄露的问题,Decoder 中的 Muti_head_Attention 需要Mask。这一节中我们重点讲解Muti_head_Attention中Mask机制的实现。
forward
函数的参数从 x 变为 x,y:请读者观察模型架构,Decoder需要接受Encoder的输入作为公式中的V
,即我们参数中的y。在普通的自注意力机制中,我们在调用中设置y=x
即可。- requires_mask: 是否采用Mask机制,在Decoder中设置为True
class Mutihead_Attention(nn.Module):
def __init__(self,d_model,dim_k,dim_v,n_heads):
super(Mutihead_Attention, self).__init__()
self.dim_v = dim_v # value
self.dim_k = dim_k # key
self.n_heads = n_heads # multi-head nums
self.q = nn.Linear(d_model,dim_k)
self.k = nn.Linear(d_model,dim_k)
self.v = nn.Linear(d_model,dim_v)
self.o = nn.Linear(dim_v,d_model)
self.norm_fact = 1 / math.sqrt(d_model) #
def generate_mask(self,dim):
# 此处是 sequence mask ,防止 decoder窥视后面时间步的信息。
# padding mask 在数据输入模型之前完成。
matirx = np.ones((dim,dim))
mask = torch.Tensor(np.tril(matirx))
return mask==1
def forward(self,x,y,requires_mask=False):
assert self.dim_k % self.n_heads == 0 and self.dim_v % self.n_heads == 0
# size of x : [batch_size * seq_len * batch_size]
# 对 x 进行自注意力
Q = self.q(x).reshape(-1,x.shape[0],x.shape[1],self.dim_k // self.n_heads) # n_heads * batch_size * seq_len * dim_k
K = self.k(x).reshape(-1,x.shape[0],x.shape[1],self.dim_k // self.n_heads) # n_heads * batch_size * seq_len * dim_k
V = self.v(y).reshape(-1,y.shape[0],y.shape[1],self.dim_v // self.n_heads) # n_heads * batch_size * seq_len * dim_v
# print("Attention V shape : {}".format(V.shape))
attention_score = torch.matmul(Q,K.permute(0,1,3,2)) * self.norm_fact
if requires_mask:
mask = self.generate_mask(x.shape[1])
attention_score.masked_fill(mask,value=float("-inf")) # 注意这里的小Trick,不需要将Q,K,V 分别MASK,只MASKSoftmax之前的结果就好了
output = torch.matmul(attention_score,V).reshape(y.shape[0],y.shape[1],-1)
# print("Attention output shape : {}".format(output.shape))
output = self.o(output)
return output
Feed Forward
这一部分实现很简单,两个Linear中连接Relu即可,目的是为模型增添非线性信息,提高模型的拟合能力。
class Feed_Forward(nn.Module):
def __init__(self,input_dim,hidden_dim=2048):
super(Feed_Forward, self).__init__()
self.L1 = nn.Linear(input_dim,hidden_dim)
self.L2 = nn.Linear(hidden_dim,input_dim)
def forward(self,x):
output = nn.ReLU((self.L1(x)))
output = self.L2(output)
return output
Add & LayerNorm
这一节我们实现论文中提出的残差连接以及LayerNorm。
论文中关于这部分给出公式:
代码中的dropout,在论文中也有所解释,对输入layer_norm的tensor进行dropout,对模型的性能影响还是蛮大的。
代码中的参数sub_layer
,可以是Feed Forward,也可以是Muti_head_Attention。
class Add_Norm(nn.Module):
def __init__(self):
self.dropout = nn.Dropout(config.p)
super(Add_Norm, self).__init__()
def forward(self,x,sub_layer,**kwargs):
sub_output = sub_layer(x,**kwargs)
# print("{} output : {}".format(sub_layer,sub_output.size()))
x = self.dropout(x + sub_output)
layer_norm = nn.LayerNorm(x.size()[1:])
out = layer_norm(x)
return out
OK,Encoder中所有模块我们已经讲解完毕,接下来我们将其拼接作为Encoder
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.positional_encoding = Positional_Encoding(config.d_model)
self.muti_atten = Mutihead_Attention(config.d_model,config.dim_k,config.dim_v,config.n_heads)
self.feed_forward = Feed_Forward(config.d_model)
self.add_norm = Add_Norm()
def forward(self,x): # batch_size * seq_len 并且 x 的类型不是tensor,是普通list
x += self.positional_encoding(x.shape[1],config.d_model)
# print("After positional_encoding: {}".format(x.size()))
output = self.add_norm(x,self.muti_atten,y=x)
output = self.add_norm(output,self.feed_forward)
return output
5. Decoder
在 Encoder 部分的讲解中,我们已经实现了大部分Decoder的模块。Decoder的Muti_head_Attention引入了Mask机制,Decoder与Encoder 中模块的拼接方式不同。以上两点读者在Coding的时候需要注意。
class Decoder(nn.Module):
def __init__(self):
super(Decoder, self).__init__()
self.positional_encoding = Positional_Encoding(config.d_model)
self.muti_atten = Mutihead_Attention(config.d_model,config.dim_k,config.dim_v,config.n_heads)
self.feed_forward = Feed_Forward(config.d_model)
self.add_norm = Add_Norm()
def forward(self,x,encoder_output): # batch_size * seq_len 并且 x 的类型不是tensor,是普通list
# print(x.size())
x += self.positional_encoding(x.shape[1],config.d_model)
# print(x.size())
# 第一个 sub_layer
output = self.add_norm(x,self.muti_atten,y=x,requires_mask=True)
# 第二个 sub_layer
output = self.add_norm(output,self.muti_atten,y=encoder_output,requires_mask=True)
# 第三个 sub_layer
output = self.add_norm(output,self.feed_forward)
return output
6. Transformer
至此,所有内容已经铺垫完毕,我们开始组装Transformer模型。论文中提到,Transformer中堆叠了6个我们上文中实现的Encoder 和 Decoder。这里笔者采用nn.Sequential
实现了堆叠操作。
Output模块的 Linear 和 Softmax 的实现也包含在下面的代码中
class Transformer_layer(nn.Module):
def __init__(self):
super(Transformer_layer, self).__init__()
self.encoder = Encoder()
self.decoder = Decoder()
def forward(self,x):
x_input,x_output = x
encoder_output = self.encoder(x_input)
decoder_output = self.decoder(x_output,encoder_output)
return (encoder_output,decoder_output)
class Transformer(nn.Module):
def __init__(self,N,vocab_size,output_dim):
super(Transformer, self).__init__()
self.embedding_input = Embedding(vocab_size=vocab_size)
self.embedding_output = Embedding(vocab_size=vocab_size)
self.output_dim = output_dim
self.linear = nn.Linear(config.d_model,output_dim)
self.softmax = nn.Softmax(dim=-1)
self.model = nn.Sequential(*[Transformer_layer() for _ in range(N)])
def forward(self,x):
x_input , x_output = x
x_input = self.embedding_input(x_input)
x_output = self.embedding_output(x_output)
_ , output = self.model((x_input,x_output))
output = self.linear(output)
output = self.softmax(output)
return output
7. 完整代码
import torch
import torch.nn as nn
import numpy as np
import math
class Config(object):
def __init__(self):
self.vocab_size = 6
self.d_model = 20
self.n_heads = 2
assert self.d_model % self.n_heads == 0
dim_k = self.d_model // self.n_heads
dim_v = self.d_model // self.n_heads
self.padding_size = 30
self.UNK = 5
self.PAD = 4
self.N = 6
self.p = 0.1
config = Config()
class Embedding(nn.Module):
def __init__(self,vocab_size):
super(Embedding, self).__init__()
# 一个普通的 embedding层,我们可以通过设置padding_idx=config.PAD 来实现论文中的 padding_mask
self.embedding = nn.Embedding(vocab_size,config.d_model,padding_idx=config.PAD)
def forward(self,x):
# 根据每个句子的长度,进行padding,短补长截
for i in range(len(x)):
if len(x[i]) < config.padding_size:
x[i].extend([config.UNK] * (config.padding_size - len(x[i]))) # 注意 UNK是你词表中用来表示oov的token索引,这里进行了简化,直接假设为6
else:
x[i] = x[i][:config.padding_size]
x = self.embedding(torch.tensor(x)) # batch_size * seq_len * d_model
return x
class Positional_Encoding(nn.Module):
def __init__(self,d_model):
super(Positional_Encoding,self).__init__()
self.d_model = d_model
def forward(self,seq_len,embedding_dim):
positional_encoding = np.zeros((seq_len,embedding_dim))
for pos in range(positional_encoding.shape[0]):
for i in range(positional_encoding.shape[1]):
positional_encoding[pos][i] = math.sin(pos/(10000**(2*i/self.d_model))) if i % 2 == 0 else math.cos(pos/(10000**(2*i/self.d_model)))
return torch.from_numpy(positional_encoding)
class Mutihead_Attention(nn.Module):
def __init__(self,d_model,dim_k,dim_v,n_heads):
super(Mutihead_Attention, self).__init__()
self.dim_v = dim_v
self.dim_k = dim_k
self.n_heads = n_heads
self.q = nn.Linear(d_model,dim_k)
self.k = nn.Linear(d_model,dim_k)
self.v = nn.Linear(d_model,dim_v)
self.o = nn.Linear(dim_v,d_model)
self.norm_fact = 1 / math.sqrt(d_model)
def generate_mask(self,dim):
# 此处是 sequence mask ,防止 decoder窥视后面时间步的信息。
# padding mask 在数据输入模型之前完成。
matirx = np.ones((dim,dim))
mask = torch.Tensor(np.tril(matirx))
return mask==1
def forward(self,x,y,requires_mask=False):
assert self.dim_k % self.n_heads == 0 and self.dim_v % self.n_heads == 0
# size of x : [batch_size * seq_len * batch_size]
# 对 x 进行自注意力
Q = self.q(x).reshape(-1,x.shape[0],x.shape[1],self.dim_k // self.n_heads) # n_heads * batch_size * seq_len * dim_k
K = self.k(x).reshape(-1,x.shape[0],x.shape[1],self.dim_k // self.n_heads) # n_heads * batch_size * seq_len * dim_k
V = self.v(y).reshape(-1,y.shape[0],y.shape[1],self.dim_v // self.n_heads) # n_heads * batch_size * seq_len * dim_v
# print("Attention V shape : {}".format(V.shape))
attention_score = torch.matmul(Q,K.permute(0,1,3,2)) * self.norm_fact
if requires_mask:
mask = self.generate_mask(x.shape[1])
# masked_fill 函数中,对Mask位置为True的部分进行Mask
attention_score.masked_fill(mask,value=float("-inf")) # 注意这里的小Trick,不需要将Q,K,V 分别MASK,只MASKSoftmax之前的结果就好了
output = torch.matmul(attention_score,V).reshape(y.shape[0],y.shape[1],-1)
# print("Attention output shape : {}".format(output.shape))
output = self.o(output)
return output
class Feed_Forward(nn.Module):
def __init__(self,input_dim,hidden_dim=2048):
super(Feed_Forward, self).__init__()
self.L1 = nn.Linear(input_dim,hidden_dim)
self.L2 = nn.Linear(hidden_dim,input_dim)
def forward(self,x):
output = nn.ReLU()(self.L1(x))
output = self.L2(output)
return output
class Add_Norm(nn.Module):
def __init__(self):
self.dropout = nn.Dropout(config.p)
super(Add_Norm, self).__init__()
def forward(self,x,sub_layer,**kwargs):
sub_output = sub_layer(x,**kwargs)
# print("{} output : {}".format(sub_layer,sub_output.size()))
x = self.dropout(x + sub_output)
layer_norm = nn.LayerNorm(x.size()[1:])
out = layer_norm(x)
return out
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.positional_encoding = Positional_Encoding(config.d_model)
self.muti_atten = Mutihead_Attention(config.d_model,config.dim_k,config.dim_v,config.n_heads)
self.feed_forward = Feed_Forward(config.d_model)
self.add_norm = Add_Norm()
def forward(self,x): # batch_size * seq_len 并且 x 的类型不是tensor,是普通list
x += self.positional_encoding(x.shape[1],config.d_model)
# print("After positional_encoding: {}".format(x.size()))
output = self.add_norm(x,self.muti_atten,y=x)
output = self.add_norm(output,self.feed_forward)
return output
# 在 Decoder 中,Encoder的输出作为Query和KEy输出的那个东西。即 Decoder的Input作为V。此时是可行的
# 因为在输入过程中,我们有一个padding操作,将Inputs和Outputs的seq_len这个维度都拉成一样的了
# 我们知道,QK那个过程得到的结果是 batch_size * seq_len * seq_len .既然 seq_len 一样,那么我们可以这样操作
# 这样操作的意义是,Outputs 中的 token 分别对于 Inputs 中的每个token作注意力
class Decoder(nn.Module):
def __init__(self):
super(Decoder, self).__init__()
self.positional_encoding = Positional_Encoding(config.d_model)
self.muti_atten = Mutihead_Attention(config.d_model,config.dim_k,config.dim_v,config.n_heads)
self.feed_forward = Feed_Forward(config.d_model)
self.add_norm = Add_Norm()
def forward(self,x,encoder_output): # batch_size * seq_len 并且 x 的类型不是tensor,是普通list
# print(x.size())
x += self.positional_encoding(x.shape[1],config.d_model)
# print(x.size())
# 第一个 sub_layer
output = self.add_norm(x,self.muti_atten,y=x,requires_mask=True)
# 第二个 sub_layer
output = self.add_norm(x,self.muti_atten,y=encoder_output,requires_mask=True)
# 第三个 sub_layer
output = self.add_norm(output,self.feed_forward)
return output
class Transformer_layer(nn.Module):
def __init__(self):
super(Transformer_layer, self).__init__()
self.encoder = Encoder()
self.decoder = Decoder()
def forward(self,x):
x_input,x_output = x
encoder_output = self.encoder(x_input)
decoder_output = self.decoder(x_output,encoder_output)
return (encoder_output,decoder_output)
class Transformer(nn.Module):
def __init__(self,N,vocab_size,output_dim):
super(Transformer, self).__init__()
self.embedding_input = Embedding(vocab_size=vocab_size)
self.embedding_output = Embedding(vocab_size=vocab_size)
self.output_dim = output_dim
self.linear = nn.Linear(config.d_model,output_dim)
self.softmax = nn.Softmax(dim=-1)
self.model = nn.Sequential(*[Transformer_layer() for _ in range(N)])
def forward(self,x):
x_input , x_output = x
x_input = self.embedding_input(x_input)
x_output = self.embedding_output(x_output)
_ , output = self.model((x_input,x_output))
output = self.linear(output)
output = self.softmax(output)
return output
参考
更多实现可参考:
Transformer Q&A
Transformer如何解决梯度消失问题?
残差
为何Transformer中使用LN而不用BN?
BatchNorm是对一个batch-size样本内的每个特征做归一化,LayerNorm是对每个样本的所有特征做归一化。
形象点来说,假设有一个二维矩阵。行为batch-size,列为样本特征。那么BN就是竖着归一化,LN就是横着归一化。
它们的出发点都是让该层参数稳定下来,避免梯度消失或者梯度爆炸,方便后续的学习。但是也有侧重点。
一般来说,如果你的特征依赖于不同样本间的统计参数,那BN更有效。因为它抹杀了不同特征之间的大小关系,但是保留了不同样本间的大小关系。(CV领域)
而在NLP领域,LN就更加合适。因为它抹杀了不同样本间的大小关系,但是保留了一个样本内不同特征之间的大小关系。对于NLP或者序列任务来说,一条样本的不同特征,其实就是时序上字符取值的变化,样本内的特征关系是非常紧密的。
LN的作用是什么?
允许使用更大的学习率,加速训练。有一定的抗过拟合作用,使训练过程更加平稳
多头自注意力层中的“多头”如何理解,有什么作用?
有点类似于CNN的多个卷积核。通过三个线性层的映射,不同头中的Q、K、V是不一样的,而这三个线性层的权重是先初始化后续通过学习得到的。不同的权重可以捕捉到序列中不同的相关性。
Transformer是自回归模型还是自编码模型?
自回归模型。
所谓自回归,即使用当前自己预测的字符再去预测接下来的信息。Transformer在预测阶段(机器翻译任务)会先预测第一个字,然后在第一个预测的字的基础上接下来再去预测后面的字,是典型的自回归模型。Bert中的Mask任务是典型的自编码模型,即根据上下文字符来预测当前信息。
原论文中Q、K矩阵相乘为什么最后要除以√d_k
当√d_k特别小的时候,其实除不除无所谓。无论编码器还是解码器Q、K矩阵其实本质是一个相同的矩阵。Q、K相乘其实相等于Q乘以Q的转置,这样造成结果会很大或者很小。小了还好说,大的话会使得后续做softmax继续被放大造成梯度消失,不利于梯度反向传播。
原论中编码器与解码器的Embedding层的权重为什么要乘以√d_model
为了让embedding层的权重值不至于过小,乘以√d_model 后与位置编码的值差不多,可以保护原有向量空间不被破坏。
Transformer在训练与验证的时候有什么不同
Transformer在训练的时候是并行的,在验证的时候是串行的。这个问题与Transformer是否是自回归模型考察的是同一个知识点。
Transformer模型的计算复杂度是多少?
n是序列长度,d是embedding的长度。Transformer中最大的计算量就是多头自注意力层,这里的计算量主要就是QK相乘再乘上V,即两次矩阵相乘。QK相乘是矩阵【n d】乘以【d n】,这个复杂度就是d*n^2
Transformer中三个多头自注意力层分别有什么意义与作用?
Transformer中有三个多头自注意力层,编码器中有一个,解码器中有两个。
编码器中的多头自注意力层的作用是将原始文本序列信息做整合,转换后的文本序列中每个字符都与整个文本序列的信息相关(这也是Transformer中最创新的思想,尽管根据最新的综述研究表明,Transformer的效果非常好其实多头自注意力层并不占据绝大贡献)。示意图如下:
解码器的第一个多头自注意力层比较特殊,原论文给其起名叫Masked Multi-Head-Attention。其一方面也有上图介绍的作用,即对输入文本做整合(对与翻译任务来说,编码器的输入是翻译前的文本,解码器的输入是翻译后的文本)。另一个任务是做掩码,防止信息泄露。拓展解释一下就是在做信息整合的时候,第一个字符其实不应该看到后面的字符,第二个字符也只能看到第一个、第二个字符的信息,以此类推。
解码器的第二个多头自注意力层与编码器的第一个多头自注意力层功能是完全一样的。不过输入需要额外强调下,我们都知道多头自注意力层是通过计算QKV三个矩阵最后完成信息整合的。在这里,Q是解码器整合后的信息,KV两个矩阵是编码器整合后的信息,是两个完全相同的矩阵。QKV矩阵相乘后,翻译前与翻译后的文本也做了充分的交互整合。至此最终得到的向量矩阵用来做后续下游工作。
Transformer中的mask机制有什么作用
有两个作用。
- 对不等长的序列做padding补齐
- 掩码防止信息泄露