Language Selection

Get healthy now with MedBeds!
Click here to book your session

Protect your whole family with Orgo-Life® Quantum MedBed Energy Technology® devices.

Advertising by Adpathway

         

 Advertising by Adpathway

Building a Plain Seq2Seq Model for Language Translation

9 months ago 31

PROTECT YOUR DNA WITH QUANTUM TECHNOLOGY

Orgo-Life the new way to the future

  Advertising by Adpathway

Sequence-to-sequence (seq2seq) models are powerful architectures for tasks that transform one sequence into another, such as machine translation. These models employ an encoder-decoder architecture, where the encoder processes the input sequence and the decoder generates an output sequence based on the encoder’s output. The attention mechanism was developed for seq2seq models, and understanding how seq2seq works helps clarify the rationale behind attention. In this post, you will explore how to build and train a plain seq2seq model with LSTM for language translation. Specifically:

  • How to implement an encoder-decoder architecture with LSTM cells in PyTorch
  • How to train the model using sentence pairs from a dataset
  • How to generate a variable-length sequence with a seq2seq model

Let’s get started.

Building a Plain Seq2Seq Model for Language Translation
Photo by David Emrich. Some rights reserved.

Overview

This post is divided into five parts; they are:

  • Preparing the Dataset for Training
  • Implementing the Seq2Seq Model with LSTM
  • Training the Seq2Seq Model
  • Using the Seq2Seq Model
  • Improving the Seq2Seq Model

Preparing the Dataset for Training

In a previous post, you learned how to build a transformer model for translating French sentences to English. In this post, you will reuse the same dataset and build a seq2seq model for the same task.

The seq2seq model consists of two main components: an encoder and a decoder. The encoder processes the input sequence (French sentences) and generates a fixed-size representation, known as the context vector. The decoder then uses this context vector to generate the output sequence (English sentences) one token at a time.

To train such a model, you need a dataset of sentence pairs. The model learns how to translate from the example sentence pairs in the dataset. You can source your own dataset. In this post, you will use the Anki dataset, which can be downloaded from https://www.manythings.org/anki/, and you can also use the copy hosted in Google:

import os

import requests

if not os.path.exists("fra-eng.zip"):

    url = "http://storage.googleapis.com/download.tensorflow.org/data/fra-eng.zip"

    response = requests.get(url)

    with open("fra-eng.zip", "wb") as f:

        f.write(response.content)

This is how you can use the requests library to download a file in Python. This zip file contains only one file, fra.txt, which is a plain text file. Each line consists of an English sentence, followed by a tab character, and then a corresponding sentence in French.

To make the data useful for training, it needs to be normalized. Firstly, French sentences are in Unicode, but some characters may have multiple representations. To help your model understand the sentence better, you want to normalize the Unicode representations, such as NFKC. You may also want to convert the alphabet to lowercase to reduce the size of the vocabulary (since the model will consider the same word in different cases as different words). You can read the sentence pairs and perform the normalization as follows:

import unicodedata

import zipfile

def normalize(line):

    """Normalize a line of text and split into two at the tab character"""

    line = unicodedata.normalize("NFKC", line.strip().lower())

    eng, fra = line.split("\t")

    return eng.lower().strip(), fra.lower().strip()

text_pairs = []

with zipfile.ZipFile("fra-eng.zip", "r") as zip_ref:

    for line in zip_ref.read("fra.txt").decode("utf-8").splitlines():

        eng, fra = normalize(line)

        text_pairs.append((eng, fra))

The model you will build is a seq2seq model using LSTM. It is a recurrent neural network that can handle variable-length sequences. It cannot handle sequences of words directly but needs them to be tokenized and encoded into numerical form first. You can create a dictionary as a tokenizer to map each word in the vocabulary to a unique integer. You can also use a more advanced technique, such as Byte Pair Encoding (BPE), which allows it to handle unknown words more effectively by recognizing subword units. Let’s create a separate tokenizer for English and French, respectively:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

import os

import tokenizers

if os.path.exists("en_tokenizer.json") and os.path.exists("fr_tokenizer.json"):

    en_tokenizer = tokenizers.Tokenizer.from_file("en_tokenizer.json")

    fr_tokenizer = tokenizers.Tokenizer.from_file("fr_tokenizer.json")

