训练的技能点
Spark GraphX API Spark GraphX PageRank Spark GraphX Pregel
需求概述 探索航班飞行网图数据 构建航班飞行网图 使用Spark GraphX完成下列任务 统计航班飞行网图中机场的数量 统计航班飞行网图中航线的数量 计算最长的飞行航线(Point to Point) 找出最繁忙的机场 找出最重要的飞行航线(PageRank) 找出最便宜的飞行航线(SSSP)
下载数据(USA Flight Dataset) https://drive.google.com/file/d/0B7Yoht-ttAeuaWdGZkRsSkVkN00/view 数据格式 文件格式为CSV,字段之间分隔符为“,” 依次为:#日、周#、航空公司、飞机注册号、航班号、起飞机场编号、起飞机场、到达机场编号、到达机场、预计起飞时间(时分)、起飞时间、起飞延迟(分钟)、到达预计时间、到达时间、到达延迟(分钟)、预计飞行时间、飞行距离
问题分析2:构建航班飞行网图创建属性图Graph[VD,ED] 装载CSV为RDD,每个机场作为顶点。关键字段:起飞机场编号、起飞机场、到达机场编号、到达机场、飞行距离 初始化顶点集airports:RDD[(VertexId,String)],顶点属性为机场名称 初始化边集lines:RDD[Edge],边属性为飞行距离
问题分析3:统计航班飞行网图中机场与航线的数量机场数量 求顶点个数:Graph.numVertices 航线数量 求边的个数:Graph.numEdges
问题分析4:计算最长的飞行航线最大的边属性 对triplets按飞行距离排序(降序)并取第一个
问题分析5:找出最繁忙的机场哪个机场到达航班最多 计算顶点的入度并排序
问题分析6:找出最重要的飞行航线PageRank 收敛误差:0.05
问题分析7:找出最便宜的飞行航线定价模型 price = 180.0 + distance * 0.15 SSSP问题 从初始指定的源点到达任意点的最短距离 pregel 初始化源点(0)与其它顶点(Double.PositiveInfinity) 初始消息(Double.PositiveInfinity) vprog函数计算最小值 sendMsg函数计算进行是否下一个迭代 mergeMsg函数合并接受的消息,取最小值
技能总结
Spark GraphX API vertices、edges、triplets、 numEdges、numVertices inDegrees、outDegrees、degrees mapVertices、mapEdges、mapTriplets Spark GraphX PageRank Spark GraphX Pregel
USAFlightDatset-SparkTutorial-Edureka.csv-互联网文档类资源-CSDN下载资源地址 代码实现package cn.kgc.spark.graphX
import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
object GraphXFlights {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getName)
.master("local[4]")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
//数据加载 csv
val lines: RDD[String] = sc.textFile("file:///D:\mypro\hangban\data\USA Flight Datset - Spark Tutorial - Edureka.csv")
.repartition(1)
//获取首行
val firstline: String = lines.first()
// lines.filter(_ != firstline).take(10).foreach(println)
//去掉表头 思路,和首行一样的 就去掉
val flights: RDD[Array[String]] = lines.filter(_ != firstline).map(_.split(","))
// 通过机场构建点集合
val airports: RDD[(Long, String)] = flights.flatMap(x => Array((x(5).toLong, x(6)), (x(7).toLong, x(8))))
//通过航线构建边集合
val airlines: RDD[Edge[Int]] = flights.map(x => (x(5).toLong, x(7).toLong, x(16).toInt)).distinct()
.map(x => Edge(x._1, x._2, x._3))
//通过飞机场和航线构建图结构
val graph = Graph(airports, airlines, "unknow")
//1统计航班飞行网图中机场的数量
val numAirports: Long = graph.numVertices
println("1统计航班飞行网图中机场的数量:")
println(numAirports)
//2统计航班飞行网图中航线的数量
val numRoutes = graph.numEdges
println("2统计航班飞行网图中航线的数量:")
println(numRoutes)
//3计算最长的飞行航线(Point to Point)
println("3计算最长的飞行航线:")
graph.triplets.sortBy(_.attr, false).take(1).foreach(println)
// ((12173,HNL),(12478,JFK),4983)
//4找出最繁忙的机场
println("4找出最繁忙的机场: 入和出")
graph.inDegrees.sortBy(_._2, false).take(1).foreach(println)
graph.outDegrees.sortBy(_._2, false).take(1).foreach(println)
// (10397,152)
// (10397,153)
//5找出最重要的飞行航线(PageRank)
println("5找出最重要的飞行航线(PageRank)")
graph.pageRank(0.05).vertices.takeOrdered(3)(Ordering.by(_._2)).foreach(println)
//
// (11695,0.29483238699643666)
// (14905,0.29766715072369493)
// (11041,0.29766715072369493)
//6找出最便宜的飞行航线(SSSP)
println("6找出最便宜的飞行航线(SSSP)")
graph.mapEdges(e => 180.0 * e.attr * 0.15)
//赋给源点一个初值,0.0 其他点赋无穷大
.mapVertices((id, attr) => if (id == 12478) 0.0 else Double.PositiveInfinity)
.pregel(Double.PositiveInfinity)(
//接受消息时的处理函数
(id, dist, new_dist) => math.min(dist, new_dist),
//发送消息的处理函数
triplet => {
//如果当前点的价格+边长点的价格<小于目标点已有的价格就发送消息
if (triplet.srcAttr + triplet.attr < triplet.dstAttr)
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
else Iterator.empty
},
//合并消息的函数,求最小值
(a, b) => Math.min(a, b)
//取价格最便宜的三个遍历
).edges.takeOrdered(3)(Ordering.by(_.attr)).foreach(println)
}
}



