栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

基于spark的推荐系统(spark推荐算法)

基于spark的推荐系统(spark推荐算法)

1、项目环境

具有hive、hadoop、hbase的服务器

使用idea进行开发

2、原理

通过使用spark的机器学习算法对已经编写建表查询好的用户id和对新闻列表的操作,在过去时间权重和用户行为权重进行打分,编写召回算法,以topK进行推荐类似的文章id对应给用户id。

3、编程

pom.xml环境依赖



    4.0.0

    com.qf.bigdata
    recommend
    1.0

    
        2.11.12
        2.3.9

        2.10.1
        3.2.0
        2.6

        2.4.5
        compile
        1.2.3
        1.3.6
        2.8.1
        
    

    

        
        
            com.alibaba
            fastjson
            ${json.version}
        

        
            org.apache.spark
            spark-core_2.11
            ${spark.version}
            ${scope.type}
        
        
            org.apache.spark
            spark-sql_2.11
            ${spark.version}
            ${scope.type}
        

        
            org.apache.spark
            spark-hive_2.11
            ${spark.version}
            ${scope.type}
        

        
            org.apache.spark
            spark-mllib_2.11
            ${spark.version}
            ${scope.type}
        

        
            mysql
            mysql-connector-java
            5.1.47
        
        
            log4j
            log4j
            1.2.17
            ${scope.type}
        
        
            commons-codec
            commons-codec
            1.6
        
        
            org.scala-lang
            scala-library
            ${scala.version}
            ${scope.type}
        
        
            org.scala-lang
            scala-reflect
            ${scala.version}
            ${scope.type}
        

        
            com.github.scopt
            scopt_2.11
            4.0.0-RC2
        

        
            org.apache.spark
            spark-avro_2.11
            ${spark.version}
        

        
            org.apache.hive
            hive-jdbc
            2.3.7
            ${scope.type}
            
                
                    javax.mail
                    mail
                
                
                    org.eclipse.jetty.aggregate
                    *
                
            
        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
            ${scope.type}
        
        
            org.apache.hbase
            hbase-server
            ${hbase.version}
            ${scope.type}
        
        
            org.apache.hbase
            hbase-client
            ${hbase.version}
            ${scope.type}
        
        
            org.apache.hbase
            hbase-hadoop2-compat
            ${hbase.version}
            ${scope.type}
        
        
            org.jpmml
            jpmml-sparkml
            1.5.9
        

    

    

        
            alimaven
            http://maven.aliyun.com/nexus/content/groups/public/
            
                never
            
            
                never
            
        
    

    
        src/main/scala
        src/test/scala
        
            
                org.apache.maven.plugins
                maven-assembly-plugin
                ${maven-assembly-plugin.version}
                
                    
                        jar-with-dependencies
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
            
                net.alchim31.maven
                scala-maven-plugin
                ${scala-maven-plugin.version}
                
                    
                    
                        scala-compile-first
                        process-resources
                        
                            add-source
                            compile
                        
                    
                    
                        
                            compile
                            testCompile
                        
                        
                            
                                -dependencyfile
                                ${project.build.directory}/.scala_dependencies
                            
                        
                    
                
            
            
                org.apache.maven.plugins
                maven-archetype-plugin
                2.2
            

            
                org.codehaus.mojo
                build-helper-maven-plugin
                1.8
                
                    
                    
                        add-source
                        generate-sources
                        
                            add-source
                        
                        
                            
                                src/main/java
                            
                        
                    
                    
                    
                        add-test-source
                        generate-test-sources
                        
                            add-test-source
                        
                        
                            
                                src/test/java
                            
                        
                    
                
            
        
    

文件分类

记得把服务器中的hadoop的etc下的hadoop下的core-site.xml,hdfs-site.xml,hive-site.xml保存到上面的resource文件夹里面

3.1 普通的参数配置,就是一个类型匹配是开发环境还是生产环境,对应不同的环境加载不同的mavenjar包

package com.qf.bigdata.conf

import org.slf4j.LoggerFactory

case class Config(
                   env:String = "",
                   hbaseZK:String = "",
                   hbasePort:String = "2181",
                   hFileTmpPath:String = "",
                   tableName:String = "recommend:news-cf",
                   irisPath:String = "",
                   proxyUser:String = "root",
                   topK:Int = 10
                 )
