栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

1226 exam hive sparksql spark rdd spark dataframe 简单操作

1226 exam hive sparksql spark rdd spark dataframe 简单操作

 1

hdfs dfs -mkdir /app/data/allprovinces

hdfs dfs -put /opt/1226/products.txt /app/data/allprovinces/

 hdfs dfs -put /opt/1226/allprovinces.txt /app/data/allprovinces/

hdfs dfs -mkdir 

hdfs dfs -mv  /app/data/allprovinces/products.txt  /app/data/events/products

2

 create 'province_market','market','info'

    //    //rdd 1题
//        val rdd = sc.textFile("hdfs://192.168.10.136:9000/app/data/events/products/products.txt")
//        rdd.map(_.split("t")).filter(_.size==6).map(x=>{
//          (x(4),x(0))
//        }).groupByKey().map(x=>(x._1,x._2.toArray.distinct.size))
//          .repartition(1).sortBy(_._2,false).take(3).foreach(println)

//    val lineRdd= sc.textFile("hdfs://192.168.10.136:9000/app/data/events/products/products.txt")
//      .filter(_.split("t").length==6).map(_.split("t"))
//    case class Line(name:String, price: String, craw_time:String, market:String, province:String, city:String)
//    val rdd2= lineRdd.map(x => Line(x(0), x(1), x(2), x(3), x(4), x(5)))
//    val myrdd: Dataframe = rdd2.toDF()
//      .toDF("name", "price", " craw_time", "market", "province", "city")

with t as (
select name,market,price
from products
where province='山西'
)
select name,
case when count(price)<=2 then round(sum(price)/count(price),3) else round((sum(price)-max(price)-min(price))/(count(price)-2),3) end as pavg
from t 
group by name;
 

第一题上传

hdfs dfs -mkdir /app/data/allprovinces

hdfs dfs -put /opt/1226/products.txt /app/data/allprovinces/

 hdfs dfs -put /opt/1226/allprovinces.txt /app/data/allprovinces/

 hdfs dfs -mv  /app/data/allprovinces/products.txt  /app/data/events/products

  1. 创建 Hbase 数据表

  1. 请在 Hive 中创建数据库 market,并在 market 数据库中创建三个外表

 hive --service hiveserver2

hive --service metastore

create database market;

create database market;
use market;
create external table ex_allprovinces(
    name string,
    abbr string
)
row format delimited fields terminated by 't'
location '/app/data/allprovinces/';

create external table ex_products(
    name string,
    price float,
    craw_time string,
    market string,
    province string,
    city string
)
row format delimited fields terminated by 't'
location '/app/data/events/products';

3c

create external table ex_province_market(
   rowkey string,
   marketCount string,
   provicneName string
)
stored by 'org.apache.hadoop.hive.hbase.HbaseStorageHandler'
with serdeproperties
("hbase.columns.mapping" = ":key,market:count,info:name")
tblproperties ("hbase.table.name" = "exam:province_market");

4d插入那个hbase表的语句

with t1
    as (
select province,count(distinct(market))con from ex_products where province!="" group by province )
insert into table ex_province_market
select t2.abbr,t1.con,t1.province from t1 inner join ex_allprovinces t2 on t1.province=t2.name ;


select * from ex_province_market;

4.在 Spark-Shell 中,将 products.txt 装载到 RDD 中,使用 RDD 对农产品种类统

计,并将结果输出到控制台上。

//rdd 1题
val rdd = sc.textFile("hdfs://192.168.10.136:9000/app/data/events/products/products.txt")
rdd.map(_.split("t")).filter(_.size==6).map(x=>{
  (x(4),x(0))
}).groupByKey().map(x=>(x._1,x._2.toArray.distinct.size))
  .repartition(1).sortBy(_._2,false).take(3).foreach(println)

//rdd 2题
val lines = sc.textFile("hdfs://192.168.10.136:9000/app/data/events/products/products.txt")
val tmp=lines.distinct.filter(_.split("t").length==6)

tmp.map(line=>{
  val fields: Array[String] = line.split("t")
  ((fields(4),fields(3)),1)
}).reduceByKey(_+_).map({case((province,market),nums)=>(province,(market,nums))})
  .groupByKey().mapValues(x=>x.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)).foreach(println)

5.在 Spark-Shell 中,将 products.txt 装载成 Dataframe,计算山西省每种农产

品的价格波动趋势,即计算每天价格均值,并将结果输出到控制台上。

val df: Dataframe = sc.textFile("hdfs://192.168.10.136:9000/app/data/events/products/products.txt")
  .map(_.split("t")).filter(_.length == 6).map(x => {
  (x(0), x(1), x(2), x(3), x(4), x(5))
}).toDF("name", "price", " craw_time", "market", "province", "city")
 

df.where("province='山西'").groupBy("name").agg(
  sum($"price").as("sum"),
  min($"price").as("min"),
  count($"price").as("count"),
  max($"price").as("max")
).withColumn("pavg",
  when($"count">2, ($"sum" - $"max" - $"min") / ($"count" - 2)).otherwise($"sum"/$"count"))
  .show()

这里空值是因为有的种类只有一个价格 减掉2变成负数所以最后得到null 题目没标注 就不特意处理了 保持原汁原味

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/682614.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号