栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark封装SQL作业

Spark封装SQL作业

Spark封装SQL作业

SQL ON HADOOP

1. SparkSession
hive 支持--> 能够去操作Hive对应的metastrore里面的数据

spark.sql(......)
ThriftServer <==> beeline/

2.hive -e

SQL通过UI配置 ==> 存储到某个地方 ==> mysql

开发一个通用的SQL执行作业(一个或者一类)

raw ⇒ ods ⇒ 各种维度的统计分析(可能会涉及到一系列的SQL)

access_ods
统计 province
isp

MySQL表 如何设计rzdata_sql

id int
sql varchar(....)     业务sql
series varchar       业务分类   access
index int (业务内的sql顺序问题)

1. hive表 Text格式数据处理

hive建表

create table ods_access(
ip string,
proxy_ip string,
response_time bigint,
referer string,
method string,
url string,
http_code string,
request_size bigint,
response_size bigint,
cache string,
province string,
city string,
isp string,
http string,
domain string,
path string,
year string,
month string,
day string)
partitioned by (d string) 
row format delimited fields terminated by 't'
location '/csz-project/hive_test/ods_access';

上传access.log文件

218.30.86.0	-	86107	"-"	GET	http://www.ruoze.ke.qq.com/a&b=asds/sd.html	200	732	6848	MISS	北京	北京市	电信	http	www.ruoze.ke.qq.com	/a&b=asds/sd.html	2020	10	1
218.0.0.0	-	60232	"-"	POST	https://www.sina.com.cn/article/details/81201864	500	143	5488	MISS	浙江省	宁波市	电信	https	www.sina.com.cn	/article/details/81201864	2020	10	1
119.0.0.0	-	13249	"-"	POST	http://www.ruoze.ke.qq.com/s?ie=utf-8&f=8&rsv_bp=1&tn=0200	500	367	8913	MISS	贵州省	黔东南	电信	http	www.ruoze.ke.qq.com	/s?ie=utf-8&f=8&rsv_bp=1&tn=0200	2020	10	1

导入数据

load data local inpath '/data/csz-project/spark-project/data/access.log' into table ods_access PARTITION (d='20211123');

select * from ods_access where d = "20211123";

还需要两个表

dws_access_domain_traffic   根据域名统计

create table dws_access_domain_traffic   (
domain string,
traffics  string)
partitioned by (d string) 
row format delimited fields terminated by 't'
location '/csz-project/hive_test/dws_access_domain_traffic';


dws_access_province_traffic	更具省份统计
create table dws_access_province_traffic	(
province string,
traffics  string)
partitioned by (d string) 
row format delimited fields terminated by 't'
location '/csz-project/hive_test/dws_access_province_traffic';

low一点的实现方法

insert overwrite table csz_test.dws_access_domain_traffic partition (d='20211123')
select domain,sum(response_size) as traffics from csz_test.ods_access where d ='20211123'  group by domain
insert overwrite table csz_test.dws_access_province_traffic partition (d='20211123')
select province,sum(response_size) as traffics from csz_test.ods_access where d ='20211123'  group by province

可以直接写入DB ,但是分区要改为{day}

insert overwrite table csz_test.dws_access_domain_traffic partition (d='{day}')
select domain,sum(response_size) traffics from csz_test.ods_access where d ='{day}'  group by domain

insert overwrite table csz_test.dws_access_province_traffic partition (d='{day}')
select province,sum(response_size) traffics from csz_test.ods_access where d ='{day}'  group by province

可以使用IDEA的databases链接mysql

(这里补充一个, CDH文件的配置都在/var/run/cloudera-scm-agent/process 内)

package com.cityos.spark.SQLtest

//import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession

object SQLApp {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME","hdfs")
    val spark : SparkSession  = SparkSession.builder()
      .appName(this.getClass.getCanonicalName)
      .master("local[2]")
      .enableHiveSupport()
      .getOrCreate()


//    spark.sql("select * from csz_test.ods_access limit 100").show()

