import collections
import math
import torch
from torch import nn
from d2l import torch as d2l
class Seq2SeqEncoder(d2l.Encoder):
"""用于序列到序列学习的循环神经网络编码器。"""
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
dropout=0, **kwargs):
# 初始化父类(继承自d2l.Encoder框架)
super(Seq2SeqEncoder, self).__init__(**kwargs)
# 创建词嵌入层:将词汇索引转换为密集向量
# vocab_size: 输入语言词汇表大小(如10000个词)
# embed_size: 每个词向量的维度(如512维)
self.embedding = nn.Embedding(vocab_size, embed_size)
# 创建GRU循环神经网络层
# embed_size: 输入特征维度(与嵌入层输出一致)
# num_hiddens: 隐藏层神经元数(如24个)
# num_layers: GRU堆叠层数(如2层)
# dropout: 层间随机失活率(防止过拟合)
self.rnn = nn.GRU(embed_size, num_hiddens, num_layers,
dropout=dropout)
def forward(self, X, *args):
# 输入X形状:(batch_size, seq_length) 如(3,8)
# 嵌入层处理:将词索引转换为密集向量
# 输出形状变为:(batch_size, seq_length, embed_size) 如(3,8,512)
X = self.embedding(X)
# 维度置换:调整张量维度顺序以适应RNN输入要求
# permute(1,0,2)将维度从(batch,seq,embed)变为(seq,batch,embed)
# 因为PyTorch的RNN层要求输入形状为(seq_len, batch_size, input_size)
X = X.permute(1, 0, 2)
# 通过GRU网络处理时序数据
# output: 所有时间步的隐藏状态,形状(seq_len, batch_size, num_hiddens*D)
# D=1(单向)或2(双向),此处默认单向
# state: 最后一个时间步的隐藏状态,形状(num_layers, batch_size, num_hiddens)
# 当num_layers=2时包含两个隐藏层的最终状态
output, state = self.rnn(X)
# 返回编码器输出和最终状态(供解码器初始化用)
return output, state
encoder = Seq2SeqEncoder(vocab_size=10, embed_size=8, num_hiddens=16,
num_layers=2)
encoder.eval()
X = torch.zeros((4, 7), dtype=torch.long)
output, state = encoder(X)
output.shape
torch.Size([7, 4, 16])
state.shape
torch.Size([2, 4, 16])
class Seq2SeqDecoder(d2l.Decoder):
"""用于序列到序列学习的循环神经网络解码器。"""
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
dropout=0, **kwargs):
# 初始化父类(继承自d2l.Decoder框架)
super(Seq2SeqDecoder, self).__init__(**kwargs)
# 创建词嵌入层:将目标语言词索引转换为密集向量
# vocab_size: 目标语言词汇表大小(如25000个词)
# embed_size: 每个词向量的维度(需与编码器设置一致)
self.embedding = nn.Embedding(vocab_size, embed_size)
# 创建带上下文拼接的GRU网络:
# 输入维度是embed_size + num_hiddens(将编码器上下文信息与当前词向量拼接)
# num_hiddens: 隐藏层维度需与编码器一致(如24)
# num_layers: GRU层数需与编码器一致(如2层)
self.rnn = nn.GRU(embed_size + num_hiddens, num_hiddens, num_layers,
dropout=dropout)
# 输出全连接层:将隐藏状态映射到目标语言词汇表维度
self.dense = nn.Linear(num_hiddens, vocab_size)
def init_state(self, enc_outputs, *args):
# 从编码器输出中提取最终隐藏状态作为解码器初始状态
# enc_outputs是元组(output, state),其中state包含各层的最终隐藏状态
# 取[1]索引获取state,该状态是(num_layers, batch_size, num_hiddens)
return enc_outputs[1]
def forward(self, X, state):
# 输入X形状:(batch_size, seq_length) 如(3,7)
# 词嵌入处理,输出形状变为:(batch_size, seq_length, embed_size)
X = self.embedding(X)
# 维度置换:调整为(seq_length, batch_size, embed_size)
# 与编码器一致,满足PyTorch GRU输入格式要求
X = X.permute(1, 0, 2)
# 上下文扩展:获取编码器最后时刻最后一层的隐藏状态
# state[-1]的shape是(num_layers, batch_size, num_hiddens),取最后一层
# repeat扩展至(seq_length, batch_size, num_hiddens)匹配时间步数
context = state[-1].repeat(X.shape[0], 1, 1)
# 拼接词向量与上下文信息
# 在特征维度拼接,因此输入维度变为embed_size+num_hiddens
X_and_context = torch.cat((X, context), 2)
# GRU前向传播(同时传递初始状态)
# output形状:(seq_length, batch_size, num_hiddens)
# state形状:(num_layers, batch_size, num_hiddens)
output, state = self.rnn(X_and_context, state)
# 全连接层预测词概率分布
# 先permute恢复为(batch_size, seq_length, vocab_size)
output = self.dense(output).permute(1, 0, 2)
return output, state
decoder = Seq2SeqDecoder(vocab_size=10, embed_size=8, num_hiddens=16,
num_layers=2)
decoder.eval()
state = decoder.init_state(encoder(X))
output, state = decoder(X, state)
output.shape, state.shape
(torch.Size([4, 7, 10]), torch.Size([2, 4, 16]))
def sequence_mask(X, valid_len, value=0):
"""在序列中屏蔽不相关的项。"""
# 获取当前批次的最大序列长度(X的形状是(batch_size, seq_length))
# 例如:X是形状(2,3)的张量时,maxlen=3
maxlen = X.size(1)
# 创建序列位置索引矩阵
# torch.arange生成[0,1,2,...,maxlen-1]的行向量(通过[None,:]增加第0维度)
# 与valid_len[:,None](列向量)进行广播比较,生成布尔掩码矩阵
# 例如valid_len=[1,2]时,生成mask=[[True,False,False],[True,True,False]]
mask = torch.arange((maxlen), dtype=torch.float32,
device=X.device)[None, :] < valid_len[:, None]
# 将掩码取反后赋值为指定值
# ~mask表示需要屏蔽的位置,例如mask为False的位置会被填充value
X[~mask] = value
return X # 返回处理后的张量,如输入[[1,2,3],[4,5,6]]变为[[1,0,0],[4,5,0]]
"""
应用场景说明:
1. 自然语言处理:处理不同长度句子时屏蔽padding位置
2. 时间序列预测:仅对有效历史数据进行处理
3. 注意力机制:防止关注到无效位置
关键记忆点:
1. 维度操作技巧:
- [None,:]将行向量变为(1, maxlen)
- valid_len[:,None]将列向量变为(batch_size, 1)
- 两者广播后得到(batch_size, maxlen)的布尔矩阵
2. 设备一致性原则:
- mask.device = X.device 确保张量在同一设备(CPU/GPU)
3. 逆向思维运用:
- 通过~mask取反选择需要屏蔽的位置
"""
'\n应用场景说明:\n1. 自然语言处理:处理不同长度句子时屏蔽padding位置\n2. 时间序列预测:仅对有效历史数据进行处理\n3. 注意力机制:防止关注到无效位置\n\n关键记忆点:\n1. 维度操作技巧:\n - [None,:]将行向量变为(1, maxlen)\n - valid_len[:,None]将列向量变为(batch_size, 1)\n - 两者广播后得到(batch_size, maxlen)的布尔矩阵\n\n2. 设备一致性原则:\n - mask.device = X.device 确保张量在同一设备(CPU/GPU)\n\n3. 逆向思维运用:\n - 通过~mask取反选择需要屏蔽的位置\n\n'
X = torch.ones(2, 3, 4)
sequence_mask(X, torch.tensor([1, 2]), value=-1)
tensor([[[ 1., 1., 1., 1.],
[-1., -1., -1., -1.],
[-1., -1., -1., -1.]],
[[ 1., 1., 1., 1.],
[ 1., 1., 1., 1.],
[-1., -1., -1., -1.]]])
class MaskedSoftmaxCELoss(nn.CrossEntropyLoss):
"""带遮蔽的softmax交叉熵损失函数"""
def forward(self, pred, label, valid_len):
# 初始化权重矩阵:创建与标签形状相同的全1张量
# 原理:所有时间步初始权重为1,后续通过valid_len屏蔽padding位置
# 记忆点:始终与label形状保持一致(batch_size, seq_length)
weights = torch.ones_like(label)
# 应用序列遮蔽:将超出有效长度的位置权重置0
# 关键:sequence_mask函数
# 效果:例如valid_len=[2,4]时,保留前2/4个时间步的权重
weights = sequence_mask(weights, valid_len)
# 禁用默认的损失聚合方式:保留每个时间步的损失值
# 原理:父类CrossEntropyLoss默认会做mean/sum,这里需要逐个位置计算
self.reduction = 'none'
# 计算未加权的交叉熵损失(注意维度调整)
# permute(0,2,1)将pred从(batch, seq, vocab)转为(batch, vocab, seq)
# 原因:CrossEntropyLoss要求class维度在第二位置
unweighted_loss = super(MaskedSoftmaxCELoss,
self).forward(pred.permute(0, 2, 1), label)
# 应用权重并沿序列维度取平均
# 数学原理:加权平均公式 (sum(loss * weight)) / sum(weight)
# 实现技巧:mean(dim=1)对每个样本的序列维度做平均
weighted_loss = (unweighted_loss * weights).mean(dim=1)
return weighted_loss
"""
预测输出:[B,T,V] → permute → [B,V,T]
交叉熵计算:与标签[B,T]对齐 → 得到未加权损失[B,T]
应用遮蔽权重[B,T] → 加权平均得到最终损失[B]
"""
loss = MaskedSoftmaxCELoss()
loss(torch.ones(3, 4, 10), torch.ones((3, 4), dtype=torch.long),
torch.tensor([4, 2, 0]))
tensor([2.3026, 1.1513, 0.0000])
import torch.optim as optim
def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device):
"""训练序列到序列模型"""
# 初始化模型参数(Xavier初始化策略)
def xavier_init_weights(m):
# 原理:Xavier初始化保持各层激活值的方差稳定
if type(m) == nn.Linear:
nn.init.xavier_uniform_(m.weight) # 全连接层权重初始化
if type(m) == nn.GRU:
for param in m._flat_weights:
if param.ndim == 2:
nn.init.xavier_uniform_(param) # GRU的权重矩阵初始化
# 应用初始化函数到所有网络层
net.apply(xavier_init_weights)
# 将模型移动到指定设备(CPU/GPU)
net.to(device)
# 设置优化器(Adam自适应学习率算法)
optimizer = optim.Adam(net.parameters(), lr=lr)
# 定义带遮蔽的交叉熵损失函数(忽略padding位置)
loss = d2l.MaskedSoftmaxCELoss()
# 切换到训练模式(影响dropout等层的状态)
net.train()
# 可视化工具初始化(监控训练过程)
animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[0, num_epochs])
# 训练循环
for epoch in range(num_epochs):
timer = d2l.Timer() # 计时工具
metric = d2l.Accumulator(2) # 累计损失和有效token数
for batch in data_iter:
# 数据预处理:将批量数据移动到指定设备
X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch]
# 构建解码器输入:添加<bos>起始符
bos = torch.tensor([tgt_vocab['<bos>']] * Y.shape[0], device=device).reshape(-1, 1)
dec_input = torch.cat([bos, Y[:, :-1]], 1) # 拼接起始符与右移后的目标序列
# 前向传播:获取模型输出(编码器-解码器联合计算)
output = net(X, dec_input, X_valid_len)
# 处理网络输出:解包可能存在的元组结构(如包含注意力权重)
if isinstance(output, tuple):
Y_hat, _ = output # 取预测结果部分
else:
Y_hat = output
# 计算损失值(自动处理序列遮蔽)
l = loss(Y_hat, Y, Y_valid_len)
# 反向传播与梯度裁剪(防止梯度爆炸)
l.sum().backward() # 损失求和后反向传播
d2l.grad_clipping(net, 1) # 梯度裁剪阈值设为1(策略参考网页3的训练技巧)
# 参数更新
optimizer.step() # 执行优化器更新步骤
optimizer.zero_grad() # 清空梯度(重要!防止梯度累积)
# 累计统计指标
metric.add(l.sum(), l.numel()) # 记录总损失和有效token数
# 周期日志输出
if (epoch + 1) % 10 == 0:
# 计算平均损失和吞吐量
print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} tokens/sec on {str(device)}')
animator.add(epoch + 1, (metric[0] / metric[1],)) # 更新训练曲线
embed_size, num_hiddens, num_layers, dropout = 32, 32, 2, 0.1
batch_size, num_steps = 64, 10
lr, num_epochs, device = 0.005, 300, d2l.try_gpu()
train_iter, src_vocab, tgt_vocab = d2l.load_data_nmt(batch_size, num_steps)
encoder = Seq2SeqEncoder(len(src_vocab), embed_size, num_hiddens, num_layers,
dropout)
decoder = Seq2SeqDecoder(len(tgt_vocab), embed_size, num_hiddens, num_layers,
dropout)
net = d2l.EncoderDecoder(encoder, decoder)
train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)
def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps,
device, save_attention_weights=False):
"""序列到序列模型的预测"""
# 设置模型为评估模式(关闭dropout等训练专用层)
net.eval() # 原理:预测时不更新参数,保持计算图稳定
# 将源语句转换为词索引序列,并添加结束符<eos>
src_tokens = src_vocab[src_sentence.lower().split(' ')] + [
src_vocab['<eos>']] # 注意:lower()统一为小写,split()分词处理
# 创建有效长度张量(实际序列长度)
enc_valid_len = torch.tensor([len(src_tokens)], device=device) # 用于编码器掩码
# 截断/填充处理:统一为固定长度num_steps
src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>']) # 填充<pad>符号
# 添加批量维度(batch_size=1)并转换为设备张量
enc_X = torch.unsqueeze(
torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0) # 形状从(seq_len,)变为(1, seq_len)
# 编码器前向传播(获取上下文向量)
enc_outputs = net.encoder(enc_X, enc_valid_len) # 输出包含编码器各时间步状态
# 初始化解码器状态(使用编码器最终隐藏状态)
dec_state = net.decoder.init_state(enc_outputs, enc_valid_len) # 状态传递原理
# 构建解码器初始输入(<bos>符号)
dec_X = torch.unsqueeze(
torch.tensor([tgt_vocab['<bos>']], dtype=torch.long, device=device),
dim=0) # 形状(1,1),启动解码过程
# 初始化输出序列和注意力权重容器
output_seq, attention_weight_seq = [], []
# 循环生成目标序列(最多num_steps步)
for _ in range(num_steps):
# 解码器前向传播(Y形状:(1,1, vocab_size))
Y, dec_state = net.decoder(dec_X, dec_state) # 当前时间步预测
# 选择概率最高的词作为下一时间步输入(贪心搜索)
dec_X = Y.argmax(dim=2) # dim=2选择词汇维度,得到预测词索引
# 提取预测词索引(去除批量维度)
pred = dec_X.squeeze(dim=0).type(torch.int32).item() # 转为Python标量
# 保存注意力权重(用于可视化)
if save_attention_weights:
attention_weight_seq.append(net.decoder.attention_weights) # 注意力机制扩展点
# 遇到结束符则终止生成
if pred == tgt_vocab['<eos>']: # 提前终止条件
break
# 收集预测结果
output_seq.append(pred) # 存储词索引序列
# 将索引序列转换为目标语言词汇
return ' '.join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq # 最终输出字符串
def bleu(pred_seq, label_seq, k):
"""计算BLEU分数,评估机器翻译质量"""
# 将预测序列和参考序列分割为词列表(处理基础单元)
pred_tokens, label_tokens = pred_seq.split(' '), label_seq.split(' ')
# 获取预测序列和参考序列的长度(用于后续长度惩罚计算)
len_pred, len_label = len(pred_tokens), len(label_tokens)
# 初始化长度惩罚因子(解决短句得分虚高问题)
# 原理:当预测长度<参考长度时施加惩罚
score = math.exp(min(0, 1 - len_label / len_pred)) # exp(min(0, 1 - r/c))
# 遍历1到k元语法(n-gram)计算匹配度
for n in range(1, k + 1):
num_matches = 0 # 有效匹配次数计数器
label_subs = collections.defaultdict(int) # 存储参考n-gram的出现次数
# 生成参考序列的所有n-gram并统计出现次数(用于修正精确度)
for i in range(len_label - n + 1):
ngram = ''.join(label_tokens[i:i + n]) # 拼接n-gram作为字典键
label_subs[ngram] += 1 # 记录参考中n-gram出现次数
# 检测预测序列中的n-gram匹配(考虑重复情况)
for i in range(len_pred - n + 1):
ngram = ''.join(pred_tokens[i:i + n])
if label_subs[ngram] > 0: # 当参考中存在该n-gram时
num_matches += 1 # 增加有效匹配计数
label_subs[ngram] -= 1 # 避免重复匹配
# 计算当前n-gram的修正精确度(分子:有效匹配数,分母:预测n-gram总数)
precision = num_matches / (len_pred - n + 1)
# 加权几何平均(权重系数为0.5^n,参考网页4的加权策略)
score *= math.pow(precision, math.pow(0.5, n))
return score # 最终BLEU得分(范围0-1,值越大质量越好)
engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
for eng, fra in zip(engs, fras):
translation, attention_weight_seq = predict_seq2seq(
net, eng, src_vocab, tgt_vocab, num_steps, device)
print(f'{eng} => {translation}, bleu {bleu(translation, fra, k=2):.3f}')
go . => le les viens viens je je je je trouve je, bleu 0.000
i lost . => le cours je trouve trouve trouve trouve courez oublie-le oublie-le, bleu 0.000
he's calm . => le cours je trouve trouve trouve trouve courez oublie-le oublie-le, bleu 0.000
i'm home . => le les les j’ai j’ai parti oublie-le oublie-le soyez soyez, bleu 0.000