object Config {

  private val logger = LoggerFactory.getLogger(Config.getClass.getSimpleName)

  
  def parseConfig(obj: Object, args: Array[String]): Config = {
    //1. 获取到程序名称
    val programName = obj.getClass.getSimpleName.replace("$", "")
    //2. 类似于getopts命令
    //2.1 得到解析器
    val parser = new scopt.OptionParser[Config](s"ItemCF ${programName}") {
      head(programName, "v1.0")
      opt[String]('e', "env").required().action((x, config) => config.copy(env = x)).text("dev or prod")
      opt[String]('x', "proxyUser").optional().action((x, config) => config.copy(proxyUser = x)).text("proxy username")
      programName match {
        case "ItemCF" => {
          logger.info(s"ItemCF is staring ---------------------------->")
          opt[String]('z', "hbaseZK").required().action((x, config) => config.copy(hbaseZK = x)).text("hbaseZK")
          opt[String]('p', "hbasePort").required().action((x, config) => config.copy(hbasePort = x)).text("hbasePort")
          opt[String]('f', "hFileTmpPath").required().action((x, config) => config.copy(hFileTmpPath = x)).text("hFileTmpPath")
          opt[String]('t', "tableName").required().action((x, config) => config.copy(tableName = x)).text("tableName")
          opt[Int]('k', "topK").required().action((x, config) => config.copy(topK = x)).text("topK")

        }
        case _ =>
      }
    }
    //2.2 解析
    parser.parse(args, Config()) match {
      case Some(conf) => conf
      case None => {
        logger.error(s"cannot parse args")
        System.exit(-1)
        null
      }
    }
  }
}
package com.qf.bigdata.utils

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory

//spark工具类,主要是对sparksession对象的设置
object SparkUtils {
  private val logger = LoggerFactory.getLogger(SparkUtils.getClass.getSimpleName)
  //获取一个sparksession对象
  def getSparkSession(env: String,appName: String) : SparkSession = {
    val conf = new SparkConf()
      .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      .set("spark.sql.hive.metastore.version","1.2.1")
      .set("spark.sql.cbo.enabled","true")
      .set("spark.hadoop.dfs.client.block.write.replica-datanode-on=failuer.enable","true")
      .set("spark.hadoop.dfs.client.block.write.replica-datanode-on=failure.policy","NEVER")
    env match {
      case "prod" =>{   //环境类型匹配,设置appName和加载hive的mavenjar包
        conf.setAppName(appName+"_prod")
        SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
      }
      case "dev" =>{
        conf.setMaster("local[*]").setAppName(appName+"_dev").set("spark.sql.hive.metastore.jars","maven")
        SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
      }
      case _ =>{
        logger.error("not match env")
        System.exit(-1)
        null
      }
    }
  }
}

3.2针对表格中的权重算法:

3.2.1创建一个枚举,装用户行为的常量

package com.qf.bigdata.constant

object Action extends Enumeration {
  type Action = Value
  val CLICK = Value("点击")
  val SHARE = Value("分享")
  val COMMENT = Value("评论")
  val COLLECT = Value("收藏")
  val LIKE = Value("点赞")

  //下面这个函数是把所有枚举打印出来
  def showAll = this.values.foreach(println)

  //根据枚举常量名称查询枚举的值

  def  withNameOpt(name:String):Option[Value] = this.values.find(_.toString == name)
}
package com.qf.bigdata.constant
//常量类:以后公共常量都可以放在此类中
object Constant {
//在定义新闻文章的有效时间,表示文章在前100天内具备最大价值,超过一百天。价值就梯度下滑
  //距离这个时间越远,时间价值下降越快
  val ARTICLE_AGING_TIME = 100
}

3.2.2时间权重的一个工具类

就是专门把字符串转换为date格式,其实时间权重是以100天以内为主

package com.qf.bigdata.utils

import java.text.SimpleDateFormat
import java.util.{Calendar, Date}