    import spark.implicits._

    val templates = spark.read.format("jdbc")
      .option("url", "")
      .option("dbtable", "tracking.rzdata_sql")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "").load()
      .filter('series === "access")
      .orderBy('index.asc)
      .as[SQLTemplate]
      .collect()
    //并行执行  
    templates.par.map(x =>{
      val str = x.sql.replace("{day}", "20211123")
//      logError(str)
      spark.sql(str)
    })

    spark.stop()
  }
}


启动运行,速度非常慢,因为底层是行式存储

打包上传服务器,放入lib目录下

spark-submit 
--master local[2] 
--name SQLApp 
--class com.cityos.spark.SQLtest.SQLApp 
--jars /data/csz-project/spark-project/lib/mysql-connector-java-5.1.47.jar 
--driver-class-path /data/csz-project/spark-project/lib/mysql-connector-java-5.1.47.jar 
/data/csz-project/spark-project/lib/cityos-spark-sparksql-1.0.jar
2. hive表 ORC格式数据处理

hive建表

create table ods_access2(
ip string,
proxy_ip string,
response_time bigint,
referer string,
method string,
url string,
http_code string,
request_size bigint,
response_size bigint,
cache string,
province string,
city string,
isp string,
http string,
domain string,
path string,
year string,
month string,
day string)
partitioned by (d string) 
row format delimited fields terminated by 't'
stored as orc
location '/csz-project/hive_test/ods_access2';

load data local inpath '/data/csz-project/spark-project/data/access.log' into table ods_access2 PARTITION (d='20211123');

dws_access_domain_traffic   根据域名统计

create table dws_access_domain_traffic2   (
domain string,
traffics  string)
partitioned by (d string) 
row format delimited fields terminated by 't'
stored as orc
location '/csz-project/hive_test/dws_access_domain_traffic2';


dws_access_province_traffic	更具省份统计
create table dws_access_province_traffic2	(
province string,
traffics  string)
partitioned by (d string) 
row format delimited fields terminated by 't'
stored as orc
location '/csz-project/hive_test/dws_access_province_traffic2';


如何将text转换为ORC,只要将text表数据插入ORC,带字段名就行

这里需要调优,文件太多了
但是生产上面MYSQL里不允许保留insert字段,只保留select

3 . 修改sql语句
select domain,sum(response_size),d traffics from csz_test.ods_access2 where d ='{day}'  group by domain,d


select province,sum(response_size),d traffics from csz_test.ods_access2 where d ='{day}'  group by province,d
package com.cityos.spark.SQLtest

case class SQLTemplate (id:Int, sql: String, series: String, index:  Int,db:String, tbl:String){
}

package com.cityos.spark.SQLtest

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SaveMode, SparkSession}

object SQLApp extends Logging{

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME","hdfs")
    val spark : SparkSession  = SparkSession.builder()
      .appName(this.getClass.getCanonicalName)
      .config("hive.exec.dynamic.partition.mode","constrict")
      .master("local[2]")
      .enableHiveSupport()
      .getOrCreate()

//    spark.sql("insert overwrite table csz_test.dws_access_domain_traffic2 partition (d='20211123')nselect domain,sum(response_size) traffics from csz_test.ods_access2 where d ='20211123'  group by domain").show()

    val time = spark.conf.get("spark.time","20211123")
    val series = spark.conf.get("spark.series","access2")

    import spark.implicits._

    val templates = spark.read.format("jdbc")
      .option("url", "jdbc:mysql:")
      .option("dbtable", "tracking.rzdata_sql")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "").load()
      .filter('series === series)
      .orderBy('index.asc)
      .as[SQLTemplate]
      .collect()
    //并行执行
    templates.par.map(x =>{
      val str = x.sql.replace("{day}", time)
      logError("sql : "  +  str)
      //需要调优, 文件太多
      spark.sql(str).write.mode(SaveMode.Overwrite)
        .insertInto(s"${x.db}.${x.tbl}")
    })

    spark.stop()
  }
}



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/629375.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号