栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

python对于grpc的简单操作(三)

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

python对于grpc的简单操作(三)

今天介绍python对于grpc的流式传输,主要有三种,以及视频的流式传输

  1. 客户端流,服务端非流(单流)
  2. 服务端流,客户端非流(单流)
  3. 客户端流,服务端流(双流)

PS:其实无论是单流还是双流,都是在客户端和服务端建立长连接

(一)客户端流,服务端非流(单流)

上proto:

syntax = "proto3";   // 指定protobuf版本

package test;   // 此文件的标识符,不添加也可以,以防止协议消息类型之间的名称冲突

// 定义消息结构类型,字段编号,可不从1开始,但不能重复,request为数据结构名,可自由定义
message request {
	string message = 1;
}


// 定义消息结构类型,数据后面的编号不能重复,response为数据结构名,可自由定义
message response {
	string message = 1;
}

//定义服务,下面定义的这种为最简单的rpc服务,客户端发起请求,服务端返回结果,stream关键字用来定义流式传输
service StreamTest {
	rpc ClientStream (stream request) returns (response) {}
}


client.py

import logging
import time
import grpc
import test_pb2
import test_pb2_grpc


def send_stream():
    message_list = ['1', '2', '3', '4', '5', '6', '7']
    for i in message_list:
        # return #如果想主动断开,可以提前return
        yield test_pb2.request(message=i)
        time.sleep(1)


def run():
    with grpc.insecure_channel('localhost:10086') as channel:
        client = test_pb2_grpc.StreamTestStub(channel)  # 客户端使用Stub类发送请求,参数为频道,为了绑定链接
        response = client.ClientStream(send_stream())  # 需要将上面的send_stream传进来
        print('返回结果:', response.message)


if __name__ == '__main__':
    logging.basicConfig()
    run()

server.py

from concurrent import futures
import logging
import time
import grpc
import test_pb2
import test_pb2_grpc


class StreamTest(test_pb2_grpc.StreamTestServicer):
    def ClientStream(self, request_iterator, context):
        for i in request_iterator:
            # print(dir(request_iterator))
            print(i.message)
            # if i.message == '5':
            #     break
            #     return test_pb2.response(message='提前返回')

        return test_pb2.response(message='ok')

#如果想提前中断传输,客户端可以在接收函数在break或提前return
def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))  # 开启多线程
    test_pb2_grpc.add_StreamTestServicer_to_server(StreamTest(), server)  # 注册本地服务
    server.add_insecure_port('[::]:10086')  # 指定端口以及IP
    # server.add_insecure_port('0.0.0.0:10086')# 指定端口以及IP
    server.start()  # 启动服务器 start()是非阻塞的, 将实例化一个新线程来处理请求
    server.wait_for_termination()  # 阻塞调用线程,直到服务器终止


if __name__ == '__main__':
    logging.basicConfig()
    serve()

运行结果

(二)服务端流,客户端非流(单流)

上proto

syntax = "proto3";   // 指定protobuf版本

package test;   // 此文件的标识符,不添加也可以,以防止协议消息类型之间的名称冲突

// 定义消息结构类型,字段编号,可不从1开始,但不能重复,request为数据结构名,可自由定义
message request {
	string message = 1;
}


// 定义消息结构类型,数据后面的编号不能重复,response为数据结构名,可自由定义
message response {
	string message = 1;
}

//定义服务,下面定义的这种为最简单的rpc服务,客户端发起请求,服务端返回结果,stream关键字用来定义流式传输
service StreamTest {
	rpc ClientStream (stream request) returns (response) {}//客户端发流
	rpc ServerStream (request) returns (stream response) {}//服务端发流
}


client.py

import logging
import time
import grpc
import test_pb2
import test_pb2_grpc