else:

    en_tokenizer = tokenizers.Tokenizer(tokenizers.models.BPE())

    fr_tokenizer = tokenizers.Tokenizer(tokenizers.models.BPE())

    # Configure pre-tokenizer to split on whitespace and punctuation, add space at beginning of the sentence

    en_tokenizer.pre_tokenizer = tokenizers.pre_tokenizers.ByteLevel(add_prefix_space=True)

    fr_tokenizer.pre_tokenizer = tokenizers.pre_tokenizers.ByteLevel(add_prefix_space=True)

    # Configure decoder: So that word boundary symbol "Ġ" will be removed

    en_tokenizer.decoder = tokenizers.decoders.ByteLevel()

    fr_tokenizer.decoder = tokenizers.decoders.ByteLevel()

    # Train BPE for English and French using the same trainer

    VOCAB_SIZE = 8000

    trainer = tokenizers.trainers.BpeTrainer(

        vocab_size=VOCAB_SIZE,

        special_tokens=["[start]", "[end]", "[pad]"],

        show_progress=True

    )

    en_tokenizer.train_from_iterator([x[0] for x in text_pairs], trainer=trainer)

    fr_tokenizer.train_from_iterator([x[1] for x in text_pairs], trainer=trainer)

    en_tokenizer.enable_padding(pad_id=en_tokenizer.token_to_id("[pad]"), pad_token="[pad]")

    fr_tokenizer.enable_padding(pad_id=fr_tokenizer.token_to_id("[pad]"), pad_token="[pad]")

    # Save the trained tokenizers

    en_tokenizer.save("en_tokenizer.json", pretty=True)

    fr_tokenizer.save("fr_tokenizer.json", pretty=True)

Here, the BPE tokenizer is from the tokenizers library. The trained tokenizers are saved to en_tokenizer.json and fr_tokenizer.json for future use. To train the BPE, you need to specify the maximum vocabulary size. The code above sets it to 8000, which is a small number (consider that this dataset has around 15,000 unique words in English and 30,000 unique words in French). You can increase the vocabulary size if you think the model is not performing the translation well. There are some special handling implementations in the BPE above:

  • The pre-tokenizer splits the text on whitespace and punctuation by default. But you also added a space at the beginning of the sentence so that all words are prefixed by a space. This helps to reuse vocabulary regardless of the word’s position in the sentence.
  • Three special tokens are added to the vocabulary: [start], [end], and [pad]. These tokens are added before the tokenizer is trained. The [pad] token, in particular, is set as the padding token to fill up the sentence to a longer sequence length

The BPE tokenizers are trained from the dataset, as stored in the list of string pairs text_pairs. The same trainer is used for both languages in the code above but the tokenizers are separate.

After the tokenizers are trained, you can test them on a few sentences:

# Test the tokenizer

print("Sample tokenization:")

en_sample, fr_sample = random.choice(text_pairs)

encoded = en_tokenizer.encode(en_sample)

print(f"Original: {en_sample}")

print(f"Tokens: {encoded.tokens}")

print(f"IDs: {encoded.ids}")

print(f"Decoded: {en_tokenizer.decode(encoded.ids)}")

print()

encoded = fr_tokenizer.encode("[start] " + fr_sample + " [end]")

print(f"Original: {fr_sample}")

print(f"Tokens: {encoded.tokens}")

print(f"IDs: {encoded.ids}")

print(f"Decoded: {fr_tokenizer.decode(encoded.ids)}")

print()

The output would be like the following:

Sample tokenization:

Original: it happens to all of us.

Tokens: ['Ġit', 'Ġhappens', 'Ġto', 'Ġall', 'Ġof', 'Ġus', '.']

IDs: [124, 1689, 80, 208, 128, 238, 12]

Decoded: it happens to all of us.

Original: ça nous arrive à tous.

Tokens: ['[start]', 'Ġça', 'Ġnous', 'Ġarrive', 'ĠÃł', 'Ġtous', '.', 'Ġ', '[end]']

IDs: [0, 220, 159, 1621, 123, 392, 14, 74, 1]

Decoded: ça nous arrive à tous.

Seq2Seq Architecture with LSTM

Traditionally, handling a sequence of arbitrary length using a neural network requires a recurrent neural network (RNN) architecture. It is a type of neural network where a module maintains a hidden state and updates it as it processes the sequence.

Several modules can be used to implement an RNN. LSTM is one of them. Building a simple LSTM encoder for the input sequence is straightforward:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

import torch

import torch.nn as nn

class EncoderLSTM(nn.Module):

    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers=1, dropout=0.1):

        super().__init__()

        self.vocab_size = vocab_size

        self.embedding_dim = embedding_dim

        self.hidden_dim = hidden_dim

        self.num_layers = num_layers

        self.embedding = nn.Embedding(vocab_size, embedding_dim)

        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers,

                            batch_first=True, dropout=dropout if num_layers > 1 else 0)

    def forward(self, input_seq):

        embedded = self.embedding(input_seq)

        outputs, (hidden, cell) = self.lstm(embedded)

        return outputs, hidden, cell

