This commit is contained in:
2024-08-10 19:46:55 +08:00
commit 2233526534
798 changed files with 35282 additions and 0 deletions

View File

@@ -0,0 +1,399 @@
# Billion-scale Commodity Embedding for E-commerce Recommendation in Alibaba
这篇论文是阿里巴巴在18年发表于KDD的关于召回阶段的工作。该论文提出的方法是在基于图嵌入的方法上通过引入side information来解决实际问题中的数据稀疏和冷启动问题。
## 动机
在电商领域,推荐已经是不可或缺的一部分,旨在为用户的喜好提供有趣的物品,并且成为淘宝和阿里巴巴收入的重要引擎。尽管学术界和产业界的各种推荐方法都取得了成功,如协同过滤、基于内容的方法和基于深度学习的方法,但由于用户和项目的数十亿规模,传统的方法已经不能满足于实际的需求,主要的问题体现在三个方面:
- 可扩展性:现有的推荐方法无法扩展到在拥有十亿的用户和二十亿商品的淘宝中。
- 稀疏性:存在大量的物品与用户的交互行为稀疏。即用户的交互到多集中于以下部分商品,存在大量商品很少被用户交互。
- 冷启动:在淘宝中,每分钟会上传很多新的商品,由于这些商品没有用户行为的信息(点击、购买等),无法进行很好的预测。
针对于这三个方面的问题, 本文设计了一个两阶段的推荐框架:**召回阶段和排序阶段**这也是推荐领域最常见的模型架构。而本文提及的EGES模型主要是解决了匹配阶段的问题通过用户行为计算商品间两两的相似性然后根基相似性选出topK的商品输入到排序阶段。
为了学习更好的商品向量表示本文通过用户的行为历史中构造一个item-item 图然后应用随机游走方法在item-item 图为每个item获取到一个序列然后通过Skip-Gram的方式为每个item学习embedding(这里的item序列类似于语句其中每个item类比于句子中每个word),这种方式被称为图嵌入方法(Graph Embedding)。文中提出三个具体模型来学习更好的物品embedding更好的服务于召回阶段。
## 思路
根据上述所面临的三个问题本文针对性的提出了三个模型予以解决Base Graph EmbeddingBGEGraph Embedding with Side InformationGESEnhanced Graph Embedding with Side InformationEGES
考虑可扩展性的问题,图嵌入的随机游走方式可以在物品图上捕获**物品之间高阶相似性**即Base Graph EmbeddingBGE方法。其不同于CF方法除了考虑物品的共现还考虑到了行为的序列信息。
考虑到稀疏性和冷启物品问题在图嵌入的基础上考虑了节点的属性信息。希望具有相似属性的物品可以在空间上相似即希望通过头部物品提高属性信息的泛化能力进而帮助尾部和冷启物品获取更加准确的embedding即Graph Embedding with Side InformationGES方法。
考虑到不同属性信息对于学习embedding的贡献不同因此在聚合不同的属性信息时动态的学习不同属性对于学习节点的embedding所参与的重要性权重即Enhanced Graph Embedding with Side InformationEGES
## 模型结构与原理
文中所提出的方法是基于经典的图嵌入模型DeepWalk进行改进其目标是通过物品图G学习一个映射函数$f:V -> R^d$ 将图上节点映射成一个embedding。具体的步骤包括两步1.通过随机游走为图上每个物品生成序列2.通过Skip-Gram算法学习每个物品的embedding。因此对于该方法优化的目标是在给定的上下文物品的前提下最大化物品v的条件概率即物品v对于一个序列里面的其他物品要尽可能的相似。接下来看一些每个模型具体内容。
### 构建物品图
在介绍三个模型之前我们首先需要构建好item-item图。由于基于CF的方法仅考虑物品之间的共现忽略了行为的序列信息(即序列中相邻的物品之间的语义信息)因此item-item图的构建方式如下图所示。
<div align=center>
<img src="https://ryluo.oss-cn-chengdu.aliyuncs.com/图片image-20220328133138263.png" style="zoom:80%;"/>
</div>
首先根据用户的session行为序列构建网络结构即序列中相邻两个item之间在存在边并且是有向带权图。物品图边上的权重为所有用户行为序列中两个 item 共现的次数,最终构造出来简单的有向有权图。
值得注意的是,本文通过行为序列中物品的共现来表示其中的**语义信息**,并将这种语义信息理解为**物品之间的相似性**并将共现频次作为相似性的一个度量值。其次基于用户的历史行为序列数据一般不太可能取全量的历史序列数据一方面行为数据量过大一方面用户的兴趣会随时间发生演变因此在处理行为序列时会设置了一个窗口来截断历史序列数据切分出来的序列称为session。
由于实际中会存在一些现实因素,数据中会有一些噪音,需要特殊处理,主要分为三个方面:
- 从行为方面考虑用户在点击后停留的时间少于1秒可以认为是误点需要移除。
- 从用户方面考虑淘宝场景中会有一些过度活跃用户。本文对活跃用户的定义是三月内购买商品数超过1000或者点击数超过3500就可以认为是一个无效用户需要去除。
- 从商品方面考虑存在一些商品频繁的修改即ID对应的商品频繁更新这使得这个ID可能变成一个完全不同的商品这就需要移除与这个ID相关的这个商品。
在构建完item-item图之后接下来看看三个模型的具体内容。
### 图嵌入(BGE)
对于图嵌入模型第一步先进行随机游走得到物品序列第二部通过skip-gram为图上节点生成embedding。那么对于随机游走的思想如何利用随机游走在图中生成的序列不同于DeepWalk中的随机游走本文的采样策略使用的是带权游走策略不同权重的游走到的概率不同其本质上就是node2vec传统的node2vec方法可以直接支持有向带权图。因此在给定图的邻接矩阵M后(表示节点之间的边权重),随机游走中每次转移的概率为:
<div align=center>
<img src="https://ryluo.oss-cn-chengdu.aliyuncs.com/图片image-20220328144516898.png" style="zoom:80%;"/>
</div>
其中$M_{ij}$为边$e_{ij}$上的权重,$N_{+}(v_i)$表示节点$v_i$所有邻居节点集合并且随机游走的转移概率的对每个节点所有邻接边权重的归一化结果。在随即游走之后每个item得到一个序列如下图所示
<div align=center>
<img src="https://cdn.jsdelivr.net/gh/swallown1/blogimages@main/images/image-20220418142135912.png" style="zoom:47%;"/>
</div>
然后类似于word2vec为每个item学习embedding于是优化目标如下
<div align=center>
<img src="https://ryluo.oss-cn-chengdu.aliyuncs.com/图片image-20220328144931957.png" style="zoom:77%;"/>
</div>
其中w 为窗口大小。考虑独立性假设的话,上面的式子可以进一步化简:
<div align=center>
<img src="https://ryluo.oss-cn-chengdu.aliyuncs.com/图片image-20220328145101109.png" style="zoom:77%;"/>
</div>
这样看起来就很直观了,在已知物品 i 时,最大化序列中(上下文)其他物品 j 的条件概率。为了近似计算采样了Negative sampling上面的优化目标可以化简得到如下式子
<div align=center>
<img src="https://ryluo.oss-cn-chengdu.aliyuncs.com/图片image-20220328145318718.png" style="zoom:80%;"/>
</div>
其中$N(v_i)'$表示负样本集合,负采样个数越多,结果越好。
### 基于side information的图嵌入GES
尽管BGE将行为序列关系编码进物品的embedding中从而从用户行为中捕捉高阶相似性。但是这里有个问题对于新加入的商品由于未和用户产生过交互所以不会出现在item-item图上进而模型无法学习到其embedding即无法解决冷启动问题。
为了解决冷启问题本文通过使用side information 类别,店铺, 价格等加入模型的训练过程中使得模型最终的泛化能力体现在商品的side information上。这样通过**side information学习到的embedding来表示具体的商品**使得相似side information的物品可以得到在空间上相近的表示进而来增强 BGE。
那么对于每个商品如何通过side information的embedidng来表示呢对于随机游走之后得到的商品序列其中每个每个商品由其id和属性(品牌,价格等)组成。用公式表示,对于序列中的每一个物品可以得到$W^0_V,...W_V^n$,n+1个向量表示$W^0_V$表示物品v剩下是side information的embedding。然后将所有的side information聚合成一个整体来表示物品聚合方式如下
$$H_v = \frac{1}{n+1}\sum_{s=0}^n W^s_v$$
其中,$H_v$是商品 v 的聚合后的 embedding 向量。
### 增强型EGSEGES
尽管 GES 相比 BGE 在性能上有了提升但是在聚合多个属性向量得到商品的embedding的过程中不同 side information的聚合依然存在问题。在GES中采用 average-pooling 是在假设不同种类的 side information 对商品embedding的贡献是相等的但实际中却并非如此。例如购买 Iphone 的用户更可能倾向于 Macbook 或者 Ipad相比于价格属性品牌属性相对于苹果类商品具有更重要的影响。因此根据实际现状不同类型的 side information 对商品的表示是具有不同的贡献值的。
针对上述问题作者提出了weight pooling方法来聚合不同类型的 side information。具体地EGES 与 GES 的区别在聚合不同类型 side information计算不同的权重根据权重聚合 side information 得到商品的embedding如下图所示
<div align=center>
<img src="https://ryluo.oss-cn-chengdu.aliyuncs.com/图片image-20220328154950289.png" style="zoom:80%;"/>
</div>
其中 $a_i$ 表示每个side information 用于计算权重的参数向量最终通过下面的公式得到商品的embedding
$$H_v = \frac{\sum_{j=0}^n e^{a_v^j} W_v^j}{\sum_{j=0}^n e^{a_v^j}}$$
这里对参数 $a_v^j$ 先做指数变换目的是为了保证每个边界信息的贡献都能大于0然后通过归一化为每个特征得到一个o-1之内的权重。最终物品的embedding通过权重进行加权聚合得到进而优化损失函数
$$L(v,u,y)=-[ylog( \sigma (H_v^TZ_u)) + (1-y)log(1 - \sigma(H_v^TZ_u))]$$
y是标签符号等于1时表示正样本等于0时表示负样本。$H_v$表示商品 v 的最终的隐层表示,$Z_u$表示训练数据中的上下文节点的embedding。
以上就是这三个模型主要的区别下面是EGES的伪代码。
<div align=center>
<img src="https://ryluo.oss-cn-chengdu.aliyuncs.com/图片image-20220328155406291.png" style="zoom:80%;"/>
</div>
其中**WeightedSkipGram**函数为带权重的SkipGram算法。
<div align=center>
<img src="https://ryluo.oss-cn-chengdu.aliyuncs.com/图片image-20220328155533704.png" style="zoom:80%;"/>
</div>
## 代码实现
下面我们简单的来看一下模型代码的实现,参考的内容在[这里](https://github.com/wangzhegeek/EGES)其中实验使用的是jd 2019年比赛中提供的数据。
### 构建物品图
首先对用户的下单(type=2)行为序列进行session划分其中30分钟没有产生下一个行为划分为一个session。
```python
def cnt_session(data, time_cut=30, cut_type=2):
# 商品属性 id 被交互时间 商品种类
sku_list = data['sku_id']
time_list = data['action_time']
type_list = data['type']
session = []
tmp_session = []
for i, item in enumerate(sku_list):
# 两个商品之间如果被交互的时间大于1小时划分成不同的session
if type_list[i] == cut_type or (i < len(sku_list)-1 and \
(time_list[i+1] - time_list[i]).seconds/60 > time_cut) or i == len(sku_list)-1:
tmp_session.append(item)
session.append(tmp_session)
tmp_session = []
else:
tmp_session.append(item)
return session # 返回多个session list
```
获取到所有session list之后(这里不区分具体用户)对于session长度不超过1的去除(没有意义)。
接下来就是构建图主要是先计算所有session中相邻的物品共现频次(通过字典计算)。然后通过入度节点、出度节点以及权重分别转化成list通过network来构建有向图。
```python
node_pair = dict()
# 遍历所有session list
for session in session_list_all:
for i in range(1, len(session)):
# 将session共现的item存到node_pair中用于构建item-item图
# 将共现次数所谓边的权重即node_pair的key为边(src_node,dst_node),value为边的权重(共现次数)
if (session[i - 1], session[i]) not in node_pair.keys():
node_pair[(session[i - 1], session[i])] = 1
else:
node_pair[(session[i - 1], session[i])] += 1
in_node_list = list(map(lambda x: x[0], list(node_pair.keys())))
out_node_list = list(map(lambda x: x[1], list(node_pair.keys())))
weight_list = list(node_pair.values())
graph_list = list([(i,o,w) for i,o,w in zip(in_node_list,out_node_list,weight_list)])
# 通过 network 构建图结构
G = nx.DiGraph().add_weighted_edges_from(graph_list)
```
### 随机游走
先是基于构建的图进行随机游走其中p和q是参数用于控制采样的偏向于DFS还是BFS其实也就是node2vec。
```python
walker = RandomWalker(G, p=args.p, q=args.q)
print("Preprocess transition probs...")
walker.preprocess_transition_probs()
```
对于采样的具体过程是根据边的归一化权重作为采样概率进行采样。其中关于如何通过AliasSampling来实现概率采样的可以[参考](https://blog.csdn.net/haolexiao/article/details/65157026)具体的是先通过计算create_alias_table然后根据边上两个节点的alias计算边的alias。其中可以看到这里计算alias_table是根据边的归一化权重。
```python
def preprocess_transition_probs(self):
"""预处理随即游走的转移概率"""
G = self.G
alias_nodes = {}
for node in G.nodes():
# 获取每个节点与邻居节点边上的权重
unnormalized_probs = [G[node][nbr].get('weight', 1.0)
for nbr in G.neighbors(node)]
norm_const = sum(unnormalized_probs)
# 对每个节点的邻居权重进行归一化
normalized_probs = [
float(u_prob)/norm_const for u_prob in unnormalized_probs]
# 根据权重创建alias表
alias_nodes[node] = create_alias_table(normalized_probs)
alias_edges = {}
for edge in G.edges():
# 获取边的alias
alias_edges[edge] = self.get_alias_edge(edge[0], edge[1])
self.alias_nodes = alias_nodes
self.alias_edges = alias_edges
return
```
在构建好Alias之后进行带权重的随机游走。
```python
session_reproduce = walker.simulate_walks(num_walks=args.num_walks,
walk_length=args.walk_length, workers=4,verbose=1)
```
其中这里的随机游走是根据p和q的值来选择是使用Deepwalk还是node2vec。
```python
def _simulate_walks(self, nodes, num_walks, walk_length,):
walks = []
for _ in range(num_walks):
# 打乱所有起始节点
random.shuffle(nodes)
for v in nodes:
# 根据p和q选择随机游走或者带权游走
if self.p == 1 and self.q == 1:
walks.append(self.deepwalk_walk(
walk_length=walk_length, start_node=v))
else:
walks.append(self.node2vec_walk(
walk_length=walk_length, start_node=v))
return walks
```
### 加载side information并构造训练正样本
主要是将目前所有的sku和其对应的side infromation进行left join没有的特征用0补充。然后对所有的特征进行labelEncoder()
```python
sku_side_info = pd.merge(all_skus, product_data, on='sku_id', how='left').fillna(0) # 为商品加载side information
for feat in sku_side_info.columns:
if feat != 'sku_id':
lbe = LabelEncoder()
# 对side information进行编码
sku_side_info[feat] = lbe.fit_transform(sku_side_info[feat])
else:
sku_side_info[feat] = sku_lbe.transform(sku_side_info[feat])
```
通过图中的公式可以知道优化目标是让在一个窗口内的物品尽可能相似采样若干负样本使之与目标物品不相似。因此需要将一个窗口内的所有物品与目标物品组成pair作为训练正样本。这里不需要采样负样本负样本是通过tf中的sample softmax方法自动进行采样。
```python
def get_graph_context_all_pairs(walks, window_size):
all_pairs = []
for k in range(len(walks)):
for i in range(len(walks[k])):
# 通过窗口的方式采取正样本具体的是让随机游走序列的起始item与窗口内的每个item组成正样本对
for j in range(i - window_size, i + window_size + 1):
if i == j or j < 0 or j >= len(walks[k]):
continue
else:
all_pairs.append([walks[k][i], walks[k][j]])
return np.array(all_pairs, dtype=np.int32)
```
#### EGES模型
构造完数据之后在funrec的基础上实现了EGES模型
```python
def EGES(side_information_columns, items_columns, merge_type = "weight", share_flag=True,
l2_reg=0.0001, seed=1024):
# side_information 所对应的特征
feature_columns = list(set(side_information_columns))
# 获取输入层,查字典
feature_encode = FeatureEncoder(feature_columns, linear_sparse_feature=None)
# 输入的值
feature_inputs_list = list(feature_encode.feature_input_layer_dict.values())
# item id 获取输入层的值
items_Map = FeatureMap(items_columns)
items_inputs_list = list(items_Map.feature_input_layer_dict.values())
# 正样本的id在softmax中需要传入正样本的id
label_columns = [DenseFeat('label_id', 1)]
label_Map = FeatureMap(label_columns)
label_inputs_list = list(label_Map.feature_input_layer_dict.values())
# 通过输入的值查side_information的embedding返回所有side_information的embedding的list
side_embedding_list = process_feature(side_information_columns, feature_encode)
# 拼接 N x num_feature X Dim
side_embeddings = Concatenate(axis=1)(side_embedding_list)
# items_inputs_list[0] 为了查找每个item 用于计算权重的 aplha 向量
eges_inputs = [side_embeddings, items_inputs_list[0]]
merge_emb = EGESLayer(items_columns[0].vocabulary_size, merge_type=merge_type,
l2_reg=l2_reg, seed=seed)(eges_inputs) # B * emb_dim
label_idx = label_Map.feature_input_layer_dict[label_columns[0].name]
softmaxloss_inputs = [merge_emb,label_idx]
item_vocabulary_size = items_columns[0].vocabulary_size
all_items_idx = EmbeddingIndex(list(range(item_vocabulary_size)))
all_items_embeddings = feature_encode.embedding_layers_dict[side_information_columns[0].name](all_items_idx)
if share_flag:
softmaxloss_inputs.append(all_items_embeddings)
output = SampledSoftmaxLayer(num_items=item_vocabulary_size, share_flage=share_flag,
emb_dim=side_information_columns[0].embedding_dim,num_sampled=10)(softmaxloss_inputs)
model = Model(feature_inputs_list + items_inputs_list + label_inputs_list, output)
model.__setattr__("feature_inputs_list", feature_inputs_list)
model.__setattr__("label_inputs_list", label_inputs_list)
model.__setattr__("merge_embedding", merge_emb)
model.__setattr__("item_embedding", get_item_embedding(all_items_embeddings, items_Map.feature_input_layer_dict[items_columns[0].name]))
return model
```
其中EGESLayer为聚合每个item的多个side information的方法其中根据merge_type可以选择average-pooling或者weight-pooling
```python
class EGESLayer(Layer):
def __init__(self,item_nums, merge_type="weight",l2_reg=0.001,seed=1024, **kwargs):
super(EGESLayer, self).__init__(**kwargs)
self.item_nums = item_nums
self.merge_type = merge_type #聚合方式
self.l2_reg = l2_reg
self.seed = seed
def build(self, input_shape):
if not isinstance(input_shape, list) or len(input_shape) < 2:
raise ValueError('`EGESLayer` layer should be called \
on a list of at least 2 inputs')
self.feat_nums = input_shape[0][1]
if self.merge_type == "weight":
self.alpha_embeddings = self.add_weight(
name='alpha_attention',
shape=(self.item_nums, self.feat_nums),
dtype=tf.float32,
initializer=tf.keras.initializers.RandomUniform(minval=-1, maxval=1, seed=self.seed),
regularizer=l2(self.l2_reg))
def call(self, inputs, **kwargs):
if self.merge_type == "weight":
stack_embedding = inputs[0] # (B * num_feate * embedding_size)
item_input = inputs[1] # (B * 1)
alpha_embedding = tf.nn.embedding_lookup(self.alpha_embeddings, item_input) #(B * 1 * num_feate)
alpha_emb = tf.exp(alpha_embedding)
alpha_i_sum = tf.reduce_sum(alpha_emb, axis=-1)
merge_embedding = tf.squeeze(tf.matmul(alpha_emb, stack_embedding),axis=1) / alpha_i_sum
else:
stack_embedding = inputs[0] # (B * num_feate * embedding_size)
merge_embedding = tf.squeeze(tf.reduce_mean(alpha_emb, axis=1),axis=1) # (B * embedding_size)
return merge_embedding
def compute_output_shape(self, input_shape):
return input_shape
def get_config(self):
config = {"merge_type": self.merge_type, "seed": self.seed}
base_config = super(EGESLayer, self).get_config()
base_config.update(config)
return base_config
```
至此已经从原理到代码详细的介绍了关于EGES的内容。
## 参考
[Billion-scale Commodity Embedding for E-commerce Recommendation in Alibaba](https://arxiv.org/abs/1803.02349)
[深度学习中不得不学的Graph Embedding方法](https://zhuanlan.zhihu.com/p/64200072)
[【Embedding】EGES阿里在图嵌入领域中的探索](https://blog.csdn.net/qq_27075943/article/details/106244434)
[推荐系统遇上深度学习(四十六)-阿里电商推荐中亿级商品的embedding策略](https://www.jianshu.com/p/229b686535f1)

View File

@@ -0,0 +1,577 @@
# Graph Convolutional Neural Networks for Web-Scale Recommender Systems
该论文是斯坦福大学和Pinterest公司与2018年联合发表与KDD上的一篇关于GCN成功应用于工业级推荐系统的工作。该论文提到的PinSage模型是在GraphSAGE的理论基础进行了更改以适用于实际的工业场景。下面将简单介绍一下GraphSAGE的原理以及Pinsage的核心和细节。
## GraphSAGE原理
GraphSAGE提出的前提是因为基于直推式(transductive)学习的图卷积网络无法适应工业界的大多数业务场景。我们知道的是基于直推式学习的图卷积网络是通过拉普拉斯矩阵直接为图上的每个节点学习embedding表示每次学习是针对于当前图上所有的节点。然而在实际的工业场景中图中的结构和节点都不可能是固定的会随着时间的变化而发生改变。例如在Pinterest公司的场景下每分钟都会上传新的照片素材同时也会有新用户不断的注册那么图上的节点会不断的变化。在这样的场景中直推式学习的方法就需要不断的重新训练才能够为新加入的节点学习embedding导致在实际场景中无法投入使用。
在这样的背景下,斯坦福大学提出了一种归纳(inductive)学习的GCN方法——GraphSAGE即**通过聚合邻居信息的方式为给定的节点学习embedding**。不同于直推式(transductive)学习GraphSAGE是通过学习聚合节点邻居生成节点Embedding的函数的方式为任意节点学习embedding进而将GCN扩展成归纳学习任务。
对于想直接应用GCN或者GraphSAGE的我们而言不用非要去理解其背后晦涩难懂的数学原理可以仅从公式的角度来理解GraphSAGE的具体操作。
<div align=center>
<img src="https://cdn.jsdelivr.net/gh/swallown1/blogimages@main/images/image-20220423094435223.png" style="zoom:90%;"/>
</div>
上面这个公式可以非常直观的让我们理解GraphSAGE的原理。
- $h_v^0$表示图上节点的初始化表示,等同于节点自身的特征。
- $h_v^k$表示第k层卷积后的节点表示其来源于两个部分
- 第一部分来源于节点v的邻居节点集合$N(v)$利用邻居节点的第k-1层卷积后的特征$h_u^{k-1}$进行 $\sum_{u \in N(v)} \frac{h_u^{k-1}}{|N(v)|}$ )后,在进行线性变换。这里**借助图上的边将邻居节点的信息通过边关系聚合到节点表示中(简称卷积操作)**。
- 第二部分来源于节点v的第k-1成卷积后的特征$h_v^{k-1}$,进行线性变换。总的来说图卷积的思想是**在对自身做多次非线性变换时,同时利用边关系聚合邻居节点信息。**
- 最后一次卷积结果作为节点的最终表示$Z_v$,以用于下游任务(节点分类,链路预测或节点召回)。
可以发现相比传统的方法(MLPCNNDeepWalk 或 EGES)GCN或GraphSAGE存在一些优势
1. 相比于传统的深度学习方法(MLP,CNN)GCN在对自身节点进行非线性变换时同时考虑了图中的邻接关系。从CNN的角度理解GCN通过堆叠多层结构在图结构数据上拥有更大的**感受野**,利用更加广域内的信息。
2. 相比于图嵌入学习方法(DeepWalkEGES)GCN在学习节点表示的过程中在利用节点自身的属性信息之外更好的利用图结构上的边信息。相比于借助随机采样的方式来使用边信息GCN的方式能从全局的角度利用的邻居信息。此外类似于GraphSAGE这种归纳(inductive)学习的GCN方法通过学习聚合节点邻居生成节点Embedding的函数的方式更适用于图结构和节点会不断变化的工业场景。
在采样得到目标节点的邻居集之后那么如何聚合邻居节点的信息来更新目标节点的嵌入表示呢下面就来看看GraphSAGE中提及的四个聚合函数。
## GraphSAGE的采样和聚合
通过上面的公式可以知道得到节点的表示主要依赖于两部分其中一部分其邻居节点。因此对于GraphSAGE的关键主要分为两步Sample采样和Aggregate聚合。其中Sample的作用是从庞大的邻居节点中选出用于聚合的邻居节点集合$N(v)$以达到降低迭代计算复杂度而聚合操作就是如何利用邻居节点的表示来更新节点v的表示已达到聚合作用。具体的过程如下伪代码所示
<div align=center>
<img src="https://cdn.jsdelivr.net/gh/swallown1/blogimages@main/images/image-20220406135753358.png" style="zoom:90%;"/>
</div>
GraphSAGE的minibatch算法的思路是针对Batch内的所有节点通过采样和聚合节点为每一个节点学习一个embedding。
#### 邻居采样
GraphSAGE的具体采样过程是首先根据中心节点集合$B^k$对集合中每个中心节点通过随机采样的方式对其邻居节点采样固定数量S个(如果邻居节点数量大于S采用无放回抽样如果小于S则采用有放回抽样),形成的集合表示为$B^{k-1}$以此类推每次都是为前一个得到的集合的每个节点随机采样S个邻居最终得到第k层的所有需要参与计算的节点集合$B^{0}$。值得注意的有两点:**为什么需要采样并且固定采样数量S** **为什么第k层所采样的节点集合表示为$B^0$**
进行邻居采样并固定采样数量S主要是因为1. 采样邻居节点避免了在全图的搜索以及使用全部邻居节点所导致计算复杂度高的问题2. 可以通过采样使得部分节点更同质化即两个相似的节点具有相同表达形式。3. 采样固定数量是保持每个batch的计算占用空间是固定的方便进行批量训练。
第k层所采样的节点集合表示为$B^0$主要是因为采样和聚合过程是相反的即采样时我们是从中心节点组层进行采样而聚合的过程是从中心节点的第k阶邻居逐层聚合得到前一层的节点表示。因此可以认为聚合阶段是将k阶邻居的信息聚合到k-1阶邻居上k-1阶邻居的信息聚合到k-2阶邻居上....1阶邻居的信息聚合到中心节点上的过程。
#### 聚合函数
如何对于采样到的节点集进行聚合介绍的4种方式Mean 聚合、Convolutional 聚合、LSTM聚合以及Pooling聚合。由于邻居节点是无序的所以希望构造的聚合函数具有**对称性(即输出的结果不因输入排序的不同而改变)**,同时拥有**较强的表达能力**。
- Mean 聚合:首先会对邻居节点按照**element-wise**进行均值聚合然后将当前节点k-1层得到特征$h_v^{k-1}$与邻居节点均值聚合后的特征 $MEAN(h_u^k | u\in N(v))$**分别**送入全连接网络后**相加**得到结果。
- Convolutional 聚合这是一种基于GCN聚合方式的变种首先对邻居节点特征和自身节点特征求均值得到的聚合特征送入到全连接网络中。与Mean不同的是这里**只经过一个全连接层**。
- LSTM聚合由于LSTM可以捕捉到序列信息因此相比于Mean聚合这种聚合方式的**表达能力更强**但由于LSTM对于输入是有序的因此该方法不具备**对称性**。作者对于无序的节点进行随机排列以调整LSTM所需的有序性。
- Pooling聚合对于邻居节点和中心节点进行一次非线性转化将结果进行一次基于**element-wise**的**最大池化**操作。该种方式具有**较强的表达能力**的同时还具有**对称性**。
综上可以发现GraphSAGE之所以可以用于大规模的工业场景主要是因为模型主要是通过学习聚合函数通过归纳式的学习方法为节点学习特征表示。接下来看看PinSAGE 的主要内容。
## PinSAGE
### 背景
PinSAGE 模型是Pinterest 在GraphSAGE 的基础上实现的可以应用于实际工业场景的召回算法。Pinterest 公司的主要业务是采用瀑布流的形式向用户展现图片无需用户翻页新的图片会自动加载。因此在Pinterest网站上有大量的图片(被称为pins)而用户可以将喜欢的图片分类即将pins钉在画板 boards上。可以发现基于这样的场景pin相当于普通推荐场景中item用户**钉**的行为可以认为是用于的交互行为。于是PinSAGE 模型主要应用的思路是基于GraphSAGE 的原理学习到聚合方法,并为每个图片(pin)学习一个向量表示然后基于pin的向量表示做**item2item的召回**。
可以知道的是PinSAGE 是在GraphSAGE的基础上进行改进以适应实际的工业场景因此除了改进卷积操作中的邻居采样策略以及聚合函数的同时还有一些工程技巧上的改进使得在大数据场景下能更快更好的进行模型训练。因此在了解GraphSAGE的原理后我们详细的了解一下本文的主要改进以及与GraphSAGE的区别。
### 重要性采样
在实际场景当中一个item可能被数以百万千万的用户交互过所以不可能聚合所有邻居节点是不可行的只可能是采样部分邻居进行信息聚合。但是如果采用GraphSAGE中随机采样的方法由于采样的邻居有限(这里是相对于所有节点而言)会存在一定的偏差。因此PinSAGE 在采样中考虑了更加重要的邻居节点,即卷积时只注重部分重要的邻居节点信息,已达到高效计算的同时又可以消除偏置。
PinSAGE使用重要性采样方法即需要为每个邻居节点计算一个重要性权重根据权重选取top-t的邻居作为聚合时的邻居集合。其中计算重要性的过程是以目标节点为起点进行random-walk采样结束之后计算所有节点访问数的L1-normalized作为重要性权重同时这个权重也会在聚合过程中加以使用(**加权聚合**)。
这里对于**计算权重之后如何得到top-t的邻居节点**原文并没有直接的叙述。这里可以有两种做法第一种就是直接采用重要权重这种方法言简意赅比较直观。第二种做法就是对游走得到的所有邻居进行随机抽样而计算出的权重可以用于聚合阶段。个人理解第二种做法的可行性出于两点原因其一是这样方法可以避免存在一些item由于权重系数低永远不会被选中的问题其二可能并不是将所有重要性的邻居进行聚合更合理毕竟重要性权重是通过随机采样而得到的具有一定的随机性。当然以上两种方法都是可行的方案可以通过尝试看看具体哪种方法会更有效。
### 聚合函数
PinSAGE中提到的Convolve算法单层图卷积操作相当于GraphSAGE算法的聚合过程在实际执行过程中通过对每一层执行一次图卷积操作以得到不同阶邻居的信息具体过程如下图所示
<div align=center>
<img src="https://cdn.jsdelivr.net/gh/swallown1/blogimages@main/images/image-20220406202027832.png" style="zoom:110%;"/>
</div>
上述的单层图卷积过程如下三步:
1. 聚合邻居: 先将所有的邻居节点经过一次非线性转化(一层DNN),再由聚合函数(Pooling聚合) $\gamma$(如元素平均,**加权和**等将所有邻居信息聚合成目标节点的embedding。这里的加权聚合采用的是通过random-walk得到的重要性权重。
2. 更新当前节点的embedding将目标节点当前的向量 $z_u$ 与步骤1中聚合得到的邻居向量 $n_u$ 进行拼接,在通过一次非线性转化。
3. 归一化操作:对目标节点向量 $z_u$ 归一化。
Convolve算法的聚合方法与GraphSAGE的Pooling聚合函数相同主要区别在于对更新得到的向量 $z_u$ 进行归一化操作,**可以使训练更稳定,以及在近似查找最近邻的应用中更有效率。**
### 基于**mini-batch**堆叠多层图卷积
与GraphSAGE类似采用的是基于mini-batch 的方式进行训练。之所以这么做的原因是因为什么呢在实际的工业场景中由于用户交互图非常庞大无法对于所有的节点同时学习一个embedding因此需要从原始图上寻找与 mini-batch 节点相关的子图。具体地是说对于mini-batch内的所有节点会通过采样的方式逐层的寻找相关邻居节点再通过对每一层的节点做一次图卷积操作以从k阶邻居节点聚合信息。
<div align=center>
<img src="https://cdn.jsdelivr.net/gh/swallown1/blogimages@main/images/image-20220406204431024.png" style="zoom:60%;"/>
</div>
如上图所示对于batch内的所有节点(图上最顶层的6个节点)依次根据权重采样得到batch内所有节点的一阶邻居(图上第二层的所有节点);然后对于所有一阶邻居再次进行采样,得到所有二阶邻居(图上的最后一层)。节点采样阶段完成之后,与采样的顺序相反进行聚合操作。首先对二阶邻居进行单次图卷积,将二阶节点信息聚合已更新一阶节点的向量表示(其中小方块表示的是一层非线性转化)其次对一阶节点再次进行图卷积操作将一阶节点的信息聚合已更新batch内所有节点的向量表示。仅此对于一个batch内的所有的样本通过卷积操作学习到一个embedding而每一个batch的学习过程中仅**利用与mini-batch内相关节点的子图结构。**
### **训练过程**
PinSage在训练时采用的是 Margin Hinge Loss 损失函数主要的思想是最大化正例embedding之间的相关性同时还要保证负例之间相关性相比正例之间的相关性小于某个阈值(Margin)。具体的公式如下:
<div align=center>
<img src="https://cdn.jsdelivr.net/gh/swallown1/blogimages@main/images/image-20220406210833675.png" style="zoom:100%;"/>
</div>
其中$Z_p$是学习得到的目标节点embedding$Z_i$是与目标节点相关item的embedding$Z_{n_k}$是与目标节点不相关item的embedding$\Delta$为margin值具体大小需要调参。那么对于相关节点i以及不相关节点nk具体都是如何定义的这对于召回模型的训练意义重大让我们看看具体是如何定义的。
对于正样本而言,文中的定义是如果用户在点击的 item q之后立即点击了 item i即认为 < q, i >构成正样本对。直观的我们很好理解这句话不过在参考DGL中相关代码实现时发现这部分的内容和原文中有一定的出入。具体地代码中将所有的训练样本构造成用户-项目二部图然后对batch内的每个 item q根据item-user-item的元路径进行随机游走得到被同一个用户交互过的 item i因此组成<q,i>正样本对。对于负样本部分,相对来说更为重要,因此内容相对比较多,将在下面的负样本生成部分详细介绍。
这里还有一个比较重要的细节需要注意,由于模型是用于 item to item的召回因此优化目标是与正样本之间的表示尽可能的相近与负样本之间的表示尽可能的远。而图卷积操作会使得具有邻接关系的节点表示具有同质性因此结合这两点就需要在构建图结构的时要将**训练样本之间可能存在的边在二部图上删除**,避免因为边的存在使得因卷积操作而导致的信息泄露。
### 工程技巧
由于PinSAGE是一篇工业界的论文其中会涉及与实际工程相关的内容这里在了解完算法思想之后再从实际落地的角度看看PinSAGE给我们介绍的工程技巧。
**负样本的生成**
召回模型最主要的任务是从候选集合中选出用户可能感兴趣的item直观的理解就是让模型将用户喜欢的和不喜欢的进行区分。然而由于候选集合的庞大数量许多item之间十分相似导致模型划分出来用户喜欢的item中会存在一些难以区分的item(即与用户非常喜欢item比较相似的那一部分)。因此对于召回模型不仅能区分用户喜欢和不喜欢的 item同时还能区分与用户喜欢的 item 十分相似的那一部分item。那么如果做到呢这主要是交给 easy negative examples 和 hard negative examples 两种负样本给模型学习。
- easy 负样本这里对于mini-batch内的所有pair(训练样本对)会共享500负样本这500个样本从batch之外的所有节点中随机采样得到。这么做可以减少在每个mini-batch中因计算所有节点的embedding所需的时间文中指出这和为每个item采样一定数量负样本无差异。
- hard 负样本这里使用hard 负样本的原因是根据实际场景的问题出发模型需要从20亿的物品item集合中识别出最相似的1000个即模型需要从2百万 item 中识别出最相似的那一个 item。也就是说模型的区分能力不够细致为了解决这个问题加入了一些hard样本。对于hard 负样本,应该是与 q 相似 以及和 i 不相似的物品,具体地的生成方式是将图上的节点计算相对节点 q 的个性化PageRank分值根据分值的排序随机从2000~5000的位置选取节点作为负样本。
负样本的构建是召回模型的中关键的内容,在各家公司的工作都予以体现,具体的大家可以参考 Facebook 发表的[《Embedding-based Retrieval in Facebook Search》](https://arxiv.org/pdf/2006.11632v1.pdf)
**渐进式训练(Curriculum training)**
由于hard 负样本的加入模型的训练时间加长由于与q过于相似导致loss比较小导致梯度更新的幅度比较小训练起来比较慢那么渐进式训练就是为了来解决这个问题。
如何渐进式先在第一轮训练使用easy 负样本,帮助模型先快速收敛(先让模型有个最基本的分辨能力)到一定范围然后在逐步分加入hard负样本(方式是在第n轮训练时给每个物品的负样本集合增加n-1个 hard 负样本),以调整模型细粒度的区分能力(让模型能够区分相似的item)。
**节点特征(side information)**
这里与EGES的不同这里的边信息不是端到端训练得到而是通过事前的预处理得到的。对于每个节点(即 pin),都会有一个图片和一点文本信息。因此对于每个节点使用图片的向量、文字的向量以及节点的度拼接得到。这里其实也解释了为什么在图卷积操作时,会先进行一个非线性转化,其实就是将不同空间的特征进行转化(融合)。
**构建 mini-batch**
不同于常规的构建方式PinSAGE中构建mini-batch的方式是基于生产者消费者模式。什么意思的就是将CPU和GPU分开工作让CPU负责取特征重建索引邻接列表负采样等工作让GPU进行矩阵运算即CPU负责生产每个batch所需的所有数据GPU则根据CPU生产的数据进行消费(运算)。这样由于考虑GPU的利用率无法将所有特征矩阵放在GPU只能存在CPU中然而每次查找会导致非常耗时通过上面的方式使得图卷积操作过程中就没有GPU与CPU的通信需求。
**多GPU训练超大batch**
前向传播过程中各个GPU等分minibatch共享一套模型参数反向传播时将每个GPU中的参数梯度都聚合到一起同步执行SGD。为了保证因海量数据而使用的超大batchsize的情况下模型快速收敛以及泛化精度采用warmup过程即在第一个epoch中将学习率线性提升到最高后面的epoch中再逐步指数下降。
**使用MapReduce高效推断**
在模型训练结束之后需要为所有节点计算一个embedding如果按照训练过程中的前向传播过程来生成会存在大量重复的计算。因为当计算一个节点的embedding的时候其部分邻居节点已经计算过了同时如果该节点作为其他节点邻居时也会被再次计算。针对这个问题本文采用MapReduce的方法进行推断。该过程主要分为两步具体如下图所示
<div align=center>
<img src="https://cdn.jsdelivr.net/gh/swallown1/blogimages@main/images/image-20220407132111547.png" style="zoom:60%;"/>
</div>
1. 将item的embedding进行聚合即利用item的图片、文字和度等信息的表示进行join(拼接)在通过一层dense后得到item的低维向量。
2. 然后根据item来匹配其一阶邻居(join)然后根据item进行pooling(其实就是GroupBy pooling)得到一次图卷积操作。通过堆叠多次直接得到全量的embedding。
其实这块主要就是通过MapReduce的大数据处理能力直接对全量节点进行一次运算得到其embedding避免了分batch所导致的重复计算。
## 代码解析
了解完基本的原理之后最关键的还是得解析源码以证实上面讲的细节的准确性。下面基于DGL中实现的代码看看模型中的一些细节。
### 数据处理
在弄清楚模型之前最重要的就是知道送入模型的数据到底是什么养的以及PinSAGE相对于GraphSAGE最大的区别就在于如何采样邻居如何构建负样本等。
首先需要明确的是,无论是**邻居采样**还是**样本的构造**都发生在图结构上因此最主要的是需要先构建一个user和item组成的二部图。
```python
# ratings是所有的用户交互
# 过滤掉为出现在交互中的用户和项目
distinct_users_in_ratings = ratings['user_id'].unique()
distinct_movies_in_ratings = ratings['movie_id'].unique()
users = users[users['user_id'].isin(distinct_users_in_ratings)]
movies = movies[movies['movie_id'].isin(distinct_movies_in_ratings)]
# 将电影特征分组 genres (a vector), year (a category), title (a string)
genre_columns = movies.columns.drop(['movie_id', 'title', 'year'])
movies[genre_columns] = movies[genre_columns].fillna(False).astype('bool')
movies_categorical = movies.drop('title', axis=1)
## 构建图
graph_builder = PandasGraphBuilder()
graph_builder.add_entities(users, 'user_id', 'user') # 添加user类型节点
graph_builder.add_entities(movies_categorical, 'movie_id', 'movie') # 添加movie类型节点
# 构建用户-电影的无向图
graph_builder.add_binary_relations(ratings, 'user_id', 'movie_id', 'watched')
graph_builder.add_binary_relations(ratings, 'movie_id', 'user_id', 'watched-by')
g = graph_builder.build()
```
在构建完原图之后,需要将交互数据(ratings)分成训练集和测试集,然后根据测试集从原图中抽取出与训练集中相关节点的子图。
```python
# train_test_split_by_time 根据时间划分训练集和测试集
# 将用户的倒数第二次交互作为验证,最后一次交互用作测试
# train_indices 为用于训练的用户与电影的交互
train_indices, val_indices, test_indices = train_test_split_by_time(ratings, 'timestamp', 'user_id')
# 只使用训练交互来构建图形,测试集相关的节点不应该出现在训练过程中。
# 从原图中提取与训练集相关节点的子图
train_g = build_train_graph(g, train_indices, 'user', 'movie', 'watched', 'watched-by')
```
### 正负样本采样
在得到训练图结构之后为了进行PinSAGE提出的item2item召回任务需要构建相应的训练样本。对于训练样本主要是构建正样本对和负样本对前面我们已经提到了正样本对是基于 item to user to item的随即游走得到的对于负样本DGL的实现主要是随机采样即只有easy sample未实现hard sample。具体地DGL中主要是通过sampler_module.ItemToItemBatchSampler方法进行采样主要代码如下
```python
class ItemToItemBatchSampler(IterableDataset):
def __init__(self, g, user_type, item_type, batch_size):
self.g = g
self.user_type = user_type
self.item_type = item_type
self.user_to_item_etype = list(g.metagraph()[user_type][item_type])[0]
self.item_to_user_etype = list(g.metagraph()[item_type][user_type])[0]
self.batch_size = batch_size
def __iter__(self):
while True:
# 随机采样batch_size个节点作为head 即论文中的q
heads = torch.randint(0, self.g.number_of_nodes(self.item_type), (self.batch_size,))
# 本次元路径表示从item游走到user再从user游走到item总共二跳取出二跳节点(电影节点)作为tails(即正样本)
# 得到与heads被同一个用户消费过的其他item做正样本
# 这么做可能存在问题,
# 1. 这种游走肯定会使正样本集中于少数热门item
# 2. 如果item只被一个用户消费过二跳游走岂不是又回到起始item这种case还是要处理的
tails = dgl.sampling.random_walk(
self.g,
heads,
metapath=[self.item_to_user_etype, self.user_to_item_etype])[0][:, 2]
# 随机采样做负样本, 没有hard negative
# 这么做会存在被同一个用户交互过的movie也会作为负样本
neg_tails = torch.randint(0, self.g.number_of_nodes(self.item_type), (self.batch_size,))
mask = (tails != -1)
yield heads[mask], tails[mask], neg_tails[mask]
```
上面的样本采样过程只是一个简单的示例,如果面对实际问题,需要自己来重新完成这部分的内容。
### 邻居节点采样
再得到训练样本之后接下来主要是在训练图上为heads节点采用其邻居节点。在DGL中主要是通过sampler_module.NeighborSampler来实现具体地通过**sample_blocks**方法回溯生成各层卷积需要的block即所有的邻居集合。其中需要注意的几个地方基于随机游走的重要邻居采样DGL已经实现具体参考**[dgl.sampling.PinSAGESampler](https://docs.dgl.ai/generated/dgl.sampling.PinSAGESampler.html)**其次避免信息泄漏代码中先将head → tails,head → neg_tails从frontier中先删除再生成block。
```python
class NeighborSampler(object): # 图卷积的邻居采样
def __init__(self, g, user_type, item_type, random_walk_length, random_walk_restart_prob,
num_random_walks, num_neighbors, num_layers):
self.g = g
self.user_type = user_type
self.item_type = item_type
self.user_to_item_etype = list(g.metagraph()[user_type][item_type])[0]
self.item_to_user_etype = list(g.metagraph()[item_type][user_type])[0]
# 每层都有一个采样器,根据随机游走来决定某节点邻居的重要性(主要的实现已封装在PinSAGESampler中)
# 可以认为经过多次游走,落脚于某邻居节点的次数越多,则这个邻居越重要,就更应该优先作为邻居
self.samplers = [
dgl.sampling.PinSAGESampler(g, item_type, user_type, random_walk_length,
random_walk_restart_prob, num_random_walks, num_neighbors)
for _ in range(num_layers)]
def sample_blocks(self, seeds, heads=None, tails=None, neg_tails=None):
"""根据随机游走得到的重要性权重,进行邻居采样"""
blocks = []
for sampler in self.samplers:
frontier = sampler(seeds) # 通过随机游走进行重要性采样,生成中间状态
if heads is not None:
# 如果是在训练需要将heads->tails 和 head->neg_tails这些待预测的边都去掉
eids = frontier.edge_ids(torch.cat([heads, heads]), torch.cat([tails, neg_tails]), return_uv=True)
if len(eids) > 0:
old_frontier = frontier
frontier = dgl.remove_edges(old_frontier, eids)
# 只保留seeds这些节点将frontier压缩成block
# 并设置block的input/output nodes
block = compact_and_copy(frontier, seeds)
# 本层的输入节点就是下一层的seeds
seeds = block.srcdata[dgl.NID]
blocks.insert(0, block)
return blocks
```
其次**sample_from_item_pairs**方法是通过上面得到的heads, tails, neg_tails分别构建基于正样本对以及基于负样本对的item-item图。由heads→tails生成的pos_graph用于计算pairwise loss中的pos_score由heads→neg_tails生成的neg_graph用于计算pairwise loss中的neg_score。
```python
class NeighborSampler(object): # 图卷积的邻居采样
def __init__(self, g, user_type, item_type, random_walk_length, ....):
pass
def sample_blocks(self, seeds, heads=None, tails=None, neg_tails=None):
pass
def sample_from_item_pairs(self, heads, tails, neg_tails):
# 由heads->tails构建positive graph num_nodes设置成原图中所有item节点
pos_graph = dgl.graph(
(heads, tails),
num_nodes=self.g.number_of_nodes(self.item_type))
# 由heads->neg_tails构建negative graphnum_nodes设置成原图中所有item节点
neg_graph = dgl.graph(
(heads, neg_tails),
num_nodes=self.g.number_of_nodes(self.item_type))
# 去除heads, tails, neg_tails以外的节点将大图压缩成小图避免与本轮训练不相关节点的结构也传入模型提升计算效率
pos_graph, neg_graph = dgl.compact_graphs([pos_graph, neg_graph])
# 压缩后的图上的节点是原图中的编号
# 注意这时pos_graph与neg_graph不是分开编号的两个图它们来自于同一幅由heads, tails, neg_tails组成的大图
# pos_graph和neg_graph中的节点相同都是heads+tails+neg_tails即这里的seedspos_graph和neg_graph只是边不同而已
seeds = pos_graph.ndata[dgl.NID] # 字典 不同类型节点为一个tensor为每个节点的id值
blocks = self.sample_blocks(seeds, heads, tails, neg_tails)
return pos_graph, neg_graph, blocks
```
### PinSAGE
在得到所有所需的数据之后,看看模型结构。其中主要分为三个部分:**节点特征映射****多层卷积模块 **和 **给边打分**
```python
class PinSAGEModel(nn.Module):
def __init__(self, full_graph, ntype, textsets, hidden_dims, n_layers):
super().__init__()
# 负责将节点上的各种特征都映射成向量,并聚合在一起,形成这个节点的原始特征向量
self.proj = layers.LinearProjector(full_graph, ntype, textsets, hidden_dims)
# 负责多层图卷积得到各节点最终的embedding
self.sage = layers.SAGENet(hidden_dims, n_layers)
# 负责根据首尾两端的节点的embedding计算边上的得分
self.scorer = layers.ItemToItemScorer(full_graph, ntype)
def forward(self, pos_graph, neg_graph, blocks):
""" pos_graph, neg_graph, blocks 的最后一层都对应batch中 heads+tails+neg_tails 这些节点
"""
# 得到batch中heads+tails+neg_tails这些节点的最终embedding
h_item = self.get_repr(blocks)
# 得到heads->tails这些边上的得分
pos_score = self.scorer(pos_graph, h_item)
# 得到heads->neg_tails这些边上的得分
neg_score = self.scorer(neg_graph, h_item)
# pos_graph与neg_graph边数相等因此neg_score与pos_score相减
# 返回margin hinge loss这里的margin是1
return (neg_score - pos_score + 1).clamp(min=0)
def get_repr(self, blocks):
"""
通过self.sage经过多层卷积得到输出节点上的卷积结果再加上这些输出节点上原始特征的映射结果
得到输出节点上最终的向量表示
"""
h_item = self.proj(blocks[0].srcdata) # 将输入节点上的原始特征映射成hidden_dims长的向量
h_item_dst = self.proj(blocks[-1].dstdata) # 将输出节点上的原始特征映射成hidden_dims长的向量
return h_item_dst + self.sage(blocks, h_item)
```
**节点特征映射:**由于节点使用到了多种类型int,float array,text的原始特征这里使用了一个DNN层来融合成固定的长度。
```python
class LinearProjector(nn.Module):
def __init__(self, full_graph, ntype, textset, hidden_dims):
super().__init__()
self.ntype = ntype
# 初始化参数,这里为全图中所有节点特征初始化
# 如果特征类型是float就定义一个nn.Linear线性变化为指定维度
# 如果特征类型是int就定义Embedding矩阵将id型特征转化为向量
self.inputs = _init_input_modules(full_graph, ntype, textset, hidden_dims)
def forward(self, ndata):
projections = []
for feature, data in ndata.items():
# NID是计算子图中节点、边在原图中的编号没必要用做特征
if feature == dgl.NID:
continue
module = self.inputs[feature] # 根据特征名取出相应的特征转化器
# 对文本属性进行处理
if isinstance(module, (BagOfWords, BagOfWordsPretrained)):
length = ndata[feature + '__len']
result = module(data, length)
else:
result = module(data) # look_up
projections.append(result)
# 将每个特征都映射后的hidden_dims长的向量element-wise相加
return torch.stack(projections, 1).sum(1) # [nodes, hidden_dims]
```
**多层卷积模块:**根据采样得到的节点blocks然后通过进行逐层卷积得到各节点最终的embedding。
```python
class SAGENet(nn.Module):
def __init__(self, hidden_dims, n_layers):
"""g : 二部图"""
super().__init__()
self.convs = nn.ModuleList()
for _ in range(n_layers):
self.convs.append(WeightedSAGEConv(hidden_dims, hidden_dims, hidden_dims))
def forward(self, blocks, h):
# 这里根据邻居节点进逐层聚合
for layer, block in zip(self.convs, blocks):
h_dst = h[:block.number_of_nodes('DST/' + block.ntypes[0])] #前一次卷积的结果
h = layer(block, (h, h_dst), block.edata['weights'])
return h
```
其中WeightedSAGEConv为根据邻居权重的聚合函数。
```python
class WeightedSAGEConv(nn.Module):
def __init__(self, input_dims, hidden_dims, output_dims, act=F.relu):
super().__init__()
self.act = act
self.Q = nn.Linear(input_dims, hidden_dims)
self.W = nn.Linear(input_dims + hidden_dims, output_dims)
self.reset_parameters()
self.dropout = nn.Dropout(0.5)
def reset_parameters(self):
gain = nn.init.calculate_gain('relu')
nn.init.xavier_uniform_(self.Q.weight, gain=gain)
nn.init.xavier_uniform_(self.W.weight, gain=gain)
nn.init.constant_(self.Q.bias, 0)
nn.init.constant_(self.W.bias, 0)
def forward(self, g, h, weights):
"""
g : 基于batch的子图
h : 节点特征
weights : 边的权重
"""
h_src, h_dst = h # 邻居节点特征,自身节点特征
with g.local_scope():
# 将src节点上的原始特征映射成hidden_dims长存储于'n'字段
g.srcdata['n'] = self.act(self.Q(self.dropout(h_src)))
g.edata['w'] = weights.float()
# src节点上的特征'n'乘以边上的权重,构成消息'm'
# dst节点将所有接收到的消息'm'相加起来存入dst节点的'n'字段
g.update_all(fn.u_mul_e('n', 'w', 'm'), fn.sum('m', 'n'))
# 将边上的权重w拷贝成消息'm'
# dst节点将所有接收到的消息'm'相加起来存入dst节点的'ws'字段
g.update_all(fn.copy_e('w', 'm'), fn.sum('m', 'ws'))
# 邻居节点的embedding的加权和
n = g.dstdata['n']
ws = g.dstdata['ws'].unsqueeze(1).clamp(min=1) # 边上权重之和
# 先将邻居节点的embedding做加权平均
# 再拼接上一轮卷积后dst节点自身的embedding
# 再经过线性变化与非线性激活得到这一轮卷积后各dst节点的embedding
z = self.act(self.W(self.dropout(torch.cat([n / ws, h_dst], 1))))
# 本轮卷积后各dst节点的embedding除以模长进行归一化
z_norm = z.norm(2, 1, keepdim=True)
z_norm = torch.where(z_norm == 0, torch.tensor(1.).to(z_norm), z_norm)
z = z / z_norm
return z
```
**给边打分:** 经过SAGENet得到了batch内所有节点的embedding这时需要根据学习到的embedding为pos_graph和neg_graph中的每个边打分即计算正样本对和负样本的內积。具体逻辑是根据两端节点embedding的点积然后加上两端节点的bias。
```python
class ItemToItemScorer(nn.Module):
def __init__(self, full_graph, ntype):
super().__init__()
n_nodes = full_graph.number_of_nodes(ntype)
self.bias = nn.Parameter(torch.zeros(n_nodes, 1))
def _add_bias(self, edges):
bias_src = self.bias[edges.src[dgl.NID]]
bias_dst = self.bias[edges.dst[dgl.NID]]
# 边上两顶点的embedding的点积再加上两端节点的bias
return {'s': edges.data['s'] + bias_src + bias_dst}
def forward(self, item_item_graph, h):
"""
item_item_graph : 每个边 为 pair 对
h : 每个节点隐层状态
"""
with item_item_graph.local_scope():
item_item_graph.ndata['h'] = h
# 边两端节点的embedding做点积保存到s
item_item_graph.apply_edges(fn.u_dot_v('h', 'h', 's'))
# 为每个边加上偏置,即加上两个顶点的偏置
item_item_graph.apply_edges(self._add_bias)
# 算出来的得分为 pair 的预测得分
pair_score = item_item_graph.edata['s']
return pair_score
```
### 训练过程
介绍完“数据处理”和“PinSAGE模块”之后接下来就是通过训练过程将上述两部分串起来详细的见代码
```python
def train(dataset, args):
#从dataset中加载数据和原图
g = dataset['train-graph']
...
device = torch.device(args.device)
# 为节点随机初始化一个id用于做embedding
g.nodes[user_ntype].data['id'] = torch.arange(g.number_of_nodes(user_ntype))
g.nodes[item_ntype].data['id'] = torch.arange(g.number_of_nodes(item_ntype))
# 负责采样出batch_size大小的节点列表: heads, tails, neg_tails
batch_sampler = sampler_module.ItemToItemBatchSampler(
g, user_ntype, item_ntype, args.batch_size)
# 由一个batch中的heads,tails,neg_tails构建训练这个batch所需要的
# pos_graph,neg_graph 和 blocks
neighbor_sampler = sampler_module.NeighborSampler(
g, user_ntype, item_ntype, args.random_walk_length,
args.random_walk_restart_prob, args.num_random_walks, args.num_neighbors,
args.num_layers)
# 每次next()返回: pos_graph,neg_graph和blocks做训练之用
collator = sampler_module.PinSAGECollator(neighbor_sampler, g, item_ntype, textset)
dataloader = DataLoader(
batch_sampler,
collate_fn=collator.collate_train,
num_workers=args.num_workers)
# 每次next()返回blocks做训练中测试之用
dataloader_test = DataLoader(
torch.arange(g.number_of_nodes(item_ntype)),
batch_size=args.batch_size,
collate_fn=collator.collate_test,
num_workers=args.num_workers)
dataloader_it = iter(dataloader)
# 准备模型
model = PinSAGEModel(g, item_ntype, textset, args.hidden_dims, args.num_layers).to(device)
opt = torch.optim.Adam(model.parameters(), lr=args.lr)
# 训练过程
for epoch_id in range(args.num_epochs):
model.train()
for batch_id in tqdm.trange(args.batches_per_epoch):
pos_graph, neg_graph, blocks = next(dataloader_it)
for i in range(len(blocks)):
blocks[i] = blocks[i].to(device)
pos_graph = pos_graph.to(device)
neg_graph = neg_graph.to(device)
loss = model(pos_graph, neg_graph, blocks).mean()
opt.zero_grad()
loss.backward()
opt.step()
```
至此DGL PinSAGE example的主要实现代码已经全部介绍完了感兴趣的可以去官网对照源代码自行学习。
## 参考
[Graph Convolutional Neural Networks for Web-Scale Recommender Systems](https://arxiv.org/abs/1806.01973)
[PinSAGE 召回模型及源码分析(1): PinSAGE 简介](https://zhuanlan.zhihu.com/p/275942839)
[全面理解PinSage](https://zhuanlan.zhihu.com/p/133739758)
[[论文笔记]PinSAGE——Graph Convolutional Neural Networks for Web-Scale Recommender Systems](https://zhuanlan.zhihu.com/p/461720302)