def run():
    with grpc.insecure_channel('localhost:10086') as channel:
        client = test_pb2_grpc.StreamTestStub(channel)  # 客户端使用Stub类发送请求,参数为频道,为了绑定链接
        response = client.ServerStream(test_pb2.request(message='start'))  # 返回结果是一个迭代器
        for i in response:
            print('返回结果:', i.message)
            # if i.message == '5':#客户端主动断开连接可使用braek或return
            #     return


if __name__ == '__main__':
    logging.basicConfig()
    run()

server.py

from concurrent import futures
import logging
import time
import grpc
import test_pb2
import test_pb2_grpc


class StreamTest(test_pb2_grpc.StreamTestServicer):
    def ClientStream(self, request_iterator, context):
        for i in request_iterator:
            print(i.message)
        return test_pb2.response(message='ok')

    def ServerStream(self, request, context):
        print(request)
        message_list = ['1', '2', '3', '4', '5', '6', '7']
        for i in message_list:
            # if context.is_active():#context.is_active()判断连接状态

            # if i == '5':#服务端主动断开连接可使用braek或return
            #     return

            time.sleep(1)
            print('给客户端发送数据', i)
            yield test_pb2.response(message=i)
        print(context.is_active())


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))  # 开启多线程
    test_pb2_grpc.add_StreamTestServicer_to_server(StreamTest(), server)  # 注册本地服务
    server.add_insecure_port('[::]:10086')  # 指定端口以及IP
    # server.add_insecure_port('0.0.0.0:10086')# 指定端口以及IP
    server.start()  # 启动服务器 start()是非阻塞的, 将实例化一个新线程来处理请求
    server.wait_for_termination()  # 阻塞调用线程,直到服务器终止


if __name__ == '__main__':
    logging.basicConfig()
    serve()

运行效果

(三)客户端流,服务端流(双流)

上proto

syntax = "proto3";   // 指定protobuf版本

package test;   // 此文件的标识符,不添加也可以,以防止协议消息类型之间的名称冲突

// 定义消息结构类型,字段编号,可不从1开始,但不能重复,request为数据结构名,可自由定义
message request {
	string message = 1;
}


// 定义消息结构类型,数据后面的编号不能重复,response为数据结构名,可自由定义
message response {
	string message = 1;
}

//定义服务,下面定义的这种为最简单的rpc服务,客户端发起请求,服务端返回结果,stream关键字用来定义流式传输
service StreamTest {
	rpc ClientStream (stream request) returns (response) {}//客户端发流
	rpc ServerStream (request) returns (stream response) {}//服务端发流
	rpc BothStream (stream request) returns (stream response) {}//双流
}


client.py

import logging
import time
import grpc
import test_pb2
import test_pb2_grpc


def send_stream():
    message_list = ['1', '2', '3', '4', '5', '6', '7']
    for i in message_list:
        time.sleep(1)
        yield test_pb2.request(message=i)


def run():
    with grpc.insecure_channel('localhost:10086') as channel:
        client = test_pb2_grpc.StreamTestStub(channel)  # 客户端使用Stub类发送请求,参数为频道,为了绑定链接
        response = client.BothStream(send_stream())  # 返回结果是一个迭代器
        for i in response:
            print('客户端收到', i.message)


if __name__ == '__main__':
    logging.basicConfig()
    run()

server.py

from concurrent import futures
import logging
import time
import grpc
import test_pb2
import test_pb2_grpc

from threading import Thread


def send(message):
    for i in message:
        yield test_pb2.response(message=i)


