栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SparkMLlib线性回归算法案例

SparkMLlib线性回归算法案例

SparkMLlib线性回归算法案例
    • 一、SparkMLlib线性回归模型
    • 二、案例实现

一、SparkMLlib线性回归模型

MLlib的线性回归模型采用随机梯度下降算法来优化目标函数。MLlib实现了分布式的随机梯度下降算法,其分布方法是:在每次迭代中,随机抽取一定比例的样本作为当前迭代的计算样本;对计算样本中的每一个样本分别计算梯度(分布式计算每个样本的梯度);然后再通过聚合函数对样本的梯度进行累加,得到该样本的平均梯度及损失;最后根据最新的梯度及上次迭代的权重进行权重的更新。

MLlib线性回归模型方程:

线性回归模型的损失函数是:

二、案例实现
  1. 数据格式说明:

  2. 测试数据所在位置:测试数据集

  3. 具体实现代码如下:

    package com.spark.ml
    
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.mllib.linalg.Vectors
    import org.apache.spark.mllib.regression.{LabeledPoint, LinearRegressionModel, LinearRegressionWithSGD}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object LinearRegression {
    
      def main(args: Array[String]) {
        // 构建Spark对象
        val conf = new SparkConf().setAppName("LinearRegressionWithSGD").setMaster("local[*]")
        val sc = new SparkContext(conf)
        Logger.getRootLogger.setLevel(Level.WARN)
    
        //读取样本数据
        val data_path1 = "testdatas/lpsa.data"
        val data = sc.textFile(data_path1)
        val examples = data.map { line =>
          val parts = line.split(',')
          LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
        }.cache()
        // 触发Job实现缓存
        val numExamples = examples.count()
    
        // 新建线性回归模型,并设置训练参数
        val numIterations = 200
        val stepSize = 1
        val miniBatchFraction = 0.5
        val model = LinearRegressionWithSGD.train(examples, numIterations, stepSize, miniBatchFraction)
        val weights = model.weights // 权重
        val intercept = model.intercept // 偏置
        println("权重:" + weights.toArray + "-,偏置:" + intercept)
    
        // 对样本进行测试
        predictByModel(model,examples,numExamples)
    
        // 模型保存
        val ModelPath = "testdatas/model/LinearRegressionModel"
        model.save(sc, ModelPath)
      }
    
      def predictByModel(model: LinearRegressionModel,examples:RDD[LabeledPoint],numExamples:Long): Unit ={
        // 对样本进行测试
        val prediction = model.predict(examples.map(_.features))
        val predictionAndLabel = prediction.zip(examples.map(_.label))
        val print_predict = predictionAndLabel.take(20)
        println("prediction" + "t" + "label")
        for (i <- 0 to print_predict.length - 1) {
          println(print_predict(i)._1 + "t" + print_predict(i)._2)
        }
        // 计算测试误差
        val loss = predictionAndLabel.map {
          case (p, l) =>
            val err = p - l
            err * err
        }.reduce(_ + _)
        val rmse = math.sqrt(loss / numExamples)
        println(s"Test RMSE = $rmse.")
      }
    }
    
    

    注意:可不断调整模型参数以便于得到最优的模型

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/651857.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号