//时间工具类
object DateUtils {
   //date日期对于距今的天数
  def diffDay2Now(date: String) = {
    //获取当天
    val now: Calendar = Calendar.getInstance()
    //获取当天的毫秒
    val today: Long = now.getTimeInMillis
    //获取事件发生的时间
    val current: Long = string2date(date).getTime
    //获取相差的天数
    val between:Long =(today -current) /(1000*3600*24)
    //返回结果
    Integer.parseInt(String.valueOf(between))
  }
  private val date_format = "yyyyMMdd"
  //定义一个字符串转date的函数
  def string2date(date:String):Date = {
    val fmt = new SimpleDateFormat(date_format)
    fmt.parse(date)
  }
}

下面这个工具类是专门计算时间权重的

package com.qf.bigdata.udfs

import com.qf.bigdata.constant.{Action, Constant}
import com.qf.bigdata.utils.DateUtils

//函数工具类
object RateUDF {
   //归一化函数  不论d是多少,返回结果都是0-1之间
  private def sigmoid(d: Double):Double = 1 / (1 + Math.exp(1.0 - d))

  //求行为的权重
  def getActionWeight(action:String) = {
    Action.withNameOpt(action).getOrElse() match {
      case Action.CLICK => 0.1f
      case Action.LIKE => 0.15f
      case Action.COLLECT => 0.2f
      case Action.SHARE => 0.25f
      case Action.COMMENT =>0.3f
      case _ => 0.0f
    }
  }

  //求时间的权重的函数
  //sigmoid:归一化时间权重,把值控制在0-1
  //sigmoid = 1/ (1+Math.exp(1-x))
  //w = sigmodi((AGING_TIME -x -7) * 0.8)
  //date:行为产生的时间
  def getDateWeight(date: String) ={
    try{
      //获取(数据价值时间范围-数据行为时间距今时间)的差
      var interval:Int =Constant.ARTICLE_AGING_TIME - DateUtils.diffDay2Now(date)
      if(interval < 0) interval = 1 //表示行为发生时间已经超过数据最优价值的时间
      val x:Double = interval.toDouble - 7
      sigmoid(x * 0.8).toFloat
    }catch {
      case e:Exception =>e.printStackTrace()
        0.0f
    }
  }
  //根据行为和时间求出评分
  def action2rate(action:String,date:String) :Float = {
    //行为的权重 * 时间的权重
    val rate = getActionWeight(action)* getDateWeight(date)
    rate
  }

}

---------------------------------------------Hbase工具类,主要是读取hbase中的数据---------------------------

package com.qf.bigdata.utils

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.{HbaseConfiguration, HConstants, KeyValue, TableName}
import org.apache.hadoop.hbase.client.{Admin, Connection, ConnectionFactory, Table}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory



class HbaseUtils(spark:SparkSession, hbaseZk:String, hbasePort:String) {

  private val logger = LoggerFactory.getLogger(HbaseUtils.getClass.getSimpleName)

  
  def hbaseConfig():Configuration = {
    val configuration: Configuration = HbaseConfiguration.create()
    configuration.set(HConstants.ZOOKEEPER_QUORUM, hbaseZk)
    configuration.set(HConstants.ZOOKEEPER_CLIENT_PORT, hbasePort)
    configuration
  }

  
  def getConnection(config:Configuration):Connection = ConnectionFactory.createConnection(config)

  
  def deletePath(path:String) = {
    val hdfsPath = new Path(path)
    val fs: FileSystem = FileSystem.get(new URI(path), new Configuration())
    if (fs.exists(hdfsPath)) {
      fs.delete(hdfsPath, true)
    }
  }

  
  def loadHFile2Hbase(hfileRDD:RDD[(ImmutableBytesWritable, KeyValue)], tableName:String, hFileTmpPath:String) = {
    //1. 获取到真实路径
    val hFilePath = s"${hFileTmpPath}/${String.valueOf(System.currentTimeMillis())}"
    //2. 获取到Hbase的配置
    val configuration: Configuration = hbaseConfig()
    //3. 设置保存到hbase的表
    configuration.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    //4. 获取到连接对象
    val connection: Connection = getConnection(configuration)
    logger.warn(s"hbase connection create successful!!!")
    //5. 获取到admin和table
    val admin: Admin = connection.getAdmin
    val table: Table = connection.getTable(TableName.valueOf(tableName))
    //6. 获取到job
    val job: Job = Job.getInstance()
    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setMapOutputValueClass(classOf[KeyValue])
    job.setOutputFormatClass(classOf[HFileOutputFormat2])
    //7. 校验hfile路径是否存在,如果存在就删除
    deletePath(hFilePath)
    //8. 将RDD数据写入到指定的HDFS路径
    hfileRDD.coalesce(10)
      .saveAsNewAPIHadoopFile(
        hFilePath,
        classOf[ImmutableBytesWritable],
        classOf[KeyValue],
        classOf[HFileOutputFormat2],
        job.getConfiguration
      )
    //9. 获取到增量导入数据的对象
    val loader = new LoadIncrementalHFiles(configuration)
    //10. 增量导入到hbase
    loader.doBulkLoad(
      new Path(hFilePath),
      admin,
      table,
      connection.getRegionLocator(TableName.valueOf(tableName))
    )
    logger.info(s"load hFile 2 hbase successful, hFilePath:${hFilePath}")
  }
}

