SQL ON HADOOP
1. SparkSession hive 支持--> 能够去操作Hive对应的metastrore里面的数据 spark.sql(......) ThriftServer <==> beeline/ 2.hive -e
SQL通过UI配置 ==> 存储到某个地方 ==> mysql
开发一个通用的SQL执行作业(一个或者一类)
raw ⇒ ods ⇒ 各种维度的统计分析(可能会涉及到一系列的SQL)
access_ods
统计 province
isp
MySQL表 如何设计rzdata_sql
id int sql varchar(....) 业务sql series varchar 业务分类 access index int (业务内的sql顺序问题)1. hive表 Text格式数据处理
hive建表
create table ods_access( ip string, proxy_ip string, response_time bigint, referer string, method string, url string, http_code string, request_size bigint, response_size bigint, cache string, province string, city string, isp string, http string, domain string, path string, year string, month string, day string) partitioned by (d string) row format delimited fields terminated by 't' location '/csz-project/hive_test/ods_access';
上传access.log文件
218.30.86.0 - 86107 "-" GET http://www.ruoze.ke.qq.com/a&b=asds/sd.html 200 732 6848 MISS 北京 北京市 电信 http www.ruoze.ke.qq.com /a&b=asds/sd.html 2020 10 1 218.0.0.0 - 60232 "-" POST https://www.sina.com.cn/article/details/81201864 500 143 5488 MISS 浙江省 宁波市 电信 https www.sina.com.cn /article/details/81201864 2020 10 1 119.0.0.0 - 13249 "-" POST http://www.ruoze.ke.qq.com/s?ie=utf-8&f=8&rsv_bp=1&tn=0200 500 367 8913 MISS 贵州省 黔东南 电信 http www.ruoze.ke.qq.com /s?ie=utf-8&f=8&rsv_bp=1&tn=0200 2020 10 1
导入数据
load data local inpath '/data/csz-project/spark-project/data/access.log' into table ods_access PARTITION (d='20211123'); select * from ods_access where d = "20211123";
还需要两个表
dws_access_domain_traffic 根据域名统计 create table dws_access_domain_traffic ( domain string, traffics string) partitioned by (d string) row format delimited fields terminated by 't' location '/csz-project/hive_test/dws_access_domain_traffic'; dws_access_province_traffic 更具省份统计 create table dws_access_province_traffic ( province string, traffics string) partitioned by (d string) row format delimited fields terminated by 't' location '/csz-project/hive_test/dws_access_province_traffic';
low一点的实现方法
insert overwrite table csz_test.dws_access_domain_traffic partition (d='20211123') select domain,sum(response_size) as traffics from csz_test.ods_access where d ='20211123' group by domain insert overwrite table csz_test.dws_access_province_traffic partition (d='20211123') select province,sum(response_size) as traffics from csz_test.ods_access where d ='20211123' group by province
可以直接写入DB ,但是分区要改为{day}
insert overwrite table csz_test.dws_access_domain_traffic partition (d='{day}')
select domain,sum(response_size) traffics from csz_test.ods_access where d ='{day}' group by domain
insert overwrite table csz_test.dws_access_province_traffic partition (d='{day}')
select province,sum(response_size) traffics from csz_test.ods_access where d ='{day}' group by province
可以使用IDEA的databases链接mysql
(这里补充一个, CDH文件的配置都在/var/run/cloudera-scm-agent/process 内)
package com.cityos.spark.SQLtest
//import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
object SQLApp {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","hdfs")
val spark : SparkSession = SparkSession.builder()
.appName(this.getClass.getCanonicalName)
.master("local[2]")
.enableHiveSupport()
.getOrCreate()
// spark.sql("select * from csz_test.ods_access limit 100").show()
import spark.implicits._
val templates = spark.read.format("jdbc")
.option("url", "")
.option("dbtable", "tracking.rzdata_sql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "").load()
.filter('series === "access")
.orderBy('index.asc)
.as[SQLTemplate]
.collect()
//并行执行
templates.par.map(x =>{
val str = x.sql.replace("{day}", "20211123")
// logError(str)
spark.sql(str)
})
spark.stop()
}
}
启动运行,速度非常慢,因为底层是行式存储
打包上传服务器,放入lib目录下
spark-submit --master local[2] --name SQLApp --class com.cityos.spark.SQLtest.SQLApp --jars /data/csz-project/spark-project/lib/mysql-connector-java-5.1.47.jar --driver-class-path /data/csz-project/spark-project/lib/mysql-connector-java-5.1.47.jar /data/csz-project/spark-project/lib/cityos-spark-sparksql-1.0.jar2. hive表 ORC格式数据处理
hive建表
create table ods_access2( ip string, proxy_ip string, response_time bigint, referer string, method string, url string, http_code string, request_size bigint, response_size bigint, cache string, province string, city string, isp string, http string, domain string, path string, year string, month string, day string) partitioned by (d string) row format delimited fields terminated by 't' stored as orc location '/csz-project/hive_test/ods_access2'; load data local inpath '/data/csz-project/spark-project/data/access.log' into table ods_access2 PARTITION (d='20211123'); dws_access_domain_traffic 根据域名统计 create table dws_access_domain_traffic2 ( domain string, traffics string) partitioned by (d string) row format delimited fields terminated by 't' stored as orc location '/csz-project/hive_test/dws_access_domain_traffic2'; dws_access_province_traffic 更具省份统计 create table dws_access_province_traffic2 ( province string, traffics string) partitioned by (d string) row format delimited fields terminated by 't' stored as orc location '/csz-project/hive_test/dws_access_province_traffic2';
如何将text转换为ORC,只要将text表数据插入ORC,带字段名就行
这里需要调优,文件太多了
但是生产上面MYSQL里不允许保留insert字段,只保留select
select domain,sum(response_size),d traffics from csz_test.ods_access2 where d ='{day}' group by domain,d
select province,sum(response_size),d traffics from csz_test.ods_access2 where d ='{day}' group by province,d
package com.cityos.spark.SQLtest
case class SQLTemplate (id:Int, sql: String, series: String, index: Int,db:String, tbl:String){
}
package com.cityos.spark.SQLtest
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SaveMode, SparkSession}
object SQLApp extends Logging{
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","hdfs")
val spark : SparkSession = SparkSession.builder()
.appName(this.getClass.getCanonicalName)
.config("hive.exec.dynamic.partition.mode","constrict")
.master("local[2]")
.enableHiveSupport()
.getOrCreate()
// spark.sql("insert overwrite table csz_test.dws_access_domain_traffic2 partition (d='20211123')nselect domain,sum(response_size) traffics from csz_test.ods_access2 where d ='20211123' group by domain").show()
val time = spark.conf.get("spark.time","20211123")
val series = spark.conf.get("spark.series","access2")
import spark.implicits._
val templates = spark.read.format("jdbc")
.option("url", "jdbc:mysql:")
.option("dbtable", "tracking.rzdata_sql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "").load()
.filter('series === series)
.orderBy('index.asc)
.as[SQLTemplate]
.collect()
//并行执行
templates.par.map(x =>{
val str = x.sql.replace("{day}", time)
logError("sql : " + str)
//需要调优, 文件太多
spark.sql(str).write.mode(SaveMode.Overwrite)
.insertInto(s"${x.db}.${x.tbl}")
})
spark.stop()
}
}



