栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

小黑笔记:transe模型

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

小黑笔记:transe模型

1.数据集准备
import openke
from openke.data import TrainDataLoader,TestDataLoader

train_dataloader = TrainDataLoader(
    in_path = "./benchmarks/FB15K237_tiny/",
    nbatches = 100,
    threads = 8,
    # 负采样
    sampling_mode = 'normal',
    # bern构建负样本方式
    bern_flag = 1,
    # 负样本同replace entities
    neg_ent = 25,
    neg_rel = 0
)
# dataloader for test
test_dataloader = TestDataLoader("./benchmarks/FB15K237_tiny/", "link")
2.模型构建

import torch
import torch.nn as nn
import torch.nn.functional as F
class TransE(nn.Module):
    def __init__(self,ent_tot,rel_tot,dim = 100,p_norm = 1,norm_flag = True,margin = None,epsilon = None):
        super(TransE,self).__init__()
        self.ent_tot = ent_tot
        self.rel_tot = rel_tot
        # embedding维度
        self.dim = dim
        self.margin = margin
        # 初始化为None
        self.epsilon = epsilon
        self.norm_flag = norm_flag
        # scoring function计算距离使用L1 norm
        self.p_norm = p_norm
        # embedding矩阵 [e,d]
        self.ent_embeddings = nn.Embedding(self.ent_tot,self.dim)
        # embedding矩阵,[r,d]
        self.rel_embeddings = nn.Embedding(self.rel_tot,self.dim)
        # 参数初始化
        if margin == None or epsilon == None:
            nn.init.xavier_uniform_(self.ent_embeddings.weight.data)
            nn.init.xavier_uniform_(self.rel_embeddings.weight.data)
        else:
            self.embedding_range = nn.Parameter(
                torch.Tensor([(self.margin + self.epsilon) / self.dim]),requires_grad = False
            )
            nn.init.uniform_(
                tensor = self.ent_embeddings.weight.data,
                a = -self.embedding_range.item(),
                b = self.embedding_range.item()
            )
            nn.init.uniform_(
                tensor = self.rel_embeddings.weight.data,
                a = -self.embedding_range.item(),
                b = self.embedding_range.item()
            )
        # 容忍系数margin
        if margin != None:
            self.margin = nn.Parameter(torch.Tensor([margin]))
            self.margin.requires_grad = False
            self.margin_flag = True
        else:
            # 执行
            self.margin_flag = False
    # 计算得分函数
    def _calc(self,h,t,r,mode):
        if self.norm_flag:
            h = F.normalize(h,2,-1)
            r = F.normalize(r,2,-1)
            t = F.normalize(t,2,-1)
        if mode != 'normal':
            h = h.view(-1,r.shape[0],h.shape[-1])
            t = t.view(-1,r.shape[0],t.shape[-1])
            r = r.view(-1,r.shape[0],r.shape[-1])
        if mode == 'head_batch':
            score = h + (r - t)
        else:
            score = (h + r) - t
        score = torch.norm(score,self.p_norm,-1).flatten()
        return score
    def forward(self,data):
        batch_h = data['batch_h']
        batch_t = data['batch_t']
        batch_r = data['batch_r']
        mode = data['mode']
        h = self.ent_embeddings(batch_h)
        t = self.ent_embeddings(batch_t)
        r = self.rel_embeddings(batch_r)
        # 计算score
        score = self._calc(h,t,r,mode)
        if self.margin_flag:
            return self.margin - score
        else:
            return score
    # 正则化函数
    def regularization(self,data):
        batch_h = data['batch_h']
        batch_t = data['batch_t']
        batch_r = data['batch_r']
        h = self.ent_embeddings(batch_h)
        t = self.ent_embeddings(batch_t)
        r = self.ent_embeddings(batch_r)
        regul = (torch.mean(h ** 2) + torch.mean(t ** 2) + torch.mean(r ** 2)) / 3
        return regul
    def predict(self,data):
        score = self.forward(data)
        if self.margin_flag:
            score = self.margin - score
            return score.cpu().data.numpy()
        else:
            return score.cpu().data.numpy()
    def save_checkpoint(self, path):
        torch.save(self.state_dict(), path)


