栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark部署TF、 Torch深度学习模型

spark部署TF、 Torch深度学习模型

模型训练 TF(2.1)
import tensorflow as tf
from tensorflow.keras import layers,models
import tensorflow as tf
import numpy as np
import pandas as pd
import os


def predict():
    x = pd.Dataframe({'x0': [0.1], 'x1': [0.2]})
    # x = np.random.randn(1, 2)
    model = tf.keras.models.load_model('linear.h5')
    y = model.predict(x)
    print("y:", y)

def train():
    n = 800
    X = tf.random.uniform([n, 2], minval=-10, maxval=10)
    Y = tf.random.normal([n, 1], mean=0.0, stddev=2.0)

    tf.keras.backend.clear_session()
    inputs = layers.Input(shape = (2,),name ="inputs") #设置输入名字为inputs
    outputs = layers.Dense(1, name = "outputs")(inputs) #设置输出名字为outputs
    linear = models.Model(inputs = inputs,outputs = outputs)
    linear.summary()

    linear.compile(optimizer="rmsprop",loss="mse",metrics=["mae"])
    linear.fit(X, Y, batch_size = 512, epochs = 2)
    linear.save('linear.h5')

    predict()

def predict1():
    x = pd.Dataframe({'x1': [0.1], 'x2': [0.2]})
    x = {k: v.values for k, v in x.items()}
    model = tf.keras.models.load_model('linear1.h5')
    y = model.predict(x)
    print("y:", y)

def train1():
    n = 800
    X = pd.Dataframe({'x1': np.random.randn(n), 'x2': np.random.randn(n)})
    X = {k: v.values for k, v in X.items()}
    Y = tf.random.normal([n, 1], mean=0.0, stddev=2.0)

    x1 = layers.Input(shape = (1,),name ="x1") #设置输入名字为inputs
    x2 = layers.Input(shape = (1,),name ="x2") #设置输入名字为inputs

    inputs = tf.keras.layers.Concatenate(axis=1)([x1, x2])
    outputs = layers.Dense(1, name = "outputs")(inputs) #设置输出名字为outputs
    linear = models.Model(inputs = [x1, x2], outputs = outputs)
    linear.summary()

    linear.compile(optimizer="rmsprop", loss="mse", metrics=["mae"])
    linear.fit(X, Y, batch_size = 512, epochs = 2)
    linear.save('linear1.h5')
    predict1()
train1()
  • 这里提供了两种构造方式,其实都一样,个人喜欢用train1()
Torch
import torch
from torch import nn
import pandas as pd
import numpy as np