LSTM is special because it has two hidden states, named hidden and cell in the code above. In PyTorch, you don’t need to implement the recurrent structure. The module nn.LSTM can handle this well.

In the implementation above, you implemented the encoder part of the seq2seq model as a class derived from nn.Module. You expect to pass on a 2D tensor of integer IDs as a batch of input sequences. This input will be converted into a 3D tensor by replacing each token ID with an embedding vector. In the forward() function above, the variable embedded is a 3D tensor of shape (batch_size, seq_len, embedding_dim). This is then processed by the LSTM module. The output of the LSTM module is a 3D tensor of shape (batch_size, seq_len, hidden_dim), which corresponds to the hidden states of the LSTM at each step while processing the input sequence. The final hidden state and cell state are also returned.

Note that you created the LSTM module with batch_first=True, which means that the first dimension of the input tensor is the batch size. This is a common convention in language data. This module also sets num_layers in LSTM to 1 by default. It is believed that a multi-layer LSTM is more powerful; however, you will build a larger model and require a longer training time.

Creating the decoder part of the seq2seq model is similar, except that you also need to produce the output:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

class DecoderLSTM(nn.Module):

    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers=1, dropout=0.1):

        super().__init__()

        self.vocab_size = vocab_size

        self.embedding_dim = embedding_dim

        self.hidden_dim = hidden_dim

        self.num_layers = num_layers

        self.embedding = nn.Embedding(vocab_size, embedding_dim)

        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers,

                            batch_first=True, dropout=dropout if num_layers > 1 else 0)

        self.out = nn.Linear(embedding_dim, vocab_size)

    def forward(self, input_seq, hidden, cell):

        embedded = self.embedding(input_seq)

        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))

        prediction = self.out(output)

        return prediction, hidden, cell

The decoder LSTM is similar to the encoder LSTM. In the forward() method, the input sequence is the partial target sequence, and the hidden and cell states are the last hidden and cell states from the encoder’s LSTM module. When the decoder’s LSTM module is called, the encoder’s hidden and cell states are used. If not provided, the hidden and cell states are initialized to zeros, as in the encoder.

The input to the forward method is a 2D tensor of token IDs. This needs to be converted into a 3D tensor by the embedding layer before the LSTM module can consume it. The output of the LSTM module is a sequence of hidden states. They should be converted into a logit vector by a linear layer to predict the next token.

The design of the decoder module expects you to pass on a partial target sequence of shape (batch_size, seq_len). The forward() method returns you a predicted sequence of shape (batch, seq_len+1, hidden_dim), which is the output from the LSTM module, transformed by the linear layer. You take the last token in the sequence length dimension as the predicted next token. You need to call the decoder module multiple times to generate the entire target sequence.

To build a complete seq2seq model, you need to connect the encoder and decoder modules. This is how you can do it:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

class Seq2SeqLSTM(nn.Module):

    def __init__(self, encoder, decoder):

        super().__init__()

        self.encoder = encoder

        self.decoder = decoder

    def forward(self, input_seq, target_seq):

        batch_size, target_len = target_seq.shape

        device = target_seq.device

        outputs = []

        _enc_out, hidden, cell = self.encoder(input_seq)

        dec_in = target_seq[:, :1]

        for t in range(target_len-1):

            pred, hidden, cell = self.decoder(dec_in, hidden, cell)

            pred = pred[:, -1:, :]

            outputs.append(pred)

            dec_in = torch.cat([dec_in, pred.argmax(dim=2)], dim=1)

        outputs = torch.cat(outputs, dim=1)

        return outputs

This module just connects the encoder and decoder modules. The forward() method is created to help train the model. It takes the input sequence (in English) and the target sequence (in French) as input. The English sentence will be converted into “context vectors” using the encoder. The encoder also outputs a processed sequence, but it is not used.

The decoder set up the context vector as provided by the encoder in its LSTM module. Then process a partial target sequence to produce the next-token prediction. Initially, the decoder begins with the special token [start]. Iteratively, it produces one more token at a time until the length of the target sequence is filled.

Note that the model above does not read the content of the target sequence, but uses its length to control the number of iterations. Also, note that the same decoder is called multiple times within a single call to the forward() method.

Training the Seq2Seq Model

To train the above model for English-to-French translation, you need to create a dataset object such that you can iterate over the dataset in batches and in random order. You already collected the data in the previous section and stored it as text_pairs. PyTorch provides a Dataset class to help you shuffle and batch the data. This is how you can create a dataset object:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

