前言
- tornado version 6.2
- 逻辑:
- 启动项目时创建一个子线程
- 在子线程中进行消息推送服务(阻塞式)
- 多用户连接时需要修改部分逻辑
代码
import threading, asyncio
import tornado.web
import tornado.ioloop
import tornado.httpserver
from myWebSocket import MyWs
def mkApp():
urls = [
(r"/ws", MyWs),
]
return tornado.web.Application(urls, debug=True)
async def admin():
app = mkApp()
app.listen(address="127.0.0.1", port=8888)
await asyncio.Event().wait()
def sm(func, loop):
asyncio.set_event_loop(loop)
loop.run_until_complete(func())
def main():
new_loop = asyncio.new_event_loop()
threading.Thread(target=sm, args=(MyWs.send_message, new_loop)).start()
asyncio.run(admin())
if __name__ == '__main__':
main()
import asyncio, datetime
from tornado.websocket import WebSocketHandler
from typing import Optional, Awaitable, Union
from queue import Empty, Full
from multiprocessing import Queue
async def create_msg():
"""制作消息,假设耗时1s"""
msg = str(datetime.datetime.now())
await asyncio.sleep(1)
return msg
class WSBase(WebSocketHandler):
queue = Queue(1) # 创建容量为1的队列
cli_users = set() # 存储用户信息
def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]:
pass
def check_origin(self, origin):
"""允许跨域请求"""
return True
def open(self):
"""建立连接"""
self.cli_users.add(self)
print(f"建立连接-[{self.request.remote_ip}]-[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]")
try: # 在队列中放入一个信号,队列不阻塞
self.queue.put(True, block=False)
except Full:
pass
def on_close(self):
"""连接断开"""
self.cli_users.remove(self)
print(f"断开连接-[{self.request.remote_ip}]-[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]")
try: # 清空队列,队列不阻塞
self.queue.get(block=False)
except Empty:
pass
def on_message(self, message):
"""接收客户端信息"""
print("client:", message)
@classmethod
async def send_msg(cls):
"""发送过程中可能遇到用户列表被改变"""
try:
for user in cls.cli_users:
msg = await create_msg()
user.write_message(msg) # 接收字符串or字典(字典会自动转为json)
except Exception as e:
pass
@classmethod
async def send_message(cls):
"""每隔5秒推送一次"""
while True:
# 获取队列中的信号,阻塞式获取->即: 一直阻塞直到获取到信号
status = cls.queue.get(block=True)
if status:
cls.queue.put(True)
# 异步操作,取最大耗时:5s
await asyncio.gather(cls.send_msg(), asyncio.sleep(5))
if __name__ == '__main__':
pass