栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

使用asyncio创建两个并发异步任务

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

使用asyncio创建两个并发异步任务

您使用的方式不

ThreadPoolExecutor
正确,您真的不想在这里使用它。相反,您需要设置使用者和生产者以使用队列来处理套接字和管道以在它们之间发送消息。

  • 对于每种连接类型,创建一个协程以创建连接,然后将该单个连接传递给该连接的使用者和生产者 任务 (使用创建

    asyncio.create_task()
    )。用于使用
    asyncio.wait()
    来运行两个任务
    return_when=asyncio.FIRST_COMPLETED
    ,因此当两个任务之一完成“提前”(例如失败)时,您可以取消任何仍在运行的任务。

  • 使用队列将消息从一个使用方传递到另一个连接的生成方。

  • sys.stdin
    sys.stdout
    阻塞 流,不只是读,写他们!请参阅https://gist.github.com/nathan-hoad/8966377,以了解尝试设置非阻塞STDIO流的要点,以及此异步问题,要求提供非阻塞流功能。

  • 不要使用全局套接字连接,当然不要使用两个单独的

    async with
    语句。您的
    send_to_socket()
    方法实际上将 关闭 套接字,因为在
    async with connection as web_socket:
    发送第一条消息时上下文管理器会退出,这会导致
    socket_receiver
    代码问题(假定套接字无限期保持打开状态)。

  • 不要在这里使用线程!您的连接完全由asyncio进行管理,因此,线程连接会因此而脚。

  • asyncio.Executor()
    实例只能与常规可调用对象一起使用, 而不能 与协程一起使用。
    Executor.submit()
    声明它需要一个可调用的,在协程中传递
    executor.submit(send_to_pipe(message))
    executor.submit(send_to_socket(message))
    会导致异常,因为协程不是可调用的。您可能没有看到异常消息,因为该异常在另一个线程中引发。

这就是

socket_receiver()
协程失败的原因。它当然 开始,
但是尝试发送消息失败。当我在本地模拟的websocket服务器上运行您的代码时,将显示警告:

    RuntimeWarning: coroutine 'send_to_socket' was never awaited  executor.submit(send_to_socket(message))

当不等待协程时,该协程中的代码将永远不会执行。将协程包装成一个输出stderr(

try: callable(), except Exception:traceback.print_exc(file=sys.stderr))
)异常的包装:

    Traceback (most recent call last):  File "soq52219672.py", line 15, in log_exception    callable()TypeError: 'coroutine' object is not callable

执行程序应仅用于集成无法转换为使用协程的代码。执行者管理该代码使其与

asyncio
任务并行运行而不会受到干扰。如果该代码想与
asyncio
任务交互,始终使用
asyncio.run_coroutine_threadsafe()
asyncio.call_soon_threadsafe()
跨越边界调用,则应格外小心。请参见
并发和多线程 部分

这是一个示例,该示例

stdio()
基于基于Nathan Hoad主题的主题,以及在支持对stdio作为管道的支持有限的Windows的基础上,如何重写您的代码以使用使用者/生产者模式:

import asyncioimport jsonimport osimport sysimport websocketsasync def socket_consumer(socket, outgoing):    # take messages from the web socket and push them into the queue    async for message in socket:        await outgoing.put(message)async def socket_producer(socket, incoming):    # take messages from the queue and send them to the socket    while True:        message = await incoming.get()        jsonmessage = json.dumps(message)        await socket.send(jsonmessage)async def connect_socket(incoming, outgoing):    header = {"Authorization": r"Basic XXXX="}    uri = 'wss://XXXXXXXX'    async with websockets.connect(uri, extra_headers=header) as websocket:        # create tasks for the consumer and producer. The asyncio loop will        # manage these independently        consumer_task = asyncio.create_task(socket_consumer(websocket, outgoing))        producer_task = asyncio.create_task(socket_producer(websocket, incoming))        # start both tasks, but have the loop return to us when one of them        # has ended. We can then cancel the remainder        done, pending = await asyncio.wait( [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED        )        for task in pending: task.cancel()        # force a result check; if there was an exception it'll be re-raised        for task in done: task.result()# pipe supportasync def stdio(loop=None):    if loop is None:        loop = asyncio.get_running_loop()    if sys.platform == 'win32':        # no support for asyncio stdio yet on Windows, see https://bugs.python.org/issue26832        # use an executor to read from stdio and write to stdout        class Win32StdinReader: def __init__(self):     self.stdin = sys.stdin.buffer  async def readline():     # a single call to sys.stdin.readline() is thread-safe     return await loop.run_in_executor(None, self.stdin.readline)        class Win32StdoutWriter: def __init__(self):     self.buffer = []     self.stdout = sys.stdout.buffer def write(self, data):     self.buffer.append(data) async def drain(self):     data, self.buffer = self.buffer, []     # a single call to sys.stdout.writelines() is thread-safe     return await loop.run_in_executor(None, sys.stdout.writelines, data)        return Win32StdinReader(), Win32StdoutWriter()    reader = asyncio.StreamReader()    await loop.connect_read_pipe(        lambda: asyncio.StreamReaderProtocol(reader),        sys.stdin    )    writer_transport, writer_protocol = await loop.connect_write_pipe(        asyncio.streams.FlowControlMixin,        os.fdopen(sys.stdout.fileno(), 'wb')    )    writer = asyncio.streams.StreamWriter(writer_transport, writer_protocol, None, loop)    return reader, writerasync def pipe_consumer(pipereader, outgoing):    # take messages from the pipe and push them into the queue    while True:        message = await pipereader.readline()        if not message: break        await outgoing.put(message.depre('utf8'))async def pipe_producer(pipewriter, incoming):    # take messages from the queue and send them to the pipe    while True:        jsonmessage = await incoming.get()        message = json.loads(jsonmessage)        type = int(message.get('header', {}).get('messageID', -1))        # 1 is DENM message, 2 is CAM message        if type in {1, 2}: pipewriter.write(jsonmessage.enpre('utf8') + b'n') await pipewriter.drain()async def connect_pipe(incoming, outgoing):    reader, writer = await stdio()    # create tasks for the consumer and producer. The asyncio loop will    # manage these independently    consumer_task = asyncio.create_task(pipe_consumer(reader, outgoing))    producer_task = asyncio.create_task(pipe_producer(writer, incoming))    # start both tasks, but have the loop return to us when one of them    # has ended. We can then cancel the remainder    done, pending = await asyncio.wait(        [consumer_task, producer_task],        return_when=asyncio.FIRST_COMPLETED    )    for task in pending:        task.cancel()    # force a result check; if there was an exception it'll be re-raised    for task in done:        task.result()async def main():    pipe_to_socket = asyncio.Queue()    socket_to_pipe = asyncio.Queue()    socket_coro = connect_socket(pipe_to_socket, socket_to_pipe)    pipe_coro = connect_pipe(socket_to_pipe, pipe_to_socket)    await asyncio.gather(socket_coro, pipe_coro)if __name__ == '__main__':    asyncio.run(main())

然后从两个任务开始,一个任务管理套接字,另一个任务管理STDIO管道。两家公司都分别为其消费者和生产者启动了2个任务。有两个队列将消息从一个的消费者发送到另一个的生产者。



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/441296.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号