import torch

from torch.utils.data import Dataset, DataLoader

class TranslationDataset(Dataset):

    def __init__(self, text_pairs):

        self.text_pairs = text_pairs

    def __len__(self):

        return len(self.text_pairs)

    def __getitem__(self, idx):

        en, fr = self.text_pairs[idx]

        return eng, "[start] " + fra + " [end]"

def collate_fn(batch):

    en_str, fr_str = zip(*batch)

    en_enc = en_tokenizer.encode_batch(en_str, add_special_tokens=True)

    fr_enc = fr_tokenizer.encode_batch(fr_str, add_special_tokens=True)

    en_ids = [enc.ids for enc in en_enc]

    fr_ids = [enc.ids for enc in fr_enc]

    return torch.tensor(en_ids), torch.tensor(fr_ids)

BATCH_SIZE = 32

dataset = TranslationDataset(text_pairs)

dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)

You can try to print one sample from the dataset:

for en_ids, fr_ids in dataloader:

    print(f"English: {en_ids}")

    print(f"French: {fr_ids}")

    break

The dataloader object is an iterable that scans the entire dataset in a random order. It returns a tuple of two tensors, each of shape (batch_size, seq_len). You will see that the two tensors you printed are integers, as token IDs are represented by integers.

The dataloader is created with the collate_fn() function. PyTorch dataloader only collects elements from a dataset object as a list, and each element in this case is a tuple of two strings. The collate function converts the strings into token IDs using the BPE tokenizers and then creates a PyTorch tensor.

The next step is to create a model. It is straightforward:

...

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

emb_dim = 256

hidden_dim = 256

num_layers = 1

encoder = EncoderLSTM(enc_vocab, emb_dim, hidden_dim, num_layers, dropout).to(device)

decoder = DecoderLSTM(dec_vocab, emb_dim, hidden_dim, num_layers, dropout).to(device)

model = Seq2SeqLSTM(encoder, decoder).to(device)

print(model)

This will print:

Seq2SeqLSTM(

  (encoder): EncoderLSTM(

    (embedding): Embedding(8000, 256)

    (lstm): LSTM(256, 256, batch_first=True)

  )

  (decoder): DecoderLSTM(

    (embedding): Embedding(8000, 256)

    (lstm): LSTM(256, 256, batch_first=True)

    (out): Linear(in_features=256, out_features=8000, bias=True)

  )

)

So you can see that the model is very simple. In fact, there are only 7 million parameters, but it is large enough to require a sizable time to train.

The code for training is as follows:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

optimizer = optim.Adam(model.parameters(), lr=0.001)

loss_fn = nn.CrossEntropyLoss(ignore_index=fr_tokenizer.token_to_id("[pad]"))

N_EPOCHS = 30

for epoch in range(N_EPOCHS):

    model.train()

    epoch_loss = 0

    for en_ids, fr_ids in dataloader:

        # Move the "sentences" to device

        en_ids = en_ids.to(device)

        fr_ids = fr_ids.to(device)

        # zero the grad, then forward pass

        optimizer.zero_grad()

        outputs = model(en_ids, fr_ids)

        # compute the loss: compare 3D logits to 2D targets

        loss = loss_fn(outputs.reshape(-1, dec_vocab), fr_ids[:, 1:].reshape(-1))

        loss.backward()

        optimizer.step()

        epoch_loss += loss.item()

    print(f"Epoch {epoch+1}/{N_EPOCHS}; Avg loss {epoch_loss/len(dataloader)}; Latest loss {loss.item()}")

    torch.save(model.state_dict(), f"seq2seq-epoch-{epoch+1}.pth")

    # Test

    if (epoch+1) % 5 != 0:

        continue

    model.eval()

    epoch_loss = 0

    with torch.no_grad():

        for en_ids, fr_ids in dataloader:

            en_ids = en_ids.to(device)

            fr_ids = fr_ids.to(device)

            outputs = model(en_ids, fr_ids)

            loss = loss_fn(outputs.reshape(-1, dec_vocab), fr_ids[:, 1:].reshape(-1))

            epoch_loss += loss.item()

    print(f"Eval loss: {epoch_loss/len(dataloader)}")

This is a simple training loop, and many techniques for improved training are not implemented. For example, train-test split of the data, early stopping, and gradient clipping are not used. What it does is to read the dataset in batches, then run the model with forward and backward passes, then update the model parameters.