object HbaseUtils {
  def apply(spark: SparkSession, hbaseZk: String, hbasePort: String): HbaseUtils = new HbaseUtils(spark, hbaseZk, hbasePort)
}

3.2.3  就是查询数据库,把数据表转换为矩阵(实质上是df和矩阵的转换)

package com.qf.bigata.transformer
import com.qf.bigdata.udfs.RateUDF
import org.apache.spark.sql.{Dataframe, Dataset, Row, SparkSession}


class ModelData(spark:SparkSession, env:String) {

  // 注册udf函数
  spark.udf.register("action2rate", RateUDF.action2rate _)

  
  def generateEachAction() = {
    spark.sql(
      //1. 读取原始数据并建立了虚表
      s"""
         |select uid, aid, action, action_date from dwb_news.user_acticle_action
         |""".stripMargin).createOrReplaceTempView("source_data")
    //2. 计算
    spark.sql(
      s"""
         |select uid, aid, action, action_date,
         |action2rate(action, action_date) as rate
         |from source_data
         |""".stripMargin).createOrReplaceTempView("source_data_rate")

  }

  
  def getUserRatingData():Dataframe = {
    //1. 生成每个评分的评分表
    generateEachAction()
    //2. 计算评分
    spark.sql(
      s"""
         |select
         |cast(uid as bigint) as uid,
         |cast(aid as bigint) as aid,
         |cast(sum(rate) as double) as rate
         |from source_data_rate group by uid, aid order by uid
         |""".stripMargin)
  }

  
  def joinRateDFAndSimDF(trainning: Dataset[Row], simDF:Dataset[Row]) = {
    //1. 创建评分表
    trainning.createOrReplaceTempView("user_rate")
    simDF.createOrReplaceTempView("sim_item")

    //2. 执行sql
    spark.sql(
      s"""
         |select
         |t1.uid, t1.aid, t1.rate,
         |t2.aid as aid2, t2.sim_aid, t2.sim, t1.rate * t2.sim as rsp
         |from user_rate as t1 left join sim_item as t2 on t1.aid = t2.aid
         |where t2.sim is not null
         |""".stripMargin)
  }

  
  def recommendAllUser(joinDF: Dataframe, topK: Int) = {
    joinDF.createOrReplaceTempView("rate_sim")
    spark.sql(
      s"""
         |with t1 as(-- 用户对于相似文章的预测评分:预测值
         |select uid, sim_aid, sum(rsp) / sum(rate) as pre_rate
         |from rate_sim group by uid, sim_aid
         |),
         |t2 as ( -- 剔除一部分已经阅读
         |select t1.* from t1
         |left join user_rate as ur on t1.uid = ur.uid and t1.sim_aid = ur.aid
         |where ur.rate is not null
         |),
         |t3 as ( -- 排名
         |select
         |uid, sim_aid, pre_rate,
         |row_number() over(partition by uid order by pre_rate desc) as rank
         |from t2
         |)
         |select
         |cast(uid as int) as uid,
         |cast(sim_aid as int) as sim_aid,
         |cast(pre_rate as double) as pre_rate
         |from t3 where rank <= ${topK}
         |""".stripMargin)
  }
}

package com.qf.bigdata.transformer

import com.qf.bigata.transformer.ModelData
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, Dataset, Row, SparkSession}


