数据源:数据源通过sqoop同步到hbase中。
标签存储:hbase:用于存储标签。mysql:存储标签元数据。es:标签检索。
标签开发:采用spark引擎进行开发。
一、数据导入
将数据同步至hbase中。有以下几种方式。1、sqoop直接迁移。2将数据迁移至hdfs中,然后通过hbase提供的importtsv工具将数据导入至hbase中,这里也有两种方式,默认的方式是直接put的形式,还有一种是bulkload的方式,将文件转换成hfile后再加载到hbase中。
二、标签开发
1、开发一个工具类 提供两个方法(1)向hbase写入数据。(2)读取hbase中的数据
object HbaseTools{
def read(spark:SparkSession,zkHost:String,zkPort:String,table:String,family:String,fields:Seq[String]):Dataframe={
val sc:SparkContext=spark.sparkContext
//1、设置hbase配置信息 zk地址和端口号
val conf=HbaseConfiguration.create()
conf.set("hbase.zookeeper.quorum",zkHost)
conf.set("hbase.zookeeper.property.clientPort",zkPort)
//2、设置表的名称
conf.set(TableInputFormat.Input_Table,table)
//3、设置读取的列簇和列名称
val scan:Scan=new Scan()
//4、设置列簇
val cfBytes:Array[Byte]=Bytes.toBytes(family)
scan.addFamily(cfBytes)
//5、设置列名称
fields.foreach{
field=>
scan.addColumn(cfBytes,Bytes.toBytes(filed))
}
//6、设置scan过滤数据
conf.set(TableInputFormat.SCAN,base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray()))
val hbaseRdd=sc.newApiHadoopRDD(conf,classOf(TableInputFormat),classOf(ImmutableBytesWritable),classOf(Result))
)
//将rdd转化成dataframe
val rowsRdd=hbaseRdd.map{case (_,result)=>
val values:Seq[String]=fields.map{field=>
val value=result.getValue(cfBytes,Bytes.toBytes(field))
Bytes.toString(value)
}
//将values转换成row
Row.fromSeq(values)
}
//自定义schema
val schema:StructType=StructType(fields.map{filed=>
StructField(filed,StringType,nullable=true)
})
//返回df
spark.createDataframe(rowsRdd,schema)
}
def write(dataframe:Dataframe,zkHost:String,zkPort:String,table:String,family:String,rowkeyColumn:String):Unit={
val conf=HbaseConfiguration.create()
conf.set("hbase.zookeeper.quorum",zkHost)
conf.set("hbase.zookeeper.property.clientPort",zkPort)
//2、设置表的名称
conf.set(TableOutputFormat.Output_Table,table)
//将datafram转换为rdd
val cfbytes=Bytes.toBytes(family)
val columns:Array[String]=datatrame.columns
val datasrdd=dataframe.rdd.map{row=>
val rowkey=row.getAs[String](rowkeyColumn)
val rkBytes:Array[Byte]=Bytes.toBytes(rowkey)
//构建put对象
val put=new Put(rkBytes)
columns.foreach{column=>
val value=Bytes.toBytes(row.getAs[String](column))
put.addColumn(cfbytes,Bytes.toBytes(column),value))
}
(new ImmutableBytesWriteable(rkBytes,put))
}
datasrdd.saveAsNewApiHadoopFile(
"输出路径",
classOf(ImmutableBytesWriteable),
classOf(Put),
classOf(TableOutputFormat),
conf
)
}
}
2、开发第一个标签用户性别(核心代码)
//1、获取sparksession对象
//2、根据标签id从mysql中读取标签数据
//依据标签id获取对应mysql中标签元数据
val tableTage:String="(select id,name,rule,level from tbl_basic_tag where id =318
union
select id,name,rule,level from tbl_basic_tag where pid=318) AS tag_table "
val basicTageDf=spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://主机名:3306/?useUnicode=true&characterEncoding=UTF-8")
.option("dbtable",tagTable)
.option("user","root")
.option("password","123456")
.load()
//3、解析rule,从hbase中读取业务数据
val tagRule:String=basicTageDf.filter($"level"===4).head().getAs[String]("rule")
//解析rule,存入map中
val tagRuleMap=tagRule.split("\n")
.map{line=>
val array(key,value)=line.trim.split("=")
(key,value)
}.toMap
//判断数据源
if("hbase".equals(tagRuleMap("inType").toLowerCase)){
val hbasemeta=Hbasemeta.getmetaData(tagRuleMap)//这里自己封装一个hebasemeta类 写一个伴生对象 这里省略
businessDf=HbaseTools.read(spark,hbasemeta.zkHosts,hbasemeta.zkPort,hbasemeta.hbaseTable,hbasemeta.FieldName.split(","))
}
else{
System.exit(-1)
}
//获取属性标签
val attrTagRuleDf=basicTageDf.filter($"level"===5)
.select($"rule",$"name")
//将属性标签与业务数据进行关联
val modelDf=businessDf.join(attrTagRuleDf,businessDf("gender")===attrTagRuleDf("rule"),inner)
.select( $"id".as ("userId"),$"name".as("gender"))
//4、结果写入hbase
HbaseTools.write(modelDf,"主机名","2181","tbl_profile","user","userId")
//结束
spark.stop()
当我们开发第二个标签的时候,我们会发现在进行规则匹配的过程中会产生很多重复的代码,我们可以将规则匹配这个过程封装成一个工具类(TagTools)并且设计一个标签模板。
工具类:
object TagTools{
//将dataframe转换成map 为了后续使用广播变量避免join
def convertMap(tagDf:Dataframe):Map(String,String){
import tagDf.sparkSession.implicits._
tagDf.filter($"level"===5)
.select($"rule",$"name")
.as[(String,String)]
.rdd
.collectAsMap
.toMap
}
//打标签,使用广播变量
def ruleMatchTag(dataframe:Dataframe,field:String,tagDf:Dataframe):dataframe{
val spark=dataframe.sparkSession
import spark.implicits._
//获取规则map
val attrTagRuleMap:Map[String,String]=convertMap(tagDf)
//将map集合数据封装成广播变量
val attrTagRuleMapBroadCast =spark.sparkContext.broadcast(attrTagRuleMap:Map)
//自定义UDF函数打标签
val field_to_tag=udf(
(field:String)=>attrTagRuleMapBroadCast.value(field)
)
}
//计算标签
val modelDf:Dataframe=dataframe
.select($"id".as("userId"),filed_to_tag(col(field).as(field)))
modelDf
}
标签模板:
采用模板设计模式,创建一个标签模型开发基类
trait BasicModel extends Logging{
//变量声明
val spark:SparkSession = _
//1.初始化,构建sparksession对象
def init():Unit={
val sparkConf=new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[*]")
.set("spark.sql.shuffle.partitions","4")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.rigisterKryoClasses(
Array(classOf[ImmutableBytesWritable],classOf[Result],classOf[Put])
spark=SparkSession.builder()
.config(sparkConf)
//启用与hive集成
.enableHiveSupport()
.config("hive.metastore.uris","thrift://bigdata-cdh01.itcast.cn:9083")
.config("spark.sql.warehouse.dir","xxxx")
.getOrCreate()
)
}
//2.获取标签数据
def getTagData(tagId:Long):dataframe={
val tagTable=s"(select id,name,rule,level from tbl_basci_tag
where id=$tagId
union
select id,name,rule,level from tbl_basci_tag where pid=$tagId) as tag_table
"
val basicTagDf=spark.read
.format("jdbc")
.option("driver","com.mysql.jdbc.Driver")
.option("url","数据库地址")
.option("dbtable",tagTable)
.option("user","root")
.option("password","123456")
.load()
basicTagDf
}
//3.获取业务数据
def getBusinessData(tagDf:Dataframe):dataframe={
import tagDf.sparkSession.implicits._
//获取业务标签规则
val tagRule=tagDf.filter($"level"===4)
.head()
.getAs[String]("rule")
//解析rule封装成map
val tagRuleMap=tagRule.split("\n")
.map(line=>
val array(key,value)=line.trim.split("=")
(key,value).toMap
)
//判断数据源,读取业务数据
val businessDf=null
if("hbase".equals(tagRuleMap("inType").toLowerCase)){
val hbasemeta=Hbasemeta.getmetaData(tagRuleMap)
businessDf=HbaseTools.read(spark,hbasemeta.zkHosts,hbasemeta.zkPort,hbasemeta.hbaseTable,hbasemeta.FieldName.split(","))
}
else{
System.exit(-1)
}
businessDf
}
//4.打标签
def doTag(getBusinessDf:Dataframe,tagDf:Dataframe):Dataframe={
}
//5.标签结果保存至hbase
def saveTag(modelDf:Dataframe):Unit={
HbaseTools.write(modelDf,"主机名","2181","tbl_profile","user","userId")
}
//6、关闭资源
def close():Unit={
if(null!=spark){
spark.stop()
}
}
//执行顺序
def executeModel(tagId:Long){
init()
try{
//获取标签数据
val tagDf:Dataframe=getTagData(tagId){
tagDf.persist(StorageLevel.MEMORY_AND_DISK)
tagDf.count()
//获取业务数据
val businessDf=getBusinessData(tagDf)
//计算标签
val modelDf=doTag(businessDf,tagDf)
//保存标签
saveTag(modelDf)
tagDf.unpersist()
}catch{
}
finally{
close()
}
}
}
}
}
开发政治面貌标签
//用户的政治面貌标签
class PoliticalModel extends BasicModel{
override def doTag(businessDf:Dataframe,tagDf:Dataframe):Dataframe={
//调用工具类,计算标签
val modelDf=Tagtools.ruleMatchTag(businessDf,"politicalface",tagDf)
modelDf
}
}
object PoliticalModel{
def main(agrs:Array[String]):Unit={
//创建标签模型
val tagModel=new PoliticalModel()
tagModel.execute(328l)
}
}
通过以上代码可以发现代码当中存在很多硬编码的部分,所以我们需要将以上代码进行重构
1、抽象出一个配置文件config.properties,将集群等配置信息写在配置文件里
2、重构sparksession
将是否是本地模式、是否继承hive也写在配置文件config.properties当中,
将spark应用参数放在单独的另一个配置文件当中spark.config(这里注意.config文件里的value值需要写双引号,.properties不需要)
//通过以下方式遍历配置文件的属性,利用typesafe的config库
val config=ConfigFactory.load("spark.conf")
for(entry->config.entrySet.asScala){
val resource=entry.getValue.origin().resource()
//判断文件的来源
if("spark.conf".equals(resource)){
//获取key
entry.getKey()
//获取value
entry.getValue()
}
}
重构标签基类
为什么要重构标签基类?我们发现给到一个标签类,我们好像并不知道这个标签类是规则匹配类型还是统计类型标签。
public enum ModelType{
MATCH,//规则匹配
ML,//机器学习类
STATISTICS//统计类
}
abstract class AbstractModel(modelName:String,ModelType:String)extends Logging{
//将之前写的BasicModel类所有代码拷贝到此处即可
}
自定义外部数据源
什么意思?举个例子,自定义外部数据源可以让我们使用
spark.format(“hbase”)
.option(“xxxx”)
.option(“xxxx”)
.load()
的方式进行读取或者写入数据,大家知道,默认的sparksql是提供mysql、csv、json等格式,默认是没有hbase的,我们需要自定义外部数据源。
自定义数据源的方法如下:我们要实现两个类,然后分别实现对应的接口。
class HbaseRelation extends baseRelation with TableScan with InsertableRelation with Serializable{
//相当于sparksession
override def sqlContext:SqlContext=
//schema信息
override def schema:StructType=
//加载数据
override def buildScan(): RDD[ROW]=
//写入数据
override def insert(data:Dataframe,override:Boolean):Unit=
}
class DefaultSource extends RelationProvider with CreatableRelationProvider{
override def createRelation(sqlContext:SQLContext,
parameters:Map[String,String]
):baseRelation
override def createRelation (sqlContext:SQLContext,
mode:SaveMode,
parameters:Map[String,String]
data:Dataframe
):baseRelation
}
我们只需要将之前的工具类读取、写入hbase的方法修改修改,添加到HbaseRelation中即可。
那么做完以上操作还需要注册数据源,让我们可以用spark.format(“hbase”)的形式进行操作,如果不注册数据源,format里面我们只能写(“HbaseRelation的完整类路径”)
那么怎么进行注册呢?
1、需要让我们的DefaultSource类继承DataSourceRegister接口,实现shortName方法。
2、在项目resource目录下创建meta-INF/services 二级目录,在目录下创建一个文件,文件全名称就是datasourceregister这个类的全名称(文本文件),文件里拷贝defaultsouce类的全限定名。
年龄段标签开发
步骤
一、在标签平台进行注册
新建四级标签(年龄段)
新建五级标签(50后、60后、。。。。20后)
二、代码开发
class AgeRangeModel extends AbstractModel("年龄段标签",ModelType.STATISTICS){
override def doTag(businessDf:Dataframe,tagDf:Dataframe):Dataframe={
import businessDf.sparkSession.implicits._
import org.apache.spark.sql.functions._
//自定义udf函数,解析tagDf的rule 将“1990.1.1-1999.12.31”解析为二元组
val rule_to_tuple=udf(
(rule:String)=>{
val Array(start,end)=rule.split("-").map(_.toInt)
(start,end)
}
)
}
//针对属性标签中的规则rule使用udf函数提取start和end
attrDf=tagDf.filter($"level"===5)
.select($"name",rule_to_tuple($"rule").as("rules"))
.select($"name",$"rules._1".as("start"),$"rules._2".as("end"))
//使用业务数据与属性标签规则数据进行join关联
val dataModel=bussinessDf
.select($"id",regexp_replace($"birthday","-","").cast(IntegerType).as("bornDate"))
.join(attDf)
.where($"bornDate".beetween($"start",$"end"))
.select($"id".as("userId"),$"name".as("ageRange"))
dataModel
}
object AgeRangeModel {
def main():Unit{
val tagModel=new AgeRangeModel().execute(338l)
}
}