The loss function used is cross-entropy, as the model is to predict the next token among the vocabulary. When you create the overall model, it generates the entire output sequence that matches the length of the target sequence. Therefore, the loss function can compare the sequence in one shot, rather than computing the loss token by token. However, in this application, the tensors are batches of sequences, and the sequences will be padded to match the longest length. A sequence should be terminated with the [end] token. The positions of padding tokens should be included in the overall loss calculation. That’s why the ignore_index parameter is used when we create the loss function with nn.CrossEntropyLoss().

If you have a separate test set, you can use that for evaluation. In the above, you reused the training data for evaluation once every 5 epochs in the latter half of the for-loop. Remember to toggle the model between model.train() and model.eval() for the correct training/inference behavior.

Using the Model

In the code above, you saved the model at the end of each epoch using torch.save(). When you have the model file, you can load it back using:

...

encoder = EncoderLSTM(enc_vocab, emb_dim, hidden_dim, num_layers, dropout).to(device)

decoder = DecoderLSTM(dec_vocab, emb_dim, hidden_dim, num_layers, dropout).to(device)

model = Seq2SeqLSTM(encoder, decoder).to(device)

model.load_state_dict(torch.load("seq2seq.pth"))

With a trained model, you can use it to generate translations. However, you do not use the same forward() method as in the training. Instead, you use a loop to call the decoder multiple times until the target sequence is generated.

Below is an implementation on how to perform translation of a few random sentences from the original dataset:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

import random

model.eval()

N_SAMPLES = 5

MAX_LEN = 60

with torch.no_grad():

    start_token = torch.tensor([fr_tokenizer.token_to_id("[start]")]).to(device)

    for en, true_fr in random.sample(text_pairs, N_SAMPLES):

        en_ids = torch.tensor(en_tokenizer.encode(en).ids).unsqueeze(0).to(device)

        _output, hidden, cell = model.encoder(en_ids)

        pred_ids = [start_token]

        for _ in range(MAX_LEN):

            decoder_input = torch.tensor(pred_ids).unsqueeze(0).to(device)

            output, hidden, cell = model.decoder(decoder_input, hidden, cell)

            output = output[:, -1, :].argmax(dim=1)

            pred_ids.append(output.item())

            # stop if the predicted token is the end token

            if pred_ids[-1] == fr_tokenizer.token_to_id("[end]"):

                break

        # Decode the predicted IDs

        pred_fr = fr_tokenizer.decode(pred_ids)

        print(f"English: {en}")

        print(f"French: {true_fr}")

        print(f"Predicted: {pred_fr}")

        print()

Initially, switch the model into evaluation mode and run it under the context torch.no_grad(). This will save time and memory.

You pick a few samples from the dataset using random.sample(). The input sentence (English) is tokenized and encoded into the tensor en_ids. It is a 2D tensor of shape (1, seq_len), as the model always expects a batch of sequences, even if the batch size is 1.

You run the English sentence through the model’s encoder to extract the context vector, which represents the final state of the LSTM module. Then, you start with the special token [start] and generate the French sentence in a loop.

This is a typical loop to use the seq2seq model. You expect the model to generate the [end] token eventually; otherwise, you will stop the generation when the length of the generated sequence reaches the maximum length. In each iteration of the loop, you create a new input tensor for the decoder. Then the decoder will generate one extra token, as the last token in the decoder’s output sequence. This output is a logit vector of the size of the vocabulary. You take the token with the highest probability as the next token, via the argmax() method in PyTorch.

The list pred_ids accumulates the list of token IDs. Each iteration of the loop generates the input tensor for the decoder based on this list. When the loop terminates, you run the tokenizer again to convert the token IDs into a string of sentences.

When you run the code above, you may see the following output:

English: it was his silence that made her angry.

French: ce fut son silence qui la mit en colère.

Predicted: ce fut son silence qui qui mit colère colère.

English: you're the teacher.

French: tu es le professeur.

Predicted: c'est professeur.

Improving the Model

The above outlines how you can build a plain seq2seq model with LSTM for translation. As you can see from above, the output is not perfect. There are several ways to improve it:

  • Improve the tokenizer: The vocabulary size used is small, which may limit the model’s ability to understand word meanings. You can improve the model by incorporating a larger vocabulary. But this may require more training data.
  • Use a larger model: One layer of LSTM is used above, and you may see an improvement if you use more layers. You can also add dropout to the LSTM module to prevent overfitting when more layers are used.
  • Improve the training: Split the dataset into training and test sets, and use the test set to evaluate the model. In this case, it is easier to determine which epoch produced the best model, allowing you to use it for inference or to early stop the training. You can also tell if the model is converged by monitoring the loss on the test set.
  • Experiment with a different decoder model: The decoder above runs the entire target partial sequence with the encoder’s state as the initial state. Alternatively, you can pass on only the last token to the decoder to generate the next token. The difference is that the initial state is used directly to generate the next token in the latter, while the former will mutate the states by scanning the previously generated sequences. It is believed that a recurrent neural network is easy to “forget” the initial state (i.e., the context vector) when the sequence is long.

