Dark Dwarf Blog background

简单机器翻译实现

简单机器翻译实现

在复习了 Encoder-Decoder 后,自己搓了一个简单的机器翻译,用 30k 的 cmn-eng.txt 数据集训练的,下面简单讲解一下。

1. 准备工作

a.a. 总体设计

在开始实现之前,我们先定义一些 tag,这些 tag 能够让我们的模型更好地确认翻译的边界和方向:

  1. 起始、结束标记:<sos><eos>
  2. 未知词标记:<unk>
  3. 填充标记:<pad>

同时为了用一个模型实现双语翻译的效果,我们引入 lang-tag,让模型知道当前需要翻译的语句是什么语言

  • 语言标记:<en><zh>

b.b. 词汇表搭建

机器翻译首先要做的是搭建词汇表,将文本转换为数字。我们将原有数据 tokenize,用转换后的 token 构建词汇表。

我们的 Vocabulary 类的主要方法为 buildencodedecode

from collections import Counter

class Vocabulary:
  def __init__(self, lang_tags = ['en', 'cn']):
    self.word2idx = {
        '<pad>': 0,
        '<sos>': 1,
        '<eos>': 2,
        '<unk>': 3
    }
    self.special_tokens = ['<pad>', '<sos>', '<eos>', '<unk>'] # Initialize with common special tokens

    for lang in lang_tags:
      tag = '<' + lang + '>'
      self.word2idx[tag] = len(self.word2idx)
      self.special_tokens.append(tag) # Add language tags to special tokens

    self.idx2word = {i: w for w, i in self.word2idx.items()}
    self.word_count = Counter()
    self.n_words = len(self.word2idx)

  def add_sentence(self, tokens):
    for word in tokens:
      if word not in self.word2idx:
        self.word_count[word] += 1

  def build(self, min_count=2):
    for word, count in self.word_count.items():
      if count > min_count:
        self.word2idx[word] = self.n_words
        self.idx2word[self.n_words] = word
        self.n_words += 1

  def encode(self, tokens, add_sos=True, add_eos=True):
    indices = []
    if add_sos:
      indices.append(self.word2idx['<sos>'])

    for word in tokens:
      idx = self.word2idx.get(word, self.word2idx['<unk>'])
      indices.append(idx)

    if add_eos:
      indices.append(self.word2idx['<eos>'])

    return indices

  def decode(self, indices, skip_special=True):
    words = []
    for idx in indices:
      word = self.idx2word.get(idx, '<unk>')
      if skip_special and word in self.special_tokens:
        continue
      words.append(word)

    return words

vocab = Vocabulary()
for src_tokens, tgt_tokens in lang_tagged_data:
  vocab.add_sentence(src_tokens)
  vocab.add_sentence(tgt_tokens)

vocab.build(min_count=1)

2. 数据集构建

在构建完数据集后,我们将 token 进行 encode,然后构建 dataset:

indexed_pairs = []
for src_tokens, tgt_tokens in lang_tagged_data:
  src_indices = vocab.encode(src_tokens)
  tgt_indices = vocab.encode(tgt_tokens)
  indexed_pairs.append((src_indices, tgt_indices))

注意在构建时要加上 padding:

from torch.nn.utils.rnn import pad_sequence

class TranslationDataset(Dataset):
  def __init__(self, indexed_pairs):
    self.pairs = indexed_pairs

  def __len__(self):
    return len(self.pairs)

  def __getitem__(self, idx):
    return self.pairs[idx]

def collate_batch(batch):
  src_batch = [torch.LongTensor(pair[0]) for pair in batch]
  tgt_batch = [torch.LongTensor(pair[1]) for pair in batch]

  src_lengths = torch.LongTensor([len(s) for s in src_batch])
  tgt_lengths = torch.LongTensor([len(t) for t in tgt_batch])

  src_padded = pad_sequence(src_batch, batch_first=True, padding_value=0)
  tgt_padded = pad_sequence(tgt_batch, batch_first=True, padding_value=0)

  return src_padded, tgt_padded, src_lengths, tgt_lengths

batch_size = 128

train_size = int(0.9 * len(indexed_pairs))
train_data = indexed_pairs[:train_size]
val_data = indexed_pairs[train_size:]

train_dataset = TranslationDataset(train_data)
val_dataset = TranslationDataset(val_data)
train_loader = DataLoader(train_dataset, batch_size, shuffle=True, collate_fn=collate_batch)
val_loader = DataLoader(val_dataset, batch_size, shuffle=False, collate_fn=collate_batch)

