栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

世界新冠疫情数countrydata.csv 表,实战分析

世界新冠疫情数countrydata.csv 表,实战分析

一、环境要求 Hadoop+Hive+Spark+Hbase 开发环境。

四、功能要求

1.数据准备  请在 HDFS 中创建目录/app/data/exam,并将 countrydata.csv 传到该目录。 2.在 Spark-Shell 中,加载 HDFS 文件系统 countrydata.csv 文件,并使用 RDD 完成以下 统计计算。
[root@gree2 exam]# hdfs dfs -put ./countrydata.csv /app/data/exam


scala> val countryRdd=sc.textFile("/app/data/exam/countrydata.csv")
①统计每个国家在数据截止统计时的累计确诊人数。
scala> countryRdd.map(x=>x.split(",")).map(x=>(x(4),x(1).toInt)).reduceByKey((x,y)=>if(x>y) x else y).collect.foreach(println)
②统计全世界在数据截止统计时的总感染人数。
scala> countryRdd.map(x=>x.split(",")).map(x=>(x(4),x(1).toInt)).reduceByKey((x,y)=>if(x>y) x else y).reduce((x,y)=>("count",(x._2+y._2)))
res73: (String, Int) = (count,10785407)
③统计每个大洲中每日新增确诊人数最多的国家及确诊人数,并输出 20200408 这一天各 大洲当日新增确诊人数最多的国家及确诊人数。
scala> countryRdd.map(x=>x.split(",")).map(x=>((x(3),x(6)),(x(2).toInt,x(4)))).reduceByKey((a,b)=>if(a._1>b._1) a else b).filter(x=>x._1._1=="20200408").collect.foreach(println)
④统计每个大洲中每日累计确诊人数最多的国家及确诊人数,并输出 20200607 这一天各 大洲当日累计确诊人数最多的国家及确诊人数。
scala> countryRdd.map(x=>x.split(","))
.map(x=>((x(3),x(6)),(x(1).toInt,x(4))))
.reduceByKey((a,b)=>if(a._1>b._1) a else b)
.filter(x=>x._1._1=="20200607")
.collect.foreach(println)

⑤统计每个大洲每月累计确诊人数,显示 202006 这个月每个大洲的累计确诊人数。
scala> countryRdd.map(x=>x.split(",")).map(x=>((x(6),x(3).substring(0,6)),x(2).toInt)).reduceByKey(_+_).filter(x=>x._1._2=="202006").collect.foreach(println)

3.创建 Hbase 数据表   在 Hbase 中创建命名空间(namespace)exam,在该命名空间下创建 covid19_world 表, 使用大洲和统计日期的组合作为 RowKey(如“亚洲 20200520”),该表下有 1 个列族 record。record 列族用于统计疫情数据(每个大洲当日新增确诊人数最多的国家 record:maxIncreaseCountry 及其新增确诊人数 record:maxIncreaseCount)。 4.请在 Hive 中创建数据库 exam,在该数据库中创建外部表 ex_exam_record 指向 /app/data/exam 下的疫情数据 ;创建外部表 ex_exam_covid19_record 映射至 Hbase 中的 exam:covid19_world 表的 record 列族 

5. 使用 ex_exam_record 表中的数据 

①统计每个大洲中每日新增确诊人数最多的国家,将 continent 和 recordDate 合并成 rowkey,并保存到 ex_exam_covid19_record 表中。 

②完成统计后,在 Hbase Shell 中遍历 exam:covid19_world 表中的前 20 条数据。 

//3.创建 Hbase 数据表 
 在 Hbase 中创建命名空间(namespace)exam,在该命名空间下创建 covid19_world 表, 使用大洲和统计日期的组合作为 RowKey(如“亚洲 20200520”),该表下有 1 个列族 record。record 列族用于统计疫情数据(每个大洲当日新增确诊人数最多的国家 record:maxIncreaseCountry 及其新增确诊人数 record:maxIncreaseCount)。

hbase(main):018:0> create 'examtest:covid19_world','record'


create external table ex_exam_covid_record(
    id string    ,
   confirmedCount int,
confirmedIncr int ,
recordDate string  ,
countryName string ,
countryShortCode string,
continent string
)
row format delimited fields terminated by ","
stored as textfile location "/app/data/country";

select *
from ex_exam_covid_record;

//请在 Hive 中创建数据库 exam,在该数据库中创建外部表 ex_exam_record 指向 /app/data/exam 下的疫情数据 ;创建外部表 ex_exam_covid19_record 映射至 Hbase 中的 exam:covid19_world 表的 record 列族 

create external table   ex_exam_covid19_record(
    key string,
    maxIncreaseCountry  string,
    maxIncreaseCount int
)
stored by "org.apache.hadoop.hive.hbase.HbaseStorageHandler"
with serdeproperties ("hbase.columns.mapping"=":key,record:maxIncreaseCountry,record:maxIncreaseCount")
tblproperties ("hbase.table.name"="examtest:covid19_world");




//5. 使用 ex_exam_record 表中的数据 

①统计每个大洲中每日新增确诊人数最多的国家,将 continent 和 recordDate 合并成 rowkey,并保存到 ex_exam_covid19_record 表中。 

②完成统计后,在 Hbase Shell 中遍历 exam:covid19_world 表中的前 20 条数据。


insert into ex_exam_covid19_record
select t.rowkey,t.countryName,t.confirmedIncr
       from
(select concat(continent,recordDate) rowkey,countryName,/confirm/iedIncr,
       row_number() over (partition by countryName order by confirmedIncr desc ) max
from ex_exam_covid_record group by concat(continent,recordDate) ,countryName,/confirm/iedIncr) t
where t.max=1
;

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/698392.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号