For completeness, below is the complete code you created in this post:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

import random

import os

import re

import unicodedata

import zipfile

import requests

import torch

import torch.nn as nn

import torch.optim as optim

import tokenizers

import tqdm

#

# Data preparation

#

# Download dataset provided by Anki: https://www.manythings.org/anki/ with requests

if not os.path.exists("fra-eng.zip"):

    url = "http://storage.googleapis.com/download.tensorflow.org/data/fra-eng.zip"

    response = requests.get(url)

    with open("fra-eng.zip", "wb") as f:

        f.write(response.content)

# Normalize text

# each line of the file is in the format "<english>\t<french>"

# We convert text to lowercasee, normalize unicode (UFKC)

def normalize(line):

    """Normalize a line of text and split into two at the tab character"""

    line = unicodedata.normalize("NFKC", line.strip().lower())

    eng, fra = line.split("\t")

    return eng.lower().strip(), fra.lower().strip()

text_pairs = []

with zipfile.ZipFile("fra-eng.zip", "r") as zip_ref:

    for line in zip_ref.read("fra.txt").decode("utf-8").splitlines():

        eng, fra = normalize(line)

        text_pairs.append((eng, fra))

#

# Tokenization with BPE

#

if os.path.exists("en_tokenizer.json") and os.path.exists("fr_tokenizer.json"):

    en_tokenizer = tokenizers.Tokenizer.from_file("en_tokenizer.json")

    fr_tokenizer = tokenizers.Tokenizer.from_file("fr_tokenizer.json")

else:

    en_tokenizer = tokenizers.Tokenizer(tokenizers.models.BPE())

    fr_tokenizer = tokenizers.Tokenizer(tokenizers.models.BPE())

    # Configure pre-tokenizer to split on whitespace and punctuation, add space at beginning of the sentence

    en_tokenizer.pre_tokenizer = tokenizers.pre_tokenizers.ByteLevel(add_prefix_space=True)

    fr_tokenizer.pre_tokenizer = tokenizers.pre_tokenizers.ByteLevel(add_prefix_space=True)

    # Configure decoder: So that word boundary symbol "Ġ" will be removed

    en_tokenizer.decoder = tokenizers.decoders.ByteLevel()

    fr_tokenizer.decoder = tokenizers.decoders.ByteLevel()

    # Train BPE for English and French using the same trainer

    VOCAB_SIZE = 8000

    trainer = tokenizers.trainers.BpeTrainer(

        vocab_size=VOCAB_SIZE,

        special_tokens=["[start]", "[end]", "[pad]"],

        show_progress=True

    )

    en_tokenizer.train_from_iterator([x[0] for x in text_pairs], trainer=trainer)

    fr_tokenizer.train_from_iterator([x[1] for x in text_pairs], trainer=trainer)

    en_tokenizer.enable_padding(pad_id=en_tokenizer.token_to_id("[pad]"), pad_token="[pad]")

    fr_tokenizer.enable_padding(pad_id=fr_tokenizer.token_to_id("[pad]"), pad_token="[pad]")

    # Save the trained tokenizers

    en_tokenizer.save("en_tokenizer.json", pretty=True)

    fr_tokenizer.save("fr_tokenizer.json", pretty=True)

# Test the tokenizer

print("Sample tokenization:")

en_sample, fr_sample = random.choice(text_pairs)

encoded = en_tokenizer.encode(en_sample)

print(f"Original: {en_sample}")

print(f"Tokens: {encoded.tokens}")

print(f"IDs: {encoded.ids}")

print(f"Decoded: {en_tokenizer.decode(encoded.ids)}")

print()

encoded = fr_tokenizer.encode("[start] " + fr_sample + " [end]")

print(f"Original: {fr_sample}")

print(f"Tokens: {encoded.tokens}")

print(f"IDs: {encoded.ids}")

print(f"Decoded: {fr_tokenizer.decode(encoded.ids)}")

print()

#

# Create PyTorch dataset for the BPE-encoded translation pairs

#

class TranslationDataset(torch.utils.data.Dataset):

    def __init__(self, text_pairs):

        self.text_pairs = text_pairs

    def __len__(self):

        return len(self.text_pairs)

    def __getitem__(self, idx):

        eng, fra = self.text_pairs[idx]

        return eng, "[start] " + fra + " [end]"