class StreamTest(test_pb2_grpc.StreamTestServicer):
    def ClientStream(self, request_iterator, context):
        for i in request_iterator:
            print(i.message)
        return test_pb2.response(message='ok')

    def ServerStream(self, request, context):
        print(request)
        message_list = ['1', '2', '3', '4', '5', '6', '7']
        for i in message_list:
            time.sleep(1)
            print('给客户端发送数据', i)
            yield test_pb2.response(message=i)

    def BothStream(self, request_iterator, context):

        for i in request_iterator:
        	#当然下面的yield也可以放在这,是可以同时接收和发送的
            print('服务端收到',i.message)

        for j in ['a', 'b', 'c', 'd', 'e', 'f', 'g']:
            time.sleep(1)
            yield test_pb2.response(message=j)


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))  # 开启多线程
    test_pb2_grpc.add_StreamTestServicer_to_server(StreamTest(), server)  # 注册本地服务
    server.add_insecure_port('[::]:10086')  # 指定端口以及IP
    # server.add_insecure_port('0.0.0.0:10086')# 指定端口以及IP
    server.start()  # 启动服务器 start()是非阻塞的, 将实例化一个新线程来处理请求
    server.wait_for_termination()  # 阻塞调用线程,直到服务器终止


if __name__ == '__main__':
    logging.basicConfig()
    serve()

执行效果

(四)视频的流式传输

proto文件

syntax = "proto3";   // 指定protobuf版本

package test;   // 此文件的标识符,不添加也可以,以防止协议消息类型之间的名称冲突

// 定义消息结构类型,字段编号,可不从1开始,但不能重复,request为数据结构名,可自由定义
message request {
	bytes data = 1;
}


// 定义消息结构类型,数据后面的编号不能重复,response为数据结构名,可自由定义
message response {
	string message = 1;
}

//定义服务,下面定义的这种为最简单的rpc服务,客户端发起请求,服务端返回结果,stream关键字用来定义流式传输
service StreamTest {
	rpc ClientStream (stream request) returns (response) {}
}

client.py

import logging
import os
import grpc
import test_pb2
import test_pb2_grpc



def send_stream():
    with open('input.mp4', 'rb') as f:
        size = os.path.getsize('input.mp4') / 1024
        n = 0
        while True:
            content = f.read(1024)
            if content:
                n = n + 1
                print('传输进度:{}%'.format(round(n / size * 100, 2)))
                yield test_pb2.request(data=content)
            else:
                break


def run():
    with grpc.insecure_channel('localhost:10086', options=[
        ('grpc.max_send_message_length', 256 * 1024 * 1024),  # 修改数据传输的大小限制,因为图片数据可能较大
        ('grpc.max_receive_message_length', 256 * 1024 * 1024),
    ]) as channel:
        client = test_pb2_grpc.StreamTestStub(channel)  # 客户端使用Stub类发送请求,参数为频道,为了绑定链接
        response = client.ClientStream(send_stream())  # 需要将上面的send_stream传进来
        print('返回结果:', response.message)


if __name__ == '__main__':
    logging.basicConfig()
    run()

server.py

from concurrent import futures
import logging
import time

import grpc
import test_pb2
import test_pb2_grpc



class StreamTest(test_pb2_grpc.StreamTestServicer):
    def ClientStream(self, request_iterator, context):
        with open('output.mp4', 'wb') as f:
            for i in request_iterator:
                f.write(i.data)

        print('接收完成')

        return test_pb2.response(message='ok')


# 如果想提前中断传输,客户端可以在接收函数在break或提前return
def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),options=[
        ('grpc.max_send_message_length', 256 * 1024 * 1024),  # 修改数据传输的大小限制,因为图片数据可能较大
        ('grpc.max_receive_message_length', 256 * 1024 * 1024),
    ])  # 开启多线程
    test_pb2_grpc.add_StreamTestServicer_to_server(StreamTest(), server)  # 注册本地服务
    server.add_insecure_port('[::]:10086')  # 指定端口以及IP
    # server.add_insecure_port('0.0.0.0:10086')# 指定端口以及IP
    server.start()  # 启动服务器 start()是非阻塞的, 将实例化一个新线程来处理请求
    server.wait_for_termination()  # 阻塞调用线程,直到服务器终止


if __name__ == '__main__':
    logging.basicConfig()
    serve()

执行效果

(五)结语

如果有什么错误的地方,还请大家批评指正。最后,希望小伙伴们都能有所收获。写这些,仅记录自己学习python操作grpc的过程

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/1037089.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号