本文前置知识:
- Pytorch基本操作
- Word2Vec
Pytorch实现: Skip-Gram
本文用Pytorch实现了Skip - Gram, 它是Word2Vec的其中一种. 本文实现参考PyTorch 实现 Word2Vec, 如果理解上有困难, 另外推荐该博主更简单的实现版本Word2Vec 的 PyTorch 实现(乞丐版), 以及其Word2Vec讲解Word2Vec.
本文的代码已经放到了Colab上, 打开设置GPU就可以复现(需要科学上网).
如果你不能科学上网, 应该看不到Open in Colab
的图标.
Preparing
首先我们先导包:
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils import data as tud
from torch import optim
from collections import Counter
import numpy as np
Counter
会应用于等会为字典计数.
设置GPU:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Current Device:", device)
如果在GPU可用的情况下, device = 'cuda'
, 否则device = cpu
.
定义其他参数:
MAX_VOCAB = 10000
window = 3
negative_sample = 15
hidden = 128
batch_size = 256
epochs = 2
lr = 1e-3
dtype=torch.FloatTensor
# set random seed to ensure result is reproducible
def set_random():
import random
np.random.seed(1116)
torch.manual_seed(1116)
random.seed(1116)
set_random()
MAX_VOCAB
是最大词表中的单词. window
指的是除去中心词后, 窗口中单侧的词数. negative_sample
指的是对于每个窗口中除去中心词的其他词, 进行多少次负采样, 即总共负采样window * 2 * negative_sample
个单词.
Getting Information from Text
导入文件, 并初始化词表, 以及后续需要用到的参数.
with open ('./drive/My Drive/Colab Notebooks/text8.train.txt', 'r') as f:
text = f.read()
text = text.lower().split()
# We can only use MAX_VOCAB - 1 words we use <UNK> as a word.
vocab = dict(Counter(text).most_common(MAX_VOCAB - 1))
# the count of <UNK> is text length - other words' count
vocab['<UNK>'] = len(text) - np.sum(list(vocab.values()))
# save the mapping pair of word to index
word2idx = {word: i for i, word in enumerate(vocab.keys())}
idx2word = {i: word for i, word in enumerate(vocab.keys())}
word_count = np.array([count for count in vocab.values()], dtype=np.float32)
word_freqs = word_count / np.sum(word_count)
# refer to original paper
word_freqs = word_freqs ** (3./4.)
数据集下载地址: 链接: https://pan.baidu.com/s/1j52-cQiIvHpbTGW312f4aw 提取码: af3p
DataSet
创建一个专门给Embedding用的数据集:
class EmbeddingDataset(tud.Dataset):
def __init__(self, text, word2idx, word_freqs):
super(EmbeddingDataset, self).__init__()
self.text_encoded = [word2idx.get(word, word2idx['<UNK>']) for word in text]
self.text_encoded = torch.LongTensor(self.text_encoded)
self.word2idx = word2idx
self.word_freqs = torch.Tensor(word_freqs)
def __len__(self):
return len(self.text_encoded)
def __getitem__(self, idx):
center_word = self.text_encoded[idx]
# get words in window exception center word
pos_idx = [i for i in range(idx - window, idx)] + [i for i in range(idx+1, idx + window + 1)]
pos_idx = [i % len(self.text_encoded) for i in pos_idx]
pos_words = self.text_encoded[pos_idx]
neg_mask = torch.Tensor(self.word_freqs.clone())
neg_mask[pos_words] = 0
neg_words = torch.multinomial(neg_mask, negative_sample * pos_words.shape[0], True)
# check if negative sample failure exists
if len(set(pos_words.numpy().tolist()) & set(neg_words.numpy().tolist())) > 0:
print('Need to resample.')
return center_word, pos_words, neg_words
我们需要自行实现的函数包括__init__
, __len__
, __getitem__
.
这个数据集创建好后, 等会就可以用torch.utils.data
中的Dataloader
进行加载了.
注意, 如果设定的负采样数比较大, 千万不要采用如下代码:
while len(set(pos_words.numpy().tolist()) & set(neg_words.numpy().tolist())) > 0: neg_words = torch.multinomial(self.word_freqs, negative_sample * pos_words.shape[0], True) print("Negative sample false")
这样会导致负采样次数大量增加. 我开始就纳闷为什么训练速度这么慢, 后来发现是采样会重复很多次.
因为对于我们的训练来说, 负采样时根据单词出现的概率采样, 有非常大概率采样到窗口中已经出现的词, 这样在计算Loss时会出现问题. 正确的做法应该是像我写的一样, 将单词的概率做一份拷贝, 然后将窗口词和中心词在拷贝中的概率全部置零, 然后再采样, 这样即使是使用
torch.multionmial
, 也不会采样到出现在窗口内的词.
Skip - Gram
然后定义Word2Vec的模型, 直接把损失函数放到里面了:
class Word2Vec(nn.Module):
def __init__(self, vocab_size, hidden):
super(Word2Vec, self).__init__()
self.vocab_size = vocab_size
self.hidden = hidden
# we use two embedding between input word and other words in window
self.in_embedding = nn.Embedding(self.vocab_size, self.hidden)
self.out_embedding = nn.Embedding(self.vocab_size, self.hidden)
def forward(self, input_labels, pos_labels, neg_labels):
input_embedding = self.in_embedding(input_labels) # [batch, hidden]
pos_embedding = self.out_embedding(pos_labels) # [batch, window * 2, hidden]
neg_embedding = self.out_embedding(neg_labels) # [batch, window * 2 * k, hidden]
input_embedding = input_embedding.unsqueeze(2) # [batch, hidden, 1] must be the same dimension when use torch.bmm
pos_dot = torch.bmm(pos_embedding, input_embedding) # [batch, window * 2, 1]
neg_dot = torch.bmm(neg_embedding, -input_embedding) # [batch, window * 2 * k, 1]
pos_dot = pos_dot.squeeze(2) # [batch, window * 2]
neg_dot = neg_dot.squeeze(2) # [batch, window * 2 * k]
pos_loss = F.logsigmoid(pos_dot).sum(1)
neg_loss = F.logsigmoid(neg_dot).sum(1)
loss = neg_loss + pos_loss
return -loss
def get_input_embedding(self):
# get weights to build an application for evaluation
return self.in_embedding.weight.detach()
在对Tensor进行操作时, 一定要时刻追踪Tensor维度的变换意义.
在我参考的博客中, 直接将Loss写到模型中了, 虽然结果都一样, 但我个人不建议这样做, 如果有一份可以复用的代码模板, 还是需要进行一些改动.
Training and Save
对我们前面定义的类进行实例化, 同时定义优化器:
dataset = EmbeddingDataset(text, word2idx=word2idx, word_freqs=word_freqs)
dataloader = tud.DataLoader(dataset, batch_size, True)
word2vec = Word2Vec(MAX_VOCAB, hidden).to(device)
optimizer = optim.Adam(word2vec.parameters(), lr=lr)
print("Step in one epoch:{}".format(len(dataloader)))
训练时可以用tqdm来对剩余时间进行评估:
from time import time
from tqdm.notebook import tqdm
start = time()
for epoch in range(epochs):
for step, (input_label, pos_label, neg_label) in enumerate(tqdm(dataloader)):
input_label = input_label.long().to(device)
pos_label = pos_label.long().to(device)
neg_label = neg_label.long().to(device)
# 3 step in torch
optimizer.zero_grad()
loss = word2vec(input_label, pos_label, neg_label).mean()
loss.backward()
optimizer.step()
if step % 1000 == 0 and step != 0:
end = time()
print("epoch:{}, step:{}, loss:{}, in time:{:.2f}s".format(epoch, step, loss.item(), end - start))
start = time()
我最终的训练Loss是在18 ~ 19左右波动. 训练总时长在COLAB一小时左右.
保存一下模型:
torch.save(word2vec.state_dict(), './drive/My Drive/embedding-{}.th'.format(hidden))
embedding_weights = word2vec.get_input_embedding().cpu()
注意, 如果对模型进行保存, 也同时要保存word2idx
. 因为Word2Vec本质上是查表, 除了存储模型权重, 还需要存储单词到表(权重)索引的映射关系word2idx
.
这里的embbeding_weights
从GPU上拿下来, 等会做一个小检测, 看看我们训练的Word2Vec效果怎么样.
Evaluation
选取与其相似度最高的十个词来检测一下Word2Vec的训练效果:
from scipy.spatial.distance import cosine
def find_nearest(word):
index = word2idx[word]
embedding = embedding_weights[index]
cos_dis = np.array([cosine(e, embedding) for e in embedding_weights])
return [idx2word[i] for i in cos_dis.argsort()[:10]]
for ie_words in ['two', 'man', 'computers', 'machine']:
print('word:{} is similar to {}'.format(ie_words, find_nearest(ie_words)))
控制台输出:
word:two is similar to ['two', 'three', 'four', 'five', 'zero', 'six', 'seven', 'one', 'eight', 'nine']
word:man is similar to ['man', 'woman', 'young', 'god', 'men', 'person', 'girl', 'soul', 'goddess', 'son']
word:computers is similar to ['computers', 'computer', 'devices', 'hardware', 'machines', 'applications', 'systems', 'components', 'electronic', 'computing']
word:machine is similar to ['machine', 'machines', 'device', 'program', 'memory', 'computer', 'engine', 'ibm', 'computers', 'programming']
效果其实还不错, 基本上基于平移的规则, 我们给定一个词, 都能找到与其表面语义近似的词.