import torch
import torch.nn as nn
import torch.nn.functional as F
class TransH(nn.Module):
def __init__(self,ent_tot,rel_tot,dim = 100,p_norm = 1,norm_flag = True,margin = None,epsilon = None):
super(TransH,self).__init__()
self.ent_tot = ent_tot
self.rel_tot = rel_tot
self.dim = dim
self.margin = margin
self.epsilon = epsilon
self.norm_flag = norm_flag
self.p_norm = p_norm
self.ent_embeddings = nn.Embedding(self.ent_tot,self.dim)
self.rel_embeddings = nn.Embedding(self.rel_tot,self.dim)
# 每一个rel对应一个法向量norm
self.norm_vector = nn.Embedding(self.rel_tot,self.dim)
# 初始化参数
if margin == None or epsilon == None:
nn.init.xavier_uniform_(self.ent_embeddings.weight.data)
nn.init.xavier_uniform_(self.rel_embeddings.weight.data)
nn.init.xavier_uniform_(self.norm_vector.weight.data)
else:
self.embedding_range = nn.Parameter(
torch.Tensor([(self.margin + self.epsilon) / self.dim]),requires_grad = False
)
nn.init.uniform_(
tensor = self.ent_embeddings.weight.data,
a = -self.embedding_range.item(),
b = self.embedding_range.item()
)
nn.init.uniform_(
tensor = self.rel_embeddings.weight.data,
a = -self.embedding_range.item(),
b = self.embedding_range.item()
)
nn.init.uniform_(
tensor = self.norm_vector.weight.data,
a = -self.embedding_range.item(),
b = self.embedding_range.item()
)
if margin != None:
self.margin = nn.Parameter(torch.Tensor([margin]))
self.margin.requires_grad = False
self.margin_flag = True
else:
self.margin_flag = False
def _calc(self,h,t,r,mode):
if self.norm_flag:
h = F.normalize(h,2,-1)
r = F.normalize(r,2,-1)
t = F.normalize(t,2,-1)
if mode != 'normal':
h = h.view(-1, r.shape[0], h.shape[-1])
t = t.view(-1, r.shape[0], t.shape[-1])
r = r.view(-1, r.shape[0], r.shape[-1])
if mode == 'head_batch':
score = h + (r - t)
else:
score = (h + r) - t
score = torch.norm(score,self.p_norm,-1).flatten() # [batch_size*(num_neg+1),1]
return score
def _transfer(self,e,norm):
# 将法向量L2归一化,保证模长为1
norm = F.normalize(norm,p = 2,dim = -1) # [batch_size*(num_neg+1),dim]
if e.shape[0] != norm.shape[0]:
e = e.view(-1,norm.shape[0],norm.shape[-1])
norm = norm.view(-1,norm.shape[0],norm.shape[-1])
e = e - torch.sum(e * norm,-1,True) * norm
return e.view(-1,e.shape[-1])
else:
# [batch_size*(num_neg+1),dim] - [batch_size*(num_neg+1),1] .* [batch_size*(num_neg+1),dim]
return e - torch.sum(e * norm,-1,True) * norm
def forward(self,data):
batch_h = data['batch_h']
batch_t = data['batch_t']
batch_r = data['batch_r']
mode = data['mode']
# lookup其embedding
h = self.ent_embeddings(batch_h)
t = self.ent_embeddings(batch_t)
r = self.rel_embeddings(batch_r)
r_norm = self.norm_vector(batch_r)
# 计算头尾实体在给定关系(平面)上的投影
h = self._transfer(h,r_norm)
t = self._transfer(t,r_norm)
score = self._calc(h,t,r,mode)
if self.margin_flag:
return self.margin - score
else:
return score
# 正则化
def regularization(self,data):
batch_h = data['batch_h']
batch_t = data['batch_t']
batch_r = data['batch_r']
# lookup其embedding
batch_h = data['batch_h']
batch_t = data['batch_t']
batch_r = data['batch_r']
h = self.ent_embeddings(batch_h)
t = self.ent_embeddings(batch_t)
r = self.rel_embeddings(batch_r)
r_norm = self.norm_vector(batch_r)
regul = (torch.mean(h ** 2) +
torch.mean(t ** 2) +
torch.mean(r ** 2) +
torch.mean(r_norm ** 2)
) / 4
return regul
def predict(self,data):
score = self.forward(data)
if self.margin_flag:
score = self.margin - score
return score.cpu().data.numpy()
else:
return score.cpu().data.numpy()
transh = TransH(train_dataloader.get_ent_tot(),train_dataloader.get_rel_tot())
train_dataloader = TrainDataLoader(
in_path = "./benchmarks/FB15K237_tiny/",
nbatches = 100,
threads = 8,
# 负采样
sampling_mode = 'normal',
# bern构建负样本方式
bern_flag = 1,
# 负样本同replace entities
neg_ent = 25,
neg_rel = 0
)
# dataloader for test
test_dataloader = TestDataLoader("./benchmarks/FB15K237_tiny/", "link")
loss = MarginLoss()
model = NegativeSampling(transh,loss,batch_size=train_dataloader.batch_size)
trainer = Traniner(model = model,data_loader = train_dataloader)
trainer.run()