完善以下代码(可以任意修改以下代码,但是需要使用sequence to sequence model 和 attention mechanism 和 encoder-decoder),补全一个可以用于英语语法纠正的语料库来进行训练并得到最终输出结果,并显示语法纠正器的正确率
(这个语料库是需要从网上找得到的具体的语料库,而不是自定义的)
好的回答一定采纳
#!/usr/bin/env python
# coding: utf-8
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import XLNetTokenizer, XLNetModel
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
class GrammarCorrectionDataset(Dataset):
def __init__(self, source_sentences, target_sentences, tokenizer):
self.source_sentences = source_sentences
self.target_sentences = target_sentences
self.tokenizer = tokenizer
def __len__(self):
return len(self.source_sentences)
def __getitem__(self, idx):
source_sentence = self.source_sentences[idx]
target_sentence = self.target_sentences[idx]
# Tokenize source and target sentences
source_tokens = self.tokenizer.encode(source_sentence, add_special_tokens=True)
target_tokens = self.tokenizer.encode(target_sentence, add_special_tokens=True)
return {'source_tokens': source_tokens, 'target_tokens': target_tokens}
def preprocess_data(sentences):
stop_words = set(stopwords.words('english'))
preprocessed_sentences = []
for sentence in sentences:
# Tokenize the sentence
tokens = word_tokenize(sentence)
# Remove stop words
tokens = [token for token in tokens if token.lower() not in stop_words]
# Join tokens into a preprocessed sentence
preprocessed_sentence = ' '.join(tokens)
preprocessed_sentences.append(preprocessed_sentence)
return preprocessed_sentences
class Encoder(nn.Module):
def __init__(self, vocab_size, embed_size, enc_hidden_size, dec_hidden_size, dropout=0.2):
super(Encoder, self).__init__()
self.embed = nn.Embedding(vocab_size, embed_size)
self.rnn = nn.GRU(embed_size, enc_hidden_size, batch_first=True, bidirectional=True)
self.dropout = nn.Dropout(dropout)
self.fc = nn.Linear(enc_hidden_size * 2, dec_hidden_size)
def forward(self, x, lengths):
sorted_len, sorted_idx = lengths.sort(0, descending=True)
x_sorted = x[sorted_idx.long()]
embedded = self.dropout(self.embed(x_sorted))
packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, sorted_len.long().cpu().data.numpy(),
batch_first=True)
packed_out, hid = self.rnn(packed_embedded)
out, _ = nn.utils.rnn.pad_packed_sequence(packed_out, batch_first=True)
_, original_idx = sorted_idx.sort(0, descending=False)
out = out[original_idx.long()].contiguous()
hid = hid[:, original_idx.long()].contiguous()
hid = torch.cat([hid[-2], hid[-1]], dim=1)
hid = torch.tanh(self.fc(hid)).unsqueeze(0)
return out, hid
class Attention(nn.Module):
def __init__(self, enc_hidden_size, dec_hidden_size):
super(Attention, self).__init__()
self.enc_hidden_size = enc_hidden_size
self.dec_hidden_size = dec_hidden_size
self.attn = nn.Linear((enc_hidden_size * 2) + dec_hidden_size, dec_hidden_size)
self.v = nn.Parameter(torch.rand(dec_hidden_size))
def forward(self, hidden, encoder_outputs, mask):
batch_size = encoder_outputs.shape[0]
src_len = encoder_outputs.shape[1]
hidden = hidden.repeat(src_len, 1, 1).transpose(0, 1)
encoder_outputs = encoder_outputs.transpose(0, 1)
energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
attention = torch.matmul(energy, self.v)
attention = attention.squeeze(2)
attention = attention.masked_fill(mask == 0, -1e10)
return nn.functional.softmax(attention, dim=1)
class Decoder(nn.Module):
def __init__(self, vocab_size, embed_size, enc_hidden_size, dec_hidden_size, dropout=0.2):
super(Decoder, self).__init__()
self.embed = nn.Embedding(vocab_size, embed_size)
self.rnn = nn.GRU((enc_hidden_size * 2) + embed_size, dec_hidden_size, batch_first=True)
self.attention = Attention(enc_hidden_size, dec_hidden_size)
self.out = nn.Linear((enc_hidden_size * 2) + dec_hidden_size + embed_size, vocab_size)
self.dropout = nn.Dropout(dropout)
def forward(self, x, hidden, encoder_outputs, mask):
x = x.unsqueeze(1)
embedded = self.dropout(self.embed(x))
a = self.attention(hidden, encoder_outputs, mask)
a = a.unsqueeze(1)
encoder_outputs = encoder_outputs.transpose(0, 1)
weighted = torch.bmm(a, encoder_outputs)
weighted = weighted.transpose(0, 1)
rnn_input = torch.cat((embedded, weighted), dim=2)
output, hidden = self.rnn(rnn_input, hidden.unsqueeze(0))
assert (output == hidden).all()
assert (hidden == hidden.squeeze(0)).all()
embedded = embedded.squeeze(1)
output = output.squeeze(0)
weighted = weighted.squeeze(0)
prediction = self.out(torch.cat((output, weighted, embedded), dim=1))
return prediction, hidden.squeeze(0)
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder):
super(Seq2Seq, self).__init__()
self.encoder = encoder
self.decoder = decoder
def create_mask(self, x):
mask = (x != 0).byte()
return mask
def forward(self, source, source_lengths, target, teacher_forcing_ratio=0.5):
batch_size = source.shape[0]
max_len = target.shape[1]
vocab_size = self.decoder.out.out_features
outputs = torch.zeros(batch_size, max_len, vocab_size).to(source.device)
encoder_outputs, hidden = self.encoder(source, source_lengths)
x = target[:, 0]
mask = self.create_mask(source)
for t in range(1, max_len):
output, hidden = self.decoder(x, hidden, encoder_outputs, mask)
outputs[:, t] = output
teacher_force = torch.rand(1) < teacher_forcing_ratio
best_guess = output.argmax(1)
x = target[:, t] if teacher_force else best_guess
return outputs
def tokenize_sentences(sentences, tokenizer):
tokenized_sentences = []
for sentence in sentences:
tokens = tokenizer.tokenize(sentence)
tokenized_sentences.append(tokens)
return tokenized_sentences
def index_tokens(tokenized_sentences, tokenizer):
indexed_sentences = []
for tokens in tokenized_sentences:
indexed = tokenizer.convert_tokens_to_ids(tokens)
indexed_sentences.append(indexed)
return indexed_sentences
def pad_sentences(indexed_sentences, max_length):
padded_sentences = []
for indexed in indexed_sentences:
padded = indexed + [0] * (max_length - len(indexed))
padded_sentences.append(padded)
return padded_sentences
def main():
# Set random seed for reproducibility
torch.manual_seed(42)
# Load the XLNet tokenizer
tokenizer = XLNetTokenizer.from_pretrained('xlnet-base-cased')
# Preprocess the source and target sentences
source_sentences = ["I lik dogs", "The brown cat sat on the mat"]
target_sentences = ["I like dogs", "The brown cat sat on the mat"]
preprocessed_source_sentences = preprocess_data(source_sentences)
preprocessed_target_sentences = preprocess_data(target_sentences)
# Tokenize and index the source and target sentences
tokenized_source_sentences = tokenize_sentences(preprocessed_source_sentences, tokenizer)
tokenized_target_sentences = tokenize_sentences(preprocessed_target_sentences, tokenizer)
indexed_source_sentences = index_tokens(tokenized_source_sentences, tokenizer)
indexed_target_sentences = index_tokens(tokenized_target_sentences, tokenizer)
# Pad the indexed sentences to a fixed length
max_length = max(max(len(s) for s in indexed_source_sentences),
max(len(s) for s in indexed_target_sentences))
padded_source_sentences = pad_sentences(indexed_source_sentences, max_length)
padded_target_sentences = pad_sentences(indexed_target_sentences, max_length)
# Convert the padded sentences to PyTorch tensors
source_tensor = torch.tensor(padded_source_sentences)
target_tensor = torch.tensor(padded_target_sentences)
# Create the dataset and data loader
dataset = GrammarCorrectionDataset(source_tensor, target_tensor, tokenizer)
data_loader = DataLoader(dataset, batch_size=2, shuffle=True)
# Define the model
vocab_size = len(tokenizer)
embed_size = 768
enc_hidden_size = 256
dec_hidden_size = 256
encoder = Encoder(vocab_size, embed_size, enc_hidden_size, dec_hidden_size)
decoder = Decoder(vocab_size, embed_size, enc_hidden_size, dec_hidden_size)
model = Seq2Seq(encoder, decoder)
# Define the optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss(ignore_index=0)
# Train the model
num_epochs = 10
for epoch in range(num_epochs):
for batch in data_loader:
source_tokens = batch['source_tokens']
target_tokens = batch['target_tokens']
source_lengths = torch.sum(source_tokens != 0, dim=1)
# Forward pass
outputs = model(source_tokens, source_lengths, target_tokens)
# Reshape the outputs and target tokens to compute the loss
loss = criterion(outputs.reshape(-1, outputs.shape[2]), target_tokens.reshape(-1))
# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Print the loss every few epochs
if (epoch + 1) % 5 == 0:
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
print("Training complete!")
if __name__ == '__main__':
main()