栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

Tornado搭建WebSocket持续推送信息

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Tornado搭建WebSocket持续推送信息

前言
  • tornado version 6.2
  • 逻辑:
    • 启动项目时创建一个子线程
    • 在子线程中进行消息推送服务(阻塞式)
  • 多用户连接时需要修改部分逻辑
代码
  • main文件
import threading, asyncio
import tornado.web
import tornado.ioloop
import tornado.httpserver

from myWebSocket import MyWs


def mkApp():
    urls = [
        (r"/ws", MyWs),
    ]
    return tornado.web.Application(urls, debug=True)


async def admin():
    app = mkApp()
    app.listen(address="127.0.0.1", port=8888)
    await asyncio.Event().wait()


def sm(func, loop):
    asyncio.set_event_loop(loop)
    loop.run_until_complete(func())


def main():
    new_loop = asyncio.new_event_loop()
    threading.Thread(target=sm, args=(MyWs.send_message, new_loop)).start()
    asyncio.run(admin())


if __name__ == '__main__':
    main()

  • myWebSocket文件
import asyncio, datetime
from tornado.websocket import WebSocketHandler
from typing import Optional, Awaitable, Union

from queue import Empty, Full
from multiprocessing import Queue


async def create_msg():
	"""制作消息,假设耗时1s"""
	msg = str(datetime.datetime.now())
	await asyncio.sleep(1)
	return msg


class WSBase(WebSocketHandler):
	queue = Queue(1)   # 创建容量为1的队列
	cli_users = set()  # 存储用户信息

    def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]:
        pass
    
    def check_origin(self, origin):
        """允许跨域请求"""
        return True
    
	def open(self):
		"""建立连接"""
		self.cli_users.add(self)
		print(f"建立连接-[{self.request.remote_ip}]-[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]")
		try:  # 在队列中放入一个信号,队列不阻塞
            self.queue.put(True, block=False)
        except Full:
            pass
	
	def on_close(self):
		"""连接断开"""
		self.cli_users.remove(self)
		print(f"断开连接-[{self.request.remote_ip}]-[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]")
		try:  # 清空队列,队列不阻塞
            self.queue.get(block=False)
        except Empty:
            pass

    def on_message(self, message):
	    """接收客户端信息"""
        print("client:", message)
		       
	@classmethod
    async def send_msg(cls):
        """发送过程中可能遇到用户列表被改变"""
        try:
            for user in cls.cli_users:
                msg = await create_msg()
                user.write_message(msg)  # 接收字符串or字典(字典会自动转为json)
        except Exception as e:
            pass

    @classmethod
    async def send_message(cls):
    	"""每隔5秒推送一次"""
        while True:
        	# 获取队列中的信号,阻塞式获取->即: 一直阻塞直到获取到信号
        	status = cls.queue.get(block=True)
            if status:
                cls.queue.put(True)
			# 异步操作,取最大耗时:5s
            await asyncio.gather(cls.send_msg(), asyncio.sleep(5))


if __name__ == '__main__':
    pass
    
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/1036954.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号