1、项目环境
具有hive、hadoop、hbase的服务器
使用idea进行开发
2、原理
通过使用spark的机器学习算法对已经编写建表查询好的用户id和对新闻列表的操作,在过去时间权重和用户行为权重进行打分,编写召回算法,以topK进行推荐类似的文章id对应给用户id。
3、编程
pom.xml环境依赖
4.0.0 com.qf.bigdata recommend1.0 2.11.12 2.3.9 2.10.1 3.2.0 2.6 2.4.5 compile 1.2.3 1.3.6 2.8.1 com.alibaba fastjson${json.version} org.apache.spark spark-core_2.11${spark.version} ${scope.type} org.apache.spark spark-sql_2.11${spark.version} ${scope.type} org.apache.spark spark-hive_2.11${spark.version} ${scope.type} org.apache.spark spark-mllib_2.11${spark.version} ${scope.type} mysql mysql-connector-java5.1.47 log4j log4j1.2.17 ${scope.type} commons-codec commons-codec1.6 org.scala-lang scala-library${scala.version} ${scope.type} org.scala-lang scala-reflect${scala.version} ${scope.type} com.github.scopt scopt_2.114.0.0-RC2 org.apache.spark spark-avro_2.11${spark.version} org.apache.hive hive-jdbc2.3.7 ${scope.type} javax.mail mailorg.eclipse.jetty.aggregate *org.apache.hadoop hadoop-client${hadoop.version} ${scope.type} org.apache.hbase hbase-server${hbase.version} ${scope.type} org.apache.hbase hbase-client${hbase.version} ${scope.type} org.apache.hbase hbase-hadoop2-compat${hbase.version} ${scope.type} org.jpmml jpmml-sparkml1.5.9 alimaven http://maven.aliyun.com/nexus/content/groups/public/ never never src/main/scala src/test/scala org.apache.maven.plugins maven-assembly-plugin${maven-assembly-plugin.version} jar-with-dependencies make-assembly package single net.alchim31.maven scala-maven-plugin${scala-maven-plugin.version} scala-compile-first process-resources add-source compile compile testCompile -dependencyfile ${project.build.directory}/.scala_dependencies org.apache.maven.plugins maven-archetype-plugin2.2 org.codehaus.mojo build-helper-maven-plugin1.8 add-source generate-sources add-source src/main/java add-test-source generate-test-sources add-test-source src/test/java
文件分类
记得把服务器中的hadoop的etc下的hadoop下的core-site.xml,hdfs-site.xml,hive-site.xml保存到上面的resource文件夹里面
3.1 普通的参数配置,就是一个类型匹配是开发环境还是生产环境,对应不同的环境加载不同的mavenjar包
package com.qf.bigdata.conf
import org.slf4j.LoggerFactory
case class Config(
env:String = "",
hbaseZK:String = "",
hbasePort:String = "2181",
hFileTmpPath:String = "",
tableName:String = "recommend:news-cf",
irisPath:String = "",
proxyUser:String = "root",
topK:Int = 10
)
object Config {
private val logger = LoggerFactory.getLogger(Config.getClass.getSimpleName)
def parseConfig(obj: Object, args: Array[String]): Config = {
//1. 获取到程序名称
val programName = obj.getClass.getSimpleName.replace("$", "")
//2. 类似于getopts命令
//2.1 得到解析器
val parser = new scopt.OptionParser[Config](s"ItemCF ${programName}") {
head(programName, "v1.0")
opt[String]('e', "env").required().action((x, config) => config.copy(env = x)).text("dev or prod")
opt[String]('x', "proxyUser").optional().action((x, config) => config.copy(proxyUser = x)).text("proxy username")
programName match {
case "ItemCF" => {
logger.info(s"ItemCF is staring ---------------------------->")
opt[String]('z', "hbaseZK").required().action((x, config) => config.copy(hbaseZK = x)).text("hbaseZK")
opt[String]('p', "hbasePort").required().action((x, config) => config.copy(hbasePort = x)).text("hbasePort")
opt[String]('f', "hFileTmpPath").required().action((x, config) => config.copy(hFileTmpPath = x)).text("hFileTmpPath")
opt[String]('t', "tableName").required().action((x, config) => config.copy(tableName = x)).text("tableName")
opt[Int]('k', "topK").required().action((x, config) => config.copy(topK = x)).text("topK")
}
case _ =>
}
}
//2.2 解析
parser.parse(args, Config()) match {
case Some(conf) => conf
case None => {
logger.error(s"cannot parse args")
System.exit(-1)
null
}
}
}
}
package com.qf.bigdata.utils
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
//spark工具类,主要是对sparksession对象的设置
object SparkUtils {
private val logger = LoggerFactory.getLogger(SparkUtils.getClass.getSimpleName)
//获取一个sparksession对象
def getSparkSession(env: String,appName: String) : SparkSession = {
val conf = new SparkConf()
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.set("spark.sql.hive.metastore.version","1.2.1")
.set("spark.sql.cbo.enabled","true")
.set("spark.hadoop.dfs.client.block.write.replica-datanode-on=failuer.enable","true")
.set("spark.hadoop.dfs.client.block.write.replica-datanode-on=failure.policy","NEVER")
env match {
case "prod" =>{ //环境类型匹配,设置appName和加载hive的mavenjar包
conf.setAppName(appName+"_prod")
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
}
case "dev" =>{
conf.setMaster("local[*]").setAppName(appName+"_dev").set("spark.sql.hive.metastore.jars","maven")
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
}
case _ =>{
logger.error("not match env")
System.exit(-1)
null
}
}
}
}
3.2针对表格中的权重算法:
3.2.1创建一个枚举,装用户行为的常量
package com.qf.bigdata.constant
object Action extends Enumeration {
type Action = Value
val CLICK = Value("点击")
val SHARE = Value("分享")
val COMMENT = Value("评论")
val COLLECT = Value("收藏")
val LIKE = Value("点赞")
//下面这个函数是把所有枚举打印出来
def showAll = this.values.foreach(println)
//根据枚举常量名称查询枚举的值
def withNameOpt(name:String):Option[Value] = this.values.find(_.toString == name)
}
package com.qf.bigdata.constant
//常量类:以后公共常量都可以放在此类中
object Constant {
//在定义新闻文章的有效时间,表示文章在前100天内具备最大价值,超过一百天。价值就梯度下滑
//距离这个时间越远,时间价值下降越快
val ARTICLE_AGING_TIME = 100
}
3.2.2时间权重的一个工具类
就是专门把字符串转换为date格式,其实时间权重是以100天以内为主
package com.qf.bigdata.utils
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
//时间工具类
object DateUtils {
//date日期对于距今的天数
def diffDay2Now(date: String) = {
//获取当天
val now: Calendar = Calendar.getInstance()
//获取当天的毫秒
val today: Long = now.getTimeInMillis
//获取事件发生的时间
val current: Long = string2date(date).getTime
//获取相差的天数
val between:Long =(today -current) /(1000*3600*24)
//返回结果
Integer.parseInt(String.valueOf(between))
}
private val date_format = "yyyyMMdd"
//定义一个字符串转date的函数
def string2date(date:String):Date = {
val fmt = new SimpleDateFormat(date_format)
fmt.parse(date)
}
}
下面这个工具类是专门计算时间权重的
package com.qf.bigdata.udfs
import com.qf.bigdata.constant.{Action, Constant}
import com.qf.bigdata.utils.DateUtils
//函数工具类
object RateUDF {
//归一化函数 不论d是多少,返回结果都是0-1之间
private def sigmoid(d: Double):Double = 1 / (1 + Math.exp(1.0 - d))
//求行为的权重
def getActionWeight(action:String) = {
Action.withNameOpt(action).getOrElse() match {
case Action.CLICK => 0.1f
case Action.LIKE => 0.15f
case Action.COLLECT => 0.2f
case Action.SHARE => 0.25f
case Action.COMMENT =>0.3f
case _ => 0.0f
}
}
//求时间的权重的函数
//sigmoid:归一化时间权重,把值控制在0-1
//sigmoid = 1/ (1+Math.exp(1-x))
//w = sigmodi((AGING_TIME -x -7) * 0.8)
//date:行为产生的时间
def getDateWeight(date: String) ={
try{
//获取(数据价值时间范围-数据行为时间距今时间)的差
var interval:Int =Constant.ARTICLE_AGING_TIME - DateUtils.diffDay2Now(date)
if(interval < 0) interval = 1 //表示行为发生时间已经超过数据最优价值的时间
val x:Double = interval.toDouble - 7
sigmoid(x * 0.8).toFloat
}catch {
case e:Exception =>e.printStackTrace()
0.0f
}
}
//根据行为和时间求出评分
def action2rate(action:String,date:String) :Float = {
//行为的权重 * 时间的权重
val rate = getActionWeight(action)* getDateWeight(date)
rate
}
}
---------------------------------------------Hbase工具类,主要是读取hbase中的数据---------------------------
package com.qf.bigdata.utils
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.{HbaseConfiguration, HConstants, KeyValue, TableName}
import org.apache.hadoop.hbase.client.{Admin, Connection, ConnectionFactory, Table}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
class HbaseUtils(spark:SparkSession, hbaseZk:String, hbasePort:String) {
private val logger = LoggerFactory.getLogger(HbaseUtils.getClass.getSimpleName)
def hbaseConfig():Configuration = {
val configuration: Configuration = HbaseConfiguration.create()
configuration.set(HConstants.ZOOKEEPER_QUORUM, hbaseZk)
configuration.set(HConstants.ZOOKEEPER_CLIENT_PORT, hbasePort)
configuration
}
def getConnection(config:Configuration):Connection = ConnectionFactory.createConnection(config)
def deletePath(path:String) = {
val hdfsPath = new Path(path)
val fs: FileSystem = FileSystem.get(new URI(path), new Configuration())
if (fs.exists(hdfsPath)) {
fs.delete(hdfsPath, true)
}
}
def loadHFile2Hbase(hfileRDD:RDD[(ImmutableBytesWritable, KeyValue)], tableName:String, hFileTmpPath:String) = {
//1. 获取到真实路径
val hFilePath = s"${hFileTmpPath}/${String.valueOf(System.currentTimeMillis())}"
//2. 获取到Hbase的配置
val configuration: Configuration = hbaseConfig()
//3. 设置保存到hbase的表
configuration.set(TableOutputFormat.OUTPUT_TABLE, tableName)
//4. 获取到连接对象
val connection: Connection = getConnection(configuration)
logger.warn(s"hbase connection create successful!!!")
//5. 获取到admin和table
val admin: Admin = connection.getAdmin
val table: Table = connection.getTable(TableName.valueOf(tableName))
//6. 获取到job
val job: Job = Job.getInstance()
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
job.setOutputFormatClass(classOf[HFileOutputFormat2])
//7. 校验hfile路径是否存在,如果存在就删除
deletePath(hFilePath)
//8. 将RDD数据写入到指定的HDFS路径
hfileRDD.coalesce(10)
.saveAsNewAPIHadoopFile(
hFilePath,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
job.getConfiguration
)
//9. 获取到增量导入数据的对象
val loader = new LoadIncrementalHFiles(configuration)
//10. 增量导入到hbase
loader.doBulkLoad(
new Path(hFilePath),
admin,
table,
connection.getRegionLocator(TableName.valueOf(tableName))
)
logger.info(s"load hFile 2 hbase successful, hFilePath:${hFilePath}")
}
}
object HbaseUtils {
def apply(spark: SparkSession, hbaseZk: String, hbasePort: String): HbaseUtils = new HbaseUtils(spark, hbaseZk, hbasePort)
}
3.2.3 就是查询数据库,把数据表转换为矩阵(实质上是df和矩阵的转换)
package com.qf.bigata.transformer
import com.qf.bigdata.udfs.RateUDF
import org.apache.spark.sql.{Dataframe, Dataset, Row, SparkSession}
class ModelData(spark:SparkSession, env:String) {
// 注册udf函数
spark.udf.register("action2rate", RateUDF.action2rate _)
def generateEachAction() = {
spark.sql(
//1. 读取原始数据并建立了虚表
s"""
|select uid, aid, action, action_date from dwb_news.user_acticle_action
|""".stripMargin).createOrReplaceTempView("source_data")
//2. 计算
spark.sql(
s"""
|select uid, aid, action, action_date,
|action2rate(action, action_date) as rate
|from source_data
|""".stripMargin).createOrReplaceTempView("source_data_rate")
}
def getUserRatingData():Dataframe = {
//1. 生成每个评分的评分表
generateEachAction()
//2. 计算评分
spark.sql(
s"""
|select
|cast(uid as bigint) as uid,
|cast(aid as bigint) as aid,
|cast(sum(rate) as double) as rate
|from source_data_rate group by uid, aid order by uid
|""".stripMargin)
}
def joinRateDFAndSimDF(trainning: Dataset[Row], simDF:Dataset[Row]) = {
//1. 创建评分表
trainning.createOrReplaceTempView("user_rate")
simDF.createOrReplaceTempView("sim_item")
//2. 执行sql
spark.sql(
s"""
|select
|t1.uid, t1.aid, t1.rate,
|t2.aid as aid2, t2.sim_aid, t2.sim, t1.rate * t2.sim as rsp
|from user_rate as t1 left join sim_item as t2 on t1.aid = t2.aid
|where t2.sim is not null
|""".stripMargin)
}
def recommendAllUser(joinDF: Dataframe, topK: Int) = {
joinDF.createOrReplaceTempView("rate_sim")
spark.sql(
s"""
|with t1 as(-- 用户对于相似文章的预测评分:预测值
|select uid, sim_aid, sum(rsp) / sum(rate) as pre_rate
|from rate_sim group by uid, sim_aid
|),
|t2 as ( -- 剔除一部分已经阅读
|select t1.* from t1
|left join user_rate as ur on t1.uid = ur.uid and t1.sim_aid = ur.aid
|where ur.rate is not null
|),
|t3 as ( -- 排名
|select
|uid, sim_aid, pre_rate,
|row_number() over(partition by uid order by pre_rate desc) as rank
|from t2
|)
|select
|cast(uid as int) as uid,
|cast(sim_aid as int) as sim_aid,
|cast(pre_rate as double) as pre_rate
|from t3 where rank <= ${topK}
|""".stripMargin)
}
}
package com.qf.bigdata.transformer
import com.qf.bigata.transformer.ModelData
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, Dataset, Row, SparkSession}
//这个类是本项目中最关键的类,主要负责各种类型的转换
class ItemCFModelData(spark:SparkSession, env:String) extends ModelData(spark:SparkSession, env:String) {
//下面是从hbase中获取表数据。
def itemcf2RDD(convertDF:Dataframe) = {
convertDF.rdd.sortBy(x =>x.get(0).toString).flatMap((row =>{
//1、获取到原始数据值
val uid:String = row.get(0).toString
val items:String = row.getAs[Seq[Row]](1).map(item =>{
item.getInt(0).toString + ":"+item.getDouble(1).formatted("%.4f")
}).mkString(",")
//2.创建集合准备存放这个结果
val listBuffer = new ListBuffer[(ImmutableBytesWritable, KeyValue)]
//3、存放
val kv = new KeyValue(Bytes.toBytes(uid),Bytes.toBytes("f1"),Bytes.toBytes("itemcf"),Bytes.toBytes(items))
//4.把kv添加到listBuffer
listBuffer.append((new ImmutableBytesWritable(), kv))
listBuffer
}))
}
def recommendDataConvert(recommendDF:Dataframe) = {
import spark.implicits._
recommendDF.rdd.map(row => (row.getInt(0),(row.getInt(1),row.getDouble(2))))
.groupByKey().mapValues(sp =>{
val seq:Seq[(Int,Double)] = Seq[(Int,Double)]()
sp.foreach(tp => {
seq :+= (tp._1,tp._2)
})
seq.sortBy(_._2)
}).toDF("uid","recommendactions")
}
private val logger = LoggerFactory.getLogger(ItemCFModelData.getClass.getSimpleName)
//下面是从hive表中查询,数据表还要转换为矩阵 ,并且存储到hdfs中
def predictTestData(joinDF: Dataframe, test: Dataset[Row]) = {
//1. 建立虚表
joinDF.createOrReplaceTempView("rate_sim")
test.createOrReplaceTempView("test_data")
//2. 执行sql
spark.sql(
s"""
|with t1 as( -- 用户对于相似文章的预测评分:预测值
|select uid, sim_aid, sum(rsp) / sum(rate) as pre_rate
|from rate_sim group by uid, sim_aid
|),
|t2 as ( -- 用户对于原文中的评分:真实值
|select uid, aid, rate from test_data
|)
|select t2.*, t1.pre_rate from t2 inner join t1 on t2.aid = t1.sim_aid and t1.uid = t2.uid
|where t1.pre_rate is not null
|""".stripMargin)
}
def simMatrix2DF(simMatrix: CoordinateMatrix) = {
//1. 获取到矩阵内部的数据:RDD
val transformerRDD: RDD[(String, String, Double)] = simMatrix.entries.map {
case MatrixEntry(row: Long, column: Long, sim: Double) => (row.toString, column.toString, sim)
}
//2. rdd-->dataframe
val simDF: Dataframe = spark.createDataframe(transformerRDD).toDF("aid", "sim_aid", "sim")
//3. 合并结果
simDF.union(simDF.select("aid", "sim_aid", "sim"))
}
def rateDF2Matrix(df: Dataframe) = {
//1. Row --> MatrixEntry
val matrixRDD: RDD[MatrixEntry] = df.rdd.map {
case Row(uid: Long, aid: Long, rate: Double) => MatrixEntry(uid, aid, rate)
}
//2. 返回分布式矩阵
new CoordinateMatrix(matrixRDD)
}
}
object ItemCFModelData {
def apply(spark: SparkSession, env: String): ItemCFModelData = new ItemCFModelData(spark, env)
}
3.2.4主题,运行入口类
package com.qf.bigata
import com.qf.bigdata.transformer.ItemCFModelData
import com.qf.bigata.conf.Config
import com.qf.bigdata.utils.SparkUtils
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.evaluation.Regressionevaluator
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
object ItemCF {
private val logger = LoggerFactory.getLogger(ItemCF.getClass.getSimpleName)
def main(args: Array[String]): Unit = {
//1. 准备工作
Logger.getLogger("org").setLevel(Level.WARN)
val params:Config = Config.parseConfig(ItemCF, args)
System.setProperty("HADOOP_USER_NAME", params.proxyUser)
logger.warn("job is running, please wait for a moment")
val spark:SparkSession = SparkUtils.getSparkSession(params.env, "itemcf app")
import spark.implicits._
//2. 基础数据处理
//2.1 获取到ItemCF的模型对象
val modelData = ItemCFModelData(spark, params.env)
//2.2 将原始的数据转换为(uid, aid, rate)
val rateDF:Dataframe = modelData.getUserRatingData()
logger.warn("rateDF ---------------------------------------->")
rateDF.show()
//2.3 将得到的数据分为两部分:1. 测试数据; 2. 训练数据
val Array(training, test) = rateDF.randomSplit(Array(0.6, 0.4))
training.cache()
//2.4 将dataframe转换坐标矩阵:源数据的矩阵
val rateMatrix = modelData.rateDF2Matrix(training)
//2.5 求相似矩阵——底层就是利用了求余弦相似度
val simMatrix: CoordinateMatrix = rateMatrix.toRowMatrix().columnSimilarities()
//2.6 相似度矩阵对象转换dataframe
val simDF = modelData.simMatrix2DF(simMatrix)
logger.warn("simDF ---------------------------------------->")
simDF.show()
//2.7 将评分的训练用的df和相似的df关联起来
val joinDF = modelData.joinRateDFAndSimDF(training, simDF)
logger.warn("joinDF ---------------------------------------->")
joinDF.show()
training.unpersist()
joinDF.cache()
//2.8 使用测试数据和之前的散点数据对文章进行预测评分
val predictDF = modelData.predictTestData(joinDF, test)
logger.warn("predictDF ---------------------------------------->")
predictDF.show()
joinDF.unpersist()
//2.9 计算推荐效果好不好
//2.9.1 创建评估器
val evaluator = new Regressionevaluator()
.setLabelCol("rate") // 真实值
.setPredictionCol("pre_rate") // 预测值
//2.9.2 计算误差
// val rmse: Double = evaluator.setMetricName("rmse").evaluate(predictDF)
// logger.warn(s"itemcf rmse:${rmse}")
//2.10 取用户topk
val recommendDF = modelData.recommendAllUser(joinDF, params.topK)
logger.warn("recommendDF ---------------------------------------->")
recommendDF.show()
//2.11 将结果先在HDFS存放一份,然后再存Hbase
recommendDF.write.mode(SaveMode.Overwrite).format("ORC").saveAsTable("dwb_news.itemcf")
//2.12 将数据保存到Hbase
}
}
4、测试
把项目打成jar包,放在服务器中,
在服务器中运行代码:
${SPARK_HOME}/bin/spark-submit
--jars /usr/local/hive-1.2.1/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar
--conf spark.sql.hive.convertmetastoreParquet=false
--conf spark.executor.heartbeatInterval=120s
--conf spark.network.timeout=600s
--conf spark.sql.catalogImplementation=hive
--conf spark.yarn.submit.waitAppCompletion=false
--name itemcf
--conf spark.task.cpus=1
--conf spark.executor.cores=4
--conf spark.sql.shuffle.partitions=50
--master yarn
--deploy-mode cluster
--driver-memory 512M
--executor-memory 3G
--num-executors 1
--class com.qf.bigata.ItemCF
/data/jar/recommend-1.0-jar-with-dependencies.jar
-e prod -x root -z 192.168.10.101 -p 2181 -f /tmp/hbase -t itemcf -k 3
倒数第二行是jar包名称
倒数第一行是配置的参数:
-e是生产模式还是开发模式
-x用户
-z服务器端口
-p hbase运行端口,一般hbase默认端口就是2181
-f 临时文件目录
-t 配置文件的类名
-k 是查询的记录数(前3就可以了)
特别说明:这个项目还没有写完。现在还在测试阶段。
错误日志要查看hadoop的etc的hadoop中logs的userlogs中的日志。



