前言:本文作为本人学习sparkSql离线数据抽取,离线数据处理的学习整理记录,文中参考博客均附上原文链接。
一、Hive环境准备 1、配置文件准备:/opt/hive/conf/hive-site.xml:(2021/12/31修改,添加了&useSSL=false&useUnicode=true&characterEncoding=utf8支持中文编码)
javax.jdo.option.ConnectionURL jdbc:mysql://localhost:3306/hive_demo?createDatabaseIfNotExist=true&useSSL=false&useUnicode=true&characterEncoding=utf8 hive的元数据库 javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver mysql的驱动jar包 javax.jdo.option.ConnectionUserName root 设定数据库的用户名 javax.jdo.option.ConnectionPassword xxx 设定数据库的密码 hive.exec.max.dynamic.partitions 100000 在所有执行MR的节点上,最大一共可以创建多少个动态分区 hive.exec.max.dynamic.partitions.pernode 100000 在所有执行MR的节点上,最大可以创建多少个动态分区
若要在idea环境下运行要把
hdfs-site.xml
core-site.xml
hive-site.xml
放到resources文件夹中
否则hive.exec.max.dynamic.partitions.pernode,hive.exec.max.dynamic.partitions
配置不生效
2、hosts设置若在不同网络环境下
需设置本地hosts
设置的内容为集群主机名
Ubuntu的hosts文件在 /etc 下
参考资料:(10条消息) java.lang.IllegalArgumentException: java.net.UnknownHostException: xxx_小健的博客-CSDN博客
3、远程连接服务开启hive --service metastore
参考资料:(13条消息) hive的几种启动方式_lbl的博客-CSDN博客_hive启动
4、其他mysql服务启动
service mysqld start
防火墙关闭
systemctl stop firewalld
二、IDEA环境准备 1、pom.xml文件2、增量抽取sparkDome1 org.example 1.0-SNAPSHOT 4.0.0 HiveAndMysql 8 8 2.7.7 2.1.1 2.11 org.apache.hadoop hadoop-client ${hadoop.version} org.apache.hadoop hadoop-common ${hadoop.version} org.apache.hadoop hadoop-hdfs ${hadoop.version} org.scala-lang scala-library 2.11.0 org.apache.spark spark-core_${scala.version} ${spark.version} org.apache.spark spark-sql_${scala.version} ${spark.version} org.apache.spark spark-streaming_${scala.version} ${spark.version} org.apache.spark spark-mllib_${scala.version} ${spark.version} org.apache.spark spark-hive_${scala.version} ${spark.version} mysql mysql-connector-java 5.1.48 org.scala-tools maven-scala-plugin 2.15.2 scala-compile compile ** object ShopTest { def main(args: Array[String]): Unit = { //设置用户名,防止因为权限不足无法创建文件 System.setProperty("HADOOP_USER_NAME", "root") //获取实例对象 val spark = SparkSession.builder() .appName("ShopTest") .master("local[*]") .config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse") .config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083") .enableHiveSupport() .getOrCreate() //jdbc连接配置 val mysqlMap = Map( "url" -> "jdbc:mysql://xx.xxx.x.x:3306/clown_db?useSSL=false", "user" -> "root", "password" -> "xxx", "driver" -> "com.mysql.jdbc.Driver" ) //使用jdbc抽取mysql表数据 val inputTable = spark.read.format("jdbc") .options(mysqlMap) .option("dbtable", "EcData_tb_1") .load() // inputTable.show() //将mysql表数据创建为临时表 inputTable.createOrReplaceTempView("inputTable") //hive动态分区开启 spark.sqlContext.sql("set hive.exec.dynamic.partition = true") //hive分区模式设置,默认为strict严格模式,若设置分区必须要有一个静态分区 //需要设置为nonstrict模式,可以都是动态分区 spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict") //hive分区数设置,目前版本已无法在程序中设置,参考上文Hive环境准备-配置文件准备 spark.sqlContext.sql("set hive.exec.max.dynamic.partitions.pernode = 10000") spark.sqlContext.sql("set hive.exec.max.dynamic.partitions = 10000") // mysql表结构,通过desc table tb_name;命令可获取 //于hive数据库,ods层中创建表 spark.sqlContext.sql( """ |create table if not exists clown_test_db.ShopTest_ods_tb_1 |( | InvoiceNo string , | StockCode string , | Description string , | Quantity int , | InvoiceDate string , | UnitPrice double , | CustomerID int , | Country string |) |partitioned by (country_pid string,customer_pid int) |row format delimited |fields terminated by 't' //本数据中字段值存在','不能用','作为分隔符 |lines terminated by 'n' |stored as textfile |""".stripMargin) //使用sql-insert into 语句将mysql数据全部导入hive表中 spark.sqlContext.sql( """ |insert into table clown_test_db.ShopTest_ods_tb_1 partition(country_pid,customer_pid) |select *,Country,CustomerID from inputTable |""".stripMargin) } }
import java.text.SimpleDateFormat
import org.apache.spark.sql.{SaveMode, SparkSession}
object ShopTest2 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","root")
val spark = SparkSession.builder()
.appName("ShopTest2")
.master("local[*]")
.config("spark.sql.warehouse.dir","hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
.config("hive.metastore.uris","thrift://xx.xxx.x.x:9083")
.enableHiveSupport()
.getOrCreate()
//隐式转换,sql方法导入
import spark.implicits._
import org.apache.spark.sql.functions._
spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
//直接通过sql语句获取到hive ods层中的表数据
val inputData = spark.sqlContext.sql("select * from clown_test_db.ShopTest_ods_tb_1")
//设置时间条件
val timeStr = "2011/01/01 00:00"
val timeTemp = new SimpleDateFormat("yyyy/MM/dd HH:mm").parse(timeStr).getTime//单位为ms
println(timeTemp)
//未转换前的数据格式为:12/8/2010 9:53
val timeFormat = inputData
.withColumn("InvoiceDate",unix_timestamp($"InvoiceDate","MM/dd/yyyy HH:mm"))//时间戳获取,单位为s
.where(s"InvoiceDate>$timeTemp/1000")//增量条件判断
.withColumn("InvoiceDate",from_unixtime($"InvoiceDate","yyyy/MM/dd HH:mm"))//时间格式转换
.where("Country='United Kingdom' or Country = 'Finland'")//筛选出国家名为United Kingdom 或 Finland的数据
//由于该ods层表与目标dwd层表结构相同,直接用like语句创建结构相同的dwd表
spark.sqlContext.sql(
"""
|create table if not exists clown_dwd_db.shoptest_dwd_tb_1
|like clown_test_db.ShopTest_ods_tb_1
|""".stripMargin)
//使用sparkSql算子将数据由ods表数据增量抽取到dwd表中
timeFormat.write.format("hive")
.mode(SaveMode.Append)
.insertInto("clown_dwd_db.shoptest_dwd_tb_1")
}
}
3、数据清洗
import org.apache.spark.sql.{SaveMode, SparkSession}
object ShopTest3 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","root")
val spark = SparkSession.builder()
.appName("ShopTest3")
.master("local[*]")
.config("spark.sql.warehouse.dir","hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
.config("hive.metastore.uris","thrift://xx.xxx.x.x:9083")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
val data = spark.sqlContext.sql("select * from clown_dwd_db.shoptest_dwd_tb_1")
spark.sqlContext.sql(
"""
|create table if not exists clown_dwd_db.shopTest_dwd_tb_3
|(
| InvoiceNo string ,
| StockCode string ,
| Description string ,
| Quantity int ,
| InvoiceDate string ,
| UnitPrice double ,
| CustomerID int ,
| Country string
|)
|partitioned by (country_pid string)
|row format delimited
|fields terminated by 't'
|lines terminated by 'n'
|stored as textfile
|""".stripMargin)
//使用na.fill对缺失值进行填充
//使用na.drop对缺失值进行剔除
data.na.fill(
Map(
"Country"->"Country_Null",
"CustomerID"->0
)
)
.na.drop(
Seq("UnitPrice","Quantity")
) .selectExpr("InvoiceNo","StockCode","Description","Quantity","InvoiceDate","UnitPrice","CustomerID","Country","Country")//由于数据中存在分区表字段,且该字段关联数据已改变,需要重新进行赋值
.limit(10000)
.write
.format("hive")
.mode(SaveMode.Append)
.insertInto("clown_dwd_db.shopTest_dwd_tb_3")
}
}
4、指标计算
import org.apache.spark.sql.SparkSession
object ShopTest4 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val spark = SparkSession.builder()
.appName("ShopTest4")
.master("local[*]")
.config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
.config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
val data = spark.sqlContext.sql("select * from clown_dwd_db.shopTest_dwd_tb_3")
data.dropDuplicates("CustomerID","Country")//去重
.withColumn("x",lit(1))//添加一列数据都为1
.groupBy("Country")//聚合国家字段
.sum("x")//对1数据字段进行累加
.show(20)
data.withColumn("x", $"Quantity" * $"UnitPrice")//添加销售额字段,值为数量*单价
.groupBy("Country")//聚合国家字段
.sum("x")//计算总销售额
.withColumn("sum(x)", round($"sum(x)", 2))//对结果字段进行四舍五入到两位,但round会对最后一位0省略,最好使用其他函数
data.groupBy("StockCode")//聚合商品编码字段
.sum("Quantity")//计算销量
.coalesce(1)//将spark分区设置为1,防止后面排序混乱
.orderBy(desc("sum(Quantity)"))//由大到小排序
.show(10)
data.withColumn("InvoiceDate",substring_index($"InvoiceDate","/",2))//由于数据在增量抽取阶段已进行时间格式转换,可直接进行切割得出 年份/月份 的格式,substring_index与split不同
.withColumn("x",$"Quantity"*$"UnitPrice")//计算销售额
.groupBy("InvoiceDate")//对月份进行聚合
.sum("x")//计算总销售额
.coalesce(1)//设置spark分区为1
.orderBy(desc("InvoiceDate"))//由大到小排序
.withColumn("sum(x)",round($"sum(x)",2))//四舍五入到2位
.show(100)
data.select(col("Description"))//商品将描述字段单独查询
.flatMap(x=>x.toString().split("\W"))//进行flatMap 切割后展平,切割\W为正则匹配模式,匹配所有符号
.withColumn("x",lit(1))//增加1的数据列
.groupBy("value")//展平后字段名为value,进行聚合
.sum("x")//累加1数据
.where("value != '' ")//筛除空白数据
.coalesce(1)//设置spark分区为1
.orderBy(desc("sum(x)"))//由大到小排序
.show(300)//展示300条
}
}
import org.apache.spark.sql.SparkSession
object ShopTest5 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val spark = SparkSession.builder()
.appName("ShopTest5")
.master("local[*]")
.config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
.config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
val data = spark.sqlContext.sql("select * from clown_dwd_db.shopTest_dwd_tb_3")
data.createOrReplaceTempView("dataTable")
//对去重后的Country,CustomerID进行聚合统计即可得出各个国家的客户数
spark.sqlContext.sql(
"""
|select Country,count(distinct Country,CustomerID) from dataTable group by Country
|""".stripMargin)
.show()
spark.sqlContext.sql(
"""
|select Country ,round(sum(Quantity*UnitPrice),2)
|from dataTable
|group by Country
|""".stripMargin)
.show()
spark.sqlContext.sql(
"""
|select StockCode,round(sum(Quantity*UnitPrice),2) as xl
|from dataTable
|group by StockCode
|order by xl desc
|""".stripMargin)
.show(10)
//group by执行优先度可能高于 as 重命名,因此as后的名字无法用于group by 聚合
spark.sqlContext.sql(
"""
|select substring_index(InvoiceDate,"/",2) as time,round(sum(Quantity*UnitPrice),2) as sum
|from dataTable
|group by substring_index(InvoiceDate,"/",2)
|order by substring_index(InvoiceDate,"/",2)
|""".stripMargin)
.show()
//目前认为该题用sql解法没有必要
//- -
}
}
四、其他
1、hive分区的增删改查
参考资料:(15条消息) HIve学习:Hive分区修改_u011047968的专栏-CSDN博客_hive修改分区
hive表新增分区:[]内的不必要
alter table tb_name add partition (pid1 = ‘’,pid2 = ) [location ‘xxx’]
多个分区
alter table tb_name add partition (pid1 = ‘’,pid2 = ) partition (pid1 = ‘’,pid2 = ) [location ‘xxx’]
hive表修改分区:
alter table tb_name partition(pid1='') rename to partition(pid1=''); alter table tb_name partition(pid1='') set location 'hdfs://master:8020/....'; alter table tb_name partition column (pid1 string);
hive表删除分区:
alter table tb_name drop partition (pid1 = ‘’,pid2 = )[ partition (pid1 = ‘’,pid2 = )…]
hive分区值查询:
show partitions tb_name;2、spark打包运行
命令:
spark-submit --class ShopTest4 --master spark://xx.xxx.x.x:7077 --executor-memory 512m --total-executor-cores 1 untitled-1.0-SNAPSHOT.jar
若使用了jdbc连接,需要指明驱动jar包 mysql-connector-java-5.1.48.jar
spark-submit --jars mysql-connector-java-5.1.48.jar --class ShopTest --master spark://xx.xxx.x.x:7077 --executor-memory 512m --total-executor-cores 1 untitled-1.0-SNAPSHOT.jar
或者将mysql驱动放至 $‘spark_home’/jars 目录下
3、时间格式时间模式字符串用来指定时间格式。在此模式中,所有的 ASCII 字母被保留为模式字母,定义如下:
| 字母 | 描述 | 示例 |
|---|---|---|
| G | 纪元标记 | AD |
| y | 四位年份 | 2001 |
| M | 月份 | July or 07 |
| d | 一个月的日期 | 10 |
| h | A.M./P.M. (1~12)格式小时 | 12 |
| H | 一天中的小时 (0~23) | 22 |
| m | 分钟数 | 30 |
| s | 秒数 | 55 |
| S | 毫秒数 | 234 |
| E | 星期几 | Tuesday |
| D | 一年中的日子 | 360 |
| F | 一个月中第几周的周几 | 2 (second Wed. in July) |
| w | 一年中第几周 | 40 |
| W | 一个月中第几周 | 1 |
| a | A.M./P.M. 标记 | PM |
| k | 一天中的小时(1~24) | 24 |
| K | A.M./P.M. (0~11)格式小时 | 10 |
| z | 时区 | Eastern Standard Time |
| ’ | 文字定界符 | Delimiter |
| " | 单引号 | ` |
Scala 的正则表达式继承了 Java 的语法规则,Java 则大部分使用了 Perl 语言的规则。
下表我们给出了常用的一些正则表达式规则:(注意:需要转义,算子中写为,sql语句中写为\)
| 表达式 | 匹配规则 |
|---|---|
| ^ | 匹配输入字符串开始的位置。 |
| $ | 匹配输入字符串结尾的位置。 |
| . | 匹配除"rn"之外的任何单个字符。 |
| […] | 字符集。匹配包含的任一字符。例如,"[abc]“匹配"plain"中的"a”。 |
| [^…] | 反向字符集。匹配未包含的任何字符。例如,"[^abc]“匹配"plain"中"p”,“l”,“i”,“n”。 |
| A | 匹配输入字符串开始的位置(无多行支持) |
| z | 字符串结尾(类似$,但不受处理多行选项的影响) |
| Z | 字符串结尾或行尾(不受处理多行选项的影响) |
| re* | 重复零次或更多次 |
| re+ | 重复一次或更多次 |
| re? | 重复零次或一次 |
| re{ n} | 重复n次 |
| re{ n,} | |
| re{ n, m} | 重复n到m次 |
| a|b | 匹配 a 或者 b |
| (re) | 匹配 re,并捕获文本到自动命名的组里 |
| (?: re) | 匹配 re,不捕获匹配的文本,也不给此分组分配组号 |
| (?> re) | 贪婪子表达式 |
| w | 匹配字母或数字或下划线或汉字 |
| W | 匹配任意不是字母,数字,下划线,汉字的字符 |
| s | 匹配任意的空白符,相等于 [tnrf] |
| S | 匹配任意不是空白符的字符 |
| d | 匹配数字,类似 [0-9] |
| D | 匹配任意非数字的字符 |
| G | 当前搜索的开头 |
| n | 换行符 |
| b | 通常是单词分界位置,但如果在字符类里使用代表退格 |
| B | 匹配不是单词开头或结束的位置 |
| t | 制表符 |
| Q | 开始引号:Q(a+b)*3E 可匹配文本 “(a+b)*3”。 |
| E | 结束引号:Q(a+b)*3E 可匹配文本 “(a+b)*3”。 |
正则表达式实例
| 实例 | 描述 |
|---|---|
| . | 匹配除"rn"之外的任何单个字符。 |
| [Rr]uby | 匹配 “Ruby” 或 “ruby” |
| rub[ye] | 匹配 “ruby” 或 “rube” |
| [aeiou] | 匹配小写字母 :aeiou |
| [0-9] | 匹配任何数字,类似 [0123456789] |
| [a-z] | 匹配任何 ASCII 小写字母 |
| [A-Z] | 匹配任何 ASCII 大写字母 |
| [a-zA-Z0-9] | 匹配数字,大小写字母 |
| [^aeiou] | 匹配除了 aeiou 其他字符 |
| [^0-9] | 匹配除了数字的其他字符 |
| d | 匹配数字,类似: [0-9] |
| D | 匹配非数字,类似: [^0-9] |
| s | 匹配空格,类似: [ trnf] |
| S | 匹配非空格,类似: [^ trnf] |
| w | 匹配字母,数字,下划线,类似: [A-Za-z0-9_] |
| W | 匹配非字母,数字,下划线,类似: [^A-Za-z0-9_] |
| ruby? | 匹配 “rub” 或 “ruby”: y 是可选的 |
| ruby* | 匹配 “rub” 加上 0 个或多个的 y。 |
| ruby+ | 匹配 “rub” 加上 1 个或多个的 y。 |
| d{3} | 刚好匹配 3 个数字。 |
| d{3,} | 匹配 3 个或多个数字。 |
| d{3,5} | 匹配 3 个、4 个或 5 个数字。 |
| Dd+ | 无分组: + 重复 d |
| (Dd)+/ | 分组: + 重复 Dd 对 |
| ([Rr]uby(, )?)+ | 匹配 “Ruby”、“Ruby, ruby, ruby”,等等 |
常用可以应用正则的函数:
.split("")切割字符串
.regexp_extract(string subject, string pattern, int index) 将字符串subject按照pattern正则表达式的规则拆分,返回index指定的字符
.regexp_replace(string A, string B, string C) 将字符串A中的符合Java正则表达式B的部分替换为C
.equals("")匹配
5、SQL like与rlikelike为通配符匹配,不是正则
%:匹配零个及多个任意字符
_:与任意单字符匹配
[]:匹配一个范围
[^]:排除一个范围
rlike为正则匹配
regexp与rlike功能相似
参考资料:(15条消息) sparksql 正则匹配总结_Andrew LD-CSDN博客_spark 正则表达式
6、中文数据关于csv文件若包含中文,可在读取时设置option参数
val inputData = spark.sqlContext.read.format("csv")
.option("sep","t")
.option("encoding","GBK")
.option("header","true")
.load("file:///C:\Users\61907\Desktop\BigData\Spark\sparkDome1\HiveAndMysql\src\main\resources\cov19.csv")
jdbc读取数据库数据时,若有中文需设置jdbc连接参数
&useUnicode=true&characterEncoding=utf8
// jdbc中文编码设置
val mysqlMap = Map(
"url"->"jdbc:mysql://xx.xxx.x.x:3306/clown_db?useSSL=false&useUnicode=true&characterEncoding=utf8",
"user"->"root",
"password"->"xxx",
"driver"->"com.mysql.jdbc.Driver"
)
关于hive中存储中文数据,中文注释,中文分区(索引)
Ⅰ~Ⅲ参考资料:
(16条消息) hive设置中文编码格式utf-8_2020xyz的博客-CSDN博客_hive建表指定编码格式
(16条消息) hive修改使用utf8编码支持中文字符集_那又怎样?的博客-CSDN博客_hive默认字符集编码
Ⅰ.元数据库设置元数据库需设置为utf-8编码
##创建hive元数据库hive,并指定utf-8编码格式 mysql>create database hive DEFAULT CHARSET utf8 COLLATE utf8_general_ci; ##修改已存在的hive元数据库,字符编码格式为utf-8 mysql>alter database hive character set utf8; ##进入hive元数据库 mysql>use hive; ##查看元数据库字符编码格式 mysql>show variables like 'character_set_database';Ⅱ.相关表设置
1).修改字段注释字符集
mysql>alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;
2).修改表注释字符集
mysql>alter table TABLE_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
类似的,PARAM_KEY若需要中文也可设置为utf8
3).修改分区表参数,以支持分区能够用中文表示
mysql>alter table PARTITION_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8; mysql>alter table PARTITION_KEYS modify column PKEY_COMMENT varchar(4000) character set utf8;
另外,PARTITIONS表中存放分区名的字段也需要修改为utf8
mysql>alter table PARTITIONS modify column PART_name varchar(4000) character set utf8;
4).修改索引注解
mysql>alter table INDEX_PARAMS modify column PARAM_VALUE varchar(250) character set utf8;Ⅲ.hive-site.xml配置文件设置
需要在jdbc连接中设置支持中文编码
&useSSL=false&useUnicode=true&characterEncoding=utf8
其中&需要使用&转义
参考资料:(16条消息) 【已解决】The reference to entity “useSSL” must end with the ‘;’ delimiter_清宵尚温的博客-CSDN博客
/opt/hive/conf/hive-site.xml:
Ⅳ.未解决问题javax.jdo.option.ConnectionURL jdbc:mysql://localhost:3306/hive_demo?createDatabaseIfNotExist=true&useSSL=false&useUnicode=true&characterEncoding=utf8 hive的元数据库 javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver mysql的驱动jar包 javax.jdo.option.ConnectionUserName root 设定数据库的用户名 javax.jdo.option.ConnectionPassword xxx 设定数据库的密码 hive.exec.max.dynamic.partitions 100000 在所有执行MR的节点上,最大一共可以创建多少个动态分区 hive.exec.max.dynamic.partitions.pernode 100000 在所有执行MR的节点上,最大可以创建多少个动态分区
hdfs文件系统中显示
虽然正常显示中文但在文件夹中会出现
Path does not exist on HDFS or WebHDFS is disabled. Please check your path or enable WebHDFS
可能是中文路径导致的错误,但该错误目前未影响到分区表的正常操作,具体影响仍需实验。
Ⅴ.暴力脚本- -参考资料:(16条消息) hive分区字段含中文导致的报错_一定要努力努力再努力的博客-CSDN博客_hive分区字段是中文
alter database hive_meta default character set utf8; alter table BUCKETING_COLS default character set utf8; alter table CDS default character set utf8; alter table COLUMNS_V2 default character set utf8; alter table DATAbase_PARAMS default character set utf8; alter table DBS default character set utf8; alter table FUNCS default character set utf8; alter table FUNC_RU default character set utf8; alter table GLOBAL_PRIVS default character set utf8; alter table PARTITIONS default character set utf8; alter table PARTITION_KEYS default character set utf8; alter table PARTITION_KEY_VALS default character set utf8; alter table PARTITION_PARAMS default character set utf8; -- alter table PART_COL_STATS default character set utf8; alter table ROLES default character set utf8; alter table SDS default character set utf8; alter table SD_PARAMS default character set utf8; alter table SEQUENCE_TABLE default character set utf8; alter table SERDES default character set utf8; alter table SERDE_PARAMS default character set utf8; alter table SKEWED_COL_NAMES default character set utf8; alter table SKEWED_COL_VALUE_LOC_MAP default character set utf8; alter table SKEWED_STRING_LIST default character set utf8; alter table SKEWED_STRING_LIST_VALUES default character set utf8; alter table SKEWED_VALUES default character set utf8; alter table SORT_COLS default character set utf8; alter table TABLE_PARAMS default character set utf8; alter table TAB_COL_STATS default character set utf8; alter table TBLS default character set utf8; alter table VERSION default character set utf8; alter table BUCKETING_COLS convert to character set utf8; alter table CDS convert to character set utf8; alter table COLUMNS_V2 convert to character set utf8; alter table DATAbase_PARAMS convert to character set utf8; alter table DBS convert to character set utf8; alter table FUNCS convert to character set utf8; alter table FUNC_RU convert to character set utf8; alter table GLOBAL_PRIVS convert to character set utf8; alter table PARTITIONS convert to character set utf8; alter table PARTITION_KEYS convert to character set utf8; alter table PARTITION_KEY_VALS convert to character set utf8; alter table PARTITION_PARAMS convert to character set utf8; -- alter table PART_COL_STATS convert to character set utf8; alter table ROLES convert to character set utf8; alter table SDS convert to character set utf8; alter table SD_PARAMS convert to character set utf8; alter table SEQUENCE_TABLE convert to character set utf8; alter table SERDES convert to character set utf8; alter table SERDE_PARAMS convert to character set utf8; alter table SKEWED_COL_NAMES convert to character set utf8; alter table SKEWED_COL_VALUE_LOC_MAP convert to character set utf8; alter table SKEWED_STRING_LIST convert to character set utf8; alter table SKEWED_STRING_LIST_VALUES convert to character set utf8; alter table SKEWED_VALUES convert to character set utf8; alter table SORT_COLS convert to character set utf8; alter table TABLE_PARAMS convert to character set utf8; alter table TAB_COL_STATS convert to character set utf8; alter table TBLS convert to character set utf8; alter table VERSION convert to character set utf8; -- alter table PART_COL_STATS convert to character set utf8; SET character_set_client = utf8 ; -- SET character_set_connection = utf8 ; -- alter table PART_COL_STATS convert to character set utf8; SET character_set_database = utf8 ; SET character_set_results = utf8 ; SET character_set_server = utf8 ; -- SET collation_connection = utf8 ; -- SET collation_database = utf8 ; -- SET collation_server = utf8 ; SET NAMES 'utf8';
只复制了博客中修改表字段的部分
看看就好,最好还是根据需求修改。
Ⅵ.实例import org.apache.spark.sql.{SaveMode, SparkSession}
object CNHivePartitionTest {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "D:\BaiduNetdiskDownload\hadoop-2.7.3")
System.setProperty("HADOOP_USER_NAME", "root")
val spark = SparkSession.builder()
.appName("Cov19DataDome4")
.master("local[*]")
.config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")
.config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
spark.sqlContext.sql("set hive.exec.dynamic.partition = true")
spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
val mysqlMap = Map(
"url" -> "jdbc:mysql://xx.xxx.x.x:3306/clown_db?useSSL=false&useUnicode=true&characterEncoding=utf8",
"user" -> "root",
"password" -> "xxx",
"driver" -> "com.mysql.jdbc.Driver"
)
val mysqlData = spark.read.format("jdbc")
.options(mysqlMap)
.option("dbtable","tc_hotel2")
.load()
spark.sqlContext.sql(
"""
|create table if not exists clown_test_db.CNTest
|(
| `hname` string,
| `hbrand` string,
| `province` string,
| `city` string,
| `starlevel` string,
| `rating` string,
| `comment_count` string,
| `price` string
|)
|partitioned by (pid string)
|row format delimited
|fields terminated by 't'
|lines terminated by 'n'
|stored as textfile
|""".stripMargin)
mysqlData
.select(col("*"),col("province"))
.write
.format("hive")
.mode(SaveMode.Append)
.insertInto("clown_test_db.CNTest")
}
}
7、表连接join/union
参考资料:https://blog.csdn.net/m0_37809146/article/details/91282446
val tb1 = spark.read.format("jdbc")
.options(mysqlMap)
.option("dbtable", "cov19_test_tb")
.load()
val tb2 = spark.read.format("jdbc")
.options(mysqlMap)
.option("dbtable", "cov19_test_tb_2")
.load()
.withColumnRenamed("", "")
tb1.join(tb2, Seq("provinceName", "cityName"), "inner")
// .show(100)
tb1.join(tb2, Seq("provinceName", "cityName"), "right")
// .show(100)
tb1.join(tb2, Seq("provinceName", "cityName"), "left")
// .show(100)
val testTb1 = tb1.withColumnRenamed("cityName", "tb1CN")
val testTb2 = tb2.withColumnRenamed("cityName", "tb1CN")
//默认 inner连接,进行连接的条件字段必须两边表都存在
testTb1.join(testTb2, "tb1CN")
// .show()
tb1.join(tb2, Seq("provinceName", "cityName"), "right_outer")
// .show(100)
tb1.join(tb2, Seq("provinceName", "cityName"), "left_outer")
// .show(100)
tb1.join(tb2, Seq("provinceName", "cityName"), "outer")
// .show(100)
tb1.join(tb2, Seq("provinceName", "cityName"), "full")
// .show(100)
tb1.join(tb2, Seq("provinceName", "cityName"), "full_outer")
// .show(100)
tb1.join(tb2, Seq("provinceName", "cityName"), "left_semi")
.show(100)
tb1.join(tb2, Seq("provinceName", "cityName"), "left_anti")
.show(100)
8、自定义UDF,UDAF函数
Spark 2.4.0编程指南–Spark SQL UDF和UDAF-阿里云开发者社区 (aliyun.com)
(17条消息) Spark 2.3.0 用户自定义聚合函数UserDefinedAggregateFunction和Aggregator_L-CSDN博客
(17条消息) UDF和UDAF,UDTF的区别_山海-CSDN博客_udf和udtf区别
[(17条消息) Spark] 自定义函数 udf & pandas_udf_風の唄を聴け的博客-CSDN博客_pandas spark udf
9、数据集获取UCI机器学习知识库:包括近300个不同大小和类型的数据集,可用于分类、回归、聚类和推荐系统任务。数据集列表位于:http://archive.ics.uci.edu/ml/
Amazon AWS公开数据集:包含的通常是大型数据集,可通过Amazon S3访问。这些数据集包括人类基因组项目、Common Craw网页语料库、维基百科数据和Google Books Ngrams。相关信息可参见:http://aws.amazon.com/publicdatasets/
Kaggle:这里集合了Kaggle举行的各种机器学习竞赛所用的数据集。它们覆盖分类、回归、排名、推荐系统以及图像分析领域,可从
Competitions区域下载:http://www.kaggle.com/competitions
KDnuggets:这里包含一个详细的公开数据集列表,其中一些上面提到过的。该列表位于:http://www.kdnuggets.com/datasets/index.html
10、数仓分层概念参考资料:(10条消息) 数据仓库–数据分层(ETL、ODS、DW、APP、DIM)_hello_java_lcl的博客-CSDN博客_dim层
五、实战复盘 1、2022/1/3 题目: 数据源:csv文件(未修改)
mysql表格(增加脏数据)
环境准备:1.mysql数据表格 2.hive目标表 3.pom文件
完成速度:3h+
遇到问题: 1.data->mysql,数据保存SaveMode.Overwrite 保存至mysql数据库,不仅会覆盖数据格式,字段名也会被覆盖
在做题途中遇到了保存SaveMode.Append失败的错误,修改为Overwrite 不报错,原因不明
是否解决: ✔
出现错误
Unknown column 'sum' in 'field list'
原因是字段名与mysql数据库目标表中的字段名不同
修改字段名相同即可
.withColumnRenamed("sum","total_price")
在hive中是否有相同特性?
2.Join等表连接的使用Join,union仍不熟悉 select子查询也比较生疏
是否解决: ✔?
join理解下图足够
union联合要求字段相同 否则报错
3.Date计算参考资料:https://blog.csdn.net/wybshyy/article/details/52064337
使用datediff不需要转换时间格式
是否解决: ✔
参考资料:
(18条消息) Spark-SQL常用内置日期时间函数_绿萝蔓蔓绕枝生-CSDN博客_sparksql 时间函数
(18条消息) sparksql 时间函数_OH LEI``-CSDN博客_sparksql时间函数
datediff 计算两个时间差天数 结果返回一个整数
对时间格式可能有要求例如‘2021/1/4‘这样的时间格式无法被计算(sql中,算子貌似没有这个问题)
sql写法:
spark.sql(
"""
|select datediff('2021-1-4','2020-12-30')
|""".stripMargin).show()
算子写法:
.withColumn("o",datediff(col("delivery_date"),col("order_date")))
months_between计算两个时间差月数 结果返回一个浮点数
sql写法:
spark.sql(
"""
|select months_between('2021-1-4','2020-12-30')
|""".stripMargin).show()
返回:0.16129032
若想返回整数月份可以将天数删除:
spark.sql(
"""
|select months_between('2021-1','2020-12')
|""".stripMargin).show()
返回:1.0
算子写法:
.withColumn("o",months_between(col("delivery_date"),col("order_date")))
直接用时间戳相减通过计算也可以
spark.sql(
"""
|select (unix_timestamp('2022/1/1','yyyy/MM/dd') - unix_timestamp('2021/12/31','yyyy/MM/dd'))/60/60/24
|""".stripMargin).show()



