栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

# spark进阶(九):GraphX使用

# spark进阶(九):GraphX使用

GraphX是Spark中的一个分布式图计算框架,是对Spark RDD的扩展。这里所说的图并不是图片,而是一个抽象的关系网。例如,社交应用微信、QQ、微博等用户之间的好友、关注等存在错综复杂的联系,这种联系构成了一张巨大的关系网,我们把这个关系网称为图。GraphX目前适用于微信、微博、社交网络、电子商务等类型的产品,也越来越多地应用于推荐领域的人群划分、年龄预测、标签推理等。

Vertices、edges、triplets是Spark GraphX中3个最重要的概念。

  • Vertices对应的RDD名称为VertexRDD,VertexRDD继承自RDD[(VertexId, VD) ],RDD的类型是VertexId和VD,其中VD是属性的类型,也就是说,VertexRDD有ID和顶点属性。

  • Edges对应的是EdgeRDD,EdgeRDD继承的RDD的类型是Edge[ED],属性有3个:源顶点的ID、目标顶点的ID、边属性。

  • Triplets的属性有源顶点ID、源顶点属性、边属性、目标顶点ID、目标顶点属性,Triplets其实是对Vertices和Edges做了Join操作

一、简单使用

其实顶点和边都是RDD,通过顶点和边之间的关系构建的图其三元组关系也是一个RDD,都适用RDD的一些操作

object GraphX {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder
      .appName("SparkGraphXExample")
      .master("local[*]")
      .getOrCreate()

    val sc = spark.sparkContext
    sc.setLogLevel("WARN")

    val users:RDD[(VertexId, (String, String))] = sc.parallelize(Array(
      (3L, ("zhangsan", "student")),
      (2L, ("lisi", "prof")),
      (5L, ("wangwu", "prof")),
      (7L, ("zhaosi", "postdoc")),
    ))

    val relationships: RDD[Edge[String]] = sc.parallelize(Array(
      Edge(3L, 7L, "collab"),
      Edge(5L, 3L, "advisor"),
      Edge(2L, 5L, "colleague"),
      Edge(5L, 7L, "pi"),
      Edge(6L, 7L, "pi")
    ))

    val defaultUser = ("ffzs", "Missing")
    val graph = Graph(users, relationships, defaultUser)

    // 图中获取顶点,进行过滤操作
    val filteredVert = graph.vertices
      .filter(_._2._2 == "postdoc")
      .count
    println(f"博士后节点的数量为: $filteredVert")

    // 图中获取边,进行过滤操作
    val filteredEdge = graph.edges
      .filter(edge => edge.srcId > edge.dstId)
      .count
    println(f"途中边起始ID大于终止ID的数量为: $filteredEdge")

    // 对图中的三元组进行操作 (start vertices, end vertices, relation)
    graph.triplets.foreach(println)

    // 计算每个顶点的入度
    println("每个顶点的入度情况:")
    val inDegrees = graph.inDegrees
    inDegrees.foreach(println)

    // 计算每个顶点的出度
    println("每个顶点的出度情况:")
    val outDegrees = graph.outDegrees
    outDegrees.foreach(println)

    // 计算每个顶点的度: 入度 + 出度
    val degrees = graph.degrees

    // 对图中所有边关系进行翻转
    val reverseGraph = graph.reverse

    // 通过subgraph对图进行过滤获得新的图
    println("通过subgraph对图进行过滤获得新的图")
    val subGraph = graph.subgraph(
      vpred = (id, attr) => attr._2 == "prof"
    )
    subGraph.triplets.foreach(println)

    // groupEdges 对相同起始终止的边进行合并
    println("groupEdges 对相同起始终止的边进行合并")
    val grouped = graph.groupEdges(merge = (a, b) => a + b)
    grouped.edges.collect().foreach(println)

    // mapVertices 更改顶点属性
    println("通过 mapVertices 更改顶点属性")
    graph.mapVertices((id, attr) => (attr._1, if (attr._2=="student") "student" else "teacher"))
      .vertices
      .foreach(println)

    // mapEdges 更改顶点属性
    println("通过 mapEdges 更改顶点属性")
    graph.mapEdges(edge => if (edge.attr == "pi") "friend" else edge.attr)
      .edges.foreach(println)
  }
}

  • 计算图中每一个顶点的入度和出度情况
  • 对图进行一些过滤生成新的图
  • 更改图中的属性
  • 对图中重复边进行聚合
博士后节点的数量为: 1
途中边起始ID大于终止ID的数量为: 1
((5,(wangwu,prof)),(7,(zhaosi,postdoc)),pi)
((3,(zhangsan,student)),(7,(zhaosi,postdoc)),collab)
((2,(lisi,prof)),(5,(wangwu,prof)),colleague)
((5,(wangwu,prof)),(3,(zhangsan,student)),advisor)
((6,(ffzs,Missing)),(7,(zhaosi,postdoc)),pi)
每个顶点的入度情况:
(5,1)
(7,3)
(3,1)
每个顶点的出度情况:
(5,2)
(6,1)
(3,1)
(2,1)
通过subgraph对图进行过滤获得新的图
((2,(lisi,prof)),(5,(wangwu,prof)),colleague)
groupEdges 对相同起始终止的边进行合并
Edge(3,7,collab)
Edge(5,3,advisor)
Edge(2,5,colleague)
Edge(5,7,pi)
Edge(6,7,pi)
通过 mapVertices 更改顶点属性
(7,(zhaosi,teacher))
(2,(lisi,teacher))
(6,(ffzs,teacher))
(3,(zhangsan,student))
(5,(wangwu,teacher))
通过 mapEdges 更改顶点属性
Edge(5,7,friend)
Edge(5,3,advisor)
Edge(3,7,collab)
Edge(2,5,colleague)
Edge(6,7,friend)
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/312734.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号