GraphX是Spark中的一个分布式图计算框架,是对Spark RDD的扩展。这里所说的图并不是图片,而是一个抽象的关系网。例如,社交应用微信、QQ、微博等用户之间的好友、关注等存在错综复杂的联系,这种联系构成了一张巨大的关系网,我们把这个关系网称为图。GraphX目前适用于微信、微博、社交网络、电子商务等类型的产品,也越来越多地应用于推荐领域的人群划分、年龄预测、标签推理等。
Vertices、edges、triplets是Spark GraphX中3个最重要的概念。
-
Vertices对应的RDD名称为VertexRDD,VertexRDD继承自RDD[(VertexId, VD) ],RDD的类型是VertexId和VD,其中VD是属性的类型,也就是说,VertexRDD有ID和顶点属性。
-
Edges对应的是EdgeRDD,EdgeRDD继承的RDD的类型是Edge[ED],属性有3个:源顶点的ID、目标顶点的ID、边属性。
-
Triplets的属性有源顶点ID、源顶点属性、边属性、目标顶点ID、目标顶点属性,Triplets其实是对Vertices和Edges做了Join操作
其实顶点和边都是RDD,通过顶点和边之间的关系构建的图其三元组关系也是一个RDD,都适用RDD的一些操作
object GraphX {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName("SparkGraphXExample")
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("WARN")
val users:RDD[(VertexId, (String, String))] = sc.parallelize(Array(
(3L, ("zhangsan", "student")),
(2L, ("lisi", "prof")),
(5L, ("wangwu", "prof")),
(7L, ("zhaosi", "postdoc")),
))
val relationships: RDD[Edge[String]] = sc.parallelize(Array(
Edge(3L, 7L, "collab"),
Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"),
Edge(5L, 7L, "pi"),
Edge(6L, 7L, "pi")
))
val defaultUser = ("ffzs", "Missing")
val graph = Graph(users, relationships, defaultUser)
// 图中获取顶点,进行过滤操作
val filteredVert = graph.vertices
.filter(_._2._2 == "postdoc")
.count
println(f"博士后节点的数量为: $filteredVert")
// 图中获取边,进行过滤操作
val filteredEdge = graph.edges
.filter(edge => edge.srcId > edge.dstId)
.count
println(f"途中边起始ID大于终止ID的数量为: $filteredEdge")
// 对图中的三元组进行操作 (start vertices, end vertices, relation)
graph.triplets.foreach(println)
// 计算每个顶点的入度
println("每个顶点的入度情况:")
val inDegrees = graph.inDegrees
inDegrees.foreach(println)
// 计算每个顶点的出度
println("每个顶点的出度情况:")
val outDegrees = graph.outDegrees
outDegrees.foreach(println)
// 计算每个顶点的度: 入度 + 出度
val degrees = graph.degrees
// 对图中所有边关系进行翻转
val reverseGraph = graph.reverse
// 通过subgraph对图进行过滤获得新的图
println("通过subgraph对图进行过滤获得新的图")
val subGraph = graph.subgraph(
vpred = (id, attr) => attr._2 == "prof"
)
subGraph.triplets.foreach(println)
// groupEdges 对相同起始终止的边进行合并
println("groupEdges 对相同起始终止的边进行合并")
val grouped = graph.groupEdges(merge = (a, b) => a + b)
grouped.edges.collect().foreach(println)
// mapVertices 更改顶点属性
println("通过 mapVertices 更改顶点属性")
graph.mapVertices((id, attr) => (attr._1, if (attr._2=="student") "student" else "teacher"))
.vertices
.foreach(println)
// mapEdges 更改顶点属性
println("通过 mapEdges 更改顶点属性")
graph.mapEdges(edge => if (edge.attr == "pi") "friend" else edge.attr)
.edges.foreach(println)
}
}
- 计算图中每一个顶点的入度和出度情况
- 对图进行一些过滤生成新的图
- 更改图中的属性
- 对图中重复边进行聚合
博士后节点的数量为: 1 途中边起始ID大于终止ID的数量为: 1 ((5,(wangwu,prof)),(7,(zhaosi,postdoc)),pi) ((3,(zhangsan,student)),(7,(zhaosi,postdoc)),collab) ((2,(lisi,prof)),(5,(wangwu,prof)),colleague) ((5,(wangwu,prof)),(3,(zhangsan,student)),advisor) ((6,(ffzs,Missing)),(7,(zhaosi,postdoc)),pi) 每个顶点的入度情况: (5,1) (7,3) (3,1) 每个顶点的出度情况: (5,2) (6,1) (3,1) (2,1) 通过subgraph对图进行过滤获得新的图 ((2,(lisi,prof)),(5,(wangwu,prof)),colleague) groupEdges 对相同起始终止的边进行合并 Edge(3,7,collab) Edge(5,3,advisor) Edge(2,5,colleague) Edge(5,7,pi) Edge(6,7,pi) 通过 mapVertices 更改顶点属性 (7,(zhaosi,teacher)) (2,(lisi,teacher)) (6,(ffzs,teacher)) (3,(zhangsan,student)) (5,(wangwu,teacher)) 通过 mapEdges 更改顶点属性 Edge(5,7,friend) Edge(5,3,advisor) Edge(3,7,collab) Edge(2,5,colleague) Edge(6,7,friend)