3. Seq2Seq 模型

然后是最关键的模型构建部分。我们采用带有 Bahdanau Attention 的 Encoder-Decoder 模型架构。

a.a. Encoder

我们使用一个双向 LSTM 作为 Encoder。Encoder 工作流程如下:

  1. 使用 Embedding 层获取词嵌入向量。
  2. 将向量输入 LSTM,获得 output,hidden,cell 信息。
  3. 使用线性层学习怎么将双向 LSTM 的结果进行组合,最后输出 hidden_state 长度的 output。

具体代码如下:

class Encoder(nn.Module):
  def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers=2, dropout=0.3):
    super(Encoder, self).__init__()
    self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
    self.dropout = nn.Dropout(dropout)

    self.lstm = nn.LSTM(
      embed_dim,
      hidden_dim,
      num_layers,
      batch_first=True,
      dropout=dropout if num_layers > 1 else 0,
      bidirectional=True
    )

    self.fc_hidden = nn.Linear(hidden_dim * 2, hidden_dim)
    self.fc_cell = nn.Linear(hidden_dim * 2, hidden_dim)

  def forward(self, src, src_lengths):
    embedded = self.dropout(self.embedding(src))

    packed_embedded = pack_padded_sequence(
      embedded, src_lengths.cpu(), batch_first=True, enforce_sorted=False
    )

    packed_outputs, (hidden, cell) = self.lstm(packed_embedded)

    outputs, _ = pad_packed_sequence(packed_outputs, batch_first=True)
    hidden = self._combine_bidirectional(hidden)
    cell = self._combine_bidirectional(cell)

    return outputs, hidden, cell

  def _combine_bidirectional(self, state):
    # state: (num_layers * 2, batch_size, hidden_dim)
    num_layers = state.shape[0] // 2
    batch_size = state.shape[1]
    hidden_dim = state.shape[2]
    state = state.reshape(num_layers, 2, batch_size, hidden_dim)
    state = torch.cat([state[:, 0, :, :], state[:, 1, :, :]], dim=2)
    state = torch.tanh(self.fc_hidden(state))
    return state

b.b. 注意力层

根据 Bahdanau Attention 计算公式:

eij=vaTtanh(Wasi1+Uahj)e_{ij} = v_a^T \, \text{tanh}\,(W_a s_{i-1} + U_a h_j)

我们的 hidden 就是 Decoder 前一次的状态,Encoder 的输出 encoder_output 作为参数输入,只需要引入全连接层对它们进行变换即可。最后和 Encoder 一样,专门引入一个全连接层来学习如何组合这些状态:

class BahdanauAttention(nn.Module):
  def __init__(self, hidden_dim, encoder_dim):
    super(BahdanauAttention, self).__init__()
    self.hidden_dim = hidden_dim
    self.encoder_dim = encoder_dim

    # Attention layers
    self.attn_hidden = nn.Linear(hidden_dim, hidden_dim)
    self.attn_encoder = nn.Linear(encoder_dim, hidden_dim)
    self.attn_combine = nn.Linear(hidden_dim, 1, bias=False)

  def forward(self, hidden, encoder_outputs, mask=None):
    hidden_proj = self.attn_hidden(hidden).unsqueeze(1)
    encoder_proj = self.attn_encoder(encoder_outputs)
    energy = torch.tanh(hidden_proj + encoder_proj)
    attention_scores = self.attn_combine(energy).squeeze(2)
    attention_scores = attention_scores.masked_fill(mask, -1e10)

    attention_weights = F.softmax(attention_scores, dim=1)
    context = torch.bmm(attention_weights.unsqueeze(1), encoder_outputs).squeeze(1)

    return context, attention_weights

c.c. Decoder

Decoder 是一个单向 LSTM 层,我们前面的 Attention 生成的上下文向量要用来指导它生成结果。最后我们将上下文向量、当前输出结果和当前嵌入向量拼接在一起作为生成最终结果的全连接层的输入。这是一个重要的技巧,可以保证最终预测时有丰富的信息:

class Decoder(nn.Module):
  def __init__(self, vocab_size, embed_dim, hidden_dim, encoder_dim, num_layers=2, dropout=0.3):
    super(Decoder, self).__init__()
    self.vocab_size = vocab_size
    self.hidden_dim = hidden_dim

    self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
    self.dropout = nn.Dropout(dropout)

    self.attention = BahdanauAttention(hidden_dim, encoder_dim)

    self.lstm = nn.LSTM(
      embed_dim + encoder_dim,
      hidden_dim,
      num_layers,
      batch_first=True,
      dropout=dropout if num_layers > 1 else 0
    )

    self.fc = nn.Linear(hidden_dim + encoder_dim + embed_dim, vocab_size)

  def forward(self, tgt, hidden, cell, encoder_outputs, src_mask=None):
    embedded = self.dropout(self.embedding(tgt))  # (batch_size, 1, embed_dim)

    context, attention_weights = self.attention(
      hidden[-1], encoder_outputs, src_mask
    )

    lstm_input = torch.cat([embedded, context.unsqueeze(1)], dim=2)

    output, (hidden, cell) = self.lstm(lstm_input, (hidden, cell))

    prediction_input = torch.cat([
      output.squeeze(1),      # (batch_size, hidden_dim)
      context,                # (batch_size, encoder_dim)
      embedded.squeeze(1)     # (batch_size, embed_dim)
    ], dim=1)

    prediction = self.fc(prediction_input)

    return prediction, hidden, cell, attention_weights

d.d. 最终 Seq2Seq 模型

i.i. 模型架构

最后我们把这些组件组装起来,就得到了最终的 Seq2Seq 模型:

class Seq2Seq(nn.Module):
  def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers, dropout):
    super().__init__()
    encoder_dim = hidden_dim * 2  # Bidirectional encoder

    self.encoder = Encoder(vocab_size, embed_dim, hidden_dim, num_layers, dropout)
    self.decoder = Decoder(vocab_size, embed_dim, hidden_dim, encoder_dim, num_layers, dropout)
    self.vocab_size = vocab_size

ii.ii. forward

Seq2Seq 模型的总体流程如下:

  1. 调用 Encoder 获得 encoder_output
  2. 之后循环调用 Decoder,并将上一次 Decoder 的 output 作为输入。

同时在处理 Decoder 的下一个 input 时,使用了 Teacher forcing 这个技巧:我们可以先直接把要翻译的下一个词作为 input、而不是用 Decoder 前一次的输入,这样可以让 Decoder 先学习正确的预测方式;之后等训练进行了一段时间、模型学习到了预测的方法之后,适当降低 Teacher forcing 比率、让模型学会不依赖提示自己生成正确结果:

def create_mask(self, src, pad_idx):
  return (src == pad_idx)

def forward(self, src, src_lengths, tgt, teacher_forcing_ratio=0.5):
  batch_size = src.shape[0]
  tgt_len = tgt.shape[1]

  encoder_outputs, hidden, cell = self.encoder(src, src_lengths)
  src_mask = self.create_mask(src, pad_idx=0)
  outputs = torch.zeros(batch_size, tgt_len - 1, self.vocab_size).to(src.device)

  # First decoder input is <sos>
  decoder_input = tgt[:, 0].unsqueeze(1)

  for t in range(1, tgt_len):
    output, hidden, cell, _ = self.decoder(
      decoder_input, hidden, cell, encoder_outputs, src_mask
    )

    outputs[:, t - 1] = output

    # Teacher forcing
    use_teacher_forcing = random.random() < teacher_forcing_ratio
    if use_teacher_forcing:
      decoder_input = tgt[:, t].unsqueeze(1)
    else:
      decoder_input = output.argmax(1).unsqueeze(1)

  return outputs

Teacher-forcing 的调度也很简单,每个一段时间下降一些即可:

class TeacherForcingScheduler:
  def __init__(self, initial_ratio=1.0, final_ratio=0.5, decay_epochs=10):
    self.initial_ratio = initial_ratio
    self.final_ratio = final_ratio
    self.decay_epochs = decay_epochs

  def get_ratio(self, epoch):
    if epoch >= self.decay_epochs:
      return self.final_ratio

    # Linear decay
    ratio = self.initial_ratio - (self.initial_ratio - self.final_ratio) * (epoch / self.decay_epochs)
    return ratio

iii.iii. inference

之后我们会使用训练好的 Seq2Seq 模型进行推断、通过推断结果进行整理得到翻译结果。推断过程和前向推理过程类似:我们获取 Encoder 的 output,然后一个一个发给 Decoder 生成对应结果:

def inference(self, src, src_lengths, sos_idx, eos_idx, max_len, device, pad_idx=0):
  self.eval()
  batch_size = src.shape[0]

  with torch.no_grad():
    encoder_outputs, hidden, cell = self.encoder(src, src_lengths)

    src_mask = self.create_mask(src, pad_idx)

    # Start with <sos>
    decoder_input = torch.full((batch_size, 1), sos_idx, dtype=torch.long, device=device)
    generated_tokens = []

    for _ in range(max_len):
      output, hidden, cell, _ = self.decoder(
        decoder_input, hidden, cell, encoder_outputs, src_mask
      )

      predicted_token = output.argmax(1)

      if batch_size == 1 and predicted_token.item() == eos_idx:
        break

      generated_tokens.append(predicted_token.item() if batch_size == 1 else predicted_token)
      decoder_input = predicted_token.unsqueeze(1)

  return generated_tokens

4. 训练与评估

a.a. trainevaluate

Seq2Seq 的训练评估和普通模型类似,不过需要注意维度匹配问题:模型输出的维度为 (batch_size, seq_len, output_dim),而我们的 label 为 (batch_size, seq_len)nn.CrossEntropyLoss 需要的输入是二维张量 (N, C)、输出是一维张量 (N)。于是我们对 output 和原有的 target 进行 reshape:

output = model(src, src_lengths, tgt, teacher_forcing_ratio=teacher_forcing_ratio)
output_dim = output.shape[-1]
output = output.reshape(-1, output_dim)
target = tgt[:, 1:].reshape(-1)

之后就和一般的模型没有区别了:

def train_epoch(model, dataloader, optimizer, criterion, device, teacher_forcing_ratio=0.5, clip=1.0):
  model.train()
  epoch_loss = 0

  for src, tgt, src_lengths, _ in tqdm(dataloader, desc="Training", leave=False):
    src, tgt, src_lengths = src.to(device), tgt.to(device), src_lengths.to(device)

    optimizer.zero_grad()

    output = model(src, src_lengths, tgt, teacher_forcing_ratio=teacher_forcing_ratio)

    # Reshape for loss calculation
    output_dim = output.shape[-1]
    output = output.reshape(-1, output_dim)
    target = tgt[:, 1:].reshape(-1)

    loss = criterion(output, target)
    loss.backward()

    # Gradient clipping
    torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

    optimizer.step()
    epoch_loss += loss.item()

  return epoch_loss / len(dataloader)

def evaluate(model, dataloader, criterion, device):
  model.eval()
  epoch_loss = 0

  with torch.no_grad():
    for src, tgt, src_lengths, _ in tqdm(dataloader, desc="Evaluating", leave=False):
      src, tgt, src_lengths = src.to(device), tgt.to(device), src_lengths.to(device)

      # Use teacher forcing for evaluation 
      output = model(src, src_lengths, tgt, teacher_forcing_ratio=1.0)

      output_dim = output.shape[-1]
      output_flat = output.reshape(-1, output_dim)
      target_flat = tgt[:, 1:].reshape(-1)

      loss = criterion(output_flat, target_flat)
      epoch_loss += loss.item()

  return epoch_loss / len(dataloader)

b.b. BLEU 分数计算

我们调用 nltk 库函数来计算 BLEU 分数:

def calculate_bleu(model, test_pairs, vocab, device, max_samples=500):
  """Separate function for BLEU calculation on a subset"""
  model.eval()
  all_targets = []
  all_predictions = []

  # Use only a subset for BLEU to save time
  test_pairs = test_pairs[:max_samples]

  for src_indices, tgt_indices in tqdm(test_pairs, desc="Calculating BLEU", leave=False):
    src = torch.LongTensor(src_indices).unsqueeze(0).to(device)
    src_lengths = torch.LongTensor([len(src_indices)]).to(device)

    predicted_indices = model.inference(
      src, src_lengths,
      vocab.word2idx['<sos>'],
      vocab.word2idx['<eos>'],
      max_len=50,
      device=device,
      pad_idx=vocab.word2idx['<pad>']
    )

    predicted_tokens = vocab.decode(predicted_indices, skip_special=True)
    target_tokens = vocab.decode(tgt_indices, skip_special=True)

    if predicted_tokens:  # Only add non-empty predictions
      all_targets.append([target_tokens])
      all_predictions.append(predicted_tokens)

  bleu_score = nltk.translate.bleu_score.corpus_bleu(
    all_targets, all_predictions,
    smoothing_function=nltk.translate.bleu_score.SmoothingFunction().method1
  )
  return bleu_score

5. 完整实现

下面是完整的 Jupyter Notebook: