栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

sparkSql数据离线处理--整理记录

sparkSql数据离线处理--整理记录

sparkSql数据离线处理

前言:本文作为本人学习sparkSql离线数据抽取,离线数据处理的学习整理记录,文中参考博客均附上原文链接。

一、Hive环境准备 1、配置文件准备:

/opt/hive/conf/hive-site.xml:(2021/12/31修改,添加了&useSSL=false&useUnicode=true&characterEncoding=utf8支持中文编码)




  
    javax.jdo.option.ConnectionURL
    jdbc:mysql://localhost:3306/hive_demo?createDatabaseIfNotExist=true&useSSL=false&useUnicode=true&characterEncoding=utf8
    hive的元数据库 
  
  
    javax.jdo.option.ConnectionDriverName
    com.mysql.jdbc.Driver
    mysql的驱动jar包 
  
  
    javax.jdo.option.ConnectionUserName
    root
    设定数据库的用户名 
  
  
    javax.jdo.option.ConnectionPassword
    xxx
    设定数据库的密码
   

   
      hive.exec.max.dynamic.partitions
      100000
      在所有执行MR的节点上,最大一共可以创建多少个动态分区
   
   
      hive.exec.max.dynamic.partitions.pernode
      100000
      在所有执行MR的节点上,最大可以创建多少个动态分区
  

若要在idea环境下运行要把

hdfs-site.xml

core-site.xml

hive-site.xml

放到resources文件夹中

否则hive.exec.max.dynamic.partitions.pernode,hive.exec.max.dynamic.partitions

配置不生效

2、hosts设置

若在不同网络环境下

需设置本地hosts

设置的内容为集群主机名

Ubuntu的hosts文件在 /etc 下

参考资料:(10条消息) java.lang.IllegalArgumentException: java.net.UnknownHostException: xxx_小健的博客-CSDN博客

3、远程连接服务开启

hive --service metastore

参考资料:(13条消息) hive的几种启动方式_lbl的博客-CSDN博客_hive启动

4、其他

mysql服务启动

service mysqld start

防火墙关闭

systemctl stop firewalld

二、IDEA环境准备 1、pom.xml文件


    
        sparkDome1
        org.example
        1.0-SNAPSHOT
    
    4.0.0

    HiveAndMysql

    
        8
        8
        2.7.7
        2.1.1
        2.11
    

    
        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
        
        
            org.apache.hadoop
            hadoop-common
            ${hadoop.version}
        
        
            org.apache.hadoop
            hadoop-hdfs
            ${hadoop.version}
        
        
        
            org.scala-lang
            scala-library
            2.11.0
        
        
        
            org.apache.spark
            spark-core_${scala.version}
            ${spark.version}
        
        
            org.apache.spark
            spark-sql_${scala.version}
            ${spark.version}
        
        
            org.apache.spark
            spark-streaming_${scala.version}
            ${spark.version}
        
        
            org.apache.spark
            spark-mllib_${scala.version}
            ${spark.version}
        
        
        
            org.apache.spark
            spark-hive_${scala.version}
            ${spark.version}
        
        
            mysql
            mysql-connector-java
            5.1.48
        
    

    
        
            
                org.scala-tools
                maven-scala-plugin
                2.15.2
                
                    
                        scala-compile
                        
                            compile
                        
                        
                            
                            
                                **
   object ShopTest {

  def main(args: Array[String]): Unit = {
	//设置用户名,防止因为权限不足无法创建文件
    System.setProperty("HADOOP_USER_NAME", "root")
    //获取实例对象
    val spark = SparkSession.builder()
      .appName("ShopTest")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
      .config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")
      .enableHiveSupport()
      .getOrCreate()
    //jdbc连接配置
    val mysqlMap = Map(
      "url" -> "jdbc:mysql://xx.xxx.x.x:3306/clown_db?useSSL=false",
      "user" -> "root",
      "password" -> "xxx",
      "driver" -> "com.mysql.jdbc.Driver"
    )
    //使用jdbc抽取mysql表数据
    val inputTable = spark.read.format("jdbc")
      .options(mysqlMap)
      .option("dbtable", "EcData_tb_1")
      .load()
    
    //    inputTable.show()
      
    //将mysql表数据创建为临时表
    inputTable.createOrReplaceTempView("inputTable")
    //hive动态分区开启
    spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
    //hive分区模式设置,默认为strict严格模式,若设置分区必须要有一个静态分区
    //需要设置为nonstrict模式,可以都是动态分区
    spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
    //hive分区数设置,目前版本已无法在程序中设置,参考上文Hive环境准备-配置文件准备
    spark.sqlContext.sql("set hive.exec.max.dynamic.partitions.pernode = 10000")
    spark.sqlContext.sql("set hive.exec.max.dynamic.partitions = 10000")

    // mysql表结构,通过desc table tb_name;命令可获取

    
    
    //于hive数据库,ods层中创建表
    spark.sqlContext.sql(
      """
        |create table if not exists clown_test_db.ShopTest_ods_tb_1
        |(
        | InvoiceNo string ,
        | StockCode string ,
        | Description string ,
        | Quantity int ,
        | InvoiceDate string ,
        | UnitPrice double ,
        | CustomerID int ,
        | Country string
        |)
        |partitioned by (country_pid string,customer_pid int)
        |row format delimited
        |fields terminated by 't'  //本数据中字段值存在','不能用','作为分隔符
        |lines terminated by 'n'
        |stored as textfile
        |""".stripMargin)
      
    //使用sql-insert into 语句将mysql数据全部导入hive表中
    spark.sqlContext.sql(
      """
        |insert into table clown_test_db.ShopTest_ods_tb_1 partition(country_pid,customer_pid)
        |select *,Country,CustomerID from inputTable
        |""".stripMargin)

  }

}
2、增量抽取
import java.text.SimpleDateFormat

import org.apache.spark.sql.{SaveMode, SparkSession}


   object ShopTest2 {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME","root")

    val spark = SparkSession.builder()
      .appName("ShopTest2")
      .master("local[*]")
      .config("spark.sql.warehouse.dir","hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
      .config("hive.metastore.uris","thrift://xx.xxx.x.x:9083")
      .enableHiveSupport()
      .getOrCreate()
    
    

	//隐式转换,sql方法导入
    import spark.implicits._
    import org.apache.spark.sql.functions._
    
    spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
    spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
    
    //直接通过sql语句获取到hive ods层中的表数据
    val inputData = spark.sqlContext.sql("select * from clown_test_db.ShopTest_ods_tb_1")
    
    //设置时间条件
    val timeStr = "2011/01/01 00:00"
    val timeTemp = new SimpleDateFormat("yyyy/MM/dd HH:mm").parse(timeStr).getTime//单位为ms
    println(timeTemp)
    
    //未转换前的数据格式为:12/8/2010 9:53
    val timeFormat = inputData
      .withColumn("InvoiceDate",unix_timestamp($"InvoiceDate","MM/dd/yyyy HH:mm"))//时间戳获取,单位为s
      .where(s"InvoiceDate>$timeTemp/1000")//增量条件判断
      .withColumn("InvoiceDate",from_unixtime($"InvoiceDate","yyyy/MM/dd HH:mm"))//时间格式转换
      .where("Country='United Kingdom' or Country = 'Finland'")//筛选出国家名为United Kingdom 或 Finland的数据
    
    //由于该ods层表与目标dwd层表结构相同,直接用like语句创建结构相同的dwd表
    spark.sqlContext.sql(
      """
        |create table if not exists clown_dwd_db.shoptest_dwd_tb_1
        |like clown_test_db.ShopTest_ods_tb_1
        |""".stripMargin)


    //使用sparkSql算子将数据由ods表数据增量抽取到dwd表中
    timeFormat.write.format("hive")
      .mode(SaveMode.Append)
      .insertInto("clown_dwd_db.shoptest_dwd_tb_1")
  }

}
3、数据清洗
import org.apache.spark.sql.{SaveMode, SparkSession}


object ShopTest3 {
 
def main(args: Array[String]): Unit = {
   System.setProperty("HADOOP_USER_NAME","root")

   val spark = SparkSession.builder()
     .appName("ShopTest3")
     .master("local[*]")
     .config("spark.sql.warehouse.dir","hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
     .config("hive.metastore.uris","thrift://xx.xxx.x.x:9083")
     .enableHiveSupport()
     .getOrCreate()

   import spark.implicits._
   import org.apache.spark.sql.functions._

   spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
   spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")

   val data = spark.sqlContext.sql("select * from clown_dwd_db.shoptest_dwd_tb_1")

   spark.sqlContext.sql(
     """
       |create table if not exists clown_dwd_db.shopTest_dwd_tb_3
       |(
       | InvoiceNo string ,
       | StockCode string ,
       | Description string ,
       | Quantity int ,
       | InvoiceDate string ,
       | UnitPrice double ,
       | CustomerID int ,
       | Country string
       |)
       |partitioned by (country_pid string)
       |row format delimited
       |fields terminated by 't'
       |lines terminated by 'n'
       |stored as textfile
       |""".stripMargin)

  //使用na.fill对缺失值进行填充
  //使用na.drop对缺失值进行剔除
   data.na.fill(
     Map(
       "Country"->"Country_Null",
       "CustomerID"->0
     )
   )
     .na.drop(
     Seq("UnitPrice","Quantity")
   )      .selectExpr("InvoiceNo","StockCode","Description","Quantity","InvoiceDate","UnitPrice","CustomerID","Country","Country")//由于数据中存在分区表字段,且该字段关联数据已改变,需要重新进行赋值
     .limit(10000)
     .write
     .format("hive")
     .mode(SaveMode.Append)
     .insertInto("clown_dwd_db.shopTest_dwd_tb_3")

  }
}
4、指标计算
import org.apache.spark.sql.SparkSession


object ShopTest4 {
  
  def main(args: Array[String]): Unit = {

    System.setProperty("HADOOP_USER_NAME", "root")
    
    val spark = SparkSession.builder()
      .appName("ShopTest4")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
      .config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")
      .enableHiveSupport()
      .getOrCreate()
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    
    spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
    spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
    
    val data = spark.sqlContext.sql("select * from clown_dwd_db.shopTest_dwd_tb_3")
    
    
    
        data.dropDuplicates("CustomerID","Country")//去重
          .withColumn("x",lit(1))//添加一列数据都为1
          .groupBy("Country")//聚合国家字段
          .sum("x")//对1数据字段进行累加
          .show(20)
    
    
    
    data.withColumn("x", $"Quantity" * $"UnitPrice")//添加销售额字段,值为数量*单价
      .groupBy("Country")//聚合国家字段
      .sum("x")//计算总销售额
      .withColumn("sum(x)", round($"sum(x)", 2))//对结果字段进行四舍五入到两位,但round会对最后一位0省略,最好使用其他函数
    


    
    data.groupBy("StockCode")//聚合商品编码字段
      .sum("Quantity")//计算销量
      .coalesce(1)//将spark分区设置为1,防止后面排序混乱
      .orderBy(desc("sum(Quantity)"))//由大到小排序
      .show(10)
    
    
    data.withColumn("InvoiceDate",substring_index($"InvoiceDate","/",2))//由于数据在增量抽取阶段已进行时间格式转换,可直接进行切割得出 年份/月份 的格式,substring_index与split不同
      .withColumn("x",$"Quantity"*$"UnitPrice")//计算销售额
      .groupBy("InvoiceDate")//对月份进行聚合
      .sum("x")//计算总销售额
      .coalesce(1)//设置spark分区为1
      .orderBy(desc("InvoiceDate"))//由大到小排序
      .withColumn("sum(x)",round($"sum(x)",2))//四舍五入到2位
      .show(100)
    
    
    data.select(col("Description"))//商品将描述字段单独查询
      .flatMap(x=>x.toString().split("\W"))//进行flatMap 切割后展平,切割\W为正则匹配模式,匹配所有符号
      .withColumn("x",lit(1))//增加1的数据列
      .groupBy("value")//展平后字段名为value,进行聚合
      .sum("x")//累加1数据
      .where("value != '' ")//筛除空白数据
      .coalesce(1)//设置spark分区为1
      .orderBy(desc("sum(x)"))//由大到小排序
      .show(300)//展示300条

  }

}
import org.apache.spark.sql.SparkSession



object ShopTest5 {
  
  def main(args: Array[String]): Unit = {

    System.setProperty("HADOOP_USER_NAME", "root")
    
    val spark = SparkSession.builder()
      .appName("ShopTest5")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
      .config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")
      .enableHiveSupport()
      .getOrCreate()
    
    import spark.implicits._
    import org.apache.spark.sql.functions._
    
    spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
    spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
    
    val data = spark.sqlContext.sql("select * from clown_dwd_db.shopTest_dwd_tb_3")
    
    data.createOrReplaceTempView("dataTable")
    
    

   //对去重后的Country,CustomerID进行聚合统计即可得出各个国家的客户数
   spark.sqlContext.sql(
     """
       |select Country,count(distinct Country,CustomerID) from dataTable group by Country
       |""".stripMargin)
     .show()

    
    
    spark.sqlContext.sql(
      """
        |select Country ,round(sum(Quantity*UnitPrice),2)
        |from dataTable
        |group by Country
        |""".stripMargin)
      .show()
    
    
    
    spark.sqlContext.sql(
      """
        |select StockCode,round(sum(Quantity*UnitPrice),2) as xl
        |from dataTable
        |group by StockCode
        |order by xl desc
        |""".stripMargin)
      .show(10)
    
    
    
    //group by执行优先度可能高于 as 重命名,因此as后的名字无法用于group by 聚合
    spark.sqlContext.sql(
      """
        |select substring_index(InvoiceDate,"/",2) as time,round(sum(Quantity*UnitPrice),2) as sum
        |from dataTable
        |group by substring_index(InvoiceDate,"/",2)
        |order by substring_index(InvoiceDate,"/",2)
        |""".stripMargin)
      .show()
    
    
    
    //目前认为该题用sql解法没有必要
      //- - 
  }

}
四、其他 1、hive分区的增删改查

参考资料:(15条消息) HIve学习:Hive分区修改_u011047968的专栏-CSDN博客_hive修改分区

hive表新增分区:[]内的不必要

alter table tb_name add partition (pid1 = ‘’,pid2 = ) [location ‘xxx’] 

多个分区

alter table tb_name add partition (pid1 = ‘’,pid2 = ) partition (pid1 = ‘’,pid2 = ) [location ‘xxx’] 

hive表修改分区:

alter table tb_name partition(pid1='') rename to partition(pid1='');
alter table tb_name partition(pid1='') set location 'hdfs://master:8020/....';  
alter table tb_name partition column (pid1 string);

hive表删除分区:

alter table tb_name drop partition (pid1 = ‘’,pid2 = )[ partition (pid1 = ‘’,pid2 = )…] 

hive分区值查询:

show partitions tb_name;
2、spark打包运行

命令:

spark-submit --class ShopTest4 --master spark://xx.xxx.x.x:7077 --executor-memory 512m --total-executor-cores 1 untitled-1.0-SNAPSHOT.jar

若使用了jdbc连接,需要指明驱动jar包 mysql-connector-java-5.1.48.jar

spark-submit --jars mysql-connector-java-5.1.48.jar --class ShopTest --master spark://xx.xxx.x.x:7077 --executor-memory 512m --total-executor-cores 1 untitled-1.0-SNAPSHOT.jar

或者将mysql驱动放至 $‘spark_home’/jars 目录下

3、时间格式

时间模式字符串用来指定时间格式。在此模式中,所有的 ASCII 字母被保留为模式字母,定义如下:

字母描述示例
G纪元标记AD
y四位年份2001
M月份July or 07
d一个月的日期10
hA.M./P.M. (1~12)格式小时12
H一天中的小时 (0~23)22
m分钟数30
s秒数55
S毫秒数234
E星期几Tuesday
D一年中的日子360
F一个月中第几周的周几2 (second Wed. in July)
w一年中第几周40
W一个月中第几周1
aA.M./P.M. 标记PM
k一天中的小时(1~24)24
KA.M./P.M. (0~11)格式小时10
z时区Eastern Standard Time
文字定界符Delimiter
"单引号`
4、Scala正则表达式

Scala 的正则表达式继承了 Java 的语法规则,Java 则大部分使用了 Perl 语言的规则。

下表我们给出了常用的一些正则表达式规则:(注意:需要转义,算子中写为,sql语句中写为\)

表达式匹配规则
^匹配输入字符串开始的位置。
$匹配输入字符串结尾的位置。
.匹配除"rn"之外的任何单个字符。
[…]字符集。匹配包含的任一字符。例如,"[abc]“匹配"plain"中的"a”。
[^…]反向字符集。匹配未包含的任何字符。例如,"[^abc]“匹配"plain"中"p”,“l”,“i”,“n”。
A匹配输入字符串开始的位置(无多行支持)
z字符串结尾(类似$,但不受处理多行选项的影响)
Z字符串结尾或行尾(不受处理多行选项的影响)
re*重复零次或更多次
re+重复一次或更多次
re?重复零次或一次
re{ n}重复n次
re{ n,}
re{ n, m}重复n到m次
a|b匹配 a 或者 b
(re)匹配 re,并捕获文本到自动命名的组里
(?: re)匹配 re,不捕获匹配的文本,也不给此分组分配组号
(?> re)贪婪子表达式
w匹配字母或数字或下划线或汉字
W匹配任意不是字母,数字,下划线,汉字的字符
s匹配任意的空白符,相等于 [tnrf]
S匹配任意不是空白符的字符
d匹配数字,类似 [0-9]
D匹配任意非数字的字符
G当前搜索的开头
n换行符
b通常是单词分界位置,但如果在字符类里使用代表退格
B匹配不是单词开头或结束的位置
t制表符
Q开始引号:Q(a+b)*3E 可匹配文本 “(a+b)*3”。
E结束引号:Q(a+b)*3E 可匹配文本 “(a+b)*3”。

正则表达式实例

实例描述
.匹配除"rn"之外的任何单个字符。
[Rr]uby匹配 “Ruby” 或 “ruby”
rub[ye]匹配 “ruby” 或 “rube”
[aeiou]匹配小写字母 :aeiou
[0-9]匹配任何数字,类似 [0123456789]
[a-z]匹配任何 ASCII 小写字母
[A-Z]匹配任何 ASCII 大写字母
[a-zA-Z0-9]匹配数字,大小写字母
[^aeiou]匹配除了 aeiou 其他字符
[^0-9]匹配除了数字的其他字符
d匹配数字,类似: [0-9]
D匹配非数字,类似: [^0-9]
s匹配空格,类似: [ trnf]
S匹配非空格,类似: [^ trnf]
w匹配字母,数字,下划线,类似: [A-Za-z0-9_]
W匹配非字母,数字,下划线,类似: [^A-Za-z0-9_]
ruby?匹配 “rub” 或 “ruby”: y 是可选的
ruby*匹配 “rub” 加上 0 个或多个的 y。
ruby+匹配 “rub” 加上 1 个或多个的 y。
d{3}刚好匹配 3 个数字。
d{3,}匹配 3 个或多个数字。
d{3,5}匹配 3 个、4 个或 5 个数字。
Dd+无分组: + 重复 d
(Dd)+/分组: + 重复 Dd 对
([Rr]uby(, )?)+匹配 “Ruby”、“Ruby, ruby, ruby”,等等

常用可以应用正则的函数:

.split("")切割字符串

.regexp_extract(string subject, string pattern, int index) 将字符串subject按照pattern正则表达式的规则拆分,返回index指定的字符

.regexp_replace(string A, string B, string C) 将字符串A中的符合Java正则表达式B的部分替换为C

.equals("")匹配

5、SQL like与rlike

like为通配符匹配,不是正则

%:匹配零个及多个任意字符

_:与任意单字符匹配

[]:匹配一个范围

[^]:排除一个范围

rlike为正则匹配

regexp与rlike功能相似

参考资料:(15条消息) sparksql 正则匹配总结_Andrew LD-CSDN博客_spark 正则表达式

6、中文数据

关于csv文件若包含中文,可在读取时设置option参数

val inputData = spark.sqlContext.read.format("csv")
  .option("sep","t")
  .option("encoding","GBK")
  .option("header","true")
 
.load("file:///C:\Users\61907\Desktop\BigData\Spark\sparkDome1\HiveAndMysql\src\main\resources\cov19.csv")

jdbc读取数据库数据时,若有中文需设置jdbc连接参数

&useUnicode=true&characterEncoding=utf8

//    jdbc中文编码设置
    val mysqlMap = Map(
      "url"->"jdbc:mysql://xx.xxx.x.x:3306/clown_db?useSSL=false&useUnicode=true&characterEncoding=utf8",
      "user"->"root",
      "password"->"xxx",
      "driver"->"com.mysql.jdbc.Driver"
    )

关于hive中存储中文数据,中文注释,中文分区(索引)

Ⅰ~Ⅲ参考资料:

(16条消息) hive设置中文编码格式utf-8_2020xyz的博客-CSDN博客_hive建表指定编码格式

(16条消息) hive修改使用utf8编码支持中文字符集_那又怎样?的博客-CSDN博客_hive默认字符集编码

Ⅰ.元数据库设置

元数据库需设置为utf-8编码

##创建hive元数据库hive,并指定utf-8编码格式
mysql>create database hive DEFAULT CHARSET utf8 COLLATE utf8_general_ci;


##修改已存在的hive元数据库,字符编码格式为utf-8
mysql>alter database hive character set utf8;     


##进入hive元数据库
mysql>use hive;

##查看元数据库字符编码格式
mysql>show variables like 'character_set_database';  

Ⅱ.相关表设置

1).修改字段注释字符集

mysql>alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;

2).修改表注释字符集

mysql>alter table TABLE_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;

类似的,PARAM_KEY若需要中文也可设置为utf8

3).修改分区表参数,以支持分区能够用中文表示

mysql>alter table PARTITION_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;


mysql>alter table PARTITION_KEYS modify column PKEY_COMMENT varchar(4000) character set utf8;

另外,PARTITIONS表中存放分区名的字段也需要修改为utf8

mysql>alter table PARTITIONS modify column PART_name varchar(4000) character set utf8;

4).修改索引注解

mysql>alter table INDEX_PARAMS modify column PARAM_VALUE varchar(250) character set utf8;

Ⅲ.hive-site.xml配置文件设置

需要在jdbc连接中设置支持中文编码

&useSSL=false&useUnicode=true&characterEncoding=utf8

其中&需要使用&转义

参考资料:(16条消息) 【已解决】The reference to entity “useSSL” must end with the ‘;’ delimiter_清宵尚温的博客-CSDN博客

/opt/hive/conf/hive-site.xml:




  
    javax.jdo.option.ConnectionURL
    jdbc:mysql://localhost:3306/hive_demo?createDatabaseIfNotExist=true&useSSL=false&useUnicode=true&characterEncoding=utf8
    hive的元数据库 
  
  
    javax.jdo.option.ConnectionDriverName
    com.mysql.jdbc.Driver
    mysql的驱动jar包 
  
  
    javax.jdo.option.ConnectionUserName
    root
    设定数据库的用户名 
  
  
    javax.jdo.option.ConnectionPassword
    xxx
    设定数据库的密码
   

   
      hive.exec.max.dynamic.partitions
      100000
      在所有执行MR的节点上,最大一共可以创建多少个动态分区
   
   
      hive.exec.max.dynamic.partitions.pernode
      100000
      在所有执行MR的节点上,最大可以创建多少个动态分区
  

Ⅳ.未解决问题

hdfs文件系统中显示

虽然正常显示中文但在文件夹中会出现

Path does not exist on HDFS or WebHDFS is disabled. Please check your path or enable WebHDFS

可能是中文路径导致的错误,但该错误目前未影响到分区表的正常操作,具体影响仍需实验。

Ⅴ.暴力脚本- -

参考资料:(16条消息) hive分区字段含中文导致的报错_一定要努力努力再努力的博客-CSDN博客_hive分区字段是中文

alter database hive_meta default character set utf8;
alter table BUCKETING_COLS default character set utf8;
alter table CDS default character set utf8;
alter table COLUMNS_V2 default character set utf8;
alter table DATAbase_PARAMS default character set utf8;
alter table DBS default character set utf8;
alter table FUNCS default character set utf8;
alter table FUNC_RU default character set utf8;
alter table GLOBAL_PRIVS default character set utf8;
alter table PARTITIONS default character set utf8;
alter table PARTITION_KEYS default character set utf8;
alter table PARTITION_KEY_VALS default character set utf8;
alter table PARTITION_PARAMS default character set utf8;
-- alter table PART_COL_STATS default character set utf8;
alter table ROLES default character set utf8;
alter table SDS default character set utf8;
alter table SD_PARAMS default character set utf8;
alter table SEQUENCE_TABLE default character set utf8;
alter table SERDES default character set utf8;
alter table SERDE_PARAMS default character set utf8;
alter table SKEWED_COL_NAMES default character set utf8;
alter table SKEWED_COL_VALUE_LOC_MAP default character set utf8;
alter table SKEWED_STRING_LIST default character set utf8;
alter table SKEWED_STRING_LIST_VALUES default character set utf8;
alter table SKEWED_VALUES default character set utf8;
alter table SORT_COLS default character set utf8;
alter table TABLE_PARAMS default character set utf8;
alter table TAB_COL_STATS default character set utf8;
alter table TBLS default character set utf8;
alter table VERSION default character set utf8;
alter table BUCKETING_COLS convert to character set utf8;
alter table CDS convert to character set utf8;
alter table COLUMNS_V2 convert to character set utf8;
alter table DATAbase_PARAMS convert to character set utf8;
alter table DBS convert to character set utf8;
alter table FUNCS convert to character set utf8;
alter table FUNC_RU convert to character set utf8;
alter table GLOBAL_PRIVS convert to character set utf8;
alter table PARTITIONS convert to character set utf8;
alter table PARTITION_KEYS convert to character set utf8;
alter table PARTITION_KEY_VALS convert to character set utf8;
alter table PARTITION_PARAMS convert to character set utf8;
-- alter table PART_COL_STATS convert to character set utf8;
alter table ROLES convert to character set utf8;
alter table SDS convert to character set utf8;
alter table SD_PARAMS convert to character set utf8;
alter table SEQUENCE_TABLE convert to character set utf8;
alter table SERDES convert to character set utf8;
alter table SERDE_PARAMS convert to character set utf8;
alter table SKEWED_COL_NAMES convert to character set utf8;
alter table SKEWED_COL_VALUE_LOC_MAP convert to character set utf8;
alter table SKEWED_STRING_LIST convert to character set utf8;
alter table SKEWED_STRING_LIST_VALUES convert to character set utf8;
alter table SKEWED_VALUES convert to character set utf8;
alter table SORT_COLS convert to character set utf8;
alter table TABLE_PARAMS convert to character set utf8;
alter table TAB_COL_STATS convert to character set utf8;
alter table TBLS convert to character set utf8;
alter table VERSION convert to character set utf8;
-- alter table PART_COL_STATS convert to character set utf8;
SET character_set_client = utf8 ;
-- SET character_set_connection = utf8 ;

-- alter table PART_COL_STATS convert to character set utf8;
SET character_set_database = utf8 ;
SET character_set_results = utf8 ;
SET character_set_server = utf8 ;
-- SET collation_connection = utf8 ;
-- SET collation_database = utf8 ;
-- SET collation_server = utf8 ;
SET NAMES 'utf8';

只复制了博客中修改表字段的部分

看看就好,最好还是根据需求修改。

Ⅵ.实例
import org.apache.spark.sql.{SaveMode, SparkSession}

object CNHivePartitionTest {

  def main(args: Array[String]): Unit = {

    System.setProperty("hadoop.home.dir", "D:\BaiduNetdiskDownload\hadoop-2.7.3")
    System.setProperty("HADOOP_USER_NAME", "root")

    val spark = SparkSession.builder()
      .appName("Cov19DataDome4")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
      .config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._
    import org.apache.spark.sql.functions._

    spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
    spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")

    val mysqlMap = Map(
      "url" -> "jdbc:mysql://xx.xxx.x.x:3306/clown_db?useSSL=false&useUnicode=true&characterEncoding=utf8",
      "user" -> "root",
      "password" -> "xxx",
      "driver" -> "com.mysql.jdbc.Driver"
    )

    val mysqlData = spark.read.format("jdbc")
      .options(mysqlMap)
      .option("dbtable","tc_hotel2")
      .load()

    spark.sqlContext.sql(
      """
        |create table if not exists clown_test_db.CNTest
        |(
        |  `hname` string,
        |  `hbrand` string,
        |  `province` string,
        |  `city` string,
        |  `starlevel` string,
        |  `rating` string,
        |  `comment_count` string,
        |  `price` string
        |)
        |partitioned by (pid string)
        |row format delimited
        |fields terminated by 't'
        |lines terminated by 'n'
        |stored as textfile
        |""".stripMargin)

    mysqlData
      .select(col("*"),col("province"))
      .write
      .format("hive")
      .mode(SaveMode.Append)
      .insertInto("clown_test_db.CNTest")
  }

}
7、表连接join/union

参考资料:https://blog.csdn.net/m0_37809146/article/details/91282446

val tb1 = spark.read.format("jdbc")
  .options(mysqlMap)
  .option("dbtable", "cov19_test_tb")
  .load()

val tb2 = spark.read.format("jdbc")
  .options(mysqlMap)
  .option("dbtable", "cov19_test_tb_2")
  .load()
  .withColumnRenamed("", "")


tb1.join(tb2, Seq("provinceName", "cityName"), "inner")
//      .show(100)

tb1.join(tb2, Seq("provinceName", "cityName"), "right")
//      .show(100)

tb1.join(tb2, Seq("provinceName", "cityName"), "left")
//      .show(100)

val testTb1 = tb1.withColumnRenamed("cityName", "tb1CN")
val testTb2 = tb2.withColumnRenamed("cityName", "tb1CN")

//默认 inner连接,进行连接的条件字段必须两边表都存在
testTb1.join(testTb2, "tb1CN")
//      .show()



tb1.join(tb2, Seq("provinceName", "cityName"), "right_outer")
//      .show(100)


tb1.join(tb2, Seq("provinceName", "cityName"), "left_outer")
//      .show(100)


tb1.join(tb2, Seq("provinceName", "cityName"), "outer")
//      .show(100)


tb1.join(tb2, Seq("provinceName", "cityName"), "full")
//      .show(100)


tb1.join(tb2, Seq("provinceName", "cityName"), "full_outer")
//      .show(100)


tb1.join(tb2, Seq("provinceName", "cityName"), "left_semi")
  .show(100)


tb1.join(tb2, Seq("provinceName", "cityName"), "left_anti")
  .show(100)



8、自定义UDF,UDAF函数

Spark 2.4.0编程指南–Spark SQL UDF和UDAF-阿里云开发者社区 (aliyun.com)

(17条消息) Spark 2.3.0 用户自定义聚合函数UserDefinedAggregateFunction和Aggregator_L-CSDN博客

(17条消息) UDF和UDAF,UDTF的区别_山海-CSDN博客_udf和udtf区别

[(17条消息) Spark] 自定义函数 udf & pandas_udf_風の唄を聴け的博客-CSDN博客_pandas spark udf

9、数据集获取

UCI机器学习知识库:包括近300个不同大小和类型的数据集,可用于分类、回归、聚类和推荐系统任务。数据集列表位于:http://archive.ics.uci.edu/ml/

Amazon AWS公开数据集:包含的通常是大型数据集,可通过Amazon S3访问。这些数据集包括人类基因组项目、Common Craw网页语料库、维基百科数据和Google Books Ngrams。相关信息可参见:http://aws.amazon.com/publicdatasets/

Kaggle:这里集合了Kaggle举行的各种机器学习竞赛所用的数据集。它们覆盖分类、回归、排名、推荐系统以及图像分析领域,可从

Competitions区域下载:http://www.kaggle.com/competitions

KDnuggets:这里包含一个详细的公开数据集列表,其中一些上面提到过的。该列表位于:http://www.kdnuggets.com/datasets/index.html

10、数仓分层概念

参考资料:(10条消息) 数据仓库–数据分层(ETL、ODS、DW、APP、DIM)_hello_java_lcl的博客-CSDN博客_dim层

五、实战复盘 1、2022/1/3 题目:

数据源:

csv文件(未修改)

mysql表格(增加脏数据)

环境准备:

1.mysql数据表格 2.hive目标表 3.pom文件

完成速度:

3h+

遇到问题: 1.data->mysql,数据保存

SaveMode.Overwrite 保存至mysql数据库,不仅会覆盖数据格式,字段名也会被覆盖

在做题途中遇到了保存SaveMode.Append失败的错误,修改为Overwrite 不报错,原因不明

是否解决: ✔

出现错误

Unknown column 'sum' in 'field list'

原因是字段名与mysql数据库目标表中的字段名不同

修改字段名相同即可

.withColumnRenamed("sum","total_price")

在hive中是否有相同特性?

2.Join等表连接的使用

Join,union仍不熟悉 select子查询也比较生疏

是否解决: ✔?

join理解下图足够

union联合要求字段相同 否则报错

3.Date计算

参考资料:https://blog.csdn.net/wybshyy/article/details/52064337

使用datediff不需要转换时间格式

是否解决: ✔

参考资料:

(18条消息) Spark-SQL常用内置日期时间函数_绿萝蔓蔓绕枝生-CSDN博客_sparksql 时间函数

(18条消息) sparksql 时间函数_OH LEI``-CSDN博客_sparksql时间函数

datediff 计算两个时间差天数 结果返回一个整数

对时间格式可能有要求例如‘2021/1/4‘这样的时间格式无法被计算(sql中,算子貌似没有这个问题)

sql写法:

spark.sql(
  """
    |select datediff('2021-1-4','2020-12-30')
    |""".stripMargin).show()

算子写法:

.withColumn("o",datediff(col("delivery_date"),col("order_date")))

months_between计算两个时间差月数 结果返回一个浮点数

sql写法:

spark.sql(
  """
    |select months_between('2021-1-4','2020-12-30')
    |""".stripMargin).show()

返回:0.16129032

若想返回整数月份可以将天数删除:

spark.sql(
  """
    |select months_between('2021-1','2020-12')
    |""".stripMargin).show()

返回:1.0

算子写法:

.withColumn("o",months_between(col("delivery_date"),col("order_date")))

直接用时间戳相减通过计算也可以

spark.sql(
  """
    |select (unix_timestamp('2022/1/1','yyyy/MM/dd') - unix_timestamp('2021/12/31','yyyy/MM/dd'))/60/60/24
    |""".stripMargin).show()
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/701646.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号