def collate_fn(batch):

    en_str, fr_str = zip(*batch)

    en_enc = en_tokenizer.encode_batch(en_str, add_special_tokens=True)

    fr_enc = fr_tokenizer.encode_batch(fr_str, add_special_tokens=True)

    en_ids = [enc.ids for enc in en_enc]

    fr_ids = [enc.ids for enc in fr_enc]

    return torch.tensor(en_ids), torch.tensor(fr_ids)

BATCH_SIZE = 32

dataset = TranslationDataset(text_pairs)

dataloader = torch.utils.data.DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)

# Test the dataset

for en_ids, fr_ids in dataloader:

    print(f"English: {en_ids}")

    print(f"French: {fr_ids}")

    break

#

# Create LSTM seq2seq model for translation

#

class EncoderLSTM(nn.Module):

    """A stacked LSTM encoder with an embedding layer"""

    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers=1, dropout=0.1):

        """

        Plain LSTM is used. No bidirectional LSTM.

        Args:

            vocab_size: The size of the input vocabulary

            embedding_dim: The dimension of the embedding vector

            hidden_dim: The dimension of the hidden state

            num_layers: The number of recurrent layers (layers of stacked LSTM)

            dropout: The dropout rate, applied to all LSTM layers except the last one

        """

        super().__init__()

        self.vocab_size = vocab_size

        self.embedding_dim = embedding_dim

        self.hidden_dim = hidden_dim

        self.num_layers = num_layers

        self.embedding = nn.Embedding(vocab_size, embedding_dim)

        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers,

                            batch_first=True, dropout=dropout if num_layers > 1 else 0)

    def forward(self, input_seq):

        # input seq = [batch_size, seq_len] -> embedded = [batch_size, seq_len, embedding_dim]

        embedded = self.embedding(input_seq)

        # outputs = [batch_size, seq_len, embedding_dim]

        # hidden = cell = [n_layers, batch_size, hidden_dim]

        outputs, (hidden, cell) = self.lstm(embedded)

        return outputs, hidden, cell

class DecoderLSTM(nn.Module):

    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers=1, dropout=0.1):

        super().__init__()

        self.vocab_size = vocab_size

        self.embedding_dim = embedding_dim

        self.hidden_dim = hidden_dim

        self.num_layers = num_layers

        self.embedding = nn.Embedding(vocab_size, embedding_dim)

        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers,

                            batch_first=True, dropout=dropout if num_layers > 1 else 0)

        self.out = nn.Linear(embedding_dim, vocab_size)

    def forward(self, input_seq, hidden, cell):

        # input seq = [batch_size, seq_len] -> embedded = [batch_size, seq_len, embedding_dim]

        # hidden = cell = [n_layers, batch_size, hidden_dim]

        embedded = self.embedding(input_seq)

        # output = [batch_size, seq_len, embedding_dim]

        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))

        prediction = self.out(output)

        return prediction, hidden, cell

class Seq2SeqLSTM(nn.Module):

    def __init__(self, encoder, decoder):

        super().__init__()

        self.encoder = encoder

        self.decoder = decoder

    def forward(self, input_seq, target_seq):

        """Given the partial target sequence, predict the next token"""

        # input seq = [batch_size, seq_len]

        # target seq = [batch_size, seq_len]

        batch_size, target_len = target_seq.shape

        device = target_seq.device

        # storing output logits

        outputs = []

        # encoder forward pass

        _enc_out, hidden, cell = self.encoder(input_seq)

        dec_in = target_seq[:, :1]

        # decoder forward pass

        for t in range(target_len-1):

            # last target token and hidden states -> next token

            pred, hidden, cell = self.decoder(dec_in, hidden, cell)

            # store the prediction

            pred = pred[:, -1:, :]

            outputs.append(pred)

            # use the predicted token as the next input

            dec_in = torch.cat([dec_in, pred.argmax(dim=2)], dim=1)

        outputs = torch.cat(outputs, dim=1)

        return outputs

# Initialize model parameters

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

enc_vocab = len(en_tokenizer.get_vocab())

dec_vocab = len(fr_tokenizer.get_vocab())

emb_dim = 256

hidden_dim = 256

num_layers = 2

dropout = 0.1

# Create model

encoder = EncoderLSTM(enc_vocab, emb_dim, hidden_dim, num_layers, dropout).to(device)

decoder = DecoderLSTM(dec_vocab, emb_dim, hidden_dim, num_layers, dropout).to(device)