'''
model = TransE(train_dataloader.get_ent_tot(),train_dataloader.get_rel_tot())
example = list(train_dataloader)[0]
for key in example:
    if type(example[key]) != str:
        example[key] = torch.LongTensor(example[key])
model(example)
'''
pass
3.损失函数构建

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import numpy as np
class MarginLoss(nn.Module):
    def __init__(self,adv_temperature = None,margin = 6.0):
        super(MarginLoss,self).__init__()
        self.margin = nn.Parameter(torch.Tensor([margin]))
        self.margin.requires_grad = False
        if adv_temperature != None:
            self.adv_temperature = nn.Parameter(torch.Tensor([adv_temperature]))
            self.adv_temperatrue.requires_grad = False
            self.adv_flag = True
        else:
            self.adv_flag = False
    def get_weights(self,n_score):
        return F.softmax(-n_score * self.adv_temperature,dim = -1).detach()
    def forward(self,p_score,n_score):
        if self.adv_flag:
            return (self.get_weights(n_score) * torch.max(p_score - n_score,self.margin)).sum(dim = -1).mean() + self.margin
        else:
            return (torch.max(p_score - n_score,-self.margin)).mean() + self.margin
    def predict(self,p_score,n_score):
        score = self.forward(p_score,n_score)
        return score.cpu().data.numpy()
'''
model = TransE(train_dataloader.get_ent_tot(),train_dataloader.get_rel_tot())
example = list(train_dataloader)[0]
for key in example:
    if type(example[key]) != str:
        example[key] = torch.LongTensor(example[key])
loss = MarginLoss()
negativeSampling = NegativeSampling(model,loss,batch_size=train_dataloader.batch_size)
negativeSampling(example)
'''
pass
4.负采样构建

class NegativeSampling(nn.Module):
    def __init__(self,model = None,loss = None,batch_size = 256,regul_rate = 0.0,l3_regul_rate = 0.0):
        super(NegativeSampling,self).__init__()
        self.model = model
        self.loss = loss
        self.batch_size = batch_size
        # 正则化系数
        self.regul_rate = regul_rate
        self.l3_regul_rate = l3_regul_rate
    # batch中样本排序,[正,正,正....(batch_size个),负,负....(batch_size*negative_num个)]
    # 第i个正样本对应负样本[i + batch_size,i + 2 * batch_size...i + (negative_num * batch_size)]
    # 从batch中获取正样本
    def _get_positive_score(self,score):
        positive_score = score[:self.batch_size]
        positive_score = positive_score.view(-1,self.batch_size).permute(1,0)
        return positive_score
    def _get_negative_score(self,score):
        negative_score = score[self.batch_size:]
        # 将[i1,j1,k1,i2,j2,k2,i3,j3,k3]
        # 整理成 [i1,j1,k1
        #        i2,j2,k2
        #        i3,j3,k3] 再转转置即可
        negative_score = negative_score.view(-1,self.batch_size).permute(1,0)
        return negative_score
    def forward(self,data):
        # 调用模型
        score = self.model(data)
        # 正样本
        p_score = self._get_positive_score(score)
        # 负样本
        n_score = self._get_negative_score(score)
        # 得到标量loss
        loss_res = self.loss(p_score,n_score)
        # loss加上正则项
        if self.regul_rate != 0:
            loss_res += self.regul_rate * self.model.regularization(data)
        if self.l3_regul_rate != 0:
            loss_res += self.l3_regul_rate * self.model.l3_regularization()
        return loss_res
    def save_checkpoint(self, path):
        torch.save(self.state_dict(), path)
5.模型训练
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.optim as optim
import os
import time
import sys
import datetime
import ctypes
import json
import numpy as np
import copy
from tqdm import tqdm
class Traniner(object):
    def __init__(self,model = None,
                 data_loader = None,
                 train_times = 1000,
                 alpha = 0.5,
                 use_gpu = True,
                 opt_method = 'sgd',
                 save_steps = None,
                 checkpoint_dir = None
                ):
        self.work_threads = 8
        self.train_times = train_times
        self.opt_method = opt_method
        self.optimizer = None
        # 避免过拟合
        self.weight_decay = 0
        self.alpha = alpha
        self.model = model
        # class TrainDataLoader:训练集
        self.data_loader = data_loader
        self.use_gpu = use_gpu
        self.save_steps = save_steps
        self.checkpoint_dir = checkpoint_dir
    def to_var(self,x,use_gpu):
        if use_gpu:
            return Variable(torch.from_numpy(x).cuda())
        else:
            return Variable(torch.from_numpy(x))
    def train_one_step(self,data):
        self.optimizer.zero_grad()
        loss = self.model({
            'batch_h':self.to_var(data['batch_h'],self.use_gpu),
            'batch_t':self.to_var(data['batch_t'],self.use_gpu),
            'batch_r':self.to_var(data['batch_r'],self.use_gpu),
            'batch_y':self.to_var(data['batch_y'],self.use_gpu),
            'mode':data['mode']
        })
        loss.backward()
        self.optimizer.step()
        return loss.item()
    def run(self):
        if self.use_gpu:
            self.model.cuda()
        if self.optimizer != None:
            pass
        elif self.opt_method == 'Adagrad' or self.opt_method == 'adagrad':
            self.optimizer = optim.Adagrad(
                self.model.parameters(),
                lr = self.alpha,
                lr_decay = self.lr_decay,
                weight_decay = self.weight_decay
            )
        elif self.opt_method == 'Adadelta' or self.opt_method == 'adadelta':
            self.optimizer = optim.Adadelta(
                self.model.parameters(),
                lr = self.alpha,
                weight_decay = self.weight_decay
            )
        elif self.opt_method == 'Adam' or self.opt_method == 'adam':
            self.optimizer = optim.Adam(
                self.model.parameters(),
                lr = self.alpha,
                weight_decay = self.weight_decay
            )
        else:
            self.optimizer = optim.SGD(
                self.model.parameters(),
                lr = self.alpha,
                weight_decay = self.weight_decay
            )
        print('Finish initializing...')
        # 循环1000个epoch
        training_range = tqdm(range(self.train_times))
        for epoch in training_range:
            res = 0.0
            for data in self.data_loader:
                loss = self.train_one_step(data)
                res += loss
            training_range.set_description('Epoch %d | loss: %f' % (epoch,res))
            if self.save_steps and self.checkpoint_dir and (epoch + 1) % self.save_steps == 0:
                print('Epoch %d has finished,saving...' % (epoch))
                self.model.save_checkpoint(os.path.join(self.checkpoint_dir + '-' + str(epoch) + '.ckpt'))

transe = TransE(train_dataloader.get_ent_tot(),train_dataloader.get_rel_tot())
train_dataloader = TrainDataLoader(
    in_path = "./benchmarks/FB15K237_tiny/",
    nbatches = 100,
    threads = 8,
    # 负采样
    sampling_mode = 'normal',
    # bern构建负样本方式
    bern_flag = 1,
    # 负样本同replace entities
    neg_ent = 25,
    neg_rel = 0
)
# dataloader for test
test_dataloader = TestDataLoader("./benchmarks/FB15K237_tiny/", "link")
loss = MarginLoss()
model = NegativeSampling(transe,loss,batch_size=train_dataloader.batch_size)
trainer = Traniner(model = model,data_loader = train_dataloader)
trainer.run()

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/738313.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号