模型建立
import json
import socket
import findspark
findspark.init('/usr/local/spark') #之前找到的spark存放路径
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext,SparkConf
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import functions
from pyspark.sql.functions import avg
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
import pandas as pd
def fill_with_mean(df, exclude=set()):
stats = df.agg(*(
avg(c).alias(c) for c in df.columns if c not in exclude
))
return df.na.fill(stats.first().asDict())
filename = "input/data.csv"
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
df = spark.read.format("csv").option("header","true").load(filename)
for c in df.columns:
df = df.withColumn(c,df[c].cast('Double'))
df = df.drop("_c0")
df = df.drop_duplicates()
df = fill_with_mean(df, ["SeriousDlqin2yrs"])
dfInfoAssembler = VectorAssembler(inputCols=['RevolvingUtilizationOfUnsecuredLines','age','NumberOfTime30-59DaysPastDueNotWorse','DebtRatio','MonthlyIncome','NumberOfOpenCreditLinesAndLoans','NumberOfTimes90DaysLate','NumberRealEstateLoansOrLines','NumberOfTime60-89DaysPastDueNotWorse','NumberOfDependents'],outputCol='features')
df = dfInfoAssembler.transform(df)
df = df.select(['features', 'SeriousDlqin2yrs'])
training,test = df.randomSplit([0.75,0.25])
rfModel = RandomForestClassifier(labelCol='SeriousDlqin2yrs').fit(training)
def rfpredict(x):
x = [tuple(x)]
input = spark.createDataframe(x,['RevolvingUtilizationOfUnsecuredLines','age','NumberOfTime30-59DaysPastDueNotWorse','DebtRatio','MonthlyIncome','NumberOfOpenCreditLinesAndLoans','NumberOfTimes90DaysLate','NumberRealEstateLoansOrLines','NumberOfTime60-89DaysPastDueNotWorse','NumberOfDependents']).collect()
input = dfInfoAssembler.transform(input)
res = rfModel.transform(input)
return res.select('probability').limit(1).collect()[0][0]
连接外部端口
import json
import socket
class Server:
def __init__(self):
self.buffer_size = 1024
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind(('192.168.19.135', 11223)) # 绑定端口
def predict(self, client): # 拿模型做预测
recvData = client.recv(self.buffer_size).decode()
print(recvData)
X = json.loads(recvData, encoding='utf-8')
pred = rfpredict(X)[0]
pred = round(pred,2)
sentData = json.dumps(pred, ensure_ascii=False)
client.send(sentData.encode())
print('已发送预测结果:' + str(sentData))
def start(self):
print('启动预测服务器')
self.server.listen(1) # 监听
try:
while True:
client, addr = self.server.accept() # 等待客户端连接
self.predict(client)
client.close()
except baseException as e:
print(repr(e))
finally:
print("关闭预测服务器")
self.server.close() # 关闭连接
if __name__ == '__main__':
server = Server()
server.start()