model = Seq2SeqLSTM(encoder, decoder).to(device)

print(model)

print("Model created with:")

print(f"  Input vocabulary size: {enc_vocab}")

print(f"  Output vocabulary size: {dec_vocab}")

print(f"  Embedding dimension: {emb_dim}")

print(f"  Hidden dimension: {hidden_dim}")

print(f"  Number of layers: {num_layers}")

print(f"  Dropout: {dropout}")

print(f"  Total parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad)}")

# Train unless model.pth exists

if os.path.exists("seq2seq.pth"):

    model.load_state_dict(torch.load("seq2seq.pth"))

else:

    optimizer = optim.Adam(model.parameters(), lr=0.001)

    loss_fn = nn.CrossEntropyLoss(ignore_index=fr_tokenizer.token_to_id("[pad]"))

    N_EPOCHS = 30

    for epoch in range(N_EPOCHS):

        model.train()

        epoch_loss = 0

        for en_ids, fr_ids in tqdm.tqdm(dataloader, desc="Training"):

            # Move the "sentences" to device

            en_ids = en_ids.to(device)

            fr_ids = fr_ids.to(device)

            # zero the grad, then forward pass

            optimizer.zero_grad()

            outputs = model(en_ids, fr_ids)

            # compute the loss: compare 3D logits to 2D targets

            loss = loss_fn(outputs.reshape(-1, dec_vocab), fr_ids[:, 1:].reshape(-1))

            loss.backward()

            optimizer.step()

            epoch_loss += loss.item()

        print(f"Epoch {epoch+1}/{N_EPOCHS}; Avg loss {epoch_loss/len(dataloader)}; Latest loss {loss.item()}")

        torch.save(model.state_dict(), f"seq2seq-epoch-{epoch+1}.pth")

        # Test

        if (epoch+1) % 5 != 0:

            continue

        model.eval()

        epoch_loss = 0

        with torch.no_grad():

            for en_ids, fr_ids in tqdm.tqdm(dataloader, desc="Evaluating"):

                en_ids = en_ids.to(device)

                fr_ids = fr_ids.to(device)

                outputs = model(en_ids, fr_ids)

                loss = loss_fn(outputs.reshape(-1, dec_vocab), fr_ids[:, 1:].reshape(-1))

                epoch_loss += loss.item()

        print(f"Eval loss: {epoch_loss/len(dataloader)}")

    # Save the final model

    torch.save(model.state_dict(), "seq2seq.pth")

# Test for a few samples

model.eval()

N_SAMPLES = 5

MAX_LEN = 60

with torch.no_grad():

    start_token = torch.tensor([fr_tokenizer.token_to_id("[start]")]).to(device)

    for en, true_fr in random.sample(text_pairs, N_SAMPLES):

        en_ids = torch.tensor(en_tokenizer.encode(en).ids).unsqueeze(0).to(device)

        _output, hidden, cell = model.encoder(en_ids)

        pred_ids = [start_token]

        for _ in range(MAX_LEN):

            decoder_input = torch.tensor(pred_ids).unsqueeze(0).to(device)

            output, hidden, cell = model.decoder(decoder_input, hidden, cell)

            output = output[:, -1, :].argmax(dim=1)

            pred_ids.append(output.item())

            # early stop if the predicted token is the end token

            if pred_ids[-1] == fr_tokenizer.token_to_id("[end]"):

                break

        # Decode the predicted IDs

        pred_fr = fr_tokenizer.decode(pred_ids)

        print(f"English: {en}")

        print(f"French: {true_fr}")

        print(f"Predicted: {pred_fr}")

        print()

Further Readings

Below are some resources that you may find useful:

Summary

In this post, you learned about building and training a seq2seq model with LSTM for English to French translation. Specifically, you learned about:

  • How encoder-decoder architectures work with LSTM cells
  • How to prepare the dataset for training a seq2seq model
  • How to implement and train the complete translation model in PyTorch

The implementation is straightforward, but it outlines the general mechanism for the seq2seq model.

Learn Transformers and Attention!

Building Transformer Models with Attention

Teach your deep learning model to read a sentence

...using transformer models with attention

Discover how in my new Ebook:
Building Transformer Models with Attention

It provides self-study tutorials with working code to guide you into building a fully-working transformer models that can
translate sentences from one language to another...

Give magical power of understanding human language for
Your Projects


See What's Inside

No comments yet.

Read Entire Article

         

        

Start the new Vibrations with a Medbed Franchise today!  

Protect your whole family with Quantum Orgo-Life® devices

  Advertising by Adpathway