栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

pyspark 实现随机森林并且连接端口

pyspark 实现随机森林并且连接端口

模型建立

import json
import socket
import findspark 
findspark.init('/usr/local/spark') #之前找到的spark存放路径
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext,SparkConf
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import functions
from pyspark.sql.functions import avg
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
import pandas as pd

def fill_with_mean(df, exclude=set()): 
    stats = df.agg(*(
        avg(c).alias(c) for c in df.columns if c not in exclude
    ))
    return df.na.fill(stats.first().asDict())

filename = "input/data.csv"
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
df = spark.read.format("csv").option("header","true").load(filename)
for c in df.columns:
    df = df.withColumn(c,df[c].cast('Double'))
df = df.drop("_c0")
df = df.drop_duplicates()
df = fill_with_mean(df, ["SeriousDlqin2yrs"])

dfInfoAssembler = VectorAssembler(inputCols=['RevolvingUtilizationOfUnsecuredLines','age','NumberOfTime30-59DaysPastDueNotWorse','DebtRatio','MonthlyIncome','NumberOfOpenCreditLinesAndLoans','NumberOfTimes90DaysLate','NumberRealEstateLoansOrLines','NumberOfTime60-89DaysPastDueNotWorse','NumberOfDependents'],outputCol='features')
df = dfInfoAssembler.transform(df)
df = df.select(['features', 'SeriousDlqin2yrs'])
training,test = df.randomSplit([0.75,0.25])
rfModel = RandomForestClassifier(labelCol='SeriousDlqin2yrs').fit(training)

def rfpredict(x):
    x = [tuple(x)]
    input = spark.createDataframe(x,['RevolvingUtilizationOfUnsecuredLines','age','NumberOfTime30-59DaysPastDueNotWorse','DebtRatio','MonthlyIncome','NumberOfOpenCreditLinesAndLoans','NumberOfTimes90DaysLate','NumberRealEstateLoansOrLines','NumberOfTime60-89DaysPastDueNotWorse','NumberOfDependents']).collect()
    input = dfInfoAssembler.transform(input)
    res = rfModel.transform(input)
    return res.select('probability').limit(1).collect()[0][0]

连接外部端口

import json
import socket
class Server:
    def __init__(self):
        self.buffer_size = 1024
        self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server.bind(('192.168.19.135', 11223))  # 绑定端口

    def predict(self, client):  # 拿模型做预测
        recvData = client.recv(self.buffer_size).decode()
        print(recvData)
        X = json.loads(recvData, encoding='utf-8')

        pred = rfpredict(X)[0]
        pred = round(pred,2)

        sentData = json.dumps(pred, ensure_ascii=False)
        client.send(sentData.encode())
        print('已发送预测结果:' + str(sentData))

    def start(self):
        print('启动预测服务器')
        self.server.listen(1)  # 监听
        try:
            while True:
                client, addr = self.server.accept()  # 等待客户端连接
                self.predict(client)
                client.close()
        except baseException as e:
            print(repr(e))
        finally:
            print("关闭预测服务器")
            self.server.close()  # 关闭连接


if __name__ == '__main__':
    server = Server()
    server.start()

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/701569.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号