算法小结: 给定一组观察者序列y1,y2,y3…,假设预测值是x1,x2,x3,保序回归是要最小化以下的函数:
sum(wi(yi-xi)^2),其中x1,x2,x3是有序的序列,wi是正数值一般默认为1.0,保序回归可以看做是有顺序约束下的最小二乘问题,预测的函数一般是一个分段线性函数,训练返回的模型可以用来预测已知或者未知特征值的标签。
spark的实现:MLlib使用的是PAVA(Pool Adjacent Violators Algorithm)算法,并且是分布式的PAVA算法。首先在每个分区的样本集序列运行PAVA算法,保证局部有序,然后再对整个样本集运行PAVA算法,保证全局有序。
应用的场景:
1.不用的药物用量和病人应激性反应的程度,正常来说,随着药物用量的增加,病人的应激性反应越大
2.图片赛马数量和图片平均点击率的关系,一般来说,图片的赛马数量越多,图片的平均点击越大
评判的依据:依然可以使用mse或者rmse作为评判的依据
以下为官方代码实例 +最终评判的mse值:
from pyspark import SparkConf
from pyspark.sql import SparkSession
import traceback
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.regression import IsotonicRegression
appname = "test" # 任务名称
master = "local" # 单机模式设置
'''
local: 所有计算都运行在一个线程当中,没有任何并行计算,通常我们在本机执行一些测试代码,或者练手,就用这种模式。
local[K]: 指定使用几个线程来运行计算,比如local[4]就是运行4个worker线程。通常我们的cpu有几个core,就指定几个线程,最大化利用cpu的计算能力
local[*]: 这种模式直接帮你按照cpu最多cores来设置线程数了。
'''
# spark_driver_host = "10.0.0.248"
try:
# conf = SparkConf().setAppName(appname).setMaster(master).set("spark.driver.host", spark_driver_host) # 集群
conf = SparkConf().setAppName(appname).setMaster(master) # 本地
spark = SparkSession.builder.config(conf=conf).getOrCreate()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import *
spark = SparkSession.builder.appName('isotonicregression').master('local').getOrCreate()
# $example on$
# Loads data.
datapath = '''D://spark/spark/spark-2.3.0-bin-hadoop2.7/''';
dataset = spark.read.format("libsvm")
.load(datapath + "data/mllib/sample_isotonic_regression_libsvm_data.txt")
# print(dataset.printSchema)
# dataset.show(10,False)
(trainSet,testSet) = dataset.randomSplit([0.2,0.8])
# trainSet.show()
# Trains an isotonic regression model.
model = IsotonicRegression().fit(trainSet)
print("Boundaries in increasing order: %sn" % str(model.boundaries))
print("Predictions associated with the boundaries: %sn" % str(model.predictions))
# Makes predictions.
prediction = model.transform(testSet)
# $example off$
# 计算MSE
df = prediction.select("*", pow(col('prediction') - col('label'), 2).alias('pow2val'))
dfmean = df.groupBy().mean('pow2val')
dfMSE = dfmean.withColumnRenamed("avg(pow2val)", "mse")
dfMSE.show()
#计算RMSE
dfMSE.select(sqrt(col('mse')).alias('rmse')).show()
spark.stop()
print('计算成功!')
except:
traceback.print_exc() # 返回出错信息
print('连接出错!')
该算法一般可以作为其他算法的补充,附一张线性回归和保序回归的例子:
参考文献:
1.https://www.cnblogs.com/mstk/p/7019449.html
2.https://blog.csdn.net/liulingyuan6/article/details/53471302
3.https://zhuanlan.zhihu.com/p/88623159



