栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

如何在一个类中实现asyncio websockets?

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

如何在一个类中实现asyncio websockets?

如何编写异步程序?

  1. 您应该使用定义异步功能
    async
  2. 您应该使用调用异步函数
    await
  3. 您需要事件循环才能启动异步程序

所有其他内容与常规Python程序几乎相同。

import asynciofrom websockets import connectclass EchoWebsocket:    async def __aenter__(self):        self._conn = connect("wss://echo.websocket.org")        self.websocket = await self._conn.__aenter__()     return self    async def __aexit__(self, *args, **kwargs):        await self._conn.__aexit__(*args, **kwargs)    async def send(self, message):        await self.websocket.send(message)    async def receive(self):        return await self.websocket.recv()async def main():    async with EchoWebsocket() as echo:        await echo.send("Hello!")        print(await echo.receive())  # "Hello!"if __name__ == '__main__':    loop = asyncio.get_event_loop()    loop.run_until_complete(main())

输出:

Hello!

如您所见,代码与您编写的几乎相同。

唯一的区别是

websockets.connect
设计成异步上下文管理器(它使用
__aenter__
__aexit__
)。有必要释放连接,这也将帮助您在类初始化期间进行异步操作(因为我们没有的异步版本
__init__
)。

我建议您以相同的方式组织课程。但是,如果由于某些原因您确实不想使用上下文管理器,则可以使用新

__await__
方法进行异步初始化,并使用其他一些异步函数来释放连接:

import sysimport asynciofrom websockets import connectclass EchoWebsocket:    def __await__(self):        # see: http://stackoverflow.com/a/33420721/1113207        return self._async_init().__await__()    async def _async_init(self):        self._conn = connect("wss://echo.websocket.org")        self.websocket = await self._conn.__aenter__()        return self    async def close(self):        await self._conn.__aexit__(*sys.exc_info())    async def send(self, message):        await self.websocket.send(message)    async def receive(self):        return await self.websocket.recv()async def main():    echo = await EchoWebsocket()    try:        await echo.send("Hello!")        print(await echo.receive())  # "Hello!"    finally:        await echo.close()if __name__ == '__main__':    loop = asyncio.get_event_loop()    loop.run_until_complete(main())

websockets
您可以在docs中找到许多使用示例。



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/653149.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号