package cn.kgc.graphxdemo
import org.apache.spark.SparkContext
import org.apache.spark.graphx.{Edge, EdgeTriplet, Graph, GraphLoader}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object GraphDemo1 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName("sparkgraph")
.master("local[*]")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
//定义顶点
val vertices: RDD[(Long, Int)] = sc.makeRDD(Seq((1L,1),(2L,2),(3L,3)))
//定义边
val edges: RDD[Edge[Int]] = sc.makeRDD(Seq(Edge(1L,2L,1),Edge(2L,3L,1)))
val graph: Graph[Int, Int] = Graph(vertices,edges)
println(graph)
graph.triplets.foreach(println)
println("----------------------------------------")
val graph2: Graph[Int, Int] = GraphLoader.edgeListFile(sc,"in/graph.txt")
graph2.triplets.foreach(println)
}
}
构建用户合作关系属性图
顶点属性
用户名
职业
边属性
合作关系
package cn.kgc.graphxdemo
import org.apache.spark.SparkContext
import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object GraphDemo2 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName("sparkgraph")
.master("local[*]")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val users: RDD[(Long, (String, String))] = sc.makeRDD( //二元组
Array(
(3L, ("rxin", "student")),
(7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "professor")),
(2L, ("istoica", "professor"))
)
)
val relations: RDD[Edge[String]] = sc.makeRDD(
Array(
Edge(3L, 7L, "Collaborator"),
Edge(5L, 3L, "Advisior"),
Edge(2L, 5L, "Colleague"),
Edge(5L, 7L, "PI") //四行 边四个
)
)
val graph: Graph[(String, String), String] = Graph(users,relations)
graph.triplets.foreach(println)
println("--------------------------------")
graph.vertices.foreach(println)
graph.edges.foreach(println)
}
}
构建用户社交网络关系
顶点:用户名、年龄
边:打call次数
找出大于30岁的用户
package cn.kgc.graphxdemo
import org.apache.spark.SparkContext
import org.apache.spark.graphx.{Edge, Graph, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object GraphDemo3 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName("sparkgraph")
.master("local[*]")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val users: RDD[(Long, (String, Int))] = sc.makeRDD( //元组里面不限类型
Array(
(1L, ("Alice", 28)),
(2L, ("Bob", 27)),
(3L, ("Charlie", 65)),
(4L, ("David", 42)),
(5L, ("Ed", 55)),
(6L, ("Fran", 50))
)
)
val edges: RDD[Edge[Int]] = sc.makeRDD(
Array(
Edge(2L, 1L, 7),
Edge(3L, 2L, 4),
Edge(4L, 1L, 1),
Edge(2L, 4L, 2),
Edge(5L, 2L, 2),
Edge(5L, 3L, 8),
Edge(3L, 6L, 3),
Edge(5L, 6L, 3)
)
)
val graph: Graph[(String, Int), Int] = Graph(users,edges)
val rdd1: VertexRDD[(String, Int)] = graph.vertices.filter(x=>x._2._2>30)
rdd1.foreach(println)
println("-----------------------------")
val rdd2: VertexRDD[(String, Int)] = graph.vertices.filter{case (id,(name,age))=>age>30}
// rdd2.foreach(println)
// for(rdd <- rdd2)
// println(rdd)
// for((id,(name,age)) <- rdd2){
// println(age)
//
// }
// graph.triplets.collect().foreach(println)
graph.triplets.filter(x=>x.attr>5) //(顶点,终点,关系Edge)
.foreach(x=>{println(x.srcAttr._1+"喜欢 "+x.dstAttr._1+ " 爱的有多深: "+x.attr)}) //srcAttr起点
val edgesNum: Long = graph.numEdges
val verticesNum: Long = graph.numVertices
println(edgesNum,verticesNum)
println("----------------度-------------------")
//出度、入度
val degrees: VertexRDD[Int] = graph.degrees
degrees.foreach(println)
println("----------入度----------")
val degreesin: VertexRDD[Int] = graph.inDegrees
degreesin.foreach(println)
println("------------出度---------------")
val degreesout: VertexRDD[Int] = graph.outDegrees
degreesout.foreach(println)
}
}