整体结构
Config
package com.fuwei.bigdata.profile.conf
import org.slf4j.LoggerFactory
import scopt.OptionParser
case class Config(
env:String = "",
username:String = "",
password:String = "",
url:String = "",
cluster:String = "",
startDate:String = "",
endDate:String = "",
proxyUser:String = "",
topK:Int = 25
)
object Config{
private val logger = LoggerFactory.getLogger("Config")
def parseConfig(obj:Object,args:Array[String]):Config = {
//1、通过我们的类名获取到程序名
val programName: String = obj.getClass.getSimpleName.replaceAll("\$", "")
//2、获取到一个解析器,解析器解析参数
val parser = new OptionParser[Config]("spark sql "+programName) {
//2.1添加使用说明
head(programName,"v1.0") //就相当于抬头
//2.2给env属性赋值
//这种效果就是-v或者--v ,后面的text()是说明的内容
opt[String]('e',"env").required().action((x,config) => config.copy(env = x)).text("dev or prod")
opt[String]('n',name = "proxyUser").required().action((x,config) => config.copy(proxyUser = x)).text("proxy username")
programName match {
case "LabelGenerator" => {
logger.info("LabelGenerator")
opt[String]('n', "username").required().action((x, config) => config.copy(username = x)).text("username")
opt[String]('p', "password").required().action((x, config) => config.copy(password = x)).text("password")
opt[String]('u', "url").required().action((x, config) => config.copy(url = x)).text("url")
opt[String]('c', "cluster").required().action((x, config) => config.copy(cluster = x)).text("cluster")
}
case _ =>
}
}
parser.parse(args,Config()) match { //这个主要作用是解析参数,看参数中有没有值
case Some(conf) => conf
case None => {
logger.error("can not parse args")
System.exit(-1)
null
}
}
}
}
LabelGenerator
package com.fuwei.bigdata.profile
import com.qf.bigdata.profile.conf.Config
import com.qf.bigdata.profile.utils.{SparkUtils, TableUtils}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
object LabelGenerator {
private val logger = LoggerFactory.getLogger(LabelGenerator.getClass.getSimpleName)
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
//1、解析参数
val params: Config = Config.parseConfig(LabelGenerator, args)
//2、获取SparkSession
val spark: SparkSession = SparkUtils.getSparkSession(params.env, LabelGenerator.getClass.getSimpleName)
//val spark: SparkSession = SparkUtils.getSparkSession("dev", "test")
import spark.implicits._
//3、读取归属地数据
val df: Dataframe = spark.read.option("sep", "t").csv("src/main/resources/phoneinfo.txt").toDF("prefix", "phone", "province", "city", "isp", "post_code", "city_code", "area_code", "types")
df.createOrReplaceTempView("phone_info") //构建一个虚表
//4、baseFeatrueSql
val userSql =
"""
|select
|t1.distinct_id as uid,
|t1.gender,
|t1.age,
|case when length(t1.mobile) >= 11 then substring(t1.mobile,-11,length(t1.mobile)) else '' end as mobile,
|case when size(split(t1.email,'@')) = 2 then split(t1.email,'@')[1] else '' end email_suffix,
|t2.model
|from ods_news.user_ro as t1 left join dwb_news.user_base_info as t2
|on t1.distinct_id = t2.uid
|""".stripMargin
val userDF: Dataframe = spark.sql(userSql)
userDF.createOrReplaceTempView("user_info")
//4.2baseFeatureSql
val baseFeatureSql =
"""
|select
|t1.uid,
|t1.gender,
|t1.age,
|t1.email_suffix,
|t1.model,
|concat(ifnull(t2.province,''),ifnull(t2.city,'')) as region
|from user_info as t1 left join phone_info as t2
|on
|t2.phone = substring(t1.mobile,0,7)
|""".stripMargin
//4.3、建表
spark.sql(
"""
|create table if not exists dws_news.user_profile_base(
|uid string,
|gender string,
|age string,
|email_suffix string,
|model string,
|region string
|)
|stored as parquet
|""".stripMargin)
//4.4 查询生成df
val baseFeaturedDF: Dataframe = spark.sql(baseFeatureSql)
baseFeaturedDF.cache() //对查询的数据进行持久化内存中,用完之后要关闭
//把查询的数据导入到数据表中(查询生成df数据到HDFS)
baseFeaturedDF.write.mode(SaveMode.Overwrite).saveAsTable("dws_news.user_profile_base")
//5、把数据保存到clickhouse:1.meta:(filename,pl),2.占位符
val meta = TableUtils.getClickHouseUserProfilebaseTable(baseFeaturedDF,params)
//6、插入ClickHouse数据
//6.1插入的sql
val insertCHSql =
s"""
|insert into ${TableUtils.USER_PROFILE_CLICKHOUSE_DATAbase}.${TableUtils.USER_PROFILE_CLICKHOUSE_TABLE}(${meta._1}) values(${meta._2})
|""".stripMargin
logger.warn(insertCHSql)
//6.2按分区插入数据到clickhouse的表
baseFeaturedDF.foreachPartition(partition => {
TableUtils.insertbaseFeaturedTable(partition,insertCHSql,params)
})
baseFeaturedDF.unpersist()//关闭持久化
//7、释放资源
spark.stop()
logger.info("job has success")
}
}
ClickHouseUtils
package com.fuwei.bigdata.profile.utils
import ru.yandex.clickhouse.ClickHouseDataSource
import ru.yandex.clickhouse.settings.ClickHouseProperties
object ClickHouseUtils {
def getDataSource(username: String, password: String, url: String): ClickHouseDataSource = {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver")
val properties = new ClickHouseProperties()
properties.setUser(username)
properties.setPassword(password)
val dataSource = new ClickHouseDataSource(url, properties)
dataSource
}
def df2TypeName2CH(dfCol: String): Unit ={
dfCol.split(",").map(line => {
val fields: Array[String] = line.split(" ")
val fName: String = fields(0)
val fType: String = dfType2chType(fields(1)) //开始类型的转换
fName +" "+ fType //最后结果变成为age String, gender String
}).mkString(",")
}
def dfType2chType(fieldType: String):String = {
fieldType.toLowerCase() match {
case "string" => "String"
case "integer" => "Int32"
case "long" => "Int64"
case "bigint" => "Int64"
case "double" => "Float64"
case "float" => "Float32"
case "timestamp" => "Datetime"
case _ => "String"
}
}
}
SparkUtils(这个连接以后可以通用)
package com.fuwei.bigdata.profile.utils
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object SparkUtils {
private val logger = LoggerFactory.getLogger(SparkUtils.getClass.getSimpleName)
def getSparkSession(env:String,appName:String):SparkSession = {
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.sql.hive.metastore.version", "1.2.1")
.set("spark.sql.cbo.enabled", "true")
.set("spark.hadoop.dfs.client.block.write.replace-datanode-on-failure.enable", "true")
.set("spark.hadoop.dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")
env match {
case "prod" => {
conf.setAppName(appName+"_prod")
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
}
case "dev" => {
conf.setMaster("local[*]").setAppName(appName+"_dev").set("spark.sql.hive.metastore.jars","maven")
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
}
case _ => {
logger.error("not match env")
System.exit(-1)
null
}
}
}
}
TableUtils
package com.fuwei.bigdata.profile.utils
import com.qf.bigdata.profile.conf.Config
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
import org.apache.spark.sql.{Dataframe, Row}
import org.slf4j.LoggerFactory
import ru.yandex.clickhouse.{ClickHouseConnection, ClickHouseDataSource}
import java.sql.PreparedStatement
object TableUtils {
def insertbaseFeaturedTable(partition: Iterator[Row], insertCHSql: String, params: Config): Unit = {
//1、获取到Clickhouse的数据源
val dataSource: ClickHouseDataSource = ClickHouseUtils.getDataSource(params.username, params.password, params.url)
val connection: ClickHouseConnection = dataSource.getConnection
val ps: PreparedStatement = connection.prepareStatement(insertCHSql) //插入数据
var batchCount = 0
val batchSize = 2000
var lastBatchTime = System.currentTimeMillis()
//2、填充占位符对应的参数值
partition.foreach(row => {
var index = 1//设置值的索引下标
row.schema.fields.foreach(field => {
field.dataType match {
case StringType => ps.setString(index,row.getAs[String](field.name))
case LongType => ps.setLong(index,row.getAs[Long](field.name))
case IntegerType => ps.setInt(index,row.getAs[Int](field.name))
case _ => logger.error(s"type is err,${field.dataType}")
}
index +=1
})
//3、添加到批
ps.addBatch()
batchCount += 1
//4、控制批次大小
var currentTime = System.currentTimeMillis()
if (batchCount >= batchSize || lastBatchTime < currentTime - 3000){
ps.executeBatch()//执行一批
logger.warn(s"send data to clickhouse, batchNum:${batchCount},batchTime:${(currentTime - lastBatchTime)/1000} s")
batchCount = 0
lastBatchTime = currentTime
}
})
//5、控制如果没有满足以上条件的时候循环结束之后立刻执行ps中的数据
ps.executeBatch()
logger.warn(s"send data to clickhouse, batchNum:${batchCount},batchTime:${(System.currentTimeMillis() - lastBatchTime)/1000} s")
//6、释放资源
ps.close()
connection.close()
}
private val logger = LoggerFactory.getLogger(TableUtils.getClass.getSimpleName)
val USER_PROFILE_CLICKHOUSE_DATAbase = "app_news" //创建的数据库
val USER_PROFILE_CLICKHOUSE_TABLE = "user_profile_base" //创建的表
def getClickHouseUserProfilebaseTable(baseFeaturedDF: Dataframe, params: Config ):(String,String)= {
//schema就是获取表的所有元数据(包括以上三个)
//foldLeft是折叠函数
val (fileName,fieldType,pl) = baseFeaturedDF.schema.fields.foldLeft("","","")(
(z,f) => {
//我们要返回的数据类型是:(age,gender , age string, gender string, ?,?)
if (z._1.nonEmpty && z._2.nonEmpty && z._3.nonEmpty){
//说明不是第一次拼接
(z._1 + "," + f.name, z._2+","+f.name+" "+f.dataType.simpleString, z._3 + ",?")
}else{
(f.name,f.name+" "+ f.dataType.simpleString,"?")
}
}
)
val chCol = ClickHouseUtils.df2TypeName2CH(fieldType)
//5、获取到连接到ch的cluster
val cluster:String = params.cluster
//6、创建数据库
val createCHDatabaseSql =
s"""
|create database if not exisths ${USER_PROFILE_CLICKHOUSE_DATAbase}
|""".stripMargin
//7、创建表
val createCHTableSql =
s"""
|create table ${USER_PROFILE_CLICKHOUSE_DATAbase}.${USER_PROFILE_CLICKHOUSE_TABLE}(${chCol})
|ENGINE = MergeTree()
|ORDER BY(uid)
|""".stripMargin
//8、删除表的SQL
val dropCHTableSql =
s"""
|drop table if exists ${USER_PROFILE_CLICKHOUSE_DATAbase}.${USER_PROFILE_CLICKHOUSE_TABLE}
|""".stripMargin
//9、连接clickhouse
val dataSource:ClickHouseDataSource = ClickHouseUtils.getDataSource(params.username,params.password,params.url)
val connection: ClickHouseConnection = dataSource.getConnection
logger.warn(createCHDatabaseSql)
var ps: PreparedStatement = connection.prepareStatement(createCHDatabaseSql)//建库
ps.execute()
logger.warn(dropCHTableSql)
ps = connection.prepareStatement(dropCHTableSql) //删表
ps.execute()
logger.warn(createCHTableSql)
ps = connection.prepareStatement(createCHTableSql)//建表
ps.execute()
ps.close()
connection.close()
logger.info("success!!!!!!!!!")
(fileName,pl)
}
}
xml
4.0.0 com.fuwei.bigdata user-profile1.0-SNAPSHOT 2.11.12 2.3.9 2.10.1 3.2.0 2.6 2.4.5 compile 1.2.3 com.alibaba fastjson${json.version} org.apache.spark spark-core_2.11${spark.version} ${scope.type} org.apache.spark spark-sql_2.11${spark.version} ${scope.type} org.apache.spark spark-hive_2.11${spark.version} ${scope.type} mysql mysql-connector-java5.1.47 log4j log4j1.2.17 ${scope.type} commons-codec commons-codec1.6 org.scala-lang scala-library${scala.version} ${scope.type} org.scala-lang scala-reflect${scala.version} ${scope.type} com.github.scopt scopt_2.114.0.0-RC2 org.apache.hudi hudi-spark-bundle_2.110.5.2-incubating ${scope.type} org.apache.spark spark-avro_2.11${spark.version} com.hankcs hanlpportable-1.7.8 org.apache.spark spark-mllib_2.11${spark.version} ${scope.type} org.apache.hive hive-jdbc1.2.1 ${scope.type} javax.mail mailorg.eclipse.jetty.aggregate *ru.yandex.clickhouse clickhouse-jdbc0.2.4 alimaven http://maven.aliyun.com/nexus/content/groups/public/ never never src/main/scala src/test/scala org.apache.maven.plugins maven-assembly-plugin${maven-assembly-plugin.version} jar-with-dependencies make-assembly package single net.alchim31.maven scala-maven-plugin${scala-maven-plugin.version} compile testCompile -dependencyfile ${project.build.directory}/.scala_dependencies org.apache.maven.plugins maven-archetype-plugin2.2
测试
##1. 将core-site.xmlyarn-site.xmlhive-site.xml拷贝到工程resources目录下
##2. clean and package
##3. hive metastore服务必须开
##4. yarn/hdfs必须要开
##5. clickhouse/chproxy也要打开
##6. 编写提交jar包的spark脚本
${SPARK_HOME}/bin/spark-submit
--jars /data/apps/hive-1.2.1/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar
--conf spark.sql.hive.convertmetastoreParquet=false
--conf spark.executor.heartbeatInterval=120s
--conf spark.network.timeout=600s
--conf spark.sql.catalogImplementation=hive
--conf spark.yarn.submit.waitAppCompletion=false
--name log2hudi
--conf spark.task.cpus=1
--conf spark.executor.cores=4
--conf spark.sql.shuffle.partitions=50
--master yarn
--deploy-mode cluster
--driver-memory 1G
--executor-memory 3G
--num-executors 1
--class com.qf.bigdata.profile.LabelGenerator
/data/jar/user-profile.jar
-e prod -u jdbc:clickhouse://10.206.0.4:8321 -n fw-insert -p fw-001 -x root -c 1
##7. 通过clickhouse-client去测试
clickhouse-client --host 10.206.0.4 --port 9999 --password qwert



