栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

项目实战 航班飞行网图分析

项目实战 航班飞行网图分析

训练的技能点

Spark GraphX API Spark GraphX PageRank Spark GraphX Pregel

需求概述 探索航班飞行网图数据 构建航班飞行网图 使用Spark GraphX完成下列任务 统计航班飞行网图中机场的数量 统计航班飞行网图中航线的数量 计算最长的飞行航线(Point to Point) 找出最繁忙的机场 找出最重要的飞行航线(PageRank) 找出最便宜的飞行航线(SSSP)

下载数据(USA Flight Dataset) https://drive.google.com/file/d/0B7Yoht-ttAeuaWdGZkRsSkVkN00/view 数据格式 文件格式为CSV,字段之间分隔符为“,” 依次为:#日、周#、航空公司、飞机注册号、航班号、起飞机场编号、起飞机场、到达机场编号、到达机场、预计起飞时间(时分)、起飞时间、起飞延迟(分钟)、到达预计时间、到达时间、到达延迟(分钟)、预计飞行时间、飞行距离

问题分析2:构建航班飞行网图

创建属性图Graph[VD,ED] 装载CSV为RDD,每个机场作为顶点。关键字段:起飞机场编号、起飞机场、到达机场编号、到达机场、飞行距离 初始化顶点集airports:RDD[(VertexId,String)],顶点属性为机场名称 初始化边集lines:RDD[Edge],边属性为飞行距离

问题分析3:统计航班飞行网图中机场与航线的数量

机场数量 求顶点个数:Graph.numVertices 航线数量 求边的个数:Graph.numEdges

问题分析4:计算最长的飞行航线

最大的边属性 对triplets按飞行距离排序(降序)并取第一个

问题分析5:找出最繁忙的机场

哪个机场到达航班最多 计算顶点的入度并排序

问题分析6:找出最重要的飞行航线

PageRank 收敛误差:0.05

问题分析7:找出最便宜的飞行航线

定价模型 price = 180.0 + distance * 0.15 SSSP问题 从初始指定的源点到达任意点的最短距离 pregel 初始化源点(0)与其它顶点(Double.PositiveInfinity) 初始消息(Double.PositiveInfinity) vprog函数计算最小值 sendMsg函数计算进行是否下一个迭代 mergeMsg函数合并接受的消息,取最小值

技能总结

Spark GraphX API vertices、edges、triplets、 numEdges、numVertices inDegrees、outDegrees、degrees mapVertices、mapEdges、mapTriplets Spark GraphX PageRank Spark GraphX Pregel

USAFlightDatset-SparkTutorial-Edureka.csv-互联网文档类资源-CSDN下载资源地址

代码实现

package cn.kgc.spark.graphX

import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

object GraphXFlights {
  def main(args: Array[String]): Unit = {


    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getName)
      .master("local[4]")
      .getOrCreate()


    val sc: SparkContext = spark.sparkContext

    //数据加载   csv
    val lines: RDD[String] = sc.textFile("file:///D:\mypro\hangban\data\USA Flight Datset - Spark Tutorial - Edureka.csv")
      .repartition(1)
    //获取首行
    val firstline: String = lines.first()
    //    lines.filter(_ != firstline).take(10).foreach(println)
    //去掉表头 思路,和首行一样的 就去掉

    val flights: RDD[Array[String]] = lines.filter(_ != firstline).map(_.split(","))

    //    通过机场构建点集合
    val airports: RDD[(Long, String)] = flights.flatMap(x => Array((x(5).toLong, x(6)), (x(7).toLong, x(8))))

    //通过航线构建边集合
    val airlines: RDD[Edge[Int]] = flights.map(x => (x(5).toLong, x(7).toLong, x(16).toInt)).distinct()
      .map(x => Edge(x._1, x._2, x._3))
    //通过飞机场和航线构建图结构
    val graph = Graph(airports, airlines, "unknow")


    //1统计航班飞行网图中机场的数量
    val numAirports: Long = graph.numVertices
    println("1统计航班飞行网图中机场的数量:")
    println(numAirports)

    //2统计航班飞行网图中航线的数量
    val numRoutes = graph.numEdges
    println("2统计航班飞行网图中航线的数量:")

    println(numRoutes)

    //3计算最长的飞行航线(Point to Point)
    println("3计算最长的飞行航线:")
    graph.triplets.sortBy(_.attr, false).take(1).foreach(println)


    //      ((12173,HNL),(12478,JFK),4983)


    //4找出最繁忙的机场
    println("4找出最繁忙的机场: 入和出")
    graph.inDegrees.sortBy(_._2, false).take(1).foreach(println)
    graph.outDegrees.sortBy(_._2, false).take(1).foreach(println)


    //    (10397,152)
    //    (10397,153)

    //5找出最重要的飞行航线(PageRank)
    println("5找出最重要的飞行航线(PageRank)")
    graph.pageRank(0.05).vertices.takeOrdered(3)(Ordering.by(_._2)).foreach(println)

    //
    //    (11695,0.29483238699643666)
    //    (14905,0.29766715072369493)
    //    (11041,0.29766715072369493)

    //6找出最便宜的飞行航线(SSSP)

    println("6找出最便宜的飞行航线(SSSP)")
    graph.mapEdges(e => 180.0 * e.attr * 0.15)
      //赋给源点一个初值,0.0 其他点赋无穷大
      .mapVertices((id, attr) => if (id == 12478) 0.0 else Double.PositiveInfinity)
      .pregel(Double.PositiveInfinity)(
        //接受消息时的处理函数
        (id, dist, new_dist) => math.min(dist, new_dist),
        //发送消息的处理函数
        triplet => {
          //如果当前点的价格+边长点的价格<小于目标点已有的价格就发送消息
          if (triplet.srcAttr + triplet.attr < triplet.dstAttr)
            Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
          else Iterator.empty
        },
        //合并消息的函数,求最小值
        (a, b) => Math.min(a, b)
        //取价格最便宜的三个遍历
      ).edges.takeOrdered(3)(Ordering.by(_.attr)).foreach(println)


  }

}

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/354552.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号