//这个类是本项目中最关键的类,主要负责各种类型的转换
class ItemCFModelData(spark:SparkSession, env:String) extends ModelData(spark:SparkSession, env:String) {
//下面是从hbase中获取表数据。
 def itemcf2RDD(convertDF:Dataframe) = {
    convertDF.rdd.sortBy(x =>x.get(0).toString).flatMap((row =>{
      //1、获取到原始数据值
      val uid:String = row.get(0).toString
      
      val items:String = row.getAs[Seq[Row]](1).map(item =>{
        item.getInt(0).toString + ":"+item.getDouble(1).formatted("%.4f")
      }).mkString(",")
      //2.创建集合准备存放这个结果
      val listBuffer = new ListBuffer[(ImmutableBytesWritable, KeyValue)]
      //3、存放
      val kv = new KeyValue(Bytes.toBytes(uid),Bytes.toBytes("f1"),Bytes.toBytes("itemcf"),Bytes.toBytes(items))
      //4.把kv添加到listBuffer
      listBuffer.append((new ImmutableBytesWritable(), kv))
      listBuffer
    }))
  }
  

   def recommendDataConvert(recommendDF:Dataframe) = {
     import spark.implicits._
     recommendDF.rdd.map(row => (row.getInt(0),(row.getInt(1),row.getDouble(2))))
       .groupByKey().mapValues(sp =>{
       val seq:Seq[(Int,Double)] = Seq[(Int,Double)]()
       sp.foreach(tp => {
         seq :+= (tp._1,tp._2)
       })
       seq.sortBy(_._2)
     }).toDF("uid","recommendactions")
   }

  private val logger = LoggerFactory.getLogger(ItemCFModelData.getClass.getSimpleName)


 //下面是从hive表中查询,数据表还要转换为矩阵 ,并且存储到hdfs中

  
  def predictTestData(joinDF: Dataframe, test: Dataset[Row]) = {
    //1. 建立虚表
    joinDF.createOrReplaceTempView("rate_sim")
    test.createOrReplaceTempView("test_data")

    //2. 执行sql
    
    spark.sql(
      s"""
         |with t1 as( -- 用户对于相似文章的预测评分:预测值
         |select uid, sim_aid, sum(rsp) / sum(rate) as pre_rate
         |from rate_sim group by uid, sim_aid
         |),
         |t2 as ( -- 用户对于原文中的评分:真实值
         |select uid, aid, rate from test_data
         |)
         |select t2.*, t1.pre_rate from t2 inner join t1 on t2.aid = t1.sim_aid and t1.uid = t2.uid
         |where t1.pre_rate is not null
         |""".stripMargin)
  }

  
  def simMatrix2DF(simMatrix: CoordinateMatrix) = {
    //1. 获取到矩阵内部的数据:RDD
    val transformerRDD: RDD[(String, String, Double)] = simMatrix.entries.map {
      case MatrixEntry(row: Long, column: Long, sim: Double) => (row.toString, column.toString, sim)
    }
    //2. rdd-->dataframe
    val simDF: Dataframe = spark.createDataframe(transformerRDD).toDF("aid", "sim_aid", "sim")
    //3. 合并结果
    simDF.union(simDF.select("aid", "sim_aid", "sim"))
  }


  
  def rateDF2Matrix(df: Dataframe) = {
    //1. Row --> MatrixEntry
    val matrixRDD: RDD[MatrixEntry] = df.rdd.map {
      case Row(uid: Long, aid: Long, rate: Double) => MatrixEntry(uid, aid, rate)
    }
    //2. 返回分布式矩阵
    new CoordinateMatrix(matrixRDD)
  }

}

object ItemCFModelData {
  def apply(spark: SparkSession, env: String): ItemCFModelData = new ItemCFModelData(spark, env)
}

3.2.4主题,运行入口类

package com.qf.bigata
import com.qf.bigdata.transformer.ItemCFModelData
import com.qf.bigata.conf.Config
import com.qf.bigdata.utils.SparkUtils
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.evaluation.Regressionevaluator
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}
import org.slf4j.LoggerFactory


object ItemCF {

  private val logger = LoggerFactory.getLogger(ItemCF.getClass.getSimpleName)

  def main(args: Array[String]): Unit = {
    //1. 准备工作
    Logger.getLogger("org").setLevel(Level.WARN)
    val params:Config = Config.parseConfig(ItemCF, args)
    System.setProperty("HADOOP_USER_NAME", params.proxyUser)
    logger.warn("job is running, please wait for a moment")
    val spark:SparkSession = SparkUtils.getSparkSession(params.env, "itemcf app")
    import spark.implicits._

    //2. 基础数据处理
    //2.1 获取到ItemCF的模型对象
    val modelData = ItemCFModelData(spark, params.env)

    //2.2 将原始的数据转换为(uid, aid, rate)
    val rateDF:Dataframe = modelData.getUserRatingData()
    logger.warn("rateDF ---------------------------------------->")
    rateDF.show()

    //2.3 将得到的数据分为两部分:1. 测试数据; 2. 训练数据
    val Array(training, test) = rateDF.randomSplit(Array(0.6, 0.4))
    training.cache()
    //2.4 将dataframe转换坐标矩阵:源数据的矩阵
    val rateMatrix = modelData.rateDF2Matrix(training)
    //2.5 求相似矩阵——底层就是利用了求余弦相似度
    val simMatrix: CoordinateMatrix = rateMatrix.toRowMatrix().columnSimilarities()
    //2.6 相似度矩阵对象转换dataframe
    val simDF = modelData.simMatrix2DF(simMatrix)
    logger.warn("simDF ---------------------------------------->")
    simDF.show()
    //2.7 将评分的训练用的df和相似的df关联起来
    val joinDF = modelData.joinRateDFAndSimDF(training, simDF)
    logger.warn("joinDF ---------------------------------------->")
    joinDF.show()
    training.unpersist()
    joinDF.cache()
    //2.8 使用测试数据和之前的散点数据对文章进行预测评分
    val predictDF = modelData.predictTestData(joinDF, test)
    logger.warn("predictDF ---------------------------------------->")
    predictDF.show()
    joinDF.unpersist()

    //2.9 计算推荐效果好不好
    //2.9.1 创建评估器
    val evaluator = new Regressionevaluator()
      .setLabelCol("rate") // 真实值
      .setPredictionCol("pre_rate") // 预测值

    //2.9.2 计算误差
    // val rmse: Double = evaluator.setMetricName("rmse").evaluate(predictDF)
    // logger.warn(s"itemcf rmse:${rmse}")

    //2.10 取用户topk
    val recommendDF = modelData.recommendAllUser(joinDF, params.topK)
    logger.warn("recommendDF ---------------------------------------->")
    recommendDF.show()

    //2.11 将结果先在HDFS存放一份,然后再存Hbase
    recommendDF.write.mode(SaveMode.Overwrite).format("ORC").saveAsTable("dwb_news.itemcf")

    //2.12 将数据保存到Hbase
  }
}

4、测试

把项目打成jar包,放在服务器中,

在服务器中运行代码:

${SPARK_HOME}/bin/spark-submit
--jars /usr/local/hive-1.2.1/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar
--conf spark.sql.hive.convertmetastoreParquet=false
--conf spark.executor.heartbeatInterval=120s
--conf spark.network.timeout=600s
--conf spark.sql.catalogImplementation=hive
--conf spark.yarn.submit.waitAppCompletion=false
--name itemcf
--conf spark.task.cpus=1
--conf spark.executor.cores=4
--conf spark.sql.shuffle.partitions=50
--master yarn
--deploy-mode cluster
--driver-memory 512M
--executor-memory 3G
--num-executors 1
--class com.qf.bigata.ItemCF
/data/jar/recommend-1.0-jar-with-dependencies.jar
-e prod -x root -z 192.168.10.101 -p 2181 -f /tmp/hbase -t itemcf -k 3

倒数第二行是jar包名称

倒数第一行是配置的参数:

-e是生产模式还是开发模式

-x用户

-z服务器端口

-p hbase运行端口,一般hbase默认端口就是2181

-f 临时文件目录

-t 配置文件的类名

-k 是查询的记录数(前3就可以了)

特别说明:这个项目还没有写完。现在还在测试阶段。

错误日志要查看hadoop的etc的hadoop中logs的userlogs中的日志。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/771695.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号