class Linear(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(Linear, self).__init__()
        self.linear = nn.Linear(input_dim, output_dim)

    def forward(self, x):  # 前向传播
        out = self.linear(x)  # 输入x,输出out
        return out

def predict():
    linear = Linear(2, 1)
    linear.load_state_dict(torch.load('linear.pth'))
    linear.eval()
    x = pd.Dataframe({'x1': [0.1], 'x2': [0.2]})
    y = linear(torch.from_numpy(x.values).type(torch.float32)).data.numpy()
    print(y)

def train():
    linear = Linear(2, 1)
    torch.save(linear.state_dict(), 'linear.pth')
    predict()

train()
模型部署 环境打包
  • 先压缩python环境, 比如我的环境为envs文件夹下的py3,则直接cd envs
zip -r -q py3.zip ./py3/

可以上传至hdfs或者在本地

编写TF线上预测代码

使用pandas_udf接口,数据分布预测我们直接对dataframe进行操作,用此接口可以将spark的dataframe与python中的dataframe之间的转换更高效。

from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pyspark.sql.functions as F
from pyspark.sql.types import *
import os
#不使用GPU
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

conf = SparkConf() 
  .setAppName("dataframe") 
  .set("spark.sql.execution.arrow.pyspark.enabled", "true") 

spark = SparkSession.builder.config(conf=conf).getOrCreate()

@pandas_udf(IntegerType(),PandasUDFType.SCALAR)
def add_one(x):
    print(type(x), x)
    return x+1

@pandas_udf("uid long, aid long, score float", PandasUDFType.GROUPED_MAP)
def age_predict(df):
    import pandas as pd
    import numpy as np
    import tensorflow as tf
    tf_model = tf.keras.models.load_model("linear.h5")
	#对应TF代码的train()
    x = df
    #对应TF代码的train1(),数据格式不太一样
	x = {k: v.values for k, v in df.items()}
    df['score'] = tf_model.predict(x)
    return df.loc[:, ['uid', 'aid', 'score']]

if __name__ == "__main__":
    df = spark.read.format("json").load("hdfs:///tmp/predict.json").repartition(2)

    #此处的F.spark_partition_id()即为我的文件分区数量
    res = df.groupby(F.spark_partition_id()).apply(age_predict)
    print('file num:',res.count())
    
    hdfspath = "hdfs:///tmp/tmp1"

    # res.repartition(1).rdd.map(lambda x: ' '.join(map(str, x))).saveAsTextFile(hdfspath)
    # coalesce repartition
    res.repartition(1) 
        .write.mode("overwrite") 
        .option("header","false") 
        .option("delimiter", " ") 
        .csv(hdfspath)

    spark.stop()
编写Torch线上预测代码
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pyspark.sql.functions as F
from pyspark.sql.types import *
import pandas as pd
import torch
from torch import nn

class Linear(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(Linear, self).__init__()
        self.linear = nn.Linear(input_dim, output_dim)

    def forward(self, x):  # 前向传播
        out = self.linear(x)  # 输入x,输出out
        return out


conf = SparkConf() 
  .setAppName("dataframe") 
  .set("spark.sql.execution.arrow.pyspark.enabled", "true")

spark = SparkSession.builder.config(conf=conf).getOrCreate()

@pandas_udf("uid long, aid long, score float", PandasUDFType.GROUPED_MAP)
def age_predict(df):
    linear = Linear(2, 1)
    linear.load_state_dict(torch.load('linear.pth'))
    linear.eval()
    df['score'] = linear(torch.from_numpy(df.values).type(torch.float32)).detach().numpy()
    return df.loc[:, ['uid', 'aid', 'score']]

if __name__ == "__main__":
    df = spark.read.format("json").load("hdfs:///tmp/predict.json").repartition(2)

    #此处的F.spark_partition_id()即为我的文件分区数量
    res = df.groupby(F.spark_partition_id()).apply(age_predict)
    print('file num:',res.count())
    
    hdfspath = "hdfs:///tmp/tmp1"

    # res.repartition(1).rdd.map(lambda x: ' '.join(map(str, x))).saveAsTextFile(hdfspath)
    # coalesce repartition
    res.repartition(1) 
        .write.mode("overwrite") 
        .option("header","false") 
        .option("delimiter", " ") 
        .csv(hdfspath)

    spark.stop()
spark任务提交
spark-submit 
--master yarn 
--deploy-mode cluster 
--driver-memory 10G 
--executor-memory 20G 
--num-executors 40 
--executor-cores 2 
--archives hdfs:///tmp/py3.zip#py3 
--conf spark.yarn.maxAppAttempts=100 
--conf spark.yarn.am.attemptFailuresValidityInterval=1h 
--conf spark.yarn.executor.failuresValidityInterval=1h 
--conf spark.hadoop.fs.hdfs.impl.disable.cache=true 
--conf spark.sql.execution.arrow.pyspark.enabled=true 
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=py3/py3/bin/python 
--files linear.h5 
tfctr.py
吃水不忘挖井人
  • https://www.jianshu.com/p/df0a189ff28b
  • https://www.jianshu.com/p/fc60c967c8b8
  • https://zhuanlan.zhihu.com/p/148216347
  • https://blog.csdn.net/lsshlsw/article/details/79932643